본문 바로가기

파이썬 코딩의 기술

파이썬코딩의기술 - BW55 : Queue를 사용해 스레드 사이의 작업을 조율하라

파이썬 프로그램이 동시에 여러 일을 수행한다면, 각 작업을 잘 조율해야 합니다.

    - 동시성 작업을 처리할 때 가장 유용한 방식함수 파이프라인 입니다.

    - 파이프라인은 공장 조립 라인처럼 작동함

    - 파이프라인에는 순차적으로 실행해야 하는 여러 단계가 있고, 각 단계마다 실행할 구체적인 함수가 정해짐

    - 파이프라인의 한쪽 끝에는 새로운 작업이 계속 추가됨 

    - 각 함수는 동시에 실행될 수 있고 각 단계에서 처리해야 하는 일을 담당함

 

작업은 매 단계 함수가 완료될 때마다 다음 단계롤 전달되며, 더 이상 실행할 단계가 없을 때 끝납니다. 

    - 이런 접근 방법은 작업 처리에 블로킹 I/O하위 프로세스가 포함되는 경우특히 좋음

    - 파이썬에서는 블로킹 I/O나 하위 프로세스를 더 쉽게 병렬화할 수 있기 때문

 

(예) 디지털 카메라에서 이미지 스트림을 계속 가져와 이미지 크기를 변경하고 온라인 포토 갤러리에 저장하고 싶다고 하자

    - 3단계 파이프라인으로 나눠 프로그램을 구성할 수 있음

    -  첫 번째 단계에서 새 이미지를 얻고, 얻은 이미즈는 

    - 두 번째 단계의 크기 변환 함수로 보내 처리함

    - 세 번째 크기가 조정된 이미지를 마지막 단계의 업로드 함수에 전달하여 처리함

 

각 단계를 처리하는 함수는 download, resize, upload 라는 파이선 함수로 작성합니다.

def download(item):
    ...
    
def resize(item):
    ...

def upload(item):
     ...

가장 먼저 필요한 기능파이프라인의 단계마다 작업을 전달할 방법 입니다.

    - 스레드 안전한 생산자-소비자(producer-consumer)를 사용해 이를 모델링할 수 있음

from collections import deque
from threading import Lock

class MyQueue:
    def __init__(self):
        self.items = deque()
        self.lock = Lock()

    - 생산자인 디지털 카메라는  미처리 작업을 표현하는 deque의 끝에 새로운 이미지를 추가함

    def put(self, item):
        with self.lock:
        self.items.append(item)

    - 파이프라인의 첫 번째 단계인 소비자는 미처리 작업을 표현하는 deque의 맨 앞에서 이미지를 제거함

    def get(self):
        with self.lock:
            return self.items.popleft()

아래 코드는 큐(queue)에서 가져온 작업에 함수를 적용하고, 그 결과를 다른 큐에 넣는 스레드를 통해 파이프라인의 각 단계를 구현합니다.

 

그리고 각 작업자가 얼마나 많이 새로운 입력을 검사(폴링)했고 얼마나 많이 작업을 완료했는지 추적합니다.

from threading import Thread
import time

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0

 

가장 까다로운 곳입력 큐가 비어있는 경우를 작업자 스레드가 제대로 처리하는 부분 입니다.

    - 큐가 비어있다는 말은 이전 단계가 아직 작업을 완료하지 못했다는 뜻

 

아래 코드에서 IndexError 예외를 잡아내는 부분이 바로 이런 경우 입니다.

    - 이런 조립 라인을 일시 중단시키는 것으로 생각할 수 있음

 

def run(self):
    while True:
        self.polled_count += 1
        try:
            item = self.in_queue.get()
        except IndexError:
            time.sleep(0.01) # 할 일이 없음
        else:
            result = self.func(item)
            self.out_queue.put(result)
            self.work_done += 1

 

파이프라인을 조율하기 위한 조율 지점 역할을 할 수 있도록 각 단계별로 큐를 생성하고 각 단계에 맞는 작업 스레드를 만들어서 서로 연결할 수 있음

 

download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()

done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

 

각 단계를 처리하기 위해 세 가지 스레드를 시작하고,  파이프라인의 첫 번째 단계에 원하는 만큼 작업을 넣습니다.

아래 코드는 download 함수에 필요한 실제 데이터 대신 간단한 object를 사용합니다.

 

for thread in threads:
    thread.start()

for _ in range(1000):
    download_queue.put(object())

이제 done_queue를 지켜보면서 파이프라인이 모든 원소를 처리할 때까지 기다립니다.

while len(done_queue.items) < 1000:
    # 기다리는 동안 유용한 작업을 수행함
    ...

(문제) 스레드들이 새로운 작업을 기다리면서 큐를 폴링하기 때문에 부작용이 생깁니다. 

(문제상황) run 메서드 안의 까다로운 부분인 IndexError 예외를 잡아내는 부분이 상당히 많이 실행됨

 

processed = len(don_queue.items)
polled = sum(t.polled_count for t in threads)
print(f'{processed} 개의 아이템을 처리했습니다,' f'이떄 폴링을 {polled} 번 했습니다.')

>>> 
1000개의 아이템을 처리했습니다, 이때 폴링을 3009번 했습니다.

 (문제) 작업자 함수의 속도가 달라지면 앞에 있는 단계가 그보다 더 뒤에 있는 단계의 진행을 방해하면서 파이프라인을 막을 수 있습니다.

    - 이로 인해 뒤에 있는 단계는 작업을 받지 못하는 기아 상태(starvation)가 돼서 처리할 작업이 없으므로, 루프를 빠르게 돌며 새로운 작업이 들어왔는지 자신의 입력 큐를 계속 검사함.

    - 작업자 스레드가 유용하지 않은 일을 하느라 CPU 시간을 잡아먹게 됨

 

(문제) 문제점이 3가지나 더 존재합니다.

첫째, 모든 작업이 다 끝났는지 검사하기 위해 추가로 done_queue에 대해 바쁜 대기(busy waiting)를 수행해야함

둘째, Worker의 run 메서드가 루프를 무한히 반복함 (현재 구현에서는 작업자스레드에게 루프를 중단할 시점임을 알려줄 뚜렷한 방법이 없음)

셋째, 무엇보다 최악인 점은 파이프라인 진행이 막히면 프로그램이 임의로 중단될 수 있음 

    - 시간과 입력 데이터가 충분히 많으면, 언젠가는 메모리를 다 소모하고 프로그램이 죽어버릴 것 입니다.

 

(교훈) 파이프라인이 나쁘다는 것이 아니라, 제대로 작동하는 생산자-소비자 큐를 직접 구현하기가 어려움

따라서 굳이 직접 할 필요가 없습니다 .

 

대안: Queue

queue 내장 모듈에 있는 Queue 클래스앞에서 설명한 모든 문제를 해결할 수 있는 기능을 제공합니다.

 

Queue는 새로운 데이터가 나타날 때까지 get 메서드가 블록되게 만들어서 작업자의 바쁜 대기 문제를 해결함

    - (예) 아래 코드는 큐에 입력 데이터가 들어오기를 기다리는 스레드를 하나 시작함

from queue import Queue

my_queue = Queue()
def consumer():
    print('소비자 대기')
    my_queue.get() # 다음에 보여줄 put()이 실행된 다음에 실행된다
    print('소비자 완료')
    
thread = Thread(target=consumer)
thread.start()

이 스레드가 더 먼저 실행되지만, Queue 인스턴스에 원소가 put돼서 get 메서드가 반환할 원소가 생기기 전까지

이 스레드는 끝나지 않음.

print('생산자 데이터 추가')
my_queue.put(object()) # 앞에서 본 get()이 실행되기 전에 실행된다
print('생산자 완료')
thread.join()

>>> 
소비자 대기
생산자 데이터 추가
생산자 완료
소비자 완료

파이프라인 중간이 막히는 경우를 해결하기 위해

Queue 클래스에선느 두 단계 사이에 허용할 수 있는 미완성 작업의 최대 개수를 지정할 수 있음

 

이렇게 버퍼 크기를 정하면 큐가 이미 가득 찬 경우 put이 블록됩니다.

    - (예) 아래 코드는 큐 원소가 소비될 때까지 대기하는 스레드를 정의함

 

my_queue = Queue(1) # 버퍼 크기 1

def consumer():
    time.sleep(0.1) # 대기
    my_queue.get() # 두번째로 실행됨
    print('소비자 1')
    my_queue.get() # 네번째로 실행됨
    print('소비자 2')
    print('소비자 완료')
    
thread = Thread(target=consumer)
thread.start()

큐에 원소가 없을 경우 소비자 스레드가 대기하므로, 

생산자 스레드는 소비자 스레드가 get을 호출했는지 여부와 관계없이 put을 두 번 호출해 객체를 큐에 추가할 수 있음

 

하지만!! 이 코드에서 Queue의 크기는 1

 이는 생산자가 두 번째로 호출한 put이 큐에 두 번째 원소를 넣으려면 소비자가 최소 한번이라도 get을 호출할 때까지 기다려야 한다는 뜻임.

my_queue.put(object()) # 첫번째로 실행됨
print('생산자 1')
my_queue.put(object()) # 세번쨰로 실행됨
print('생산자2')
print('생산자 완료')
thread.join()

>>> 
생산자 1
소비자 1
생산자 2
생산자 완료
소비자 2
소비자 완료