Files
VoxPopuli/RABBITMQ_SETUP.md

4.8 KiB

RabbitMQ Integration Guide

Overview

This project now uses RabbitMQ for asynchronous message queue processing. The architecture follows the producer-consumer pattern:

  • Senders (Producers): API endpoints send messages to RabbitMQ queues
  • Receivers (Consumers): Separate consumer processes listen to queues and save to databases

Architecture

Message Flow

API Endpoint → Service → RabbitMQ Queue → Consumer → Database

Queues

  • users_queue: Receives user events (create, update, delete)
  • reports_queue: Receives report events (create, update_visibility, delete)

Setup and Configuration

1. Install Dependencies

pip install -r requirements.txt

This includes the pika package for RabbitMQ communication.

2. Start RabbitMQ

Ensure RabbitMQ is running on your system:

# Using Docker
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

# Or using local installation
rabbitmq-server

Running the Application

Start the API Server

cd src
python main.py

The API will be available at http://localhost:8000

Start Consumers

In separate terminal windows, run the consumers:

User Consumer

cd src
python -m consumers.user_consumer

Report Consumer

cd src
python -m consumers.report_consumer

Usage Examples

Creating a User

curl -X POST http://localhost:8000/users/ \
  -H "Content-Type: application/json" \
  -d '{
    "nombre": "John",
    "apellido": "Doe",
    "email": "john@example.com",
    "fecha_nacimiento": "1990-01-01T00:00:00",
    "url_foto_perfil": "http://example.com/photo.jpg",
    "biografia": "A test user"
  }'

Response (Immediate):

{
  "status": "queued",
  "message": "Usuario enviado a cola para procesamiento",
  "email": "john@example.com"
}

The user will be saved to the database by the User Consumer.

Creating a Report

curl -X POST http://localhost:8000/reports/ \
  -H "Content-Type: application/json" \
  -d '{
    "id_usuario": 1,
    "tipo_reporte": 1,
    "descripcion": "Issue description",
    "ubicacion": "Location info"
  }'

Response (Immediate):

{
  "status": "queued",
  "message": "Reporte enviado a cola para procesamiento",
  "id_reporte": "uuid-string"
}

The report will be saved to the database by the Report Consumer.

Message Formats

User Event Messages

{
  "event_type": "user.create|user.update|user.delete",
  "user_id": Optional[int],
  "nombre": Optional[str],
  "apellido": Optional[str],
  "email": Optional[str],
  "fecha_nacimiento": Optional[str],  # ISO format
  "fecha_creacion": Optional[str],    # ISO format
  "calificacion": Optional[float],
  "numero_reportes": Optional[int],
  "url_foto_perfil": Optional[str],
  "biografia": Optional[str]
}

Report Event Messages

{
  "event_type": "report.create|report.update_visibility|report.delete",
  "id_reporte": Optional[str],
  "id_usuario": Optional[int],
  "tipo_reporte": Optional[int],
  "descripcion": Optional[str],
  "ubicacion": Optional[str],
  "visibilidad": Optional[float],
  "fecha_creacion": Optional[str],  # ISO format
  "penalize_author": Optional[bool]
}

Consumer Implementation Details

User Consumer (src/consumers/user_consumer.py)

Processes three types of user events:

  1. CREATE: Saves a new user to the database
  2. UPDATE: Updates existing user fields
  3. DELETE: Removes a user from the database

Report Consumer (src/consumers/report_consumer.py)

Processes three types of report events:

  1. CREATE: Saves a new report to MongoDB and increments user's report counter
  2. UPDATE_VISIBILITY: Updates report visibility and optionally penalizes the author
  3. DELETE: Removes a report from the database

Benefits of This Architecture

  1. Asynchronous Processing: API responds immediately without waiting for database operations
  2. Scalability: Consumers can be scaled independently
  3. Reliability: Messages are persistent and won't be lost
  4. Decoupling: Services are decoupled from database operations
  5. Message Ordering: FIFO guarantee ensures operations are processed in order

Error Handling

  • Messages are acknowledged only after successful processing
  • Failed messages are automatically requeued for retry
  • All operations are logged for debugging and monitoring

Database Compatibility

  • Users: MySQL (via SQLAlchemy)
  • Reports: MongoDB

Future Enhancements

  • Add retry policies with exponential backoff
  • Implement dead-letter queues for failed messages
  • Add message monitoring and analytics
  • Implement distributed transaction handling