diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index b70ba1d..c1a76c2 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -1,5 +1,12 @@ # Arquitectura de VoxPopuli Microservices +## Resumen General + +VoxPopuli es un sistema de microservicios **basado en eventos asincronos** con comunicación mediante RabbitMQ. El sistema ejecuta múltiples componentes en paralelo: +- 2 APIs REST independientes (Usuarios y Reportes) +- 2 Consumidores de mensajes para procesamiento asincrónico +- Bases de datos separadas por dominio (MySQL para usuarios, MongoDB para reportes) + ## Patrones Arquitectónicos Usados ### 1. Clean Architecture (Arquitectura Limpia) @@ -20,31 +27,100 @@ La aplicación está organizada en capas independientes: │ - Reglas de negocio independientes │ ├─────────────────────────────────────────────────────────┤ │ Infrastructure Layer (Infraestructura) │ -│ - Adaptadores (Repositorios) │ +│ - Adaptadores (Repositorios, RabbitMQ) │ │ - Acceso a Datos (MySQL, MongoDB) │ +│ - Almacenamiento de archivos │ └─────────────────────────────────────────────────────────┘ ``` -### 2. Microservicios +### 2. Arquitectura Orientada a Eventos (Event-Driven Architecture) -Dos microservicios independientes: +En lugar de procesamiento sincrónico, los servicios se comunican mediante eventos: -- **Usuarios API** (Puerto 8000) - Gestión de usuarios y credenciales -- **Reportes API** (Puerto 8001) - Gestión de reportes comunitarios +``` +┌──────────────────────────────────────────────────────────────┐ +│ API REST (FastAPI) │ +│ (Recibe solicitudes HTTP) │ +└────────┬─────────────────────────────────┬───────────────────┘ + │ Valida y envía evento │ + ▼ ▼ + ┌─────────────────────────────────────────────┐ + │ RabbitMQ Message Queue │ + │ - users_queue (eventos de usuario) │ + │ - reports_queue (eventos de reporte) │ + └────────┬────────────────────┬───────────────┘ + │ │ + ▼ ▼ + ┌─────────────┐ ┌──────────────┐ + │ User │ │ Report │ + │ Consumer │ │ Consumer │ + │ (Thread) │ │ (Thread) │ + └─────┬───────┘ └──────┬───────┘ + │ Procesa evento │ Procesa evento + ▼ ▼ + ┌──────────────┐ ┌──────────────┐ + │ MySQL (BD │ │ MongoDB (BD │ + │ de Usuarios) │ │ de Reportes) │ + └──────────────┘ └──────────────┘ +``` -Cada microservicio: -- Tiene su propia base de datos -- Es escalable independientemente -- Se puede desplegar por separado -- Expone su propia API REST +**Ventajas:** +- Bajo acoplamiento entre servicios +- Mayor escalabilidad +- Mejor tolerancia a fallos +- Procesamiento asincrónico -### 3. Patrón Repository +### 3. Microservicios + +El sistema consta de dos microservicios independientes que ejecutan en paralelo: + +#### **Usuarios API** (Puerto 8000) +- Gestión de usuarios y autenticación +- Base de datos: MySQL con SQLAlchemy +- Endpoints: POST/GET/PUT/DELETE /users/ + +#### **Reportes API** (Puerto 8001) +- Gestión de reportes comunitarios +- Base de datos: MongoDB +- Endpoints: POST/GET/PUT/DELETE /reports/ +- Almacenamiento: Imágenes en WebP (directorio `/storage/reports_images/`) + +Cada API: +- Valida solicitudes y envía eventos a RabbitMQ +- Expone documentación automática en `/docs` +- Es independientemente escalable + +### 4. Consumidores de Eventos (Message Consumers) + +Dos consumidores ejecutan como threads separados: + +#### **User Consumer** +- Escucha la cola `users_queue` en RabbitMQ +- Eventos procesados: + - `user.create` → Guarda usuario en MySQL + - `user.update` → Actualiza usuario + - `user.delete` → Elimina usuario + +#### **Report Consumer** +- Escucha la cola `reports_queue` en RabbitMQ +- Eventos procesados: + - `report.create` → Guarda reporte en MongoDB, incrementa contador de usuario + - `report.update_visibility` → Actualiza puntuación comunitaria + - `report.delete` → Elimina reporte + +**Beneficios:** +- Desacoplamiento de API y procesamiento +- Reintentos automáticos si falla la BD +- Procesamiento en background + +### 5. Patrón Repository Abstracción para acceso a datos: ``` ┌──────────────────┐ │ Service Layer │ +│ (API Handler) │ └────────┬─────────┘ │ depends on ▼ @@ -67,7 +143,7 @@ Abstracción para acceso a datos: - Fácil de testear (usar mocks) - Reutilizable en diferentes contextos -### 4. Inversión de Dependencias (Dependency Inversion) +### 6. Inversión de Dependencias (Dependency Inversion) Los servicios dependen de **abstracciones (interfaces)**, no de implementaciones concretas: @@ -84,120 +160,360 @@ service = CreateUser(UserRepositoryMock()) # Para testing ## Flujo de Solicitud -### Crear Usuario: +### Crear Usuario (Asincrónico): ``` 1. HTTP POST /users/ - └─> FastAPI Handler - └─> CreateUser Use Case - └─> UserRepository.save() - └─> UserRepositorySQL - └─> SQLAlchemy - └─> MySQL Database + └─> Validar datos (FastAPI Handler) + └─> Crear objeto UserMessage + └─> Enviar mensaje a RabbitMQ (users_queue) + └─> Retornar respuesta HTTP inmediatamente + +[En paralelo, el User Consumer procesa:] +2. User Consumer recibe evento user.create + └─> UserRepositorySQL.save() + └─> SQLAlchemy + └─> MySQL Database ``` -### Crear Reporte: +### Crear Reporte (Asincrónico): ``` 1. HTTP POST /reports/ - └─> FastAPI Handler - └─> CreateReport Use Case - └─> ReportRepository.save() - └─> UserRepository.increment_reports() - └─> ReportRepositoryMongo - └─> UserRepositorySQL - └─> MongoDB - └─> MySQL + └─> Validar datos y usuario existe (FastAPI Handler) + └─> Crear objeto ReportMessage + └─> Enviar mensaje a RabbitMQ (reports_queue) + └─> Retornar respuesta HTTP inmediatamente + +[En paralelo, el Report Consumer procesa:] +2. Report Consumer recibe evento report.create + └─> ReportRepositoryMongo.save() + └─> UserRepositorySQL.increment_reports(id_usuario) + └─> MongoDB + MySQL Database ``` ## Capas Detalladas ### Domain Layer (src/domain/) -**Entidades puras de negocio**: -- `User`: Representa un usuario del sistema -- `Report`: Representa un reporte comunitario - -No tienen dependencias externas. Solo representan conceptos de negocio. +**Entidades puras de negocio** (sin dependencias externas): +#### User ```python @dataclass class User: user_id: int nombre: str + apellido: str email: str - # ... más campos + contraseña_hash: Optional[str] = None + fecha_nacimiento: datetime = None + fecha_creacion: datetime = None + calificacion: float = 50.0 # 0-100 + numero_reportes: int = 0 + url_foto_perfil: Optional[str] = None + biografia: Optional[str] = None +``` + +#### Report +```python +@dataclass +class Report: + id_reporte: str + id_usuario: int + tipo_reporte: int # Número que representa el tipo + descripcion: str + ubicacion: Optional[str] + lat: Optional[float] = None + lng: Optional[float] = None + image_filename: Optional[str] = None # WebP format + visibilidad: float = 0.0 # Puntuación comunitaria 0-100 + estado: Literal["en proceso", "no resuelto", "resuelto"] = "en proceso" + fecha_creacion: Optional[datetime] = None ``` ### Application Layer (src/application/) -**Use Cases y Servicios**: +Contiene la lógica de negocio encapsulada en **Use Cases** y **Puertos**. -#### Ports (Interfaces): -- `UserRepository`: Contrato para acceso a datos de usuarios -- `ReportRepository`: Contrato para acceso a datos de reportes +#### Ports (Interfaces - src/application/ports/) -#### Services (Use Cases): -- `CreateUser`, `GetUserById`, `UpdateUser`, `DeleteUser` -- `CreateReport`, `GetReportById`, `UpdateReportVisibility` +**UserRepository (Interface)** +```python +class UserRepository: + def save(self, user: User) -> User + def find_by_id(self, user_id: int) -> Optional[User] + def find_by_email(self, email: str) -> Optional[User] + def update(self, user: User) -> User + def delete(self, user_id: int) -> bool + def increment_reports(self, user_id: int) -> bool +``` -Contienen la lógica de negocio: +**ReportRepository (Interface)** +```python +class ReportRepository: + def save(self, report: Report) -> Report + def find_by_id(self, id_reporte: str) -> Optional[Report] + def find_by_user(self, id_usuario: int) -> List[Report] + def update(self, report: Report) -> Report + def delete(self, id_reporte: str) -> bool + def update_visibility(self, id_reporte: str, visibilidad: float) -> bool +``` +#### Services / Use Cases (src/application/services/) + +**User Services:** +- `CreateUser` - Valida datos y envía evento a RabbitMQ +- `GetUser` - Recupera usuario de la BD +- `UpdateUser` - Actualiza usuario +- `DeleteUser` - Elimina usuario + +**Report Services:** +- `CreateReport` - Valida usuario y reporte, envía evento a RabbitMQ +- `GetReport` - Recupera reporte +- `UpdateReportVisibility` - Actualiza puntuación comunitaria +- `DeleteReport` - Elimina reporte + +**Ejemplo de CreateReport:** ```python class CreateReport: - def execute(self, id_usuario, tipo_reporte, ...): + def __init__(self, repo: ReportRepository, user_repo: UserRepository): + self.repo = repo + self.user_repo = user_repo + + def execute(self, id_usuario: int, tipo_reporte: int, descripcion: str, ...): # 1. Validar que usuario existe - user = self.user_repo.find_by_id(id_usuario) + if not self.user_repo.find_by_id(id_usuario): + return {"status": "error", "message": "Usuario no encontrado"} - # 2. Crear reporte - report = Report(...) + # 2. Validar descripción + if not descripcion.strip(): + return {"status": "error", "message": "Descripción requerida"} - # 3. Guardar en BD - self.repo.save(report) + # 3. Crear mensaje de evento + message = ReportMessage( + event_type=ReportEventType.CREATE, + id_usuario=id_usuario, + tipo_reporte=tipo_reporte, + descripcion=descripcion, + ... + ) - # 4. Actualizar contador de usuario - self.user_repo.increment_reports(id_usuario) + # 4. Enviar a RabbitMQ + send_to_queue(message) + + return {"status": "success", "message": "Reporte enviado a procesar"} ``` ### Infrastructure Layer (src/infrastructure/) -#### Persistence Adapters: +#### API Handlers (src/infrastructure/api/) -**user_repository_sql.py**: -- Implementa `UserRepository` usando SQLAlchemy +**Users API (puerto 8000)** +- `users.py` - Endpoints REST para CRUD de usuarios +- `auth_service.py` - Autenticación JWT +- `schemas.py` - Esquemas Pydantic para validación +- `router.py` - Rutas de FastAPI +- `app.py` - Aplicación FastAPI + +**Reports API (puerto 8001)** +- `reports.py` - Endpoints REST para CRUD de reportes +- `schemas.py` - Esquemas Pydantic +- `router.py` - Rutas +- `app.py` - Aplicación FastAPI + +#### Persistence Adapters (src/infrastructure/adapters/persistence/) + +**user_repository_sql.py** (Implementación de UserRepository) +- Usa SQLAlchemy ORM +- Implementa todas las operaciones CRUD en MySQL - Convierte entre modelo de dominio y modelo de BD -**report_repository_mongo.py**: -- Implementa `ReportRepository` usando PyMongo +**report_repository_mongo.py** (Implementación de ReportRepository) +- Usa PyMongo para MongoDB +- Implementa todas las operaciones CRUD - Convierte entre modelo de dominio y documento MongoDB -#### API Handlers: +**db.py** +- Configuración de conexiones +- Instancias de motor de BD -**users/users.py**: -- Endpoints HTTP para gestión de usuarios -- Usa esquemas Pydantic para validación +**models.py** +- Modelos SQLAlchemy para MySQL +- Esquemas para MongoDB -**reports/reports.py**: -- Endpoints HTTP para gestión de reportes -- Mapeo de solicitudes a use cases +#### RabbitMQ Adapters (src/infrastructure/adapters/rabbitmq/) + +**sender.py** +- Función `send_to_queue()` para enviar mensajes +- Serialización de objetos a JSON + +**consumer.py** (Base) +- Clase `RabbitMQConsumer` para consumir mensajes +- Conexión y suscripción a colas +- Manejo de excepciones + +**messages.py** +- `UserMessage` - Schema para eventos de usuario +- `ReportMessage` - Schema para eventos de reporte +- `UserEventType` - Enumeración de tipos de eventos +- `ReportEventType` - Enumeración de tipos de eventos + +#### File Storage (src/infrastructure/adapters/file_storage.py) + +- `image_storage` - Manejo de almacenamiento de imágenes +- Convierte imágenes a formato WebP +- Almacena en `/storage/reports_images/` +- Limpieza automática si reporte es eliminado + +### Consumers (src/consumers/) + +Ejecutan como threads separados en paralelo con las APIs. + +#### User Consumer (src/consumers/user_consumer.py) + +```python +class UserConsumer: + def __init__(self): + self.repo = UserRepositorySQL() + self.consumer = RabbitMQConsumer(queue_name='users_queue') + self.consumer.set_callback(self.process_message) + + def process_message(self, message_dict: dict): + # Reconvertir a UserMessage + message = UserMessage.from_dict(message_dict) + + if message.event_type == UserEventType.CREATE: + self._handle_create_user(message) + elif message.event_type == UserEventType.UPDATE: + self._handle_update_user(message) + # ... etc + + def start(self): + # Inicia la conexión y escucha + self.consumer.start_consuming() +``` + +#### Report Consumer (src/consumers/report_consumer.py) + +Similar a User Consumer, pero: +- Escucha `reports_queue` +- Procesa `ReportMessage` con `ReportEventType` +- Interactúa con `ReportRepositoryMongo` y `UserRepositorySQL` +- Maneja almacenamiento de imágenes ### Core Layer (src/core/) -Configuración centralizada: +**config.py** - Configuración centralizada: - Variables de entorno - Configuración de logging -- Configuración de bases de datos +- Configuración de conexiones de BD +- Configuración de RabbitMQ +- Parámetros de JWT + +## Punto de Entrada (src/main.py) + +Orquesta todos los componentes en paralelo: + +```python +def run(): + """Inicia los 4 componentes en threads separados""" + users_thread = threading.Thread(target=run_users_api) + reports_thread = threading.Thread(target=run_reports_api) + user_consumer_thread = threading.Thread(target=run_user_consumer) + report_consumer_thread = threading.Thread(target=run_reports_consumer) + + users_thread.start() + reports_thread.start() + user_consumer_thread.start() + report_consumer_thread.start() +``` + +Esto inicia: +- API de Usuarios en puerto 8000 +- API de Reportes en puerto 8001 +- User Consumer escuchando `users_queue` +- Report Consumer escuchando `reports_queue` + +## Diagrama de Despliegue + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Docker Container (VoxPopuli) │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ main.py (Orquestador) │ │ +│ │ - Lanza 4 threads │ │ +│ └───┬──────────┬──────────┬──────────────────────────┬─┘ │ +│ │ │ │ │ │ +│ ┌───▼──┐ ┌───▼──┐ ┌──▼────┐ ┌─────▼──────┐ │ │ +│ │Users │ │Report│ │User │ │Report │ │ │ +│ │API │ │API │ │Consume│ │Consumer │ │ │ +│ │:8000 │ │:8001 │ │r │ │ │ │ │ +│ └───┬──┘ └───┬──┘ └──┬────┘ └─────┬──────┘ │ │ +│ │ │ │ │ │ │ +└─────┼─────────┼─────────┼────────────┼─────────────┘ │ + │ │ │ │ │ + └────┬────┴────┬────┴────────────┘ │ + │ │ │ + ┌─────▼──┐ ┌───▼──────┐ │ + │RabbitMQ│ │MySQL + │ │ + │:5672 │ │MongoDB │ │ + └────────┘ └──────────┘ │ +``` ## Testing -La arquitectura facilita el testing: +La arquitectura facilita el testing en múltiples niveles: + +### Unit Tests +- Mockear repositorios para testear Use Cases +- Testear lógica de negocio en Domain Layer ```python -# Mock Repository para testing -class UserRepositoryMock(UserRepository): - def __init__(self): - self.users = {} - +class TestCreateReport: + def test_report_creation(self): + mock_repo = ReportRepositoryMock() + service = CreateReport(mock_repo) + result = service.execute(...) + assert result['status'] == 'success' +``` + +### Integration Tests +- Usar DB reales de prueba +- Testear flujo completo API → RabbitMQ → Consumer → DB + +### E2E Tests +- Testear desde HTTP request hasta persistencia +- Validar consumidores procesan eventos correctamente + +## Ventajas de la Arquitectura Actual + +1. **Bajo Acoplamiento** - Servicios se comunican por eventos +2. **Escalabilidad Horizontal** - Consumidores pueden replicarse +3. **Resiliencia** - RabbitMQ reintenta entregas fallidas +4. **Independencia de BD** - Abstractos por puertos +5. **Testabilidad** - Inyección de dependencias +6. **Mantenibilidad** - Capas claramente separadas +7. **Asincronía** - APIs responden rápidamente +8. **Extensibilidad** - Nuevos tipos de eventos fácilmente + +## Stack Tecnológico + +| Componente | Tecnología | +|-----------|-----------| +| Framework Web | FastAPI | +| Servidor ASGI | Uvicorn | +| ORM SQL | SQLAlchemy | +| Driver MongoDB | PyMongo | +| Message Queue | RabbitMQ | +| Autenticación | JWT | +| Validación | Pydantic | +| Base Datos SQL | MySQL | +| Base Datos NoSQL | MongoDB | +| Almacenamiento | WebP (imágenes) | +| Concurrencia | threading | def save(self, user): self.users[user.user_id] = user return user diff --git a/docker-compose.yaml b/docker-compose.yaml index 539acd6..e0e4dde 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -34,6 +34,22 @@ services: timeout: 5s retries: 5 + mongodb_nots: + image: mongo:7.0 + container_name: voxpopuli_mongo_2 + environment: + MONGO_INITDB_DATABASE: voxpopuli_notifications + ports: + - "27018:27017" + volumes: + - mongo_data_notifications:/data/db + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] + interval: 10s + timeout: 5s + retries: 5 + volumes: mysql_data: - mongo_data: \ No newline at end of file + mongo_data: + mongo_data_notifications: diff --git a/src/application/ports/notification_repository.py b/src/application/ports/notification_repository.py new file mode 100644 index 0000000..4c549f8 --- /dev/null +++ b/src/application/ports/notification_repository.py @@ -0,0 +1,53 @@ +"""Port (interface) for Notification Repository""" +from abc import ABC, abstractmethod +from domain.notifications import Notification +from typing import List, Optional + + +class NotificationRepository(ABC): + """Puerto (interfaz) para el repositorio de Notificaciones""" + + @abstractmethod + def save(self, notification: Notification) -> Notification: + """Guarda una notificación en la base de datos""" + pass + + @abstractmethod + def find_by_id(self, notification_id: str) -> Optional[Notification]: + """Obtiene una notificación por ID""" + pass + + @abstractmethod + def find_by_user_id(self, user_id: int) -> List[Notification]: + """Obtiene todas las notificaciones de un usuario""" + pass + + @abstractmethod + def find_unread_by_user_id(self, user_id: int) -> List[Notification]: + """Obtiene todas las notificaciones no leídas de un usuario""" + pass + + @abstractmethod + def find_all(self) -> List[Notification]: + """Obtiene todas las notificaciones""" + pass + + @abstractmethod + def mark_as_read(self, notification_id: str) -> bool: + """Marca una notificación como leída""" + pass + + @abstractmethod + def mark_all_as_read(self, user_id: int) -> bool: + """Marca todas las notificaciones de un usuario como leídas""" + pass + + @abstractmethod + def delete(self, notification_id: str) -> bool: + """Elimina una notificación""" + pass + + @abstractmethod + def delete_all_by_user(self, user_id: int) -> bool: + """Elimina todas las notificaciones de un usuario""" + pass diff --git a/src/application/services/notification_services.py b/src/application/services/notification_services.py new file mode 100644 index 0000000..7aa6ff3 --- /dev/null +++ b/src/application/services/notification_services.py @@ -0,0 +1,173 @@ +"""Notification services implementing the business logic""" +import uuid +from datetime import datetime +from typing import Optional, List, Dict +from domain.notifications import Notification +from application.ports.notification_repository import NotificationRepository +import logging + +logger = logging.getLogger(__name__) + + +class CreateNotification: + """Use case: Create a new notification""" + + def __init__(self, notification_repo: NotificationRepository): + self.notification_repo = notification_repo + + def execute(self, id_usuario: int, tipo_notificacion: str, titulo: str, + mensaje: str, id_reporte: Optional[str] = None, + estado_reporte: Optional[str] = None) -> Dict: + """ + Creates a new notification + + Args: + id_usuario: User ID who receives the notification + tipo_notificacion: Type of notification + titulo: Notification title + mensaje: Notification message + id_reporte: Optional report ID related to notification + estado_reporte: Optional report status + + Returns: + Dictionary with status and message + """ + try: + # Validate inputs + if not id_usuario or id_usuario <= 0: + return {"status": "error", "message": "Usuario inválido"} + if not tipo_notificacion or not titulo or not mensaje: + return {"status": "error", "message": "Campos obligatorios faltantes"} + + # Create notification domain object + notification = Notification( + id_notificacion=str(uuid.uuid4()), + id_usuario=id_usuario, + tipo_notificacion=tipo_notificacion, + titulo=titulo, + mensaje=mensaje, + id_reporte=id_reporte, + estado_reporte=estado_reporte, + leida=False, + fecha_creacion=datetime.utcnow() + ) + + # Save to repository + saved_notification = self.notification_repo.save(notification) + logger.info(f"Notification created: {saved_notification.id_notificacion}") + + return { + "status": "success", + "message": "Notificación creada exitosamente", + "id_notificacion": saved_notification.id_notificacion + } + except Exception as e: + logger.error(f"Error creating notification: {e}", exc_info=True) + return {"status": "error", "message": f"Error al crear notificación: {str(e)}"} + + +class GetNotificationById: + """Use case: Get a notification by ID""" + + def __init__(self, notification_repo: NotificationRepository): + self.notification_repo = notification_repo + + def execute(self, notification_id: str) -> Optional[Notification]: + """Retrieves a notification by ID""" + return self.notification_repo.find_by_id(notification_id) + + +class GetUserNotifications: + """Use case: Get all notifications for a user""" + + def __init__(self, notification_repo: NotificationRepository): + self.notification_repo = notification_repo + + def execute(self, user_id: int) -> List[Notification]: + """Retrieves all notifications for a user""" + return self.notification_repo.find_by_user_id(user_id) + + +class GetUnreadUserNotifications: + """Use case: Get unread notifications for a user""" + + def __init__(self, notification_repo: NotificationRepository): + self.notification_repo = notification_repo + + def execute(self, user_id: int) -> List[Notification]: + """Retrieves unread notifications for a user""" + return self.notification_repo.find_unread_by_user_id(user_id) + + +class MarkNotificationAsRead: + """Use case: Mark a notification as read""" + + def __init__(self, notification_repo: NotificationRepository): + self.notification_repo = notification_repo + + def execute(self, notification_id: str) -> Dict: + """Marks a notification as read""" + try: + success = self.notification_repo.mark_as_read(notification_id) + if success: + logger.info(f"Notification marked as read: {notification_id}") + return {"status": "success", "message": "Notificación marcada como leída"} + else: + return {"status": "error", "message": "Notificación no encontrada"} + except Exception as e: + logger.error(f"Error marking notification as read: {e}", exc_info=True) + return {"status": "error", "message": f"Error: {str(e)}"} + + +class MarkAllUserNotificationsAsRead: + """Use case: Mark all notifications for a user as read""" + + def __init__(self, notification_repo: NotificationRepository): + self.notification_repo = notification_repo + + def execute(self, user_id: int) -> Dict: + """Marks all notifications for a user as read""" + try: + success = self.notification_repo.mark_all_as_read(user_id) + logger.info(f"All notifications marked as read for user: {user_id}") + return {"status": "success", "message": "Todas las notificaciones marcadas como leídas"} + except Exception as e: + logger.error(f"Error marking notifications as read: {e}", exc_info=True) + return {"status": "error", "message": f"Error: {str(e)}"} + + +class DeleteNotification: + """Use case: Delete a notification""" + + def __init__(self, notification_repo: NotificationRepository): + self.notification_repo = notification_repo + + def execute(self, notification_id: str) -> Dict: + """Deletes a notification""" + try: + success = self.notification_repo.delete(notification_id) + if success: + logger.info(f"Notification deleted: {notification_id}") + return {"status": "success", "message": "Notificación eliminada"} + else: + return {"status": "error", "message": "Notificación no encontrada"} + except Exception as e: + logger.error(f"Error deleting notification: {e}", exc_info=True) + return {"status": "error", "message": f"Error: {str(e)}"} + + +class DeleteAllUserNotifications: + """Use case: Delete all notifications for a user""" + + def __init__(self, notification_repo: NotificationRepository): + self.notification_repo = notification_repo + + def execute(self, user_id: int) -> Dict: + """Deletes all notifications for a user""" + try: + success = self.notification_repo.delete_all_by_user(user_id) + logger.info(f"All notifications deleted for user: {user_id}") + return {"status": "success", "message": "Todas las notificaciones eliminadas"} + except Exception as e: + logger.error(f"Error deleting notifications: {e}", exc_info=True) + return {"status": "error", "message": f"Error: {str(e)}"} diff --git a/src/consumers/notification_consumer.py b/src/consumers/notification_consumer.py new file mode 100644 index 0000000..ad9b4c1 --- /dev/null +++ b/src/consumers/notification_consumer.py @@ -0,0 +1,135 @@ +"""Notification RabbitMQ Consumer - Processes notification events""" +import sys +import os +import logging +from datetime import datetime + +# Add src to path to import modules +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer +from infrastructure.adapters.rabbitmq.messages import NotificationMessage, NotificationEventType +from infrastructure.adapters.persistence.notification_repository_mongo import NotificationRepositoryMongo + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class NotificationConsumer: + """Consumer for notification events from RabbitMQ""" + + def __init__(self): + self.repo = NotificationRepositoryMongo() + self.consumer = RabbitMQConsumer(queue_name='notifications_queue') + self.consumer.set_callback(self.process_message) + + def process_message(self, message_dict: dict): + """ + Processes a notification event message from RabbitMQ + + Args: + message_dict: Dictionary containing the message data + """ + try: + # Reconstruct the NotificationMessage object + message = NotificationMessage.from_dict(message_dict) + + if message.event_type == NotificationEventType.CREATE: + self._handle_create_notification(message) + elif message.event_type == NotificationEventType.REPORT_STATUS_CHANGE: + self._handle_report_status_change(message) + else: + logger.warning(f"Unknown event type: {message.event_type}") + + except Exception as e: + logger.error(f"Error processing notification message: {e}", exc_info=True) + raise + + def _handle_create_notification(self, message: NotificationMessage): + """Handle notification create event""" + try: + logger.info(f"Creating notification for user: {message.id_usuario}") + + # Parse datetime string if provided + fecha_creacion = None + if message.fecha_creacion: + try: + fecha_creacion = datetime.fromisoformat(message.fecha_creacion) + except (ValueError, TypeError): + fecha_creacion = datetime.utcnow() + else: + fecha_creacion = datetime.utcnow() + + from domain.notifications import Notification + + # Create Notification domain object + notification = Notification( + id_notificacion=message.id_notificacion, + id_usuario=message.id_usuario, + tipo_notificacion=message.tipo_notificacion, + titulo=message.titulo, + mensaje=message.mensaje, + id_reporte=message.id_reporte, + estado_reporte=message.estado_reporte, + leida=False, + fecha_creacion=fecha_creacion + ) + + # Save to MongoDB repository + saved_notification = self.repo.save(notification) + logger.info(f"Notification created successfully: {message.id_notificacion}") + + except Exception as e: + logger.error(f"Error creating notification: {e}", exc_info=True) + raise + + def _handle_report_status_change(self, message: NotificationMessage): + """Handle report status change notification""" + try: + logger.info(f"Processing report status change notification for user: {message.id_usuario}") + + # Parse datetime string if provided + fecha_creacion = None + if message.fecha_creacion: + try: + fecha_creacion = datetime.fromisoformat(message.fecha_creacion) + except (ValueError, TypeError): + fecha_creacion = datetime.utcnow() + else: + fecha_creacion = datetime.utcnow() + + from domain.notifications import Notification + + # Create Notification domain object for status change + notification = Notification( + id_notificacion=message.id_notificacion, + id_usuario=message.id_usuario, + tipo_notificacion=message.tipo_notificacion, + titulo=message.titulo, + mensaje=message.mensaje, + id_reporte=message.id_reporte, + estado_reporte=message.estado_reporte, + leida=False, + fecha_creacion=fecha_creacion + ) + + # Save to MongoDB repository + saved_notification = self.repo.save(notification) + logger.info(f"Report status change notification created: {message.id_notificacion}") + + except Exception as e: + logger.error(f"Error processing report status change: {e}", exc_info=True) + raise + + def start(self): + """Start consuming messages from RabbitMQ""" + try: + logger.info("Starting Notification Consumer...") + self.consumer.start() + except Exception as e: + logger.error(f"Error starting notification consumer: {e}", exc_info=True) + raise diff --git a/src/core/config.py b/src/core/config.py index 2a89b8f..b6f0b50 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -20,6 +20,16 @@ class Settings(BaseSettings): default="voxpopuli_reports", description="Base de datos MongoDB" ) + + # Base de datos MongoDB para Notificaciones + mongodb_notifications_url: str = Field( + default=os.getenv("MONGODB_NOTIFICATIONS_URL", "mongodb://localhost:27018"), + description="URL de conexión a MongoDB para API de Notificaciones" + ) + mongodb_notifications_db: str = Field( + default="voxpopuli_notifications", + description="Base de datos MongoDB para notificaciones" + ) rabbitmq: str = Field ( default=os.getenv("RABBITMQ_URI", "localhost") diff --git a/src/domain/notifications.py b/src/domain/notifications.py new file mode 100644 index 0000000..3e432f9 --- /dev/null +++ b/src/domain/notifications.py @@ -0,0 +1,34 @@ +"""Notification domain model""" +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + + +@dataclass +class Notification: + """Domain model for notifications""" + id_notificacion: str + id_usuario: int # Usuario que recibe la notificación + tipo_notificacion: str # "report_status_change", etc. + titulo: str + mensaje: str + id_reporte: Optional[str] = None + estado_reporte: Optional[str] = None # "en proceso", "resuelto", "no resuelto" + leida: bool = False + fecha_creacion: Optional[datetime] = None + fecha_lectura: Optional[datetime] = None + + def __post_init__(self): + """Validations after initialization""" + if not self.id_notificacion: + raise ValueError("id_notificacion is required") + if not self.id_usuario or self.id_usuario <= 0: + raise ValueError("id_usuario must be a positive integer") + if not self.tipo_notificacion: + raise ValueError("tipo_notificacion is required") + if not self.titulo: + raise ValueError("titulo is required") + if not self.mensaje: + raise ValueError("mensaje is required") + if self.fecha_creacion is None: + self.fecha_creacion = datetime.now() diff --git a/src/infrastructure/adapters/persistence/mongodb.py b/src/infrastructure/adapters/persistence/mongodb.py index 974f734..b104758 100644 --- a/src/infrastructure/adapters/persistence/mongodb.py +++ b/src/infrastructure/adapters/persistence/mongodb.py @@ -6,6 +6,14 @@ from core.config import ConfSettings mongo_client = MongoClient(ConfSettings.mongodb_url) mongodb = mongo_client[ConfSettings.mongodb_db] +# Conexión a MongoDB para Notificaciones +mongo_client_notifications = MongoClient(ConfSettings.mongodb_notifications_url) +mongodb_notifications = mongo_client_notifications[ConfSettings.mongodb_notifications_db] + def get_reports_collection() -> Collection: """Obtiene la colección de reportes desde MongoDB""" return mongodb["reportes"] + +def get_notifications_collection() -> Collection: + """Obtiene la colección de notificaciones desde MongoDB""" + return mongodb_notifications["notificaciones"] diff --git a/src/infrastructure/adapters/persistence/notification_repository_mongo.py b/src/infrastructure/adapters/persistence/notification_repository_mongo.py new file mode 100644 index 0000000..0ab5405 --- /dev/null +++ b/src/infrastructure/adapters/persistence/notification_repository_mongo.py @@ -0,0 +1,103 @@ +"""Notification Repository Implementation using MongoDB""" +from application.ports.notification_repository import NotificationRepository +from domain.notifications import Notification +from infrastructure.adapters.persistence.mongodb import get_notifications_collection +from typing import List, Optional +from datetime import datetime +import uuid + + +class NotificationRepositoryMongo(NotificationRepository): + """Implementación del repositorio de Notificaciones usando MongoDB""" + + def __init__(self): + self.collection = get_notifications_collection() + + def save(self, notification: Notification) -> Notification: + """Guarda una nueva notificación""" + notification_dict = { + "id_notificacion": notification.id_notificacion, + "id_usuario": notification.id_usuario, + "tipo_notificacion": notification.tipo_notificacion, + "titulo": notification.titulo, + "mensaje": notification.mensaje, + "id_reporte": notification.id_reporte, + "estado_reporte": notification.estado_reporte, + "leida": notification.leida, + "fecha_creacion": notification.fecha_creacion or datetime.utcnow(), + "fecha_lectura": notification.fecha_lectura + } + result = self.collection.insert_one(notification_dict) + return notification + + def find_by_id(self, notification_id: str) -> Optional[Notification]: + """Obtiene una notificación por ID""" + doc = self.collection.find_one({"id_notificacion": notification_id}) + if doc: + return self._to_domain(doc) + return None + + def find_by_user_id(self, user_id: int) -> List[Notification]: + """Obtiene todas las notificaciones de un usuario, ordenadas por fecha descendente""" + docs = self.collection.find({"id_usuario": user_id}).sort("fecha_creacion", -1) + return [self._to_domain(doc) for doc in docs] + + def find_unread_by_user_id(self, user_id: int) -> List[Notification]: + """Obtiene todas las notificaciones no leídas de un usuario""" + docs = self.collection.find({ + "id_usuario": user_id, + "leida": False + }).sort("fecha_creacion", -1) + return [self._to_domain(doc) for doc in docs] + + def find_all(self) -> List[Notification]: + """Obtiene todas las notificaciones""" + docs = self.collection.find().sort("fecha_creacion", -1) + return [self._to_domain(doc) for doc in docs] + + def mark_as_read(self, notification_id: str) -> bool: + """Marca una notificación como leída""" + result = self.collection.update_one( + {"id_notificacion": notification_id}, + {"$set": { + "leida": True, + "fecha_lectura": datetime.utcnow() + }} + ) + return result.modified_count > 0 + + def mark_all_as_read(self, user_id: int) -> bool: + """Marca todas las notificaciones de un usuario como leídas""" + result = self.collection.update_many( + {"id_usuario": user_id, "leida": False}, + {"$set": { + "leida": True, + "fecha_lectura": datetime.utcnow() + }} + ) + return result.modified_count > 0 + + def delete(self, notification_id: str) -> bool: + """Elimina una notificación""" + result = self.collection.delete_one({"id_notificacion": notification_id}) + return result.deleted_count > 0 + + def delete_all_by_user(self, user_id: int) -> bool: + """Elimina todas las notificaciones de un usuario""" + result = self.collection.delete_many({"id_usuario": user_id}) + return result.deleted_count > 0 + + def _to_domain(self, doc: dict) -> Notification: + """Convierte un documento de MongoDB a un objeto de dominio""" + return Notification( + id_notificacion=doc.get("id_notificacion"), + id_usuario=doc.get("id_usuario"), + tipo_notificacion=doc.get("tipo_notificacion"), + titulo=doc.get("titulo"), + mensaje=doc.get("mensaje"), + id_reporte=doc.get("id_reporte"), + estado_reporte=doc.get("estado_reporte"), + leida=doc.get("leida", False), + fecha_creacion=doc.get("fecha_creacion"), + fecha_lectura=doc.get("fecha_lectura") + ) diff --git a/src/infrastructure/adapters/rabbitmq/messages.py b/src/infrastructure/adapters/rabbitmq/messages.py index 18724f4..da649a5 100644 --- a/src/infrastructure/adapters/rabbitmq/messages.py +++ b/src/infrastructure/adapters/rabbitmq/messages.py @@ -18,6 +18,13 @@ class ReportEventType(str, Enum): CREATE = "report.create" UPDATE_VISIBILITY = "report.update_visibility" DELETE = "report.delete" + UPDATE_STATUS = "report.update_status" + + +class NotificationEventType(str, Enum): + """Types of notification events""" + CREATE = "notification.create" + REPORT_STATUS_CHANGE = "notification.report_status_change" @dataclass @@ -85,3 +92,33 @@ class ReportMessage: """Create from dictionary""" data['event_type'] = ReportEventType(data['event_type']) return ReportMessage(**data) + + +@dataclass +class NotificationMessage: + """Message for notification events""" + event_type: NotificationEventType + id_notificacion: Optional[str] = None + id_usuario: Optional[int] = None + tipo_notificacion: Optional[str] = None + titulo: Optional[str] = None + mensaje: Optional[str] = None + id_reporte: Optional[str] = None + estado_reporte: Optional[str] = None # Estado del reporte que cambió + fecha_creacion: Optional[str] = None # ISO format datetime string + + def to_dict(self): + """Convert to dictionary""" + data = asdict(self) + data['event_type'] = self.event_type.value + return data + + def to_json(self) -> str: + """Convert to JSON string""" + return json.dumps(self.to_dict()) + + @staticmethod + def from_dict(data: dict) -> 'NotificationMessage': + """Create from dictionary""" + data['event_type'] = NotificationEventType(data['event_type']) + return NotificationMessage(**data) diff --git a/src/infrastructure/api/notifications/__init__.py b/src/infrastructure/api/notifications/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/infrastructure/api/notifications/app.py b/src/infrastructure/api/notifications/app.py new file mode 100644 index 0000000..b5a5b64 --- /dev/null +++ b/src/infrastructure/api/notifications/app.py @@ -0,0 +1,16 @@ +"""Notifications API Application Factory""" +from fastapi import FastAPI +from core.config import ConfSettings +from infrastructure.api.notifications.router import router + + +def create_app() -> FastAPI: + """Factory para crear la aplicación de Notificaciones""" + app = FastAPI( + title="Notificaciones Microservice", + version="1.0.0", + description="Microservicio de gestión de notificaciones" + ) + app.include_router(router) + + return app diff --git a/src/infrastructure/api/notifications/notifications.py b/src/infrastructure/api/notifications/notifications.py new file mode 100644 index 0000000..0b62d14 --- /dev/null +++ b/src/infrastructure/api/notifications/notifications.py @@ -0,0 +1,176 @@ +"""Notifications API endpoints""" +from fastapi import APIRouter, HTTPException, status +from infrastructure.api.notifications.schemas import NotificationResponse, NotificationListResponse, MarkNotificationAsReadRequest +from application.services.notification_services import ( + GetUserNotifications, GetUnreadUserNotifications, MarkNotificationAsRead, + MarkAllUserNotificationsAsRead, DeleteNotification, DeleteAllUserNotifications +) +from infrastructure.adapters.persistence.notification_repository_mongo import NotificationRepositoryMongo +import logging + +router = APIRouter() +notification_repo = NotificationRepositoryMongo() +logger = logging.getLogger(__name__) + + +def _notification_to_response(notification) -> dict: + """Convierte un objeto Notification a dict de respuesta""" + return { + "id_notificacion": notification.id_notificacion, + "id_usuario": notification.id_usuario, + "tipo_notificacion": notification.tipo_notificacion, + "titulo": notification.titulo, + "mensaje": notification.mensaje, + "id_reporte": notification.id_reporte, + "estado_reporte": notification.estado_reporte, + "leida": notification.leida, + "fecha_creacion": notification.fecha_creacion, + "fecha_lectura": notification.fecha_lectura + } + + +@router.get("/user/{user_id}") +async def get_user_notifications(user_id: int): + """Obtiene todas las notificaciones de un usuario""" + try: + get_use_case = GetUserNotifications(notification_repo) + notifications = get_use_case.execute(user_id) + unread_use_case = GetUnreadUserNotifications(notification_repo) + unread = unread_use_case.execute(user_id) + + return NotificationListResponse( + total=len(notifications), + unread=len(unread), + notifications=[_notification_to_response(n) for n in notifications] + ) + except Exception as e: + logger.error(f"Error al obtener notificaciones del usuario {user_id}: {e}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Error interno del servidor" + ) + + +@router.get("/user/{user_id}/unread") +async def get_unread_user_notifications(user_id: int): + """Obtiene todas las notificaciones no leídas de un usuario""" + try: + get_use_case = GetUnreadUserNotifications(notification_repo) + notifications = get_use_case.execute(user_id) + + return NotificationListResponse( + total=len(notifications), + unread=len(notifications), + notifications=[_notification_to_response(n) for n in notifications] + ) + except Exception as e: + logger.error(f"Error al obtener notificaciones no leídas del usuario {user_id}: {e}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Error interno del servidor" + ) + + +@router.get("/{notification_id}") +async def get_notification(notification_id: str): + """Obtiene una notificación por ID""" + try: + notification = notification_repo.find_by_id(notification_id) + if not notification: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Notificación con ID {notification_id} no encontrada" + ) + return _notification_to_response(notification) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error al obtener notificación {notification_id}: {e}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Error interno del servidor" + ) + + +@router.put("/{notification_id}/read", status_code=status.HTTP_200_OK) +async def mark_notification_as_read(notification_id: str): + """Marca una notificación como leída""" + try: + mark_use_case = MarkNotificationAsRead(notification_repo) + result = mark_use_case.execute(notification_id) + + if result["status"] == "error": + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=result["message"] + ) + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error al marcar notificación como leída {notification_id}: {e}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Error interno del servidor" + ) + + +@router.put("/user/{user_id}/read-all", status_code=status.HTTP_200_OK) +async def mark_all_user_notifications_as_read(user_id: int): + """Marca todas las notificaciones de un usuario como leídas""" + try: + mark_use_case = MarkAllUserNotificationsAsRead(notification_repo) + result = mark_use_case.execute(user_id) + + return result + + except Exception as e: + logger.error(f"Error al marcar todas las notificaciones como leídas para el usuario {user_id}: {e}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Error interno del servidor" + ) + + +@router.delete("/{notification_id}", status_code=status.HTTP_200_OK) +async def delete_notification(notification_id: str): + """Elimina una notificación""" + try: + delete_use_case = DeleteNotification(notification_repo) + result = delete_use_case.execute(notification_id) + + if result["status"] == "error": + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=result["message"] + ) + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error al eliminar notificación {notification_id}: {e}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Error interno del servidor" + ) + + +@router.delete("/user/{user_id}/delete-all", status_code=status.HTTP_200_OK) +async def delete_all_user_notifications(user_id: int): + """Elimina todas las notificaciones de un usuario""" + try: + delete_use_case = DeleteAllUserNotifications(notification_repo) + result = delete_use_case.execute(user_id) + + return result + + except Exception as e: + logger.error(f"Error al eliminar todas las notificaciones del usuario {user_id}: {e}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Error interno del servidor" + ) diff --git a/src/infrastructure/api/notifications/root.py b/src/infrastructure/api/notifications/root.py new file mode 100644 index 0000000..380c6f7 --- /dev/null +++ b/src/infrastructure/api/notifications/root.py @@ -0,0 +1,14 @@ +"""Notifications API Root endpoint""" +from fastapi import APIRouter + +router = APIRouter() + + +@router.get("/") +async def root(): + """Root endpoint for Notifications API""" + return { + "message": "Notificaciones API", + "version": "1.0.0", + "status": "running" + } diff --git a/src/infrastructure/api/notifications/router.py b/src/infrastructure/api/notifications/router.py new file mode 100644 index 0000000..c22f677 --- /dev/null +++ b/src/infrastructure/api/notifications/router.py @@ -0,0 +1,18 @@ +"""Notifications API Router""" +from fastapi import APIRouter +from infrastructure.api.notifications.notifications import router as notifications_router +from infrastructure.api.notifications.root import router as root_router + +router = APIRouter() + +router.include_router( + notifications_router, + prefix="/notifications", + tags=["notifications"] +) + +router.include_router( + root_router, + prefix='', + tags=["root"] +) diff --git a/src/infrastructure/api/notifications/schemas.py b/src/infrastructure/api/notifications/schemas.py new file mode 100644 index 0000000..4fcd7d4 --- /dev/null +++ b/src/infrastructure/api/notifications/schemas.py @@ -0,0 +1,33 @@ +"""Notification API Schemas""" +from pydantic import BaseModel +from typing import Optional +from datetime import datetime + + +class NotificationResponse(BaseModel): + """Schema for notification response""" + id_notificacion: str + id_usuario: int + tipo_notificacion: str + titulo: str + mensaje: str + id_reporte: Optional[str] = None + estado_reporte: Optional[str] = None + leida: bool + fecha_creacion: datetime + fecha_lectura: Optional[datetime] = None + + class Config: + from_attributes = True + + +class NotificationListResponse(BaseModel): + """Schema for notification list response""" + total: int + unread: int + notifications: list[NotificationResponse] + + +class MarkNotificationAsReadRequest(BaseModel): + """Schema for marking notification as read""" + pass diff --git a/src/infrastructure/api/reports/reports.py b/src/infrastructure/api/reports/reports.py index 0cc71ff..9f16d8a 100644 --- a/src/infrastructure/api/reports/reports.py +++ b/src/infrastructure/api/reports/reports.py @@ -7,8 +7,12 @@ from application.services.report_services import ( from infrastructure.adapters.persistence.report_repository_mongo import ReportRepositoryMongo from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL from infrastructure.adapters.file_storage import image_storage +from infrastructure.adapters.rabbitmq.sender import send_to_queue +from infrastructure.adapters.rabbitmq.messages import NotificationMessage, NotificationEventType import logging from typing import Optional +from datetime import datetime +import uuid router = APIRouter() report_repo = ReportRepositoryMongo() @@ -231,8 +235,17 @@ async def delete_report(report_id: str): @router.put("/{report_id}/status", status_code=status.HTTP_200_OK) async def update_report_status(report_id: str, status_data: ReportUpdateStatusRequest): - """Actualiza el estado de un reporte""" + """Actualiza el estado de un reporte y envía notificación al usuario""" try: + # Obtener el reporte actual para saber el usuario creador + report = report_repo.find_by_id(report_id) + if not report: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Reporte con ID {report_id} no encontrado" + ) + + # Actualizar el estado update_use_case = UpdateReportStatus(report_repo) result = update_use_case.execute( report_id=report_id, @@ -254,6 +267,30 @@ async def update_report_status(report_id: str, status_data: ReportUpdateStatusRe detail=message ) + # Enviar notificación al usuario creador del reporte + try: + notification_message = NotificationMessage( + event_type=NotificationEventType.REPORT_STATUS_CHANGE, + id_notificacion=str(uuid.uuid4()), + id_usuario=report.id_usuario, + tipo_notificacion="report_status_change", + titulo="Tu reporte ha cambiado de estado", + mensaje=f"El estado de tu reporte ha sido actualizado a: {status_data.estado}", + id_reporte=report_id, + estado_reporte=status_data.estado, + fecha_creacion=datetime.utcnow().isoformat() + ) + + # Enviar a la cola de notificaciones + send_to_queue( + queue_name='notifications_queue', + message=notification_message.to_dict() + ) + logger.info(f"Notification sent to user {report.id_usuario} for report {report_id}") + except Exception as notification_error: + logger.warning(f"Error sending notification for report {report_id}: {notification_error}") + # No fallar la actualización si hay error en notificación + # 200 OK: estado actualizado correctamente return result diff --git a/src/infrastructure/api/users/auth_service.py b/src/infrastructure/api/users/auth_service.py index 581802c..3546d0e 100644 --- a/src/infrastructure/api/users/auth_service.py +++ b/src/infrastructure/api/users/auth_service.py @@ -4,13 +4,9 @@ from passlib.context import CryptContext from typing import Optional, Dict from core.config import ConfSettings -# Configurar contexto para hashing de contraseñas -# Soporta argon2 (nuevo) y bcrypt (antiguo) para compatibilidad hacia atrás -pwd_context = CryptContext( - schemes=["argon2"], - deprecated=["bcrypt"], - argon2__rounds=3 -) +# Configurar contexto para hashing de contraseñas con argon2 +# argon2 es más moderno y más seguro que bcrypt +pwd_context = CryptContext(schemes=["argon2"]) class AuthService: diff --git a/src/main.py b/src/main.py index 7f608f9..2b55630 100644 --- a/src/main.py +++ b/src/main.py @@ -1,11 +1,13 @@ """ Punto de entrada principal para VoxPopuli Microservices -Ejecuta dos APIs en paralelo: Usuarios (puerto 8000) y Reportes (puerto 8001) +Ejecuta dos APIs en paralelo: Usuarios (puerto 8000) y Reportes (puerto 8001), Notificaciones (puerto 8002) """ from infrastructure.api.users.app import create_app as create_users_app from infrastructure.api.reports.app import create_app as create_reports_app +from infrastructure.api.notifications.app import create_app as create_notifications_app from consumers.report_consumer import ReportConsumer from consumers.user_consumer import UserConsumer +from consumers.notification_consumer import NotificationConsumer from core.config import ConfSettings import threading import uvicorn @@ -32,6 +34,17 @@ def run_reports_api(): log_level=ConfSettings.log_level, ) +def run_notifications_api(): + """Ejecuta la API de Notificaciones en puerto 8002""" + app_notifications = create_notifications_app() + uvicorn.run( + app_notifications, + host=ConfSettings.host, + port=8002, + reload=False, + log_level=ConfSettings.log_level, + ) + def run_user_consumer(): consumer = UserConsumer() consumer.start() @@ -40,34 +53,45 @@ def run_reports_consumer(): consumer = ReportConsumer() consumer.start() +def run_notifications_consumer(): + consumer = NotificationConsumer() + consumer.start() + def run(): - """Inicia ambas APIs en threads separados""" + """Inicia todas las APIs en threads separados""" print("=" * 60) print("Iniciando VoxPopuli Microservices...") print("=" * 60) users_thread = threading.Thread(target=run_users_api, daemon=True, name="Users-API") reports_thread = threading.Thread(target=run_reports_api, daemon=True, name="Reports-API") + notifications_thread = threading.Thread(target=run_notifications_api, daemon=True, name="Notifications-API") user_consumer_thread = threading.Thread(target=run_user_consumer, daemon=True, name="Users-Consumer") report_consumer_thread = threading.Thread(target=run_reports_consumer, daemon=True, name="Reports-Consumer") + notifications_consumer_thread = threading.Thread(target=run_notifications_consumer, daemon=True, name="Notifications-Consumer") users_thread.start() reports_thread.start() + notifications_thread.start() user_consumer_thread.start() report_consumer_thread.start() + notifications_consumer_thread.start() print("\n✓ API de Usuarios ejecutándose en http://0.0.0.0:8000") print("✓ API de Reportes ejecutándose en http://0.0.0.0:8001") + print("✓ API de Notificaciones ejecutándose en http://0.0.0.0:8002") print("\nDocumentación disponible en:") print(" - Usuarios: http://localhost:8000/docs") print(" - Reportes: http://localhost:8001/docs") + print(" - Notificaciones: http://localhost:8002/docs") print("\n" + "=" * 60 + "\n") try: users_thread.join() reports_thread.join() + notifications_thread.join() except KeyboardInterrupt: print("\n\nRecibiendo señal de salida...") print("Cerrando APIs...")