외부 모듈을 위한 Python Microservice 구축기 (최종)

Python, FastAPI, asyncio 를 활용해 분산 작업 관리자를 직접 만들어 서비스 개선하기

이전 글에서는 Celery 를 사용해서 간단하게 Worker 를 띄워봤습니다. 하지만 데이터 포맷으로 인한 이슈로 인해 이번에는 새롭게 분산 작업 관리자 시스템을 만들어보고자 합니다. 이 시스템을 직접 만들게 된 이유는 설명 중간에 언급하겠습니다.

구현에 사용된 핵심적인 기술은 FastAPI 의 생명주기 함수와 asyncio 라는 Python 표준 비동기 프로그래밍을 위한 라이브러리 입니다.

파이썬 3.4 부터 공식적으로 표준으로 채택된 비동기 프로그래밍이 가능한 라이브러리 입니다. 제너레이터(Generator) 기반으로 코루틴을 구현했으며, async/await 구문으로 쉽게 사용할 수 있습니다. 사용하는 방법에 대해서는 JS 의 async/await 와 비슷하다고도 할 수 있습니다. 이것에 대해서는 잘 설명된 블로그가 있기에 읽어보시면 도움이 될 것 같습니다.

분산 작업 관리자 설계하기

분산 작업 관리자를 구현하기 위해서는 다음과 같은 컴포넌트가 필요합니다.

  • Task

  • Job

  • Router

  • Worker

  • Client

Task

Job 에서 메시지 처리를 위한 핵심 로직이 담긴 메서드 입니다.

Celery 에서는 이 Task 에 해당하는 메서드에 @app.task 와 같이 어노테이션을 달아 등록합니다. Celery 의 Worker 는 자체 메시지 프로토콜을 해석해서, 이렇게 등록한 각 Task 에 메시지를 라우팅하여 작업을 분배합니다.

# Task 메서드 구현 예시

# name='request_withdrawal'
# queue='request_withdrawal.fifo'
def task_request_withdrawal(payload):
    try:
        logging.info("[Task Start] - task_request_withdrawal ---")

	# 메시지 Deserialize, DTO Model 객체 Mapping
        model_json_data = json.loads(payload)
        body: TaskRequestWithdrawal = TaskRequestWithdrawal.model_validate(model_json_data)

        response = service.request_withdrawal(
            symbol=body.symbol,
            fee=body.fee,
            address=body.address,
            amount=body.amount
        )

        logging.info(f"[Task Response] : {dump(response)}")
    except Exception as e:
        raise e # 상위 계층으로 에러 전파
    finally:
        logging.info("[Task End] - task_request_withdrawal ---")

Router (Task Routing Table)

API Server 에서 사용되는 Router 와는 조금 다릅니다. 여기서 Router 는 Job 에 할당할 Task 를 라우팅 해준다는 의미로 사용됩니다. 위에서 정의한 Task 는 이 Routing Table 를 통해서 할당합니다.

저의 경우에는 Job 이 Polling 할 Queue 의 이름을 Key 로 사용하고, Task 로 사용되는 메서드를 Value 로 사용했습니다. 이 테이블의 사용은 후에 설명합니다.

task_routing_table = {
    'create-profile.fifo': task_create_profile,
    'request-withdrawal.fifo': task_request_withdrawal
}

Job

용어 그대로, “작업” 도메인 자체에 응집력을 가지는 유닛 입니다. Job 하나당 Task 하나를 갖습니다. Job 이 하는 일은, Client 를 통해 별도의 스레드에서 네트워크 I/O 를 하는 것 입니다. (SQS Long-Polling)

receive_message 함수로 Long-Polling 을 시작하면, 네트워크 I/O 이기 때문에 응답이 올 때 까지는 Blocking 상태가 됩니다. 그렇게 되면 CPU 가 불필요하게 놀고 있기 때문에 to_thread 함수로 별도의 스레드에서 대기하도록 하고 이어서 작업을 할 수 있도록 해야 CPU 자원을 이용하는데 효율적입니다.

원격 Queue 에서 메시지를 수신하는 데 성공하면, Job 은 메시지를 가지고 있는 Task 에 전달하여 작업을 처리합니다. 이렇게 보면 Job 자체에 로직이 있어도 문제는 없어보이지만 좀 더 Job 구현이 복잡해지고, 관심사와 객체 자체의 응집력을 위해 컴포넌트를 분리했습니다.

# Job 컴포넌트 구현 예시

# 실제 사용된 구현은 훨씬 구조적이기 때문에, 구현에 있어 필수적인 부분만 소개합니다.
class Job:
    def __init__(self, client, queue_name, url, task):
        self.client = client
        self.queue_name = queue_name
        self.task = task

    async def receive(self):
	
	# 네트워크 I/O 로 인한 Blocking 을 방지하기 위해, 다른 스레드에서 응답대기 하도록 한다.
        response = await asyncio.to_thread(
            self.client.receive_message,
            QueueUrl=url,
            WaitTimeSeconds=self.queue.conf.wait_for_seconds
        )
        
        for message in messages:
            body = message['Body']
            
            try:
                if body:
                    self.run(body)
                    
            except Exception as e:
                logging.error(f"[{self.queue_name}] Failed message processing: {e}")
                continue

	# 메시지 소비 후 삭제하지 않으면 큐에 반환되고, 다른 컨슈머가 메시지를 다시 소비하게 된다.
	# 이에 다른 컨슈머가 소비하지 않게 하기 위해 직접 삭제 처리해야 한다.
	await asyncio.to_thread(
            self.client.delete_message,
            QueueUrl=url,
            ReceiptHandle=message['ReceiptHandle']
        )

    def run(self, payload):
        self.task(payload)

Worker

Spring Boot 에서 SQS 를 사용할 경우, starter-sqs 라이브러리를 사용할 것 입니다. 여기에는 @SqsListener 라는 편리한 AOP 기반 Task 별 Polling 전략이 들어있어 별도 구현이 필요하지 않고 필요한 메서드에 달아주기만 하면 끝입니다.

하지만, 결정적으로 Python 생태계 에서는 SQS 메시지 수신을 위한 잘 구현된 쓸만한 솔루션이 거의 존재하지 않아 (작은 규모의 오픈소스가 있기는 하지만, boto3 기반이 아닌 경우도 있고 Celery 를 제외하면 대부분 잘 유지되지 않거나 오래된 솔루션이 많다) 직접 필요에 따라 만들어 사용하는 것도 가볍게 사용하기에 나쁘지 않을 것이라 판단했기 때문에 Worker 를 직접 구현했습니다.

# Worker 구현 예시

class Worker:
    def __init__(self, config, task_router_table):
        self.q: Queue = Queue.of(config, task_router_table)
        self.interrupt_flag: bool = False
        
        # 루프를 자연스럽게 인터럽트 하기 위한 이벤트 락 객체
        self.shutdown_event_lock: asyncio.Event = None

    async def run(self):
        while True:
            if self.interrupt_flag:
                break

	    # 모든 Job 은 코루틴이고, gather 함수로 호출하면 결과를 기다리는 Future 객체가 된다.
            job_futures = [job.receive() for job in self.q.jobs]
            await asyncio.gather(*job_futures)

        if self.shutdown_event_lock:
            self.shutdown_event_lock.set()

    # 정지 명령을 하면 다음 루프에서 break 플래그가 켜지고, 마지막 대기가 끝날 때 이벤트 락이 해제된다.
    async def stop(self):
        self.interrupt_flag = True
        self.shutdown_event_lock = asyncio.Event()
        await self.shutdown_event_lock.wait()

FastAPI 와 동시에 실행되도록 Worker 실행하기

Worker 작업 루프는 Uvicorn 에서 관리하는 이벤트 루프에서 처리하도록 하기 위해 FastAPI 의 생명주기인 on_event Handler 를 사용해서 직접 태워야 합니다. 이 때 주의할 점은, startup 이벤트 메서드에 async 를 붙여서 코루틴으로 만들면 안됩니다. async 를 붙이게 되면 Worker 클래스의 startup_event 또한 코루틴인데, 여기에 await 를 붙여서 루프가 끝날 때 까지 기다리게 만들기 때문에, Worker 루프는 동작하지만 정작 FastAPI 서버는 띄워지지 않습니다.

반대로, shutdown 이벤트 메서드에서는 이벤트 락을 사용해서 자연스레 끝날 때 까지 대기하도록 하기 위해 async, await 를 붙여 코루틴을 대기하도록 만들었습니다.

worker_app 을 시작할 때는, create_task 메서드를 사용해서 Task 로서 실행되도록 예약해야 합니다. 그래야 ASGI 웹 서버의 이벤트 루프에서 백그라운드 작업으로 실행되기 때문입니다.

app = FastAPI()
worker_app = WorkerApp()

# 앱 시작시 무한루프가 들어있는 작업은 절대 async 를 붙이면 안된다!!!
@app.on_event("startup")
def startup_event():
    asyncio.create_task(worker_app.startup_event())

@app.on_event("shutdown")
async def shutdown_event():
    await worker_app.shutdown_event()

처음에는 이 Worker 를 실행하기 위해 멀티프로세싱으로 구현했습니다. 이 멀티프로세싱 (하위 프로세스 fork) 방식은 추후 Worker Process 를 별도의 인스턴스에서 띄우는 것을 고려하기도 했고, Apache MPM Worker 모델을 모티브로 따로 main 진입점을 가지는 Worker App 을 만들어 subprocess 로써 사용했으나, 구독/발행 이벤트 처리 규모가 앞으로도 크게 변화가 없을 것 같아, FastAPI 에서 사용하는 Uvicorn 의 비동기 이벤트 루프를 적극 활용하기 위해 지금과 같은 구조로 만들어졌습니다.

결과적으로 배포 및 테스트까지 성공적으로 완료되었으나, 다음과 같은 개선이 필요한 부분이 남아있습니다.

  1. 네트워크 I/O 처리를 하는 스레드의 적절한 관리

    현재는 구독하는 모든 Queue 에 대해 Long-Polling 을 수행하고 있습니다. 구독하는 Queue 가 몇개 되지 않는 것도 있지만, 수요에 대해서는 어떻게 될지 아무도 모르기 때문에, 적절하게 스레드를 할당해서 효율적인 Polling 을 수행하는 구조로 바꿀 필요가 있습니다.

  2. Job 역할에 대한 모호성

    이것은 의미론적으로 고찰중인 부분인데, 이 구현에서의 Job 은 Queue 에 대해 receive_message 를 호출하고 Task 로 메시지를 전달하는 역할만 수행합니다. 이렇게 Polling 만 해서 메시지 상하차만 하는 것이 진정 Job 이라고 의미를 붙일 수 있는 것일까? 하는 의문이 들었습니다.

Last updated