Implementação do fluxo do pedido com a fila
Esta seção consiste em explicitar dois serviços separados que interagem entre si usando RabbitMQ. Um serviço publica pedidos em uma fila, enquanto o outro consome esses pedidos e os armazena em um banco de dados PostgreSQL.
Estrutura do Projeto
O projeto está organizado em dois serviços principais:
- Serviço 1 - main.py: Publica pedidos na fila RabbitMQ a partir de uma rota HTTP.
- Serviço 2 - consume.py: Consome os pedidos da fila e os armazena em um banco de dados.
Serviço 1 - Publicador de Pedidos
Este serviço expõe uma API HTTP que permite a criação de pedidos. Quando um pedido é criado, ele é publicado na fila RabbitMQ.
Estrutura de Diretórios
app/
controllers/
queue.py
- Define as funções de controle para criar pedidos.
services/
queueConfig.py
- Define funções para conexão com RabbitMQ e publicação na fila.
routes/
queue.py
- Define as rotas da API para publicação das mensagens na fila.
schemas/
order.py
- Define a estrutura de dados a serem enviados pra fila e armazenados no banco.
main.py
- Instancia a aplicação FastAPI e configura as rotas.
Arquivos Importantes
queueConfig.py
Este arquivo contém funções para configurar a conexão com RabbitMQ e publicar mensagens na fila.
import os
import pika
import json
from .order import OrderService
import asyncio
def get_connection():
rabbitmq_host = os.getenv('RABBITMQ_HOST')
rabbitmq_port = int(os.getenv('RABBITMQ_PORT'))
rabbitmq_user = os.getenv('RABBITMQ_DEFAULT_USER')
rabbitmq_pass = os.getenv('RABBITMQ_DEFAULT_PASS')
credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_pass)
parameters = pika.ConnectionParameters(
host=rabbitmq_host,
port=rabbitmq_port,
credentials=credentials
)
connection = pika.BlockingConnection(parameters)
return connection
def publish_order(queue_name, message):
connection = get_connection()
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True) # Durable to ensure it persists
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Make message persistent
))
connection.close()
def create_order_queue(payload):
publish_order('create_order_queue', payload)
queue.py
(Controller)
Este arquivo define as funções de controle para criar pedidos.
from fastapi import HTTPException
from services.queueConfig import create_order_queue
async def handle_create_order(payload):
try:
order = create_order_queue(payload)
return {"message": "Order created successfully", "order": order}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
queue.py
(Routes)
Define as rotas da API.
from fastapi import APIRouter, Body
from schemas.order import UpdateOrder, CreateOrder
from controllers.queue import handle_create_order
router = APIRouter(prefix="/queue", tags=["queue"])
@router.post("/neworder")
async def create_order_endpoint(request: CreateOrder = Body()):
payload = request.dict()
await handle_create_order(payload)
return {"message":"Order queued succesfully"}
main.py
Instancia a aplicação FastAPI e configura as rotas.
from fastapi import FastAPI
from app.routes.queue import router as queue_router
app = FastAPI()
app.include_router(queue_router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Serviço 2 - Consumer de Pedidos
Este serviço consome os pedidos da fila RabbitMQ e os armazena em um banco de dados.
Estrutura de Diretórios
app/
services/
queueConfig.py
- Define a função para consumir mensagens da fila.order.py
- Define a lógica de negócios para armazenar pedidos no banco de dados.
consume.py
- Instancia a aplicação FastAPI e inicia a thread para consumir mensagens.
Arquivos Importantes
queueConfig.py
Define a função para consumir mensagens da fila.
import os
import pika
import json
import asyncio
from .order import OrderService
def get_connection():
rabbitmq_host = os.getenv('RABBITMQ_HOST')
rabbitmq_port = int(os.getenv('RABBITMQ_PORT'))
rabbitmq_user = os.getenv('RABBITMQ_DEFAULT_USER')
rabbitmq_pass = os.getenv('RABBITMQ_DEFAULT_PASS')
credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_pass)
parameters = pika.ConnectionParameters(
host=rabbitmq_host,
port=rabbitmq_port,
credentials=credentials
)
connection = pika.BlockingConnection(parameters)
return connection
def consume_order_queue():
connection = get_connection()
channel = connection.channel()
channel.queue_declare(queue='create_order_queue', durable=True)
order_service = OrderService() # Instância do OrderService
def callback(ch, method, properties, body):
payload = json.loads(body)
print(body)
asyncio.run(order_service.create_in_db(payload))
channel.basic_consume(queue='create_order_queue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
order.py
(Service)
Define a lógica de negócios para armazenar pedidos no banco de dados.
from prisma import Prisma
from datetime import datetime
from contextlib import asynccontextmanager
class OrderService:
def __init__(self):
self.db = Prisma()
self.createdAt = datetime.now()
@asynccontextmanager
async def database_connection(self):
await self.db.connect()
try:
yield
finally:
await self.db.disconnect()
async def create_in_db(self, payload):
async with self.database_connection():
try:
new_order = await self.db.order.create(
data={
"medicationId": payload['medicationId'],
"sender_userId": payload['sender_userId'],
"status": payload.get('status', ['PENDING']),
"createdAt": self.createdAt,
"updatedAt": self.createdAt,
}
)
print(f"Order stored successfully: {payload['medicationId']}")
except Exception as e:
print(f"Failed to store order: {e}")
consume.py
Instancia a aplicação FastAPI e inicia a thread para consumir mensagens.
from fastapi import FastAPI
from contextlib import asynccontextmanager
from fastapi.middleware.cors import CORSMiddleware
import threading
import os
import uvicorn
from services.queueConfig import consume_order_queue
@asynccontextmanager
async def lifespan(app: FastAPI):
from prismaClient import prismaClient # Importa aqui para evitar problemas de importação circular
await prismaClient.connect()
yield
await prismaClient.disconnect()
app = FastAPI(lifespan=lifespan)
# Middleware de CORS (caso necessário)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
if __name__ == "__main__":
if "RABBITMQ_HOST" in os.environ:
try:
# Cria uma thread para receber as mensagens do RabbitMQ
thread = threading.Thread(target=consume_order_queue)
thread.start()
uvicorn.run(app, host="0.0.0.0", port=8001)
except Exception as e:
print(f"Finalizando a execução da thread: {e}")
thread.join()
else:
raise Exception("HOST and PORT must be defined in environment variables")
Como Executar os Serviços
Serviço 1
- Configure as variáveis de ambiente necessárias (RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_DEFAULT_USER, RABBITMQ_DEFAULT_PASS).
- Inicie o serviço:
python3 app/main.py
Serviço 2
- Configure as variáveis de ambiente necessárias (RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_DEFAULT_USER, RABBITMQ_DEFAULT_PASS).
- Inicie o serviço:
python3 app/consume.py
Conclusão
Esta seção demonstra como dois serviços inicialmente interagem usando RabbitMQ para comunicar e processar pedidos. Um serviço publica pedidos na fila, enquanto o outro consome esses pedidos e os armazena em um banco de dados.