4.8 KiB
4.8 KiB
RabbitMQ Integration Guide
Overview
This project now uses RabbitMQ for asynchronous message queue processing. The architecture follows the producer-consumer pattern:
- Senders (Producers): API endpoints send messages to RabbitMQ queues
- Receivers (Consumers): Separate consumer processes listen to queues and save to databases
Architecture
Message Flow
API Endpoint → Service → RabbitMQ Queue → Consumer → Database
Queues
- users_queue: Receives user events (create, update, delete)
- reports_queue: Receives report events (create, update_visibility, delete)
Setup and Configuration
1. Install Dependencies
pip install -r requirements.txt
This includes the pika package for RabbitMQ communication.
2. Start RabbitMQ
Ensure RabbitMQ is running on your system:
# Using Docker
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# Or using local installation
rabbitmq-server
Running the Application
Start the API Server
cd src
python main.py
The API will be available at http://localhost:8000
Start Consumers
In separate terminal windows, run the consumers:
User Consumer
cd src
python -m consumers.user_consumer
Report Consumer
cd src
python -m consumers.report_consumer
Usage Examples
Creating a User
curl -X POST http://localhost:8000/users/ \
-H "Content-Type: application/json" \
-d '{
"nombre": "John",
"apellido": "Doe",
"email": "john@example.com",
"fecha_nacimiento": "1990-01-01T00:00:00",
"url_foto_perfil": "http://example.com/photo.jpg",
"biografia": "A test user"
}'
Response (Immediate):
{
"status": "queued",
"message": "Usuario enviado a cola para procesamiento",
"email": "john@example.com"
}
The user will be saved to the database by the User Consumer.
Creating a Report
curl -X POST http://localhost:8000/reports/ \
-H "Content-Type: application/json" \
-d '{
"id_usuario": 1,
"tipo_reporte": 1,
"descripcion": "Issue description",
"ubicacion": "Location info"
}'
Response (Immediate):
{
"status": "queued",
"message": "Reporte enviado a cola para procesamiento",
"id_reporte": "uuid-string"
}
The report will be saved to the database by the Report Consumer.
Message Formats
User Event Messages
{
"event_type": "user.create|user.update|user.delete",
"user_id": Optional[int],
"nombre": Optional[str],
"apellido": Optional[str],
"email": Optional[str],
"fecha_nacimiento": Optional[str], # ISO format
"fecha_creacion": Optional[str], # ISO format
"calificacion": Optional[float],
"numero_reportes": Optional[int],
"url_foto_perfil": Optional[str],
"biografia": Optional[str]
}
Report Event Messages
{
"event_type": "report.create|report.update_visibility|report.delete",
"id_reporte": Optional[str],
"id_usuario": Optional[int],
"tipo_reporte": Optional[int],
"descripcion": Optional[str],
"ubicacion": Optional[str],
"visibilidad": Optional[float],
"fecha_creacion": Optional[str], # ISO format
"penalize_author": Optional[bool]
}
Consumer Implementation Details
User Consumer (src/consumers/user_consumer.py)
Processes three types of user events:
- CREATE: Saves a new user to the database
- UPDATE: Updates existing user fields
- DELETE: Removes a user from the database
Report Consumer (src/consumers/report_consumer.py)
Processes three types of report events:
- CREATE: Saves a new report to MongoDB and increments user's report counter
- UPDATE_VISIBILITY: Updates report visibility and optionally penalizes the author
- DELETE: Removes a report from the database
Benefits of This Architecture
- Asynchronous Processing: API responds immediately without waiting for database operations
- Scalability: Consumers can be scaled independently
- Reliability: Messages are persistent and won't be lost
- Decoupling: Services are decoupled from database operations
- Message Ordering: FIFO guarantee ensures operations are processed in order
Error Handling
- Messages are acknowledged only after successful processing
- Failed messages are automatically requeued for retry
- All operations are logged for debugging and monitoring
Database Compatibility
- Users: MySQL (via SQLAlchemy)
- Reports: MongoDB
Future Enhancements
- Add retry policies with exponential backoff
- Implement dead-letter queues for failed messages
- Add message monitoring and analytics
- Implement distributed transaction handling