diff --git a/RABBITMQ_SETUP.md b/RABBITMQ_SETUP.md new file mode 100644 index 0000000..f8dead6 --- /dev/null +++ b/RABBITMQ_SETUP.md @@ -0,0 +1,202 @@ +# RabbitMQ Integration Guide + +## Overview + +This project now uses RabbitMQ for asynchronous message queue processing. The architecture follows the **producer-consumer pattern**: + +- **Senders (Producers)**: API endpoints send messages to RabbitMQ queues +- **Receivers (Consumers)**: Separate consumer processes listen to queues and save to databases + +## Architecture + +### Message Flow + +``` +API Endpoint → Service → RabbitMQ Queue → Consumer → Database +``` + +### Queues + +- **users_queue**: Receives user events (create, update, delete) +- **reports_queue**: Receives report events (create, update_visibility, delete) + +## Setup and Configuration + +### 1. Install Dependencies + +```bash +pip install -r requirements.txt +``` + +This includes the `pika` package for RabbitMQ communication. + +### 2. Start RabbitMQ + +Ensure RabbitMQ is running on your system: + +```bash +# Using Docker +docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management + +# Or using local installation +rabbitmq-server +``` + +## Running the Application + +### Start the API Server + +```bash +cd src +python main.py +``` + +The API will be available at `http://localhost:8000` + +### Start Consumers + +In separate terminal windows, run the consumers: + +#### User Consumer +```bash +cd src +python -m consumers.user_consumer +``` + +#### Report Consumer +```bash +cd src +python -m consumers.report_consumer +``` + +## Usage Examples + +### Creating a User + +```bash +curl -X POST http://localhost:8000/users/ \ + -H "Content-Type: application/json" \ + -d '{ + "nombre": "John", + "apellido": "Doe", + "email": "john@example.com", + "fecha_nacimiento": "1990-01-01T00:00:00", + "url_foto_perfil": "http://example.com/photo.jpg", + "biografia": "A test user" + }' +``` + +**Response** (Immediate): +```json +{ + "status": "queued", + "message": "Usuario enviado a cola para procesamiento", + "email": "john@example.com" +} +``` + +The user will be saved to the database by the User Consumer. + +### Creating a Report + +```bash +curl -X POST http://localhost:8000/reports/ \ + -H "Content-Type: application/json" \ + -d '{ + "id_usuario": 1, + "tipo_reporte": 1, + "descripcion": "Issue description", + "ubicacion": "Location info" + }' +``` + +**Response** (Immediate): +```json +{ + "status": "queued", + "message": "Reporte enviado a cola para procesamiento", + "id_reporte": "uuid-string" +} +``` + +The report will be saved to the database by the Report Consumer. + +## Message Formats + +### User Event Messages + +```python +{ + "event_type": "user.create|user.update|user.delete", + "user_id": Optional[int], + "nombre": Optional[str], + "apellido": Optional[str], + "email": Optional[str], + "fecha_nacimiento": Optional[str], # ISO format + "fecha_creacion": Optional[str], # ISO format + "calificacion": Optional[float], + "numero_reportes": Optional[int], + "url_foto_perfil": Optional[str], + "biografia": Optional[str] +} +``` + +### Report Event Messages + +```python +{ + "event_type": "report.create|report.update_visibility|report.delete", + "id_reporte": Optional[str], + "id_usuario": Optional[int], + "tipo_reporte": Optional[int], + "descripcion": Optional[str], + "ubicacion": Optional[str], + "visibilidad": Optional[float], + "fecha_creacion": Optional[str], # ISO format + "penalize_author": Optional[bool] +} +``` + +## Consumer Implementation Details + +### User Consumer (`src/consumers/user_consumer.py`) + +Processes three types of user events: + +1. **CREATE**: Saves a new user to the database +2. **UPDATE**: Updates existing user fields +3. **DELETE**: Removes a user from the database + +### Report Consumer (`src/consumers/report_consumer.py`) + +Processes three types of report events: + +1. **CREATE**: Saves a new report to MongoDB and increments user's report counter +2. **UPDATE_VISIBILITY**: Updates report visibility and optionally penalizes the author +3. **DELETE**: Removes a report from the database + +## Benefits of This Architecture + +1. **Asynchronous Processing**: API responds immediately without waiting for database operations +2. **Scalability**: Consumers can be scaled independently +3. **Reliability**: Messages are persistent and won't be lost +4. **Decoupling**: Services are decoupled from database operations +5. **Message Ordering**: FIFO guarantee ensures operations are processed in order + +## Error Handling + +- Messages are acknowledged only after successful processing +- Failed messages are automatically requeued for retry +- All operations are logged for debugging and monitoring + +## Database Compatibility + +- **Users**: MySQL (via SQLAlchemy) +- **Reports**: MongoDB + +## Future Enhancements + +- Add retry policies with exponential backoff +- Implement dead-letter queues for failed messages +- Add message monitoring and analytics +- Implement distributed transaction handling diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..4d9309c --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,39 @@ +version: '3.8' + +services: + mysql: + image: mysql:8.0 + container_name: voxpopuli_mysql + environment: + MYSQL_ROOT_PASSWORD: rootpassword + MYSQL_DATABASE: voxpopuli_users + MYSQL_USER: voxpopuli + MYSQL_PASSWORD: voxpopuli_pass + ports: + - "3306:3306" + volumes: + - mysql_data:/var/lib/mysql + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "localhost"] + interval: 10s + timeout: 5s + retries: 5 + + mongodb: + image: mongo:7.0 + container_name: voxpopuli_mongo + environment: + MONGO_INITDB_DATABASE: voxpopuli_reports + ports: + - "27017:27017" + volumes: + - mongo_data:/data/db + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] + interval: 10s + timeout: 5s + retries: 5 + +volumes: + mysql_data: + mongo_data: \ No newline at end of file diff --git a/enable-venv.sh b/enable-venv.sh old mode 100644 new mode 100755 diff --git a/requirements.txt b/requirements.txt index ef9d18b..c76ee35 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ pydantic pydantic-settings pymongo python-dotenv +pika diff --git a/src/application/services/report_services.py b/src/application/services/report_services.py index 1413920..f97bbee 100644 --- a/src/application/services/report_services.py +++ b/src/application/services/report_services.py @@ -1,12 +1,14 @@ from domain.reports import Report from application.ports.report_repository import ReportRepository from application.ports.user_repository import UserRepository +from infrastructure.adapters.rabbitmq.sender import send_to_queue +from infrastructure.adapters.rabbitmq.messages import ReportMessage, ReportEventType from datetime import datetime -from typing import List, Optional +from typing import List, Optional, Dict, Any from uuid import uuid4 class CreateReport: - """Use case para crear un nuevo reporte""" + """Use case para crear un nuevo reporte - envía mensaje a RabbitMQ""" def __init__(self, repo: ReportRepository, user_repo: UserRepository): if not isinstance(repo, ReportRepository): raise TypeError("repo must implement ReportRepository") @@ -16,26 +18,51 @@ class CreateReport: self.user_repo = user_repo def execute(self, id_usuario: int, tipo_reporte: int, descripcion: str, - ubicacion: Optional[str] = None) -> Report: - # Verificar que el usuario existe + ubicacion: Optional[str] = None) -> Dict[str, Any]: + """ + Sends a create report message to RabbitMQ. + The actual database save will be done by the consumer. + + Returns: + Dictionary with status and message + """ + # Verify user exists (we still need to check this before queuing) user = self.user_repo.find_by_id(id_usuario) if not user: - raise ValueError(f"Usuario con ID {id_usuario} no existe") + return { + "status": "error", + "message": f"Usuario con ID {id_usuario} no existe" + } - report = Report( - id_reporte=str(uuid4()), + id_reporte = str(uuid4()) + fecha_creacion = datetime.now() + + # Create message object + message = ReportMessage( + event_type=ReportEventType.CREATE, + id_reporte=id_reporte, id_usuario=id_usuario, tipo_reporte=tipo_reporte, descripcion=descripcion, ubicacion=ubicacion, visibilidad=50.0, # Visibilidad inicial neutral - fecha_creacion=datetime.now() + fecha_creacion=fecha_creacion.isoformat() ) - # Incrementar contador de reportes del usuario - self.user_repo.increment_reports(id_usuario) + # Send to RabbitMQ + success = send_to_queue("reports_queue", message.to_dict()) - return self.repo.save(report) + if success: + return { + "status": "queued", + "message": "Reporte enviado a cola para procesamiento", + "id_reporte": id_reporte + } + else: + return { + "status": "error", + "message": "Error al enviar reporte a la cola de procesamiento" + } class GetReportById: """Use case para obtener un reporte por ID""" @@ -68,31 +95,51 @@ class ListAllReports: return self.repo.find_all() class UpdateReportVisibility: - """Use case para actualizar la visibilidad de un reporte basado en votación comunitaria""" + """Use case para actualizar la visibilidad de un reporte basado en votación comunitaria - envía mensaje a RabbitMQ""" def __init__(self, repo: ReportRepository, user_repo: UserRepository): if not isinstance(repo, ReportRepository): raise TypeError("repo must implement ReportRepository") self.repo = repo self.user_repo = user_repo - def execute(self, report_id: str, new_visibility: float, penalize_author: bool = False) -> None: + def execute(self, report_id: str, new_visibility: float, penalize_author: bool = False) -> Dict[str, Any]: + """ + Sends an update report visibility message to RabbitMQ. + The actual database update will be done by the consumer. + + Returns: + Dictionary with status and message + """ # Validar rango de visibilidad if new_visibility < 0 or new_visibility > 100: - raise ValueError("La visibilidad debe estar entre 0 y 100") + return { + "status": "error", + "message": "La visibilidad debe estar entre 0 y 100" + } - report = self.repo.find_by_id(report_id) - if not report: - raise ValueError(f"Reporte con ID {report_id} no existe") + # Create message object + message = ReportMessage( + event_type=ReportEventType.UPDATE_VISIBILITY, + id_reporte=report_id, + visibilidad=new_visibility, + penalize_author=penalize_author + ) - self.repo.update_visibility(report_id, new_visibility) + # Send to RabbitMQ + success = send_to_queue("reports_queue", message.to_dict()) - # Si la visibilidad es muy baja (shadowban), penalizar al autor - if penalize_author and new_visibility < 20: - user = self.user_repo.find_by_id(report.id_usuario) - if user: - # Reducir calificación del usuario - new_rating = max(0, user.calificacion - 5) - self.user_repo.update_rating(report.id_usuario, new_rating) + if success: + return { + "status": "queued", + "message": "Actualización de visibilidad enviada a cola para procesamiento", + "report_id": report_id, + "new_visibility": new_visibility + } + else: + return { + "status": "error", + "message": "Error al enviar actualización de visibilidad a la cola de procesamiento" + } class GetShadowbannedReports: """Use case para obtener reportes shadowbaneados (baja visibilidad)""" @@ -105,11 +152,37 @@ class GetShadowbannedReports: return self.repo.find_shadowbanned(visibility_threshold) class DeleteReport: - """Use case para eliminar un reporte""" + """Use case para eliminar un reporte - envía mensaje a RabbitMQ""" def __init__(self, repo: ReportRepository): if not isinstance(repo, ReportRepository): raise TypeError("repo must implement ReportRepository") self.repo = repo - def execute(self, report_id: str) -> bool: - return self.repo.delete(report_id) + def execute(self, report_id: str) -> Dict[str, Any]: + """ + Sends a delete report message to RabbitMQ. + The actual database deletion will be done by the consumer. + + Returns: + Dictionary with status and message + """ + # Create message object + message = ReportMessage( + event_type=ReportEventType.DELETE, + id_reporte=report_id + ) + + # Send to RabbitMQ + success = send_to_queue("reports_queue", message.to_dict()) + + if success: + return { + "status": "queued", + "message": f"Reporte {report_id} enviado a cola para eliminación", + "id_reporte": report_id + } + else: + return { + "status": "error", + "message": "Error al enviar eliminación del reporte a la cola de procesamiento" + } diff --git a/src/application/services/user_services.py b/src/application/services/user_services.py index 6afc5cb..7ae7c09 100644 --- a/src/application/services/user_services.py +++ b/src/application/services/user_services.py @@ -1,10 +1,12 @@ from domain.users import User from application.ports.user_repository import UserRepository +from infrastructure.adapters.rabbitmq.sender import send_to_queue +from infrastructure.adapters.rabbitmq.messages import UserMessage, UserEventType from datetime import datetime -from typing import List, Optional +from typing import List, Optional, Dict, Any class CreateUser: - """Use case para crear un nuevo usuario""" + """Use case para crear un nuevo usuario - envía mensaje a RabbitMQ""" def __init__(self, repo: UserRepository): if not isinstance(repo, UserRepository): raise TypeError("repo must implement UserRepository") @@ -12,20 +14,44 @@ class CreateUser: def execute(self, nombre: str, apellido: str, email: str, fecha_nacimiento: datetime, url_foto_perfil: Optional[str] = None, - biografia: Optional[str] = None) -> User: - user = User( - user_id=0, + biografia: Optional[str] = None) -> Dict[str, Any]: + """ + Sends a create user message to RabbitMQ. + The actual database save will be done by the consumer. + + Returns: + Dictionary with status and message + """ + fecha_creacion = datetime.now() + + # Create message object + message = UserMessage( + event_type=UserEventType.CREATE, nombre=nombre, apellido=apellido, email=email, - fecha_nacimiento=fecha_nacimiento, - fecha_creacion=datetime.now(), - calificacion=50.0, # Puntuación inicial + fecha_nacimiento=fecha_nacimiento.isoformat(), + fecha_creacion=fecha_creacion.isoformat(), + calificacion=50.0, numero_reportes=0, url_foto_perfil=url_foto_perfil, biografia=biografia ) - return self.repo.save(user) + + # Send to RabbitMQ + success = send_to_queue("users_queue", message.to_dict()) + + if success: + return { + "status": "queued", + "message": "Usuario enviado a cola para procesamiento", + "email": email + } + else: + return { + "status": "error", + "message": "Error al enviar usuario a la cola de procesamiento" + } class GetUserById: """Use case para obtener un usuario por ID""" @@ -58,33 +84,78 @@ class ListAllUsers: return self.repo.find_all() class UpdateUser: - """Use case para actualizar un usuario""" + """Use case para actualizar un usuario - envía mensaje a RabbitMQ""" def __init__(self, repo: UserRepository): if not isinstance(repo, UserRepository): raise TypeError("repo must implement UserRepository") self.repo = repo def execute(self, user_id: int, nombre: str = None, apellido: str = None, - url_foto_perfil: str = None, biografia: str = None) -> Optional[User]: - user = self.repo.find_by_id(user_id) - if user: - if nombre: - user.nombre = nombre - if apellido: - user.apellido = apellido - if url_foto_perfil is not None: - user.url_foto_perfil = url_foto_perfil - if biografia is not None: - user.biografia = biografia - return self.repo.update(user) - return None + url_foto_perfil: str = None, biografia: str = None) -> Dict[str, Any]: + """ + Sends an update user message to RabbitMQ. + The actual database update will be done by the consumer. + + Returns: + Dictionary with status and message + """ + # Create message object with only the fields to update + message = UserMessage( + event_type=UserEventType.UPDATE, + user_id=user_id, + nombre=nombre, + apellido=apellido, + url_foto_perfil=url_foto_perfil, + biografia=biografia + ) + + # Send to RabbitMQ + success = send_to_queue("users_queue", message.to_dict()) + + if success: + return { + "status": "queued", + "message": f"Usuario {user_id} enviado a cola para actualización", + "user_id": user_id + } + else: + return { + "status": "error", + "message": "Error al enviar actualización del usuario a la cola de procesamiento" + } class DeleteUser: - """Use case para eliminar un usuario""" + """Use case para eliminar un usuario - envía mensaje a RabbitMQ""" def __init__(self, repo: UserRepository): if not isinstance(repo, UserRepository): raise TypeError("repo must implement UserRepository") self.repo = repo - def execute(self, user_id: int) -> bool: - return self.repo.delete(user_id) + def execute(self, user_id: int) -> Dict[str, Any]: + """ + Sends a delete user message to RabbitMQ. + The actual database deletion will be done by the consumer. + + Returns: + Dictionary with status and message + """ + # Create message object + message = UserMessage( + event_type=UserEventType.DELETE, + user_id=user_id + ) + + # Send to RabbitMQ + success = send_to_queue("users_queue", message.to_dict()) + + if success: + return { + "status": "queued", + "message": f"Usuario {user_id} enviado a cola para eliminación", + "user_id": user_id + } + else: + return { + "status": "error", + "message": "Error al enviar eliminación del usuario a la cola de procesamiento" + } diff --git a/src/consumers/__init__.py b/src/consumers/__init__.py new file mode 100644 index 0000000..632de3a --- /dev/null +++ b/src/consumers/__init__.py @@ -0,0 +1 @@ +"""RabbitMQ Consumer implementations""" diff --git a/src/consumers/report_consumer.py b/src/consumers/report_consumer.py new file mode 100644 index 0000000..94df7c0 --- /dev/null +++ b/src/consumers/report_consumer.py @@ -0,0 +1,149 @@ +"""Report RabbitMQ Consumer - Processes report events and saves to database""" +import sys +import os +import logging +from datetime import datetime + +# Add src to path to import modules +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer +from infrastructure.adapters.rabbitmq.messages import ReportMessage, ReportEventType +from infrastructure.adapters.persistence.report_repository_mongo import ReportRepositoryMongo +from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL +from domain.reports import Report + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class ReportConsumer: + """Consumer for report events from RabbitMQ""" + + def __init__(self): + self.repo = ReportRepositoryMongo() + self.user_repo = UserRepositorySQL() + self.consumer = RabbitMQConsumer(queue_name='reports_queue') + self.consumer.set_callback(self.process_message) + + def process_message(self, message_dict: dict): + """ + Processes a report event message from RabbitMQ + + Args: + message_dict: Dictionary containing the message data + """ + try: + # Reconstruct the ReportMessage object + message = ReportMessage.from_dict(message_dict) + + if message.event_type == ReportEventType.CREATE: + self._handle_create_report(message) + elif message.event_type == ReportEventType.UPDATE_VISIBILITY: + self._handle_update_visibility(message) + elif message.event_type == ReportEventType.DELETE: + self._handle_delete_report(message) + else: + logger.warning(f"Unknown event type: {message.event_type}") + + except Exception as e: + logger.error(f"Error processing report message: {e}", exc_info=True) + raise + + def _handle_create_report(self, message: ReportMessage): + """Handle report create event""" + try: + logger.info(f"Creating report: {message.id_reporte} from user {message.id_usuario}") + + # Parse datetime string + fecha_creacion = datetime.fromisoformat(message.fecha_creacion) + + # Create Report domain object + report = Report( + id_reporte=message.id_reporte, + id_usuario=message.id_usuario, + tipo_reporte=message.tipo_reporte, + descripcion=message.descripcion, + ubicacion=message.ubicacion, + visibilidad=message.visibilidad, + fecha_creacion=fecha_creacion + ) + + # Save to repository + saved_report = self.repo.save(report) + logger.info(f"Report created successfully: {message.id_reporte}") + + # Increment user's report counter + self.user_repo.increment_reports(message.id_usuario) + logger.info(f"Incremented report counter for user: {message.id_usuario}") + + except Exception as e: + logger.error(f"Error creating report: {e}", exc_info=True) + raise + + def _handle_update_visibility(self, message: ReportMessage): + """Handle report visibility update event""" + try: + logger.info(f"Updating visibility for report: {message.id_reporte}") + + # Find the report + report = self.repo.find_by_id(message.id_reporte) + if not report: + logger.warning(f"Report not found: {message.id_reporte}") + return + + # Update visibility + self.repo.update_visibility(message.id_reporte, message.visibilidad) + logger.info(f"Report visibility updated: {message.id_reporte} -> {message.visibilidad}") + + # Penalize author if visibility is very low (shadowban) + if message.penalize_author and message.visibilidad < 20: + try: + user = self.user_repo.find_by_id(report.id_usuario) + if user: + # Reduce user's rating + new_rating = max(0, user.calificacion - 5) + self.user_repo.update_rating(report.id_usuario, new_rating) + logger.info(f"Author penalized: user {report.id_usuario} rating reduced to {new_rating}") + except Exception as e: + logger.error(f"Error penalizing author: {e}") + + except Exception as e: + logger.error(f"Error updating report visibility: {e}", exc_info=True) + raise + + def _handle_delete_report(self, message: ReportMessage): + """Handle report delete event""" + try: + logger.info(f"Deleting report: {message.id_reporte}") + + success = self.repo.delete(message.id_reporte) + if success: + logger.info(f"Report deleted successfully: {message.id_reporte}") + else: + logger.warning(f"Failed to delete report: {message.id_reporte}") + + except Exception as e: + logger.error(f"Error deleting report: {e}", exc_info=True) + raise + + def start(self): + """Start consuming messages""" + logger.info("Starting Report Consumer...") + logger.info("[*] Waiting for report events. Ctrl+C to exit.") + try: + self.consumer.start_consuming() + except KeyboardInterrupt: + logger.info("Report Consumer stopped by user") + except Exception as e: + logger.error(f"Consumer error: {e}", exc_info=True) + raise + + +if __name__ == '__main__': + consumer = ReportConsumer() + consumer.start() diff --git a/src/consumers/user_consumer.py b/src/consumers/user_consumer.py new file mode 100644 index 0000000..983f434 --- /dev/null +++ b/src/consumers/user_consumer.py @@ -0,0 +1,145 @@ +"""User RabbitMQ Consumer - Processes user events and saves to database""" +import sys +import os +import logging +from datetime import datetime + +# Add src to path to import modules +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer +from infrastructure.adapters.rabbitmq.messages import UserMessage, UserEventType +from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL +from domain.users import User + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class UserConsumer: + """Consumer for user events from RabbitMQ""" + + def __init__(self): + self.repo = UserRepositorySQL() + self.consumer = RabbitMQConsumer(queue_name='users_queue') + self.consumer.set_callback(self.process_message) + + def process_message(self, message_dict: dict): + """ + Processes a user event message from RabbitMQ + + Args: + message_dict: Dictionary containing the message data + """ + try: + # Reconstruct the UserMessage object + message = UserMessage.from_dict(message_dict) + + if message.event_type == UserEventType.CREATE: + self._handle_create_user(message) + elif message.event_type == UserEventType.UPDATE: + self._handle_update_user(message) + elif message.event_type == UserEventType.DELETE: + self._handle_delete_user(message) + else: + logger.warning(f"Unknown event type: {message.event_type}") + + except Exception as e: + logger.error(f"Error processing user message: {e}", exc_info=True) + raise + + def _handle_create_user(self, message: UserMessage): + """Handle user create event""" + try: + logger.info(f"Creating user: {message.email}") + + # Parse datetime strings + fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento) + fecha_creacion = datetime.fromisoformat(message.fecha_creacion) + + # Create User domain object + user = User( + user_id=0, # Will be auto-generated by DB + nombre=message.nombre, + apellido=message.apellido, + email=message.email, + fecha_nacimiento=fecha_nacimiento, + fecha_creacion=fecha_creacion, + calificacion=message.calificacion, + numero_reportes=message.numero_reportes, + url_foto_perfil=message.url_foto_perfil, + biografia=message.biografia + ) + + # Save to repository + saved_user = self.repo.save(user) + logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}") + + except Exception as e: + logger.error(f"Error creating user: {e}", exc_info=True) + raise + + def _handle_update_user(self, message: UserMessage): + """Handle user update event""" + try: + logger.info(f"Updating user: {message.user_id}") + + # Find the user + user = self.repo.find_by_id(message.user_id) + if not user: + logger.warning(f"User not found: {message.user_id}") + return + + # Update fields if provided + if message.nombre: + user.nombre = message.nombre + if message.apellido: + user.apellido = message.apellido + if message.url_foto_perfil is not None: + user.url_foto_perfil = message.url_foto_perfil + if message.biografia is not None: + user.biografia = message.biografia + + # Save to repository + updated_user = self.repo.update(user) + logger.info(f"User updated successfully: {message.user_id}") + + except Exception as e: + logger.error(f"Error updating user: {e}", exc_info=True) + raise + + def _handle_delete_user(self, message: UserMessage): + """Handle user delete event""" + try: + logger.info(f"Deleting user: {message.user_id}") + + success = self.repo.delete(message.user_id) + if success: + logger.info(f"User deleted successfully: {message.user_id}") + else: + logger.warning(f"Failed to delete user: {message.user_id}") + + except Exception as e: + logger.error(f"Error deleting user: {e}", exc_info=True) + raise + + def start(self): + """Start consuming messages""" + logger.info("Starting User Consumer...") + logger.info("[*] Waiting for user events. Ctrl+C to exit.") + try: + self.consumer.start_consuming() + except KeyboardInterrupt: + logger.info("User Consumer stopped by user") + except Exception as e: + logger.error(f"Consumer error: {e}", exc_info=True) + raise + + +if __name__ == '__main__': + consumer = UserConsumer() + consumer.start() diff --git a/src/core/config.py b/src/core/config.py index dc6fc4b..06c7386 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -1,3 +1,4 @@ +import os from pydantic_settings import BaseSettings from pydantic import Field @@ -6,19 +7,24 @@ class Settings(BaseSettings): # Base de datos MySQL mysql_url: str = Field( - default="mysql+pymysql://user:password@localhost/voxpopuli_users", + default=os.getenv("MYSQL_URL", "mysql+pymysql://voxpopuli:voxpopuli_pass@localhost:3306/voxpopuli_users"), description="URL de conexión a MySQL para API de Usuarios" ) # Base de datos MongoDB mongodb_url: str = Field( - default="mongodb://localhost:27017", + default=os.getenv("MONGODB_URL", "mongodb://localhost:27017"), description="URL de conexión a MongoDB para API de Reportes" ) mongodb_db: str = Field( default="voxpopuli_reports", description="Base de datos MongoDB" ) + + rabbitmq: str = Field ( + default=os.getenv("RABBITMQ_URI", "localhost") + + ) # API api_title: str = "VoxPopuli Microservices" diff --git a/src/infrastructure/adapters/rabbitmq/__init__.py b/src/infrastructure/adapters/rabbitmq/__init__.py new file mode 100644 index 0000000..752d002 --- /dev/null +++ b/src/infrastructure/adapters/rabbitmq/__init__.py @@ -0,0 +1 @@ +"""RabbitMQ adapters for message publishing and consuming""" diff --git a/src/infrastructure/adapters/rabbitmq/consumer.py b/src/infrastructure/adapters/rabbitmq/consumer.py new file mode 100644 index 0000000..36b1c33 --- /dev/null +++ b/src/infrastructure/adapters/rabbitmq/consumer.py @@ -0,0 +1,71 @@ +"""RabbitMQ message consumer base""" +import pika +import json +from typing import Callable, Dict, Any +import logging + +logger = logging.getLogger(__name__) + + +class RabbitMQConsumer: + """Generic RabbitMQ consumer for consuming messages from queues""" + + def __init__(self, queue_name: str, host: str = 'localhost', port: int = 5672): + self.queue_name = queue_name + self.host = host + self.port = port + self.callback = None + + def set_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None: + """ + Sets the callback function to be called when a message is received + + Args: + callback: Function that takes a message dictionary as argument + """ + self.callback = callback + + def start_consuming(self) -> None: + """ + Starts consuming messages from the queue + """ + try: + connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host, port=self.port) + ) + channel = connection.channel() + + # Declare queue to ensure it exists + channel.queue_declare(queue=self.queue_name, durable=True) + + def callback_wrapper(ch, method, properties, body): + try: + # Decode the message + message = json.loads(body.decode('utf-8')) + logger.info(f"Received message from queue '{self.queue_name}': {message}") + + # Call the user's callback function + if self.callback: + self.callback(message) + + # Acknowledge the message + ch.basic_ack(delivery_tag=method.delivery_tag) + + except Exception as e: + logger.error(f"Error processing message: {e}") + # Negative acknowledge to requeue the message + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + + # Set up the consumer with manual acknowledgment + channel.basic_consume( + queue=self.queue_name, + on_message_callback=callback_wrapper, + auto_ack=False + ) + + logger.info(f"[*] Waiting for messages in queue '{self.queue_name}'. Ctrl+C to exit.") + channel.start_consuming() + + except Exception as e: + logger.error(f"Error in consumer: {e}") + raise diff --git a/src/infrastructure/adapters/rabbitmq/messages.py b/src/infrastructure/adapters/rabbitmq/messages.py new file mode 100644 index 0000000..21775ae --- /dev/null +++ b/src/infrastructure/adapters/rabbitmq/messages.py @@ -0,0 +1,82 @@ +"""Message schemas for RabbitMQ communication""" +from dataclasses import dataclass, asdict +from datetime import datetime +from typing import Optional +from enum import Enum +import json + + +class UserEventType(str, Enum): + """Types of user events""" + CREATE = "user.create" + UPDATE = "user.update" + DELETE = "user.delete" + + +class ReportEventType(str, Enum): + """Types of report events""" + CREATE = "report.create" + UPDATE_VISIBILITY = "report.update_visibility" + DELETE = "report.delete" + + +@dataclass +class UserMessage: + """Message for user events""" + event_type: UserEventType + user_id: Optional[int] = None + nombre: Optional[str] = None + apellido: Optional[str] = None + email: Optional[str] = None + fecha_nacimiento: Optional[str] = None # ISO format datetime string + fecha_creacion: Optional[str] = None # ISO format datetime string + calificacion: Optional[float] = None + numero_reportes: Optional[int] = None + url_foto_perfil: Optional[str] = None + biografia: Optional[str] = None + + def to_dict(self): + """Convert to dictionary""" + data = asdict(self) + data['event_type'] = self.event_type.value + return data + + def to_json(self) -> str: + """Convert to JSON string""" + return json.dumps(self.to_dict()) + + @staticmethod + def from_dict(data: dict) -> 'UserMessage': + """Create from dictionary""" + data['event_type'] = UserEventType(data['event_type']) + return UserMessage(**data) + + +@dataclass +class ReportMessage: + """Message for report events""" + event_type: ReportEventType + id_reporte: Optional[str] = None + id_usuario: Optional[int] = None + tipo_reporte: Optional[int] = None + descripcion: Optional[str] = None + ubicacion: Optional[str] = None + visibilidad: Optional[float] = None + fecha_creacion: Optional[str] = None # ISO format datetime string + penalize_author: Optional[bool] = None # For update_visibility event + + def to_dict(self): + """Convert to dictionary""" + data = asdict(self) + data['event_type'] = self.event_type.value + return data + + def to_json(self) -> str: + """Convert to JSON string""" + return json.dumps(self.to_dict()) + + @staticmethod + def from_dict(data: dict) -> 'ReportMessage': + """Create from dictionary""" + data['event_type'] = ReportEventType(data['event_type']) + return ReportMessage(**data) diff --git a/src/infrastructure/adapters/rabbitmq/sender.py b/src/infrastructure/adapters/rabbitmq/sender.py new file mode 100644 index 0000000..fca1961 --- /dev/null +++ b/src/infrastructure/adapters/rabbitmq/sender.py @@ -0,0 +1,74 @@ +"""RabbitMQ message sender""" +import pika +import json +from typing import Any, Dict +import logging + +logger = logging.getLogger(__name__) + + +class RabbitMQSender: + """Generic RabbitMQ sender for publishing messages to queues""" + + def __init__(self, host: str = 'localhost', port: int = 5672): + self.host = host + self.port = port + + def send_message(self, queue_name: str, message: Dict[str, Any]) -> bool: + """ + Sends a message to a RabbitMQ queue + + Args: + queue_name: Name of the queue to send to + message: Dictionary containing the message data + + Returns: + True if successful, False otherwise + """ + try: + connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host, port=self.port) + ) + channel = connection.channel() + + # Declare queue to ensure it exists + channel.queue_declare(queue=queue_name, durable=True) + + # Convert message to JSON + message_json = json.dumps(message) + + # Publish the message + channel.basic_publish( + exchange='', + routing_key=queue_name, + body=message_json, + properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + ) + ) + + connection.close() + logger.info(f"Message sent to queue '{queue_name}': {message_json}") + return True + + except Exception as e: + logger.error(f"Error sending message to RabbitMQ: {e}") + return False + + +def send_to_queue(queue_name: str, message: Dict[str, Any], + host: str = 'localhost', port: int = 5672) -> bool: + """ + Convenience function to send a message to RabbitMQ + + Args: + queue_name: Name of the queue + message: Message dictionary + host: RabbitMQ host + port: RabbitMQ port + + Returns: + True if successful, False otherwise + """ + sender = RabbitMQSender(host=host, port=port) + return sender.send_message(queue_name, message) diff --git a/src/infrastructure/api/reports/reports.py b/src/infrastructure/api/reports/reports.py index e665d50..686a242 100644 --- a/src/infrastructure/api/reports/reports.py +++ b/src/infrastructure/api/reports/reports.py @@ -11,23 +11,25 @@ router = APIRouter() report_repo = ReportRepositoryMongo() user_repo = UserRepositorySQL() -@router.post("/", response_model=ReportResponse, status_code=status.HTTP_201_CREATED) +@router.post("/", status_code=status.HTTP_202_ACCEPTED) async def create_report(report_data: ReportCreateRequest): - """Crea un nuevo reporte""" + """Crea un nuevo reporte - envía a cola de procesamiento""" try: create_use_case = CreateReport(report_repo, user_repo) - report = create_use_case.execute( + result = create_use_case.execute( id_usuario=report_data.id_usuario, tipo_reporte=report_data.tipo_reporte, descripcion=report_data.descripcion, ubicacion=report_data.ubicacion ) - return report - except ValueError as e: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=str(e) - ) + if result["status"] == "error": + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=result["message"] + ) + return result + except HTTPException: + raise except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, @@ -65,35 +67,38 @@ async def get_shadowbanned_reports(threshold: float = 20): get_use_case = GetShadowbannedReports(report_repo) return get_use_case.execute(threshold) -@router.put("/{report_id}/visibility", status_code=status.HTTP_200_OK) +@router.put("/{report_id}/visibility", status_code=status.HTTP_202_ACCEPTED) async def update_report_visibility(report_id: str, visibility_data: ReportUpdateVisibilityRequest): - """Actualiza la visibilidad de un reporte (basado en votación comunitaria)""" + """Actualiza la visibilidad de un reporte - envía a cola de procesamiento""" try: update_use_case = UpdateReportVisibility(report_repo, user_repo) - update_use_case.execute( + result = update_use_case.execute( report_id=report_id, new_visibility=visibility_data.new_visibility, penalize_author=visibility_data.penalize_author ) - return { - "message": "Visibilidad actualizada exitosamente", - "report_id": report_id, - "new_visibility": visibility_data.new_visibility - } - except ValueError as e: + if result["status"] == "error": + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=result["message"] + ) + return result + except HTTPException: + raise + except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail=str(e) + detail=f"Error al actualizar visibilidad: {str(e)}" ) -@router.delete("/{report_id}", status_code=status.HTTP_204_NO_CONTENT) +@router.delete("/{report_id}", status_code=status.HTTP_202_ACCEPTED) async def delete_report(report_id: str): - """Elimina un reporte""" + """Elimina un reporte - envía a cola de procesamiento""" delete_use_case = DeleteReport(report_repo) - success = delete_use_case.execute(report_id) - if not success: + result = delete_use_case.execute(report_id) + if result["status"] == "error": raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Reporte con ID {report_id} no encontrado" + status_code=status.HTTP_400_BAD_REQUEST, + detail=result["message"] ) - return None + return result diff --git a/src/infrastructure/api/users/users.py b/src/infrastructure/api/users/users.py index 90d1154..79f2557 100644 --- a/src/infrastructure/api/users/users.py +++ b/src/infrastructure/api/users/users.py @@ -8,12 +8,12 @@ from infrastructure.adapters.persistence.user_repository_sql import UserReposito router = APIRouter() user_repo = UserRepositorySQL() -@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED) +@router.post("/", status_code=status.HTTP_202_ACCEPTED) async def create_user(user_data: UserCreateRequest): - """Crea un nuevo usuario""" + """Crea un nuevo usuario - envía a cola de procesamiento""" try: create_use_case = CreateUser(user_repo) - user = create_use_case.execute( + result = create_use_case.execute( nombre=user_data.nombre, apellido=user_data.apellido, email=user_data.email, @@ -21,7 +21,14 @@ async def create_user(user_data: UserCreateRequest): url_foto_perfil=user_data.url_foto_perfil, biografia=user_data.biografia ) - return user + if result["status"] == "error": + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=result["message"] + ) + return result + except HTTPException: + raise except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, @@ -58,32 +65,32 @@ async def list_users(): list_use_case = ListAllUsers(user_repo) return list_use_case.execute() -@router.put("/{user_id}", response_model=UserResponse) +@router.put("/{user_id}", status_code=status.HTTP_202_ACCEPTED) async def update_user(user_id: int, user_data: UserUpdateRequest): - """Actualiza un usuario""" + """Actualiza un usuario - envía a cola de procesamiento""" update_use_case = UpdateUser(user_repo) - user = update_use_case.execute( + result = update_use_case.execute( user_id=user_id, nombre=user_data.nombre, apellido=user_data.apellido, url_foto_perfil=user_data.url_foto_perfil, biografia=user_data.biografia ) - if not user: + if result["status"] == "error": raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Usuario con ID {user_id} no encontrado" + status_code=status.HTTP_400_BAD_REQUEST, + detail=result["message"] ) - return user + return result -@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) +@router.delete("/{user_id}", status_code=status.HTTP_202_ACCEPTED) async def delete_user(user_id: int): - """Elimina un usuario""" + """Elimina un usuario - envía a cola de procesamiento""" delete_use_case = DeleteUser(user_repo) - success = delete_use_case.execute(user_id) - if not success: + result = delete_use_case.execute(user_id) + if result["status"] == "error": raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Usuario con ID {user_id} no encontrado" + status_code=status.HTTP_400_BAD_REQUEST, + detail=result["message"] ) - return None + return result