From 30efa0e0983cc82e00cf22006183e9ae4ce6d1ea Mon Sep 17 00:00:00 2001 From: "Juan M. Ley" Date: Sun, 19 Apr 2026 19:08:43 -0600 Subject: [PATCH] =?UTF-8?q?a=C3=B1adido=20los=20estados=20de=20reporte?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- RABBITMQ_SETUP.md | 404 ++++++++-------- docker-compose.yaml | 76 +-- enable-venv.sh | 104 ++--- src/application/ports/report_repository.py | 5 + src/application/services/report_services.py | 66 ++- src/consumers/__init__.py | 2 +- src/consumers/report_consumer.py | 440 +++++++++--------- src/consumers/user_consumer.py | 314 ++++++------- src/domain/reports.py | 3 +- src/infrastructure/adapters/file_storage.py | 308 ++++++------ .../persistence/report_repository_mongo.py | 9 + .../adapters/rabbitmq/__init__.py | 2 +- .../adapters/rabbitmq/consumer.py | 114 ++--- .../adapters/rabbitmq/messages.py | 171 +++---- .../adapters/rabbitmq/sender.py | 148 +++--- src/infrastructure/api/reports/reports.py | 46 +- src/infrastructure/api/reports/schemas.py | 8 +- 17 files changed, 1173 insertions(+), 1047 deletions(-) diff --git a/RABBITMQ_SETUP.md b/RABBITMQ_SETUP.md index f8dead6..4fa5a70 100644 --- a/RABBITMQ_SETUP.md +++ b/RABBITMQ_SETUP.md @@ -1,202 +1,202 @@ -# RabbitMQ Integration Guide - -## Overview - -This project now uses RabbitMQ for asynchronous message queue processing. The architecture follows the **producer-consumer pattern**: - -- **Senders (Producers)**: API endpoints send messages to RabbitMQ queues -- **Receivers (Consumers)**: Separate consumer processes listen to queues and save to databases - -## Architecture - -### Message Flow - -``` -API Endpoint → Service → RabbitMQ Queue → Consumer → Database -``` - -### Queues - -- **users_queue**: Receives user events (create, update, delete) -- **reports_queue**: Receives report events (create, update_visibility, delete) - -## Setup and Configuration - -### 1. Install Dependencies - -```bash -pip install -r requirements.txt -``` - -This includes the `pika` package for RabbitMQ communication. - -### 2. Start RabbitMQ - -Ensure RabbitMQ is running on your system: - -```bash -# Using Docker -docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management - -# Or using local installation -rabbitmq-server -``` - -## Running the Application - -### Start the API Server - -```bash -cd src -python main.py -``` - -The API will be available at `http://localhost:8000` - -### Start Consumers - -In separate terminal windows, run the consumers: - -#### User Consumer -```bash -cd src -python -m consumers.user_consumer -``` - -#### Report Consumer -```bash -cd src -python -m consumers.report_consumer -``` - -## Usage Examples - -### Creating a User - -```bash -curl -X POST http://localhost:8000/users/ \ - -H "Content-Type: application/json" \ - -d '{ - "nombre": "John", - "apellido": "Doe", - "email": "john@example.com", - "fecha_nacimiento": "1990-01-01T00:00:00", - "url_foto_perfil": "http://example.com/photo.jpg", - "biografia": "A test user" - }' -``` - -**Response** (Immediate): -```json -{ - "status": "queued", - "message": "Usuario enviado a cola para procesamiento", - "email": "john@example.com" -} -``` - -The user will be saved to the database by the User Consumer. - -### Creating a Report - -```bash -curl -X POST http://localhost:8000/reports/ \ - -H "Content-Type: application/json" \ - -d '{ - "id_usuario": 1, - "tipo_reporte": 1, - "descripcion": "Issue description", - "ubicacion": "Location info" - }' -``` - -**Response** (Immediate): -```json -{ - "status": "queued", - "message": "Reporte enviado a cola para procesamiento", - "id_reporte": "uuid-string" -} -``` - -The report will be saved to the database by the Report Consumer. - -## Message Formats - -### User Event Messages - -```python -{ - "event_type": "user.create|user.update|user.delete", - "user_id": Optional[int], - "nombre": Optional[str], - "apellido": Optional[str], - "email": Optional[str], - "fecha_nacimiento": Optional[str], # ISO format - "fecha_creacion": Optional[str], # ISO format - "calificacion": Optional[float], - "numero_reportes": Optional[int], - "url_foto_perfil": Optional[str], - "biografia": Optional[str] -} -``` - -### Report Event Messages - -```python -{ - "event_type": "report.create|report.update_visibility|report.delete", - "id_reporte": Optional[str], - "id_usuario": Optional[int], - "tipo_reporte": Optional[int], - "descripcion": Optional[str], - "ubicacion": Optional[str], - "visibilidad": Optional[float], - "fecha_creacion": Optional[str], # ISO format - "penalize_author": Optional[bool] -} -``` - -## Consumer Implementation Details - -### User Consumer (`src/consumers/user_consumer.py`) - -Processes three types of user events: - -1. **CREATE**: Saves a new user to the database -2. **UPDATE**: Updates existing user fields -3. **DELETE**: Removes a user from the database - -### Report Consumer (`src/consumers/report_consumer.py`) - -Processes three types of report events: - -1. **CREATE**: Saves a new report to MongoDB and increments user's report counter -2. **UPDATE_VISIBILITY**: Updates report visibility and optionally penalizes the author -3. **DELETE**: Removes a report from the database - -## Benefits of This Architecture - -1. **Asynchronous Processing**: API responds immediately without waiting for database operations -2. **Scalability**: Consumers can be scaled independently -3. **Reliability**: Messages are persistent and won't be lost -4. **Decoupling**: Services are decoupled from database operations -5. **Message Ordering**: FIFO guarantee ensures operations are processed in order - -## Error Handling - -- Messages are acknowledged only after successful processing -- Failed messages are automatically requeued for retry -- All operations are logged for debugging and monitoring - -## Database Compatibility - -- **Users**: MySQL (via SQLAlchemy) -- **Reports**: MongoDB - -## Future Enhancements - -- Add retry policies with exponential backoff -- Implement dead-letter queues for failed messages -- Add message monitoring and analytics -- Implement distributed transaction handling +# RabbitMQ Integration Guide + +## Overview + +This project now uses RabbitMQ for asynchronous message queue processing. The architecture follows the **producer-consumer pattern**: + +- **Senders (Producers)**: API endpoints send messages to RabbitMQ queues +- **Receivers (Consumers)**: Separate consumer processes listen to queues and save to databases + +## Architecture + +### Message Flow + +``` +API Endpoint → Service → RabbitMQ Queue → Consumer → Database +``` + +### Queues + +- **users_queue**: Receives user events (create, update, delete) +- **reports_queue**: Receives report events (create, update_visibility, delete) + +## Setup and Configuration + +### 1. Install Dependencies + +```bash +pip install -r requirements.txt +``` + +This includes the `pika` package for RabbitMQ communication. + +### 2. Start RabbitMQ + +Ensure RabbitMQ is running on your system: + +```bash +# Using Docker +docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management + +# Or using local installation +rabbitmq-server +``` + +## Running the Application + +### Start the API Server + +```bash +cd src +python main.py +``` + +The API will be available at `http://localhost:8000` + +### Start Consumers + +In separate terminal windows, run the consumers: + +#### User Consumer +```bash +cd src +python -m consumers.user_consumer +``` + +#### Report Consumer +```bash +cd src +python -m consumers.report_consumer +``` + +## Usage Examples + +### Creating a User + +```bash +curl -X POST http://localhost:8000/users/ \ + -H "Content-Type: application/json" \ + -d '{ + "nombre": "John", + "apellido": "Doe", + "email": "john@example.com", + "fecha_nacimiento": "1990-01-01T00:00:00", + "url_foto_perfil": "http://example.com/photo.jpg", + "biografia": "A test user" + }' +``` + +**Response** (Immediate): +```json +{ + "status": "queued", + "message": "Usuario enviado a cola para procesamiento", + "email": "john@example.com" +} +``` + +The user will be saved to the database by the User Consumer. + +### Creating a Report + +```bash +curl -X POST http://localhost:8000/reports/ \ + -H "Content-Type: application/json" \ + -d '{ + "id_usuario": 1, + "tipo_reporte": 1, + "descripcion": "Issue description", + "ubicacion": "Location info" + }' +``` + +**Response** (Immediate): +```json +{ + "status": "queued", + "message": "Reporte enviado a cola para procesamiento", + "id_reporte": "uuid-string" +} +``` + +The report will be saved to the database by the Report Consumer. + +## Message Formats + +### User Event Messages + +```python +{ + "event_type": "user.create|user.update|user.delete", + "user_id": Optional[int], + "nombre": Optional[str], + "apellido": Optional[str], + "email": Optional[str], + "fecha_nacimiento": Optional[str], # ISO format + "fecha_creacion": Optional[str], # ISO format + "calificacion": Optional[float], + "numero_reportes": Optional[int], + "url_foto_perfil": Optional[str], + "biografia": Optional[str] +} +``` + +### Report Event Messages + +```python +{ + "event_type": "report.create|report.update_visibility|report.delete", + "id_reporte": Optional[str], + "id_usuario": Optional[int], + "tipo_reporte": Optional[int], + "descripcion": Optional[str], + "ubicacion": Optional[str], + "visibilidad": Optional[float], + "fecha_creacion": Optional[str], # ISO format + "penalize_author": Optional[bool] +} +``` + +## Consumer Implementation Details + +### User Consumer (`src/consumers/user_consumer.py`) + +Processes three types of user events: + +1. **CREATE**: Saves a new user to the database +2. **UPDATE**: Updates existing user fields +3. **DELETE**: Removes a user from the database + +### Report Consumer (`src/consumers/report_consumer.py`) + +Processes three types of report events: + +1. **CREATE**: Saves a new report to MongoDB and increments user's report counter +2. **UPDATE_VISIBILITY**: Updates report visibility and optionally penalizes the author +3. **DELETE**: Removes a report from the database + +## Benefits of This Architecture + +1. **Asynchronous Processing**: API responds immediately without waiting for database operations +2. **Scalability**: Consumers can be scaled independently +3. **Reliability**: Messages are persistent and won't be lost +4. **Decoupling**: Services are decoupled from database operations +5. **Message Ordering**: FIFO guarantee ensures operations are processed in order + +## Error Handling + +- Messages are acknowledged only after successful processing +- Failed messages are automatically requeued for retry +- All operations are logged for debugging and monitoring + +## Database Compatibility + +- **Users**: MySQL (via SQLAlchemy) +- **Reports**: MongoDB + +## Future Enhancements + +- Add retry policies with exponential backoff +- Implement dead-letter queues for failed messages +- Add message monitoring and analytics +- Implement distributed transaction handling diff --git a/docker-compose.yaml b/docker-compose.yaml index 4d9309c..539acd6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,39 +1,39 @@ -version: '3.8' - -services: - mysql: - image: mysql:8.0 - container_name: voxpopuli_mysql - environment: - MYSQL_ROOT_PASSWORD: rootpassword - MYSQL_DATABASE: voxpopuli_users - MYSQL_USER: voxpopuli - MYSQL_PASSWORD: voxpopuli_pass - ports: - - "3306:3306" - volumes: - - mysql_data:/var/lib/mysql - healthcheck: - test: ["CMD", "mysqladmin", "ping", "-h", "localhost"] - interval: 10s - timeout: 5s - retries: 5 - - mongodb: - image: mongo:7.0 - container_name: voxpopuli_mongo - environment: - MONGO_INITDB_DATABASE: voxpopuli_reports - ports: - - "27017:27017" - volumes: - - mongo_data:/data/db - healthcheck: - test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] - interval: 10s - timeout: 5s - retries: 5 - -volumes: - mysql_data: +version: '3.8' + +services: + mysql: + image: mysql:8.0 + container_name: voxpopuli_mysql + environment: + MYSQL_ROOT_PASSWORD: rootpassword + MYSQL_DATABASE: voxpopuli_users + MYSQL_USER: voxpopuli + MYSQL_PASSWORD: voxpopuli_pass + ports: + - "3306:3306" + volumes: + - mysql_data:/var/lib/mysql + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "localhost"] + interval: 10s + timeout: 5s + retries: 5 + + mongodb: + image: mongo:7.0 + container_name: voxpopuli_mongo + environment: + MONGO_INITDB_DATABASE: voxpopuli_reports + ports: + - "27017:27017" + volumes: + - mongo_data:/data/db + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] + interval: 10s + timeout: 5s + retries: 5 + +volumes: + mysql_data: mongo_data: \ No newline at end of file diff --git a/enable-venv.sh b/enable-venv.sh index 28b619d..95492a7 100755 --- a/enable-venv.sh +++ b/enable-venv.sh @@ -1,52 +1,52 @@ -#!/bin/bash - -# Script para habilitar el entorno virtual de VoxPopuli Microservices - -# Colores para output -GREEN='\033[0;32m' -YELLOW='\033[1;33m' -NC='\033[0m' # No Color - -echo -e "${YELLOW}================================${NC}" -echo -e "${YELLOW}VoxPopuli Microservices Setup${NC}" -echo -e "${YELLOW}================================${NC}" - -# Crear entorno virtual si no existe -if [ ! -d "venv" ]; then - echo -e "${YELLOW}Creando entorno virtual...${NC}" - python3 -m venv venv - echo -e "${GREEN}✓ Entorno virtual creado${NC}" -else - echo -e "${GREEN}✓ Entorno virtual ya existe${NC}" -fi - -# Activar entorno virtual -echo -e "${YELLOW}Activando entorno virtual...${NC}" -source venv/bin/activate -echo -e "${GREEN}✓ Entorno virtual activado${NC}" - -# Instalar dependencias -if [ -f "requirements.txt" ]; then - echo -e "${YELLOW}Instalando dependencias...${NC}" - pip install -r requirements.txt - echo -e "${GREEN}✓ Dependencias instaladas${NC}" -fi - -# Verificar archivo .env -if [ ! -f ".env" ]; then - if [ -f ".env.example" ]; then - echo -e "${YELLOW}Creando archivo .env desde .env.example...${NC}" - cp .env.example .env - echo -e "${YELLOW}⚠ Por favor, actualiza .env con tus credenciales${NC}" - fi -fi - -echo -echo -e "${GREEN}================================${NC}" -echo -e "${GREEN}✓ Setup completado${NC}" -echo -e "${GREEN}================================${NC}" -echo -echo -e "${YELLOW}Para ejecutar los microservicios:${NC}" -echo "cd src" -echo "python main.py" -echo +#!/bin/bash + +# Script para habilitar el entorno virtual de VoxPopuli Microservices + +# Colores para output +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +echo -e "${YELLOW}================================${NC}" +echo -e "${YELLOW}VoxPopuli Microservices Setup${NC}" +echo -e "${YELLOW}================================${NC}" + +# Crear entorno virtual si no existe +if [ ! -d "venv" ]; then + echo -e "${YELLOW}Creando entorno virtual...${NC}" + python3 -m venv venv + echo -e "${GREEN}✓ Entorno virtual creado${NC}" +else + echo -e "${GREEN}✓ Entorno virtual ya existe${NC}" +fi + +# Activar entorno virtual +echo -e "${YELLOW}Activando entorno virtual...${NC}" +source venv/bin/activate +echo -e "${GREEN}✓ Entorno virtual activado${NC}" + +# Instalar dependencias +if [ -f "requirements.txt" ]; then + echo -e "${YELLOW}Instalando dependencias...${NC}" + pip install -r requirements.txt + echo -e "${GREEN}✓ Dependencias instaladas${NC}" +fi + +# Verificar archivo .env +if [ ! -f ".env" ]; then + if [ -f ".env.example" ]; then + echo -e "${YELLOW}Creando archivo .env desde .env.example...${NC}" + cp .env.example .env + echo -e "${YELLOW}⚠ Por favor, actualiza .env con tus credenciales${NC}" + fi +fi + +echo +echo -e "${GREEN}================================${NC}" +echo -e "${GREEN}✓ Setup completado${NC}" +echo -e "${GREEN}================================${NC}" +echo +echo -e "${YELLOW}Para ejecutar los microservicios:${NC}" +echo "cd src" +echo "python main.py" +echo diff --git a/src/application/ports/report_repository.py b/src/application/ports/report_repository.py index b19b08b..89e3660 100644 --- a/src/application/ports/report_repository.py +++ b/src/application/ports/report_repository.py @@ -35,6 +35,11 @@ class ReportRepository(ABC): """Actualiza la visibilidad de un reporte""" pass + @abstractmethod + def update_estado(self, report_id: str, new_estado: str) -> None: + """Actualiza el estado de un reporte""" + pass + @abstractmethod def delete(self, report_id: str) -> bool: """Elimina un reporte""" diff --git a/src/application/services/report_services.py b/src/application/services/report_services.py index 67c91fa..2c6cb47 100644 --- a/src/application/services/report_services.py +++ b/src/application/services/report_services.py @@ -19,7 +19,8 @@ class CreateReport: def execute(self, id_usuario: int, tipo_reporte: int, descripcion: str, ubicacion: Optional[str] = None, lat: Optional[float] = None, - lng: Optional[float] = None, image_filename: Optional[str] = None) -> Dict[str, Any]: + lng: Optional[float] = None, image_filename: Optional[str] = None, + estado: str = "en proceso") -> Dict[str, Any]: """ Sends a create report message to RabbitMQ. Valida previamente: @@ -44,6 +45,14 @@ class CreateReport: "message": "El tipo de reporte debe estar entre 1 y 5" } + # Validación: estado válido + estados_validos = ["en proceso", "no resuelto", "resuelto"] + if estado not in estados_validos: + return { + "status": "error", + "message": f"El estado del reporte debe ser uno de: {', '.join(estados_validos)}" + } + # Validación: usuario existe (CRÍTICO) try: user = self.user_repo.find_by_id(id_usuario) @@ -73,6 +82,7 @@ class CreateReport: lng=lng, image_filename=image_filename, visibilidad=50.0, # Visibilidad inicial neutral + estado=estado, fecha_creacion=fecha_creacion.isoformat() ) @@ -244,3 +254,57 @@ class DeleteReport: "status": "error", "message": "Error al enviar eliminación del reporte a la cola de procesamiento" } + +class UpdateReportStatus: + """Use case para actualizar el estado de un reporte""" + def __init__(self, repo: ReportRepository): + if not isinstance(repo, ReportRepository): + raise TypeError("repo must implement ReportRepository") + self.repo = repo + + def execute(self, report_id: str, new_estado: str) -> Dict[str, Any]: + """ + Actualiza el estado de un reporte. + Valida previamente: + - Reporte existe + - Estado es válido + + Returns: + Dictionary with status and message + """ + # Validación: estado válido + estados_validos = ["en proceso", "no resuelto", "resuelto"] + if new_estado not in estados_validos: + return { + "status": "error", + "message": f"El estado del reporte debe ser uno de: {', '.join(estados_validos)}" + } + + # Validación: reporte existe + try: + report = self.repo.find_by_id(report_id) + if not report: + return { + "status": "error", + "message": f"Reporte con ID {report_id} no existe" + } + except Exception as e: + return { + "status": "error", + "message": f"Error al buscar reporte: {str(e)}" + } + + # Actualizar estado + try: + self.repo.update_estado(report_id, new_estado) + return { + "status": "success", + "message": f"Estado del reporte actualizado a '{new_estado}'", + "id_reporte": report_id, + "nuevo_estado": new_estado + } + except Exception as e: + return { + "status": "error", + "message": f"Error al actualizar estado del reporte: {str(e)}" + } diff --git a/src/consumers/__init__.py b/src/consumers/__init__.py index 632de3a..082839e 100644 --- a/src/consumers/__init__.py +++ b/src/consumers/__init__.py @@ -1 +1 @@ -"""RabbitMQ Consumer implementations""" +"""RabbitMQ Consumer implementations""" diff --git a/src/consumers/report_consumer.py b/src/consumers/report_consumer.py index 149aaf3..8109f77 100644 --- a/src/consumers/report_consumer.py +++ b/src/consumers/report_consumer.py @@ -1,220 +1,220 @@ -"""Report RabbitMQ Consumer - Processes report events and saves to database""" -import sys -import os -import logging -from datetime import datetime -from pathlib import Path -from shutil import move - -# Add src to path to import modules -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) - -from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer -from infrastructure.adapters.rabbitmq.messages import ReportMessage, ReportEventType -from infrastructure.adapters.persistence.report_repository_mongo import ReportRepositoryMongo -from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL -from infrastructure.adapters.file_storage import image_storage -from domain.reports import Report -from core.config import ConfSettings - -# Set up logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) - - -class ReportConsumer: - """Consumer for report events from RabbitMQ""" - - def __init__(self): - self.repo = ReportRepositoryMongo() - self.user_repo = UserRepositorySQL() - self.consumer = RabbitMQConsumer(queue_name='reports_queue') - self.consumer.set_callback(self.process_message) - - def process_message(self, message_dict: dict): - """ - Processes a report event message from RabbitMQ - - Args: - message_dict: Dictionary containing the message data - """ - try: - # Reconstruct the ReportMessage object - message = ReportMessage.from_dict(message_dict) - - if message.event_type == ReportEventType.CREATE: - self._handle_create_report(message) - elif message.event_type == ReportEventType.UPDATE_VISIBILITY: - self._handle_update_visibility(message) - elif message.event_type == ReportEventType.DELETE: - self._handle_delete_report(message) - else: - logger.warning(f"Unknown event type: {message.event_type}") - - except Exception as e: - logger.error(f"Error processing report message: {e}", exc_info=True) - # Rollback explícito en caso de error - self.user_repo.db.rollback() - raise - - def _handle_create_report(self, message: ReportMessage): - """Handle report create event con manejo de transacciones cruzadas""" - try: - logger.info(f"Creating report: {message.id_reporte} from user {message.id_usuario}") - - # Parse datetime string - fecha_creacion = datetime.fromisoformat(message.fecha_creacion) - - # Renombrar imagen temporal si existe - final_image_filename = None - if message.image_filename: - try: - # Renombrar de temp_userid_type a report_id - temp_path = image_storage.get_image_path(message.image_filename) - final_filename = f"{message.id_reporte}.webp" - final_path = image_storage.get_image_path(final_filename) - - if temp_path.exists(): - move(str(temp_path), str(final_path)) - final_image_filename = final_filename - logger.info(f"Image renamed from {message.image_filename} to {final_filename}") - else: - logger.warning(f"Temporary image not found: {message.image_filename}") - except Exception as img_error: - logger.error(f"Error renaming image: {img_error}", exc_info=True) - # Continuar sin imagen si falla el renombramiento - - # Create Report domain object - report = Report( - id_reporte=message.id_reporte, - id_usuario=message.id_usuario, - tipo_reporte=message.tipo_reporte, - descripcion=message.descripcion, - ubicacion=message.ubicacion, - lat=message.lat, - lng=message.lng, - image_filename=final_image_filename, - visibilidad=message.visibilidad, - fecha_creacion=fecha_creacion - ) - - try: - # Save to MongoDB repository - saved_report = self.repo.save(report) - logger.info(f"Report created successfully in MongoDB: {message.id_reporte}") - except Exception as mongo_error: - logger.error(f"Error saving report to MongoDB: {mongo_error}", exc_info=True) - raise - - try: - # Increment user's report counter in MySQL - self.user_repo.increment_reports(message.id_usuario) - logger.info(f"Incremented report counter for user: {message.id_usuario}") - except Exception as sql_error: - logger.error(f"Error incrementing report counter: {sql_error}", exc_info=True) - # Rollback SQL transaction - self.user_repo.db.rollback() - # Note: MongoDB save cannot be rolled back, log for manual cleanup - logger.critical(f"INCONSISTENCY: Report {message.id_reporte} saved to MongoDB but user counter not incremented for user {message.id_usuario}") - raise - - except Exception as e: - logger.error(f"Error creating report: {e}", exc_info=True) - self.user_repo.db.rollback() - raise - - def _handle_update_visibility(self, message: ReportMessage): - """Handle report visibility update event con manejo de transacciones""" - try: - logger.info(f"Updating visibility for report: {message.id_reporte}") - - # Find the report - report = self.repo.find_by_id(message.id_reporte) - if not report: - logger.warning(f"Report not found: {message.id_reporte}") - return - - # Update visibility in MongoDB - try: - self.repo.update_visibility(message.id_reporte, message.visibilidad) - logger.info(f"Report visibility updated: {message.id_reporte} -> {message.visibilidad}") - except Exception as mongo_error: - logger.error(f"Error updating report visibility in MongoDB: {mongo_error}", exc_info=True) - raise - - # Penalize author if visibility is very low (shadowban) - if message.penalize_author and message.visibilidad < 20: - try: - user = self.user_repo.find_by_id(report.id_usuario) - if user: - # Reduce user's rating - new_rating = max(0, user.calificacion - 5) - self.user_repo.update_rating(report.id_usuario, new_rating) - logger.info(f"Author penalized: user {report.id_usuario} rating reduced to {new_rating}") - else: - logger.warning(f"User not found for penalty: {report.id_usuario}") - except Exception as penalty_error: - logger.error(f"Error penalizing author: {penalty_error}", exc_info=True) - self.user_repo.db.rollback() - logger.critical(f"INCONSISTENCY: Report {message.id_reporte} visibility updated but author penalty failed") - raise - - except Exception as e: - logger.error(f"Error updating report visibility: {e}", exc_info=True) - self.user_repo.db.rollback() - raise - - def _handle_delete_report(self, message: ReportMessage): - """Handle report delete event""" - try: - logger.info(f"Deleting report: {message.id_reporte}") - - # Obtener reportepara acceder a image_filename antes de eliminarlo - report = self.repo.find_by_id(message.id_reporte) - - # Eliminar del MongoDB - success = self.repo.delete(message.id_reporte) - if success: - logger.info(f"Report deleted successfully from MongoDB: {message.id_reporte}") - - # Eliminar imagen del almacenamiento - if report and report.image_filename: - deleted = image_storage.delete_image(report.image_filename) - if deleted: - logger.info(f"Image deleted: {report.image_filename}") - else: - logger.warning(f"Could not delete image: {report.image_filename}") - else: - logger.warning(f"Failed to delete report: {message.id_reporte}") - - except Exception as e: - logger.error(f"Error deleting report: {e}", exc_info=True) - self.user_repo.db.rollback() - raise - - def start(self): - """Start consuming messages""" - logger.info("Starting Report Consumer...") - logger.info("[*] Waiting for report events. Ctrl+C to exit.") - try: - self.consumer.start_consuming() - except KeyboardInterrupt: - logger.info("Report Consumer stopped by user") - except Exception as e: - logger.error(f"Consumer error: {e}", exc_info=True) - raise - finally: - # Asegurar cierre de sesión SQL - if self.user_repo.db: - try: - self.user_repo.db.close() - except Exception as e: - logger.error(f"Error closing database session: {e}") - - -if __name__ == '__main__': - consumer = ReportConsumer() - consumer.start() +"""Report RabbitMQ Consumer - Processes report events and saves to database""" +import sys +import os +import logging +from datetime import datetime +from pathlib import Path +from shutil import move + +# Add src to path to import modules +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer +from infrastructure.adapters.rabbitmq.messages import ReportMessage, ReportEventType +from infrastructure.adapters.persistence.report_repository_mongo import ReportRepositoryMongo +from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL +from infrastructure.adapters.file_storage import image_storage +from domain.reports import Report +from core.config import ConfSettings + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class ReportConsumer: + """Consumer for report events from RabbitMQ""" + + def __init__(self): + self.repo = ReportRepositoryMongo() + self.user_repo = UserRepositorySQL() + self.consumer = RabbitMQConsumer(queue_name='reports_queue') + self.consumer.set_callback(self.process_message) + + def process_message(self, message_dict: dict): + """ + Processes a report event message from RabbitMQ + + Args: + message_dict: Dictionary containing the message data + """ + try: + # Reconstruct the ReportMessage object + message = ReportMessage.from_dict(message_dict) + + if message.event_type == ReportEventType.CREATE: + self._handle_create_report(message) + elif message.event_type == ReportEventType.UPDATE_VISIBILITY: + self._handle_update_visibility(message) + elif message.event_type == ReportEventType.DELETE: + self._handle_delete_report(message) + else: + logger.warning(f"Unknown event type: {message.event_type}") + + except Exception as e: + logger.error(f"Error processing report message: {e}", exc_info=True) + # Rollback explícito en caso de error + self.user_repo.db.rollback() + raise + + def _handle_create_report(self, message: ReportMessage): + """Handle report create event con manejo de transacciones cruzadas""" + try: + logger.info(f"Creating report: {message.id_reporte} from user {message.id_usuario}") + + # Parse datetime string + fecha_creacion = datetime.fromisoformat(message.fecha_creacion) + + # Renombrar imagen temporal si existe + final_image_filename = None + if message.image_filename: + try: + # Renombrar de temp_userid_type a report_id + temp_path = image_storage.get_image_path(message.image_filename) + final_filename = f"{message.id_reporte}.webp" + final_path = image_storage.get_image_path(final_filename) + + if temp_path.exists(): + move(str(temp_path), str(final_path)) + final_image_filename = final_filename + logger.info(f"Image renamed from {message.image_filename} to {final_filename}") + else: + logger.warning(f"Temporary image not found: {message.image_filename}") + except Exception as img_error: + logger.error(f"Error renaming image: {img_error}", exc_info=True) + # Continuar sin imagen si falla el renombramiento + + # Create Report domain object + report = Report( + id_reporte=message.id_reporte, + id_usuario=message.id_usuario, + tipo_reporte=message.tipo_reporte, + descripcion=message.descripcion, + ubicacion=message.ubicacion, + lat=message.lat, + lng=message.lng, + image_filename=final_image_filename, + visibilidad=message.visibilidad, + fecha_creacion=fecha_creacion + ) + + try: + # Save to MongoDB repository + saved_report = self.repo.save(report) + logger.info(f"Report created successfully in MongoDB: {message.id_reporte}") + except Exception as mongo_error: + logger.error(f"Error saving report to MongoDB: {mongo_error}", exc_info=True) + raise + + try: + # Increment user's report counter in MySQL + self.user_repo.increment_reports(message.id_usuario) + logger.info(f"Incremented report counter for user: {message.id_usuario}") + except Exception as sql_error: + logger.error(f"Error incrementing report counter: {sql_error}", exc_info=True) + # Rollback SQL transaction + self.user_repo.db.rollback() + # Note: MongoDB save cannot be rolled back, log for manual cleanup + logger.critical(f"INCONSISTENCY: Report {message.id_reporte} saved to MongoDB but user counter not incremented for user {message.id_usuario}") + raise + + except Exception as e: + logger.error(f"Error creating report: {e}", exc_info=True) + self.user_repo.db.rollback() + raise + + def _handle_update_visibility(self, message: ReportMessage): + """Handle report visibility update event con manejo de transacciones""" + try: + logger.info(f"Updating visibility for report: {message.id_reporte}") + + # Find the report + report = self.repo.find_by_id(message.id_reporte) + if not report: + logger.warning(f"Report not found: {message.id_reporte}") + return + + # Update visibility in MongoDB + try: + self.repo.update_visibility(message.id_reporte, message.visibilidad) + logger.info(f"Report visibility updated: {message.id_reporte} -> {message.visibilidad}") + except Exception as mongo_error: + logger.error(f"Error updating report visibility in MongoDB: {mongo_error}", exc_info=True) + raise + + # Penalize author if visibility is very low (shadowban) + if message.penalize_author and message.visibilidad < 20: + try: + user = self.user_repo.find_by_id(report.id_usuario) + if user: + # Reduce user's rating + new_rating = max(0, user.calificacion - 5) + self.user_repo.update_rating(report.id_usuario, new_rating) + logger.info(f"Author penalized: user {report.id_usuario} rating reduced to {new_rating}") + else: + logger.warning(f"User not found for penalty: {report.id_usuario}") + except Exception as penalty_error: + logger.error(f"Error penalizing author: {penalty_error}", exc_info=True) + self.user_repo.db.rollback() + logger.critical(f"INCONSISTENCY: Report {message.id_reporte} visibility updated but author penalty failed") + raise + + except Exception as e: + logger.error(f"Error updating report visibility: {e}", exc_info=True) + self.user_repo.db.rollback() + raise + + def _handle_delete_report(self, message: ReportMessage): + """Handle report delete event""" + try: + logger.info(f"Deleting report: {message.id_reporte}") + + # Obtener reportepara acceder a image_filename antes de eliminarlo + report = self.repo.find_by_id(message.id_reporte) + + # Eliminar del MongoDB + success = self.repo.delete(message.id_reporte) + if success: + logger.info(f"Report deleted successfully from MongoDB: {message.id_reporte}") + + # Eliminar imagen del almacenamiento + if report and report.image_filename: + deleted = image_storage.delete_image(report.image_filename) + if deleted: + logger.info(f"Image deleted: {report.image_filename}") + else: + logger.warning(f"Could not delete image: {report.image_filename}") + else: + logger.warning(f"Failed to delete report: {message.id_reporte}") + + except Exception as e: + logger.error(f"Error deleting report: {e}", exc_info=True) + self.user_repo.db.rollback() + raise + + def start(self): + """Start consuming messages""" + logger.info("Starting Report Consumer...") + logger.info("[*] Waiting for report events. Ctrl+C to exit.") + try: + self.consumer.start_consuming() + except KeyboardInterrupt: + logger.info("Report Consumer stopped by user") + except Exception as e: + logger.error(f"Consumer error: {e}", exc_info=True) + raise + finally: + # Asegurar cierre de sesión SQL + if self.user_repo.db: + try: + self.user_repo.db.close() + except Exception as e: + logger.error(f"Error closing database session: {e}") + + +if __name__ == '__main__': + consumer = ReportConsumer() + consumer.start() diff --git a/src/consumers/user_consumer.py b/src/consumers/user_consumer.py index c0a0679..198ad23 100644 --- a/src/consumers/user_consumer.py +++ b/src/consumers/user_consumer.py @@ -1,157 +1,157 @@ -"""User RabbitMQ Consumer - Processes user events and saves to database""" -import sys -import os -import logging -from datetime import datetime - -# Add src to path to import modules -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) - -from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer -from infrastructure.adapters.rabbitmq.messages import UserMessage, UserEventType -from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL -from domain.users import User - -# Set up logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) - - -class UserConsumer: - """Consumer for user events from RabbitMQ""" - - def __init__(self): - self.repo = UserRepositorySQL() - self.consumer = RabbitMQConsumer(queue_name='users_queue') - self.consumer.set_callback(self.process_message) - - def process_message(self, message_dict: dict): - """ - Processes a user event message from RabbitMQ - - Args: - message_dict: Dictionary containing the message data - """ - try: - # Reconstruct the UserMessage object - message = UserMessage.from_dict(message_dict) - - if message.event_type == UserEventType.CREATE: - self._handle_create_user(message) - elif message.event_type == UserEventType.UPDATE: - self._handle_update_user(message) - elif message.event_type == UserEventType.DELETE: - self._handle_delete_user(message) - else: - logger.warning(f"Unknown event type: {message.event_type}") - - except Exception as e: - logger.error(f"Error processing user message: {e}", exc_info=True) - # Rollback en caso de error en el procesamiento del mensaje - self.repo.db.rollback() - raise - - def _handle_create_user(self, message: UserMessage): - """Handle user create event""" - try: - logger.info(f"Creating user: {message.email}") - - # Parse datetime strings - fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento) - fecha_creacion = datetime.fromisoformat(message.fecha_creacion) - - # Create User domain object - user = User( - user_id=0, # Will be auto-generated by DB - nombre=message.nombre, - apellido=message.apellido, - email=message.email, - fecha_nacimiento=fecha_nacimiento, - fecha_creacion=fecha_creacion, - calificacion=message.calificacion, - numero_reportes=message.numero_reportes, - url_foto_perfil=message.url_foto_perfil, - biografia=message.biografia - ) - - # Save to repository with transaction handling - saved_user = self.repo.save(user) - logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}") - - except Exception as e: - logger.error(f"Error creating user: {e}", exc_info=True) - self.repo.db.rollback() - raise - - def _handle_update_user(self, message: UserMessage): - """Handle user update event""" - try: - logger.info(f"Updating user: {message.user_id}") - - # Find the user - user = self.repo.find_by_id(message.user_id) - if not user: - logger.warning(f"User not found: {message.user_id}") - return - - # Update fields if provided - if message.nombre: - user.nombre = message.nombre - if message.apellido: - user.apellido = message.apellido - if message.url_foto_perfil is not None: - user.url_foto_perfil = message.url_foto_perfil - if message.biografia is not None: - user.biografia = message.biografia - - # Save to repository with transaction handling - updated_user = self.repo.update(user) - logger.info(f"User updated successfully: {message.user_id}") - - except Exception as e: - logger.error(f"Error updating user: {e}", exc_info=True) - self.repo.db.rollback() - raise - - def _handle_delete_user(self, message: UserMessage): - """Handle user delete event""" - try: - logger.info(f"Deleting user: {message.user_id}") - - success = self.repo.delete(message.user_id) - if success: - logger.info(f"User deleted successfully: {message.user_id}") - else: - logger.warning(f"Failed to delete user: {message.user_id}") - - except Exception as e: - logger.error(f"Error deleting user: {e}", exc_info=True) - self.repo.db.rollback() - raise - - def start(self): - """Start consuming messages""" - logger.info("Starting User Consumer...") - logger.info("[*] Waiting for user events. Ctrl+C to exit.") - try: - self.consumer.start_consuming() - except KeyboardInterrupt: - logger.info("User Consumer stopped by user") - except Exception as e: - logger.error(f"Consumer error: {e}", exc_info=True) - raise - finally: - # Asegurar cierre de sesión - if self.repo.db: - try: - self.repo.db.close() - except Exception as e: - logger.error(f"Error closing database session: {e}") - - -if __name__ == '__main__': - consumer = UserConsumer() - consumer.start() +"""User RabbitMQ Consumer - Processes user events and saves to database""" +import sys +import os +import logging +from datetime import datetime + +# Add src to path to import modules +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer +from infrastructure.adapters.rabbitmq.messages import UserMessage, UserEventType +from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL +from domain.users import User + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class UserConsumer: + """Consumer for user events from RabbitMQ""" + + def __init__(self): + self.repo = UserRepositorySQL() + self.consumer = RabbitMQConsumer(queue_name='users_queue') + self.consumer.set_callback(self.process_message) + + def process_message(self, message_dict: dict): + """ + Processes a user event message from RabbitMQ + + Args: + message_dict: Dictionary containing the message data + """ + try: + # Reconstruct the UserMessage object + message = UserMessage.from_dict(message_dict) + + if message.event_type == UserEventType.CREATE: + self._handle_create_user(message) + elif message.event_type == UserEventType.UPDATE: + self._handle_update_user(message) + elif message.event_type == UserEventType.DELETE: + self._handle_delete_user(message) + else: + logger.warning(f"Unknown event type: {message.event_type}") + + except Exception as e: + logger.error(f"Error processing user message: {e}", exc_info=True) + # Rollback en caso de error en el procesamiento del mensaje + self.repo.db.rollback() + raise + + def _handle_create_user(self, message: UserMessage): + """Handle user create event""" + try: + logger.info(f"Creating user: {message.email}") + + # Parse datetime strings + fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento) + fecha_creacion = datetime.fromisoformat(message.fecha_creacion) + + # Create User domain object + user = User( + user_id=0, # Will be auto-generated by DB + nombre=message.nombre, + apellido=message.apellido, + email=message.email, + fecha_nacimiento=fecha_nacimiento, + fecha_creacion=fecha_creacion, + calificacion=message.calificacion, + numero_reportes=message.numero_reportes, + url_foto_perfil=message.url_foto_perfil, + biografia=message.biografia + ) + + # Save to repository with transaction handling + saved_user = self.repo.save(user) + logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}") + + except Exception as e: + logger.error(f"Error creating user: {e}", exc_info=True) + self.repo.db.rollback() + raise + + def _handle_update_user(self, message: UserMessage): + """Handle user update event""" + try: + logger.info(f"Updating user: {message.user_id}") + + # Find the user + user = self.repo.find_by_id(message.user_id) + if not user: + logger.warning(f"User not found: {message.user_id}") + return + + # Update fields if provided + if message.nombre: + user.nombre = message.nombre + if message.apellido: + user.apellido = message.apellido + if message.url_foto_perfil is not None: + user.url_foto_perfil = message.url_foto_perfil + if message.biografia is not None: + user.biografia = message.biografia + + # Save to repository with transaction handling + updated_user = self.repo.update(user) + logger.info(f"User updated successfully: {message.user_id}") + + except Exception as e: + logger.error(f"Error updating user: {e}", exc_info=True) + self.repo.db.rollback() + raise + + def _handle_delete_user(self, message: UserMessage): + """Handle user delete event""" + try: + logger.info(f"Deleting user: {message.user_id}") + + success = self.repo.delete(message.user_id) + if success: + logger.info(f"User deleted successfully: {message.user_id}") + else: + logger.warning(f"Failed to delete user: {message.user_id}") + + except Exception as e: + logger.error(f"Error deleting user: {e}", exc_info=True) + self.repo.db.rollback() + raise + + def start(self): + """Start consuming messages""" + logger.info("Starting User Consumer...") + logger.info("[*] Waiting for user events. Ctrl+C to exit.") + try: + self.consumer.start_consuming() + except KeyboardInterrupt: + logger.info("User Consumer stopped by user") + except Exception as e: + logger.error(f"Consumer error: {e}", exc_info=True) + raise + finally: + # Asegurar cierre de sesión + if self.repo.db: + try: + self.repo.db.close() + except Exception as e: + logger.error(f"Error closing database session: {e}") + + +if __name__ == '__main__': + consumer = UserConsumer() + consumer.start() diff --git a/src/domain/reports.py b/src/domain/reports.py index 5e460ae..42609b8 100644 --- a/src/domain/reports.py +++ b/src/domain/reports.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from datetime import datetime -from typing import Optional +from typing import Optional, Literal @dataclass class Report: @@ -14,4 +14,5 @@ class Report: lng: Optional[float] = None image_filename: Optional[str] = None # Nombre del archivo de imagen WebP visibilidad: float = 0.0 # 0-100 (puntuación comunitaria) + estado: Literal["en proceso", "no resuelto", "resuelto"] = "en proceso" # Estado del reporte fecha_creacion: Optional[datetime] = None diff --git a/src/infrastructure/adapters/file_storage.py b/src/infrastructure/adapters/file_storage.py index 3b97c7e..a53d2cb 100644 --- a/src/infrastructure/adapters/file_storage.py +++ b/src/infrastructure/adapters/file_storage.py @@ -1,154 +1,154 @@ -"""File storage utilities for report images""" -import os -import logging -from pathlib import Path -from fastapi import UploadFile, HTTPException, status -from PIL import Image -from io import BytesIO -from core.config import ConfSettings - -logger = logging.getLogger(__name__) - - -class ImageStorageManager: - """Maneja almacenamiento, compresión y eliminación de imágenes de reportes""" - - def __init__(self): - self.storage_path = Path(ConfSettings.storage_base_path) / ConfSettings.images_dir - self.max_size_bytes = ConfSettings.images_max_size_mb * 1024 * 1024 - self.allowed_types = ConfSettings.images_allowed_types - self.compression_quality = ConfSettings.images_compression_quality - - # Crear directorio si no existe - self.storage_path.mkdir(parents=True, exist_ok=True) - logger.info(f"ImageStorageManager initialized with path: {self.storage_path}") - - def validate_and_save_image(self, file: UploadFile, report_id: str) -> str: - """ - Valida y guarda una imagen, comprimiendo a WebP. - - Args: - file: Archivo subido (UploadFile) - report_id: ID del reporte para nombrado del archivo - - Returns: - Nombre del archivo guardado (sin ruta) - - Raises: - HTTPException: Si hay error en validación o guardado - """ - try: - # Validar tipo MIME - if file.content_type not in self.allowed_types: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"Tipo de archivo no permitido. Permitidos: {', '.join(self.allowed_types)}" - ) - - # Leer contenido - content = file.file.read() - - # Validar tamaño - if len(content) > self.max_size_bytes: - raise HTTPException( - status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, - detail=f"Archivo demasiado grande. Máximo: {ConfSettings.images_max_size_mb}MB" - ) - - # Abrir imagen con Pillow - try: - image = Image.open(BytesIO(content)) - image.verify() # Verificar que sea una imagen válida - - # Reabrir después de verify() que la cierra - image = Image.open(BytesIO(content)) - - # Convertir a RGB si tiene canal alpha (RGBA) - if image.mode in ('RGBA', 'LA', 'P'): - rgb_image = Image.new('RGB', image.size, (255, 255, 255)) - rgb_image.paste(image, mask=image.split()[-1] if image.mode == 'RGBA' else None) - image = rgb_image - - except Exception as e: - logger.error(f"Error validating image: {e}") - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Archivo no es una imagen válida" - ) - - # Guardar como WebP comprimido - filename = f"{report_id}.webp" - filepath = self.storage_path / filename - - try: - image.save( - filepath, - "WEBP", - quality=self.compression_quality, - method=6 - ) - logger.info(f"Image saved: {filename} ({len(open(filepath, 'rb').read())} bytes)") - return filename - - except Exception as e: - logger.error(f"Error saving image to disk: {e}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error al guardar la imagen" - ) - - except HTTPException: - raise - except Exception as e: - logger.error(f"Unexpected error processing image: {e}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error procesando la imagen" - ) - - def get_image_path(self, filename: str) -> Path: - """Obtiene la ruta completa de una imagen""" - return self.storage_path / filename - - def image_exists(self, filename: str) -> bool: - """Verifica si una imagen existe""" - if not filename: - return False - filepath = self.get_image_path(filename) - return filepath.exists() - - def delete_image(self, filename: str) -> bool: - """ - Elimina una imagen del almacenamiento. - - Args: - filename: Nombre del archivo a eliminar - - Returns: - True si se eliminó, False si no existía - """ - if not filename: - return False - - filepath = self.get_image_path(filename) - try: - if filepath.exists(): - filepath.unlink() # Eliminar archivo - logger.info(f"Image deleted: {filename}") - return True - else: - logger.warning(f"Image not found for deletion: {filename}") - return False - except Exception as e: - logger.error(f"Error deleting image {filename}: {e}") - return False - - def get_image_url(self, filename: str) -> str: - """Genera la URL pública para una imagen""" - if not filename: - return None - return f"/images/{filename}" - - -# Instancia global -image_storage = ImageStorageManager() +"""File storage utilities for report images""" +import os +import logging +from pathlib import Path +from fastapi import UploadFile, HTTPException, status +from PIL import Image +from io import BytesIO +from core.config import ConfSettings + +logger = logging.getLogger(__name__) + + +class ImageStorageManager: + """Maneja almacenamiento, compresión y eliminación de imágenes de reportes""" + + def __init__(self): + self.storage_path = Path(ConfSettings.storage_base_path) / ConfSettings.images_dir + self.max_size_bytes = ConfSettings.images_max_size_mb * 1024 * 1024 + self.allowed_types = ConfSettings.images_allowed_types + self.compression_quality = ConfSettings.images_compression_quality + + # Crear directorio si no existe + self.storage_path.mkdir(parents=True, exist_ok=True) + logger.info(f"ImageStorageManager initialized with path: {self.storage_path}") + + def validate_and_save_image(self, file: UploadFile, report_id: str) -> str: + """ + Valida y guarda una imagen, comprimiendo a WebP. + + Args: + file: Archivo subido (UploadFile) + report_id: ID del reporte para nombrado del archivo + + Returns: + Nombre del archivo guardado (sin ruta) + + Raises: + HTTPException: Si hay error en validación o guardado + """ + try: + # Validar tipo MIME + if file.content_type not in self.allowed_types: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Tipo de archivo no permitido. Permitidos: {', '.join(self.allowed_types)}" + ) + + # Leer contenido + content = file.file.read() + + # Validar tamaño + if len(content) > self.max_size_bytes: + raise HTTPException( + status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, + detail=f"Archivo demasiado grande. Máximo: {ConfSettings.images_max_size_mb}MB" + ) + + # Abrir imagen con Pillow + try: + image = Image.open(BytesIO(content)) + image.verify() # Verificar que sea una imagen válida + + # Reabrir después de verify() que la cierra + image = Image.open(BytesIO(content)) + + # Convertir a RGB si tiene canal alpha (RGBA) + if image.mode in ('RGBA', 'LA', 'P'): + rgb_image = Image.new('RGB', image.size, (255, 255, 255)) + rgb_image.paste(image, mask=image.split()[-1] if image.mode == 'RGBA' else None) + image = rgb_image + + except Exception as e: + logger.error(f"Error validating image: {e}") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Archivo no es una imagen válida" + ) + + # Guardar como WebP comprimido + filename = f"{report_id}.webp" + filepath = self.storage_path / filename + + try: + image.save( + filepath, + "WEBP", + quality=self.compression_quality, + method=6 + ) + logger.info(f"Image saved: {filename} ({len(open(filepath, 'rb').read())} bytes)") + return filename + + except Exception as e: + logger.error(f"Error saving image to disk: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Error al guardar la imagen" + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Unexpected error processing image: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Error procesando la imagen" + ) + + def get_image_path(self, filename: str) -> Path: + """Obtiene la ruta completa de una imagen""" + return self.storage_path / filename + + def image_exists(self, filename: str) -> bool: + """Verifica si una imagen existe""" + if not filename: + return False + filepath = self.get_image_path(filename) + return filepath.exists() + + def delete_image(self, filename: str) -> bool: + """ + Elimina una imagen del almacenamiento. + + Args: + filename: Nombre del archivo a eliminar + + Returns: + True si se eliminó, False si no existía + """ + if not filename: + return False + + filepath = self.get_image_path(filename) + try: + if filepath.exists(): + filepath.unlink() # Eliminar archivo + logger.info(f"Image deleted: {filename}") + return True + else: + logger.warning(f"Image not found for deletion: {filename}") + return False + except Exception as e: + logger.error(f"Error deleting image {filename}: {e}") + return False + + def get_image_url(self, filename: str) -> str: + """Genera la URL pública para una imagen""" + if not filename: + return None + return f"/images/{filename}" + + +# Instancia global +image_storage = ImageStorageManager() diff --git a/src/infrastructure/adapters/persistence/report_repository_mongo.py b/src/infrastructure/adapters/persistence/report_repository_mongo.py index 9be65ac..322057a 100644 --- a/src/infrastructure/adapters/persistence/report_repository_mongo.py +++ b/src/infrastructure/adapters/persistence/report_repository_mongo.py @@ -23,6 +23,7 @@ class ReportRepositoryMongo(ReportRepository): "lng": report.lng, "image_filename": report.image_filename, "visibilidad": report.visibilidad, + "estado": report.estado, "fecha_creacion": report.fecha_creacion or datetime.utcnow() } result = self.collection.insert_one(report_dict) @@ -59,6 +60,13 @@ class ReportRepositoryMongo(ReportRepository): {"$set": {"visibilidad": new_visibility}} ) + def update_estado(self, report_id: str, new_estado: str) -> None: + """Actualiza el estado de un reporte""" + self.collection.update_one( + {"id_reporte": report_id}, + {"$set": {"estado": new_estado}} + ) + def delete(self, report_id: str) -> bool: """Elimina un reporte""" result = self.collection.delete_one({"id_reporte": report_id}) @@ -83,5 +91,6 @@ class ReportRepositoryMongo(ReportRepository): lng=doc.get("lng"), image_filename=doc.get("image_filename"), visibilidad=doc.get("visibilidad"), + estado=doc.get("estado", "en proceso"), fecha_creacion=doc.get("fecha_creacion") ) diff --git a/src/infrastructure/adapters/rabbitmq/__init__.py b/src/infrastructure/adapters/rabbitmq/__init__.py index 752d002..a765284 100644 --- a/src/infrastructure/adapters/rabbitmq/__init__.py +++ b/src/infrastructure/adapters/rabbitmq/__init__.py @@ -1 +1 @@ -"""RabbitMQ adapters for message publishing and consuming""" +"""RabbitMQ adapters for message publishing and consuming""" diff --git a/src/infrastructure/adapters/rabbitmq/consumer.py b/src/infrastructure/adapters/rabbitmq/consumer.py index 7449eda..8ad582e 100644 --- a/src/infrastructure/adapters/rabbitmq/consumer.py +++ b/src/infrastructure/adapters/rabbitmq/consumer.py @@ -1,58 +1,58 @@ -import pika -import json -from typing import Callable, Dict, Any -import logging -from sqlalchemy.exc import IntegrityError - -logger = logging.getLogger(__name__) - -class RabbitMQConsumer: - def __init__(self, queue_name: str, host: str = 'localhost', port: int = 5672): - self.queue_name = queue_name - self.host = host - self.port = port - self.callback = None - - def set_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None: - self.callback = callback - - def start_consuming(self) -> None: - try: - connection = pika.BlockingConnection( - pika.ConnectionParameters(host=self.host, port=self.port) - ) - channel = connection.channel() - channel.queue_declare(queue=self.queue_name, durable=True) - - def callback_wrapper(ch, method, properties, body): - try: - message = json.loads(body.decode('utf-8')) - logger.info(f"Received message from queue '{self.queue_name}': {message}") - - if self.callback: - self.callback(message) - - ch.basic_ack(delivery_tag=method.delivery_tag) - - except IntegrityError as e: - # Error de negocio: no tiene sentido reintentar - logger.warning(f"Business error, discarding message: {e}") - ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) - - except Exception as e: - # Error transitorio (red, DB caída): sí puede resolverse solo - logger.error(f"Transient error processing message: {e}") - ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) - - channel.basic_consume( - queue=self.queue_name, - on_message_callback=callback_wrapper, - auto_ack=False - ) - - logger.info(f"[*] Waiting for messages in queue '{self.queue_name}'. Ctrl+C to exit.") - channel.start_consuming() - - except Exception as e: - logger.error(f"Error in consumer: {e}") +import pika +import json +from typing import Callable, Dict, Any +import logging +from sqlalchemy.exc import IntegrityError + +logger = logging.getLogger(__name__) + +class RabbitMQConsumer: + def __init__(self, queue_name: str, host: str = 'localhost', port: int = 5672): + self.queue_name = queue_name + self.host = host + self.port = port + self.callback = None + + def set_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None: + self.callback = callback + + def start_consuming(self) -> None: + try: + connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host, port=self.port) + ) + channel = connection.channel() + channel.queue_declare(queue=self.queue_name, durable=True) + + def callback_wrapper(ch, method, properties, body): + try: + message = json.loads(body.decode('utf-8')) + logger.info(f"Received message from queue '{self.queue_name}': {message}") + + if self.callback: + self.callback(message) + + ch.basic_ack(delivery_tag=method.delivery_tag) + + except IntegrityError as e: + # Error de negocio: no tiene sentido reintentar + logger.warning(f"Business error, discarding message: {e}") + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + + except Exception as e: + # Error transitorio (red, DB caída): sí puede resolverse solo + logger.error(f"Transient error processing message: {e}") + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + + channel.basic_consume( + queue=self.queue_name, + on_message_callback=callback_wrapper, + auto_ack=False + ) + + logger.info(f"[*] Waiting for messages in queue '{self.queue_name}'. Ctrl+C to exit.") + channel.start_consuming() + + except Exception as e: + logger.error(f"Error in consumer: {e}") raise \ No newline at end of file diff --git a/src/infrastructure/adapters/rabbitmq/messages.py b/src/infrastructure/adapters/rabbitmq/messages.py index e5d2420..61ce6ed 100644 --- a/src/infrastructure/adapters/rabbitmq/messages.py +++ b/src/infrastructure/adapters/rabbitmq/messages.py @@ -1,85 +1,86 @@ -"""Message schemas for RabbitMQ communication""" -from dataclasses import dataclass, asdict -from datetime import datetime -from typing import Optional -from enum import Enum -import json - - -class UserEventType(str, Enum): - """Types of user events""" - CREATE = "user.create" - UPDATE = "user.update" - DELETE = "user.delete" - - -class ReportEventType(str, Enum): - """Types of report events""" - CREATE = "report.create" - UPDATE_VISIBILITY = "report.update_visibility" - DELETE = "report.delete" - - -@dataclass -class UserMessage: - """Message for user events""" - event_type: UserEventType - user_id: Optional[int] = None - nombre: Optional[str] = None - apellido: Optional[str] = None - email: Optional[str] = None - fecha_nacimiento: Optional[str] = None # ISO format datetime string - fecha_creacion: Optional[str] = None # ISO format datetime string - calificacion: Optional[float] = None - numero_reportes: Optional[int] = None - url_foto_perfil: Optional[str] = None - biografia: Optional[str] = None - - def to_dict(self): - """Convert to dictionary""" - data = asdict(self) - data['event_type'] = self.event_type.value - return data - - def to_json(self) -> str: - """Convert to JSON string""" - return json.dumps(self.to_dict()) - - @staticmethod - def from_dict(data: dict) -> 'UserMessage': - """Create from dictionary""" - data['event_type'] = UserEventType(data['event_type']) - return UserMessage(**data) - - -@dataclass -class ReportMessage: - """Message for report events""" - event_type: ReportEventType - id_reporte: Optional[str] = None - id_usuario: Optional[int] = None - tipo_reporte: Optional[int] = None - descripcion: Optional[str] = None - ubicacion: Optional[str] = None - lat: Optional[float] = None - lng: Optional[float] = None - image_filename: Optional[str] = None # Nombre del archivo de imagen guardado - visibilidad: Optional[float] = None - fecha_creacion: Optional[str] = None # ISO format datetime string - penalize_author: Optional[bool] = None # For update_visibility event - - def to_dict(self): - """Convert to dictionary""" - data = asdict(self) - data['event_type'] = self.event_type.value - return data - - def to_json(self) -> str: - """Convert to JSON string""" - return json.dumps(self.to_dict()) - - @staticmethod - def from_dict(data: dict) -> 'ReportMessage': - """Create from dictionary""" - data['event_type'] = ReportEventType(data['event_type']) - return ReportMessage(**data) +"""Message schemas for RabbitMQ communication""" +from dataclasses import dataclass, asdict +from datetime import datetime +from typing import Optional +from enum import Enum +import json + + +class UserEventType(str, Enum): + """Types of user events""" + CREATE = "user.create" + UPDATE = "user.update" + DELETE = "user.delete" + + +class ReportEventType(str, Enum): + """Types of report events""" + CREATE = "report.create" + UPDATE_VISIBILITY = "report.update_visibility" + DELETE = "report.delete" + + +@dataclass +class UserMessage: + """Message for user events""" + event_type: UserEventType + user_id: Optional[int] = None + nombre: Optional[str] = None + apellido: Optional[str] = None + email: Optional[str] = None + fecha_nacimiento: Optional[str] = None # ISO format datetime string + fecha_creacion: Optional[str] = None # ISO format datetime string + calificacion: Optional[float] = None + numero_reportes: Optional[int] = None + url_foto_perfil: Optional[str] = None + biografia: Optional[str] = None + + def to_dict(self): + """Convert to dictionary""" + data = asdict(self) + data['event_type'] = self.event_type.value + return data + + def to_json(self) -> str: + """Convert to JSON string""" + return json.dumps(self.to_dict()) + + @staticmethod + def from_dict(data: dict) -> 'UserMessage': + """Create from dictionary""" + data['event_type'] = UserEventType(data['event_type']) + return UserMessage(**data) + + +@dataclass +class ReportMessage: + """Message for report events""" + event_type: ReportEventType + id_reporte: Optional[str] = None + id_usuario: Optional[int] = None + tipo_reporte: Optional[int] = None + descripcion: Optional[str] = None + ubicacion: Optional[str] = None + lat: Optional[float] = None + lng: Optional[float] = None + image_filename: Optional[str] = None # Nombre del archivo de imagen guardado + visibilidad: Optional[float] = None + estado: Optional[str] = None # Estado del reporte: "en proceso", "no resuelto", "resuelto" + fecha_creacion: Optional[str] = None # ISO format datetime string + penalize_author: Optional[bool] = None # For update_visibility event + + def to_dict(self): + """Convert to dictionary""" + data = asdict(self) + data['event_type'] = self.event_type.value + return data + + def to_json(self) -> str: + """Convert to JSON string""" + return json.dumps(self.to_dict()) + + @staticmethod + def from_dict(data: dict) -> 'ReportMessage': + """Create from dictionary""" + data['event_type'] = ReportEventType(data['event_type']) + return ReportMessage(**data) diff --git a/src/infrastructure/adapters/rabbitmq/sender.py b/src/infrastructure/adapters/rabbitmq/sender.py index fca1961..f412277 100644 --- a/src/infrastructure/adapters/rabbitmq/sender.py +++ b/src/infrastructure/adapters/rabbitmq/sender.py @@ -1,74 +1,74 @@ -"""RabbitMQ message sender""" -import pika -import json -from typing import Any, Dict -import logging - -logger = logging.getLogger(__name__) - - -class RabbitMQSender: - """Generic RabbitMQ sender for publishing messages to queues""" - - def __init__(self, host: str = 'localhost', port: int = 5672): - self.host = host - self.port = port - - def send_message(self, queue_name: str, message: Dict[str, Any]) -> bool: - """ - Sends a message to a RabbitMQ queue - - Args: - queue_name: Name of the queue to send to - message: Dictionary containing the message data - - Returns: - True if successful, False otherwise - """ - try: - connection = pika.BlockingConnection( - pika.ConnectionParameters(host=self.host, port=self.port) - ) - channel = connection.channel() - - # Declare queue to ensure it exists - channel.queue_declare(queue=queue_name, durable=True) - - # Convert message to JSON - message_json = json.dumps(message) - - # Publish the message - channel.basic_publish( - exchange='', - routing_key=queue_name, - body=message_json, - properties=pika.BasicProperties( - delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE - ) - ) - - connection.close() - logger.info(f"Message sent to queue '{queue_name}': {message_json}") - return True - - except Exception as e: - logger.error(f"Error sending message to RabbitMQ: {e}") - return False - - -def send_to_queue(queue_name: str, message: Dict[str, Any], - host: str = 'localhost', port: int = 5672) -> bool: - """ - Convenience function to send a message to RabbitMQ - - Args: - queue_name: Name of the queue - message: Message dictionary - host: RabbitMQ host - port: RabbitMQ port - - Returns: - True if successful, False otherwise - """ - sender = RabbitMQSender(host=host, port=port) - return sender.send_message(queue_name, message) +"""RabbitMQ message sender""" +import pika +import json +from typing import Any, Dict +import logging + +logger = logging.getLogger(__name__) + + +class RabbitMQSender: + """Generic RabbitMQ sender for publishing messages to queues""" + + def __init__(self, host: str = 'localhost', port: int = 5672): + self.host = host + self.port = port + + def send_message(self, queue_name: str, message: Dict[str, Any]) -> bool: + """ + Sends a message to a RabbitMQ queue + + Args: + queue_name: Name of the queue to send to + message: Dictionary containing the message data + + Returns: + True if successful, False otherwise + """ + try: + connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host, port=self.port) + ) + channel = connection.channel() + + # Declare queue to ensure it exists + channel.queue_declare(queue=queue_name, durable=True) + + # Convert message to JSON + message_json = json.dumps(message) + + # Publish the message + channel.basic_publish( + exchange='', + routing_key=queue_name, + body=message_json, + properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + ) + ) + + connection.close() + logger.info(f"Message sent to queue '{queue_name}': {message_json}") + return True + + except Exception as e: + logger.error(f"Error sending message to RabbitMQ: {e}") + return False + + +def send_to_queue(queue_name: str, message: Dict[str, Any], + host: str = 'localhost', port: int = 5672) -> bool: + """ + Convenience function to send a message to RabbitMQ + + Args: + queue_name: Name of the queue + message: Message dictionary + host: RabbitMQ host + port: RabbitMQ port + + Returns: + True if successful, False otherwise + """ + sender = RabbitMQSender(host=host, port=port) + return sender.send_message(queue_name, message) diff --git a/src/infrastructure/api/reports/reports.py b/src/infrastructure/api/reports/reports.py index 125ca97..0cc71ff 100644 --- a/src/infrastructure/api/reports/reports.py +++ b/src/infrastructure/api/reports/reports.py @@ -1,8 +1,8 @@ from fastapi import APIRouter, HTTPException, status, UploadFile, File, Form -from infrastructure.api.reports.schemas import ReportCreateRequest, ReportUpdateVisibilityRequest, ReportResponse +from infrastructure.api.reports.schemas import ReportCreateRequest, ReportUpdateVisibilityRequest, ReportUpdateStatusRequest, ReportResponse from application.services.report_services import ( CreateReport, GetReportById, GetReportsByUser, ListAllReports, - UpdateReportVisibility, GetShadowbannedReports, DeleteReport + UpdateReportVisibility, GetShadowbannedReports, DeleteReport, UpdateReportStatus ) from infrastructure.adapters.persistence.report_repository_mongo import ReportRepositoryMongo from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL @@ -28,6 +28,7 @@ def _report_to_response(report) -> dict: "lng": report.lng, "image_url": image_storage.get_image_url(report.image_filename) if report.image_filename else None, "visibilidad": report.visibilidad, + "estado": report.estado, "fecha_creacion": report.fecha_creacion } @@ -39,6 +40,7 @@ async def create_report( ubicacion: Optional[str] = Form(None), lat: Optional[float] = Form(None), lng: Optional[float] = Form(None), + estado: str = Form("en proceso"), file: Optional[UploadFile] = File(None) ): """Crea un nuevo reporte - envía a cola de procesamiento con validaciones previas""" @@ -57,7 +59,8 @@ async def create_report( ubicacion=ubicacion, lat=lat, lng=lng, - image_filename=image_filename + image_filename=image_filename, + estado=estado ) if result["status"] == "error": @@ -225,3 +228,40 @@ async def delete_report(report_id: str): status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor" ) + +@router.put("/{report_id}/status", status_code=status.HTTP_200_OK) +async def update_report_status(report_id: str, status_data: ReportUpdateStatusRequest): + """Actualiza el estado de un reporte""" + try: + update_use_case = UpdateReportStatus(report_repo) + result = update_use_case.execute( + report_id=report_id, + new_estado=status_data.estado + ) + + if result["status"] == "error": + message = result["message"] + if "no existe" in message: + # 404 Not Found: reporte no existe + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=message + ) + else: + # 400 Bad Request: error de validación + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=message + ) + + # 200 OK: estado actualizado correctamente + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error al actualizar estado del reporte {report_id}: {e}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Error interno del servidor" + ) diff --git a/src/infrastructure/api/reports/schemas.py b/src/infrastructure/api/reports/schemas.py index d35f3a3..8946ae0 100644 --- a/src/infrastructure/api/reports/schemas.py +++ b/src/infrastructure/api/reports/schemas.py @@ -1,6 +1,6 @@ from pydantic import BaseModel from datetime import datetime -from typing import Optional +from typing import Optional, Literal from fastapi import Form, UploadFile, File class ReportCreateRequest(BaseModel): @@ -11,6 +11,7 @@ class ReportCreateRequest(BaseModel): ubicacion: Optional[str] = None lat: Optional[float] = None lng: Optional[float] = None + estado: Literal["en proceso", "no resuelto", "resuelto"] = "en proceso" # Estado del reporte # file se recibe como UploadFile en el endpoint, no en el modelo class ReportUpdateVisibilityRequest(BaseModel): @@ -18,6 +19,10 @@ class ReportUpdateVisibilityRequest(BaseModel): new_visibility: float penalize_author: bool = False +class ReportUpdateStatusRequest(BaseModel): + """Solicitud para actualizar el estado de un reporte""" + estado: Literal["en proceso", "no resuelto", "resuelto"] + class ReportResponse(BaseModel): """Respuesta con datos de reporte""" id_reporte: str @@ -29,6 +34,7 @@ class ReportResponse(BaseModel): lng: Optional[float] image_url: Optional[str] = None # URL pública para acceder a la imagen visibilidad: float + estado: Literal["en proceso", "no resuelto", "resuelto"] = "en proceso" # Estado del reporte fecha_creacion: Optional[datetime] class Config: