외부 모듈을 위한 Python Microservice 구축기 (최종)
Python, FastAPI, asyncio 를 활용해 분산 작업 관리자를 직접 만들어 서비스 개선하기
이전 글에서는 Celery 를 사용해서 간단하게 Worker 를 띄워봤습니다. 하지만 데이터 포맷으로 인한 이슈로 인해 이번에는 새롭게 분산 작업 관리자 시스템을 만들어보고자 합니다. 이 시스템을 직접 만들게 된 이유는 설명 중간에 언급하겠습니다.
구현에 사용된 핵심적인 기술은 FastAPI 의 생명주기 함수와 asyncio 라는 Python 표준 비동기 프로그래밍을 위한 라이브러리 입니다.
파이썬 3.4 부터 공식적으로 표준으로 채택된 비동기 프로그래밍이 가능한 라이브러리 입니다. 제너레이터(Generator) 기반으로 코루틴을 구현했으며, async/await 구문으로 쉽게 사용할 수 있습니다. 사용하는 방법에 대해서는 JS 의 async/await 와 비슷하다고도 할 수 있습니다. 이것에 대해서는 잘 설명된 블로그가 있기에 읽어보시면 도움이 될 것 같습니다.
분산 작업 관리자 설계하기
분산 작업 관리자를 구현하기 위해서는 다음과 같은 컴포넌트가 필요합니다.
Task
Job
Router
Worker
Client
Task
Job 에서 메시지 처리를 위한 핵심 로직이 담긴 메서드 입니다.
Celery 에서는 이 Task 에 해당하는 메서드에 @app.task
와 같이 어노테이션을 달아 등록합니다. Celery 의 Worker 는 자체 메시지 프로토콜을 해석해서, 이렇게 등록한 각 Task 에 메시지를 라우팅하여 작업을 분배합니다.
# Task 메서드 구현 예시
# name='request_withdrawal'
# queue='request_withdrawal.fifo'
def task_request_withdrawal(payload):
try:
logging.info("[Task Start] - task_request_withdrawal ---")
# 메시지 Deserialize, DTO Model 객체 Mapping
model_json_data = json.loads(payload)
body: TaskRequestWithdrawal = TaskRequestWithdrawal.model_validate(model_json_data)
response = service.request_withdrawal(
symbol=body.symbol,
fee=body.fee,
address=body.address,
amount=body.amount
)
logging.info(f"[Task Response] : {dump(response)}")
except Exception as e:
raise e # 상위 계층으로 에러 전파
finally:
logging.info("[Task End] - task_request_withdrawal ---")
Router (Task Routing Table)
API Server 에서 사용되는 Router 와는 조금 다릅니다. 여기서 Router 는 Job 에 할당할 Task 를 라우팅 해준다는 의미로 사용됩니다. 위에서 정의한 Task 는 이 Routing Table 를 통해서 할당합니다.
저의 경우에는 Job 이 Polling 할 Queue 의 이름을 Key 로 사용하고, Task 로 사용되는 메서드를 Value 로 사용했습니다. 이 테이블의 사용은 후에 설명합니다.
task_routing_table = {
'create-profile.fifo': task_create_profile,
'request-withdrawal.fifo': task_request_withdrawal
}
Job
용어 그대로, “작업” 도메인 자체에 응집력을 가지는 유닛 입니다. Job 하나당 Task 하나를 갖습니다. Job 이 하는 일은, Client 를 통해 별도의 스레드에서 네트워크 I/O 를 하는 것 입니다. (SQS Long-Polling)
receive_message 함수로 Long-Polling 을 시작하면, 네트워크 I/O 이기 때문에 응답이 올 때 까지는 Blocking 상태가 됩니다. 그렇게 되면 CPU 가 불필요하게 놀고 있기 때문에 to_thread 함수로 별도의 스레드에서 대기하도록 하고 이어서 작업을 할 수 있도록 해야 CPU 자원을 이용하는데 효율적입니다.
원격 Queue 에서 메시지를 수신하는 데 성공하면, Job 은 메시지를 가지고 있는 Task 에 전달하여 작업을 처리합니다. 이렇게 보면 Job 자체에 로직이 있어도 문제는 없어보이지만 좀 더 Job 구현이 복잡해지고, 관심사와 객체 자체의 응집력을 위해 컴포넌트를 분리했습니다.
# Job 컴포넌트 구현 예시
# 실제 사용된 구현은 훨씬 구조적이기 때문에, 구현에 있어 필수적인 부분만 소개합니다.
class Job:
def __init__(self, client, queue_name, url, task):
self.client = client
self.queue_name = queue_name
self.task = task
async def receive(self):
# 네트워크 I/O 로 인한 Blocking 을 방지하기 위해, 다른 스레드에서 응답대기 하도록 한다.
response = await asyncio.to_thread(
self.client.receive_message,
QueueUrl=url,
WaitTimeSeconds=self.queue.conf.wait_for_seconds
)
for message in messages:
body = message['Body']
try:
if body:
self.run(body)
except Exception as e:
logging.error(f"[{self.queue_name}] Failed message processing: {e}")
continue
# 메시지 소비 후 삭제하지 않으면 큐에 반환되고, 다른 컨슈머가 메시지를 다시 소비하게 된다.
# 이에 다른 컨슈머가 소비하지 않게 하기 위해 직접 삭제 처리해야 한다.
await asyncio.to_thread(
self.client.delete_message,
QueueUrl=url,
ReceiptHandle=message['ReceiptHandle']
)
def run(self, payload):
self.task(payload)
Worker
Spring Boot 에서 SQS 를 사용할 경우, starter-sqs
라이브러리를 사용할 것 입니다. 여기에는 @SqsListener
라는 편리한 AOP 기반 Task 별 Polling 전략이 들어있어 별도 구현이 필요하지 않고 필요한 메서드에 달아주기만 하면 끝입니다.
하지만, 결정적으로 Python 생태계 에서는 SQS 메시지 수신을 위한 잘 구현된 쓸만한 솔루션이 거의 존재하지 않아 (작은 규모의 오픈소스가 있기는 하지만, boto3 기반이 아닌 경우도 있고 Celery 를 제외하면 대부분 잘 유지되지 않거나 오래된 솔루션이 많다) 직접 필요에 따라 만들어 사용하는 것도 가볍게 사용하기에 나쁘지 않을 것이라 판단했기 때문에 Worker 를 직접 구현했습니다.
# Worker 구현 예시
class Worker:
def __init__(self, config, task_router_table):
self.q: Queue = Queue.of(config, task_router_table)
self.interrupt_flag: bool = False
# 루프를 자연스럽게 인터럽트 하기 위한 이벤트 락 객체
self.shutdown_event_lock: asyncio.Event = None
async def run(self):
while True:
if self.interrupt_flag:
break
# 모든 Job 은 코루틴이고, gather 함수로 호출하면 결과를 기다리는 Future 객체가 된다.
job_futures = [job.receive() for job in self.q.jobs]
await asyncio.gather(*job_futures)
if self.shutdown_event_lock:
self.shutdown_event_lock.set()
# 정지 명령을 하면 다음 루프에서 break 플래그가 켜지고, 마지막 대기가 끝날 때 이벤트 락이 해제된다.
async def stop(self):
self.interrupt_flag = True
self.shutdown_event_lock = asyncio.Event()
await self.shutdown_event_lock.wait()
FastAPI 와 동시에 실행되도록 Worker 실행하기
Worker 작업 루프는 Uvicorn 에서 관리하는 이벤트 루프에서 처리하도록 하기 위해 FastAPI 의 생명주기인 on_event
Handler 를 사용해서 직접 태워야 합니다. 이 때 주의할 점은, startup
이벤트 메서드에 async 를 붙여서 코루틴으로 만들면 안됩니다. async 를 붙이게 되면 Worker 클래스의 startup_event 또한 코루틴인데, 여기에 await 를 붙여서 루프가 끝날 때 까지 기다리게 만들기 때문에, Worker 루프는 동작하지만 정작 FastAPI 서버는 띄워지지 않습니다.
반대로, shutdown
이벤트 메서드에서는 이벤트 락을 사용해서 자연스레 끝날 때 까지 대기하도록 하기 위해 async, await 를 붙여 코루틴을 대기하도록 만들었습니다.
worker_app 을 시작할 때는, create_task
메서드를 사용해서 Task 로서 실행되도록 예약해야 합니다. 그래야 ASGI 웹 서버의 이벤트 루프에서 백그라운드 작업으로 실행되기 때문입니다.
app = FastAPI()
worker_app = WorkerApp()
# 앱 시작시 무한루프가 들어있는 작업은 절대 async 를 붙이면 안된다!!!
@app.on_event("startup")
def startup_event():
asyncio.create_task(worker_app.startup_event())
@app.on_event("shutdown")
async def shutdown_event():
await worker_app.shutdown_event()
처음에는 이 Worker 를 실행하기 위해 멀티프로세싱으로 구현했습니다. 이 멀티프로세싱 (하위 프로세스 fork) 방식은 추후 Worker Process 를 별도의 인스턴스에서 띄우는 것을 고려하기도 했고, Apache MPM Worker 모델을 모티브로 따로 main 진입점을 가지는 Worker App 을 만들어 subprocess 로써 사용했으나, 구독/발행 이벤트 처리 규모가 앞으로도 크게 변화가 없을 것 같아, FastAPI 에서 사용하는 Uvicorn 의 비동기 이벤트 루프를 적극 활용하기 위해 지금과 같은 구조로 만들어졌습니다.
결과적으로 배포 및 테스트까지 성공적으로 완료되었으나, 다음과 같은 개선이 필요한 부분이 남아있습니다.
네트워크 I/O 처리를 하는 스레드의 적절한 관리
현재는 구독하는 모든 Queue 에 대해 Long-Polling 을 수행하고 있습니다. 구독하는 Queue 가 몇개 되지 않는 것도 있지만, 수요에 대해서는 어떻게 될지 아무도 모르기 때문에, 적절하게 스레드를 할당해서 효율적인 Polling 을 수행하는 구조로 바꿀 필요가 있습니다.
Job 역할에 대한 모호성
이것은 의미론적으로 고찰중인 부분인데, 이 구현에서의 Job 은 Queue 에 대해 receive_message 를 호출하고 Task 로 메시지를 전달하는 역할만 수행합니다. 이렇게 Polling 만 해서 메시지 상하차만 하는 것이 진정 Job 이라고 의미를 붙일 수 있는 것일까? 하는 의문이 들었습니다.
Last updated