Compare commits

...

2 Commits

18 changed files with 1277 additions and 74 deletions

View File

@@ -1,5 +1,12 @@
# Arquitectura de VoxPopuli Microservices
## Resumen General
VoxPopuli es un sistema de microservicios **basado en eventos asincronos** con comunicación mediante RabbitMQ. El sistema ejecuta múltiples componentes en paralelo:
- 2 APIs REST independientes (Usuarios y Reportes)
- 2 Consumidores de mensajes para procesamiento asincrónico
- Bases de datos separadas por dominio (MySQL para usuarios, MongoDB para reportes)
## Patrones Arquitectónicos Usados
### 1. Clean Architecture (Arquitectura Limpia)
@@ -20,31 +27,100 @@ La aplicación está organizada en capas independientes:
│ - Reglas de negocio independientes │
├─────────────────────────────────────────────────────────┤
│ Infrastructure Layer (Infraestructura) │
│ - Adaptadores (Repositorios)
│ - Adaptadores (Repositorios, RabbitMQ)
│ - Acceso a Datos (MySQL, MongoDB) │
│ - Almacenamiento de archivos │
└─────────────────────────────────────────────────────────┘
```
### 2. Microservicios
### 2. Arquitectura Orientada a Eventos (Event-Driven Architecture)
Dos microservicios independientes:
En lugar de procesamiento sincrónico, los servicios se comunican mediante eventos:
- **Usuarios API** (Puerto 8000) - Gestión de usuarios y credenciales
- **Reportes API** (Puerto 8001) - Gestión de reportes comunitarios
```
┌──────────────────────────────────────────────────────────────┐
│ API REST (FastAPI) │
│ (Recibe solicitudes HTTP) │
└────────┬─────────────────────────────────┬───────────────────┘
│ Valida y envía evento │
▼ ▼
┌─────────────────────────────────────────────┐
│ RabbitMQ Message Queue │
│ - users_queue (eventos de usuario) │
│ - reports_queue (eventos de reporte) │
└────────┬────────────────────┬───────────────┘
│ │
▼ ▼
┌─────────────┐ ┌──────────────┐
│ User │ │ Report │
│ Consumer │ │ Consumer │
│ (Thread) │ │ (Thread) │
└─────┬───────┘ └──────┬───────┘
│ Procesa evento │ Procesa evento
▼ ▼
┌──────────────┐ ┌──────────────┐
│ MySQL (BD │ │ MongoDB (BD │
│ de Usuarios) │ │ de Reportes) │
└──────────────┘ └──────────────┘
```
Cada microservicio:
- Tiene su propia base de datos
- Es escalable independientemente
- Se puede desplegar por separado
- Expone su propia API REST
**Ventajas:**
- Bajo acoplamiento entre servicios
- Mayor escalabilidad
- Mejor tolerancia a fallos
- Procesamiento asincrónico
### 3. Patrón Repository
### 3. Microservicios
El sistema consta de dos microservicios independientes que ejecutan en paralelo:
#### **Usuarios API** (Puerto 8000)
- Gestión de usuarios y autenticación
- Base de datos: MySQL con SQLAlchemy
- Endpoints: POST/GET/PUT/DELETE /users/
#### **Reportes API** (Puerto 8001)
- Gestión de reportes comunitarios
- Base de datos: MongoDB
- Endpoints: POST/GET/PUT/DELETE /reports/
- Almacenamiento: Imágenes en WebP (directorio `/storage/reports_images/`)
Cada API:
- Valida solicitudes y envía eventos a RabbitMQ
- Expone documentación automática en `/docs`
- Es independientemente escalable
### 4. Consumidores de Eventos (Message Consumers)
Dos consumidores ejecutan como threads separados:
#### **User Consumer**
- Escucha la cola `users_queue` en RabbitMQ
- Eventos procesados:
- `user.create` → Guarda usuario en MySQL
- `user.update` → Actualiza usuario
- `user.delete` → Elimina usuario
#### **Report Consumer**
- Escucha la cola `reports_queue` en RabbitMQ
- Eventos procesados:
- `report.create` → Guarda reporte en MongoDB, incrementa contador de usuario
- `report.update_visibility` → Actualiza puntuación comunitaria
- `report.delete` → Elimina reporte
**Beneficios:**
- Desacoplamiento de API y procesamiento
- Reintentos automáticos si falla la BD
- Procesamiento en background
### 5. Patrón Repository
Abstracción para acceso a datos:
```
┌──────────────────┐
│ Service Layer │
│ (API Handler) │
└────────┬─────────┘
│ depends on
@@ -67,7 +143,7 @@ Abstracción para acceso a datos:
- Fácil de testear (usar mocks)
- Reutilizable en diferentes contextos
### 4. Inversión de Dependencias (Dependency Inversion)
### 6. Inversión de Dependencias (Dependency Inversion)
Los servicios dependen de **abstracciones (interfaces)**, no de implementaciones concretas:
@@ -84,120 +160,360 @@ service = CreateUser(UserRepositoryMock()) # Para testing
## Flujo de Solicitud
### Crear Usuario:
### Crear Usuario (Asincrónico):
```
1. HTTP POST /users/
└─> FastAPI Handler
└─> CreateUser Use Case
└─> UserRepository.save()
└─> UserRepositorySQL
└─> SQLAlchemy
└─> MySQL Database
└─> Validar datos (FastAPI Handler)
└─> Crear objeto UserMessage
└─> Enviar mensaje a RabbitMQ (users_queue)
└─> Retornar respuesta HTTP inmediatamente
[En paralelo, el User Consumer procesa:]
2. User Consumer recibe evento user.create
└─> UserRepositorySQL.save()
└─> SQLAlchemy
└─> MySQL Database
```
### Crear Reporte:
### Crear Reporte (Asincrónico):
```
1. HTTP POST /reports/
└─> FastAPI Handler
└─> CreateReport Use Case
└─> ReportRepository.save()
└─> UserRepository.increment_reports()
└─> ReportRepositoryMongo
└─> UserRepositorySQL
└─> MongoDB
└─> MySQL
└─> Validar datos y usuario existe (FastAPI Handler)
└─> Crear objeto ReportMessage
└─> Enviar mensaje a RabbitMQ (reports_queue)
└─> Retornar respuesta HTTP inmediatamente
[En paralelo, el Report Consumer procesa:]
2. Report Consumer recibe evento report.create
└─> ReportRepositoryMongo.save()
└─> UserRepositorySQL.increment_reports(id_usuario)
└─> MongoDB + MySQL Database
```
## Capas Detalladas
### Domain Layer (src/domain/)
**Entidades puras de negocio**:
- `User`: Representa un usuario del sistema
- `Report`: Representa un reporte comunitario
No tienen dependencias externas. Solo representan conceptos de negocio.
**Entidades puras de negocio** (sin dependencias externas):
#### User
```python
@dataclass
class User:
user_id: int
nombre: str
apellido: str
email: str
# ... más campos
contraseña_hash: Optional[str] = None
fecha_nacimiento: datetime = None
fecha_creacion: datetime = None
calificacion: float = 50.0 # 0-100
numero_reportes: int = 0
url_foto_perfil: Optional[str] = None
biografia: Optional[str] = None
```
#### Report
```python
@dataclass
class Report:
id_reporte: str
id_usuario: int
tipo_reporte: int # Número que representa el tipo
descripcion: str
ubicacion: Optional[str]
lat: Optional[float] = None
lng: Optional[float] = None
image_filename: Optional[str] = None # WebP format
visibilidad: float = 0.0 # Puntuación comunitaria 0-100
estado: Literal["en proceso", "no resuelto", "resuelto"] = "en proceso"
fecha_creacion: Optional[datetime] = None
```
### Application Layer (src/application/)
**Use Cases y Servicios**:
Contiene la lógica de negocio encapsulada en **Use Cases** y **Puertos**.
#### Ports (Interfaces):
- `UserRepository`: Contrato para acceso a datos de usuarios
- `ReportRepository`: Contrato para acceso a datos de reportes
#### Ports (Interfaces - src/application/ports/)
#### Services (Use Cases):
- `CreateUser`, `GetUserById`, `UpdateUser`, `DeleteUser`
- `CreateReport`, `GetReportById`, `UpdateReportVisibility`
**UserRepository (Interface)**
```python
class UserRepository:
def save(self, user: User) -> User
def find_by_id(self, user_id: int) -> Optional[User]
def find_by_email(self, email: str) -> Optional[User]
def update(self, user: User) -> User
def delete(self, user_id: int) -> bool
def increment_reports(self, user_id: int) -> bool
```
Contienen la lógica de negocio:
**ReportRepository (Interface)**
```python
class ReportRepository:
def save(self, report: Report) -> Report
def find_by_id(self, id_reporte: str) -> Optional[Report]
def find_by_user(self, id_usuario: int) -> List[Report]
def update(self, report: Report) -> Report
def delete(self, id_reporte: str) -> bool
def update_visibility(self, id_reporte: str, visibilidad: float) -> bool
```
#### Services / Use Cases (src/application/services/)
**User Services:**
- `CreateUser` - Valida datos y envía evento a RabbitMQ
- `GetUser` - Recupera usuario de la BD
- `UpdateUser` - Actualiza usuario
- `DeleteUser` - Elimina usuario
**Report Services:**
- `CreateReport` - Valida usuario y reporte, envía evento a RabbitMQ
- `GetReport` - Recupera reporte
- `UpdateReportVisibility` - Actualiza puntuación comunitaria
- `DeleteReport` - Elimina reporte
**Ejemplo de CreateReport:**
```python
class CreateReport:
def execute(self, id_usuario, tipo_reporte, ...):
def __init__(self, repo: ReportRepository, user_repo: UserRepository):
self.repo = repo
self.user_repo = user_repo
def execute(self, id_usuario: int, tipo_reporte: int, descripcion: str, ...):
# 1. Validar que usuario existe
user = self.user_repo.find_by_id(id_usuario)
if not self.user_repo.find_by_id(id_usuario):
return {"status": "error", "message": "Usuario no encontrado"}
# 2. Crear reporte
report = Report(...)
# 2. Validar descripción
if not descripcion.strip():
return {"status": "error", "message": "Descripción requerida"}
# 3. Guardar en BD
self.repo.save(report)
# 3. Crear mensaje de evento
message = ReportMessage(
event_type=ReportEventType.CREATE,
id_usuario=id_usuario,
tipo_reporte=tipo_reporte,
descripcion=descripcion,
...
)
# 4. Actualizar contador de usuario
self.user_repo.increment_reports(id_usuario)
# 4. Enviar a RabbitMQ
send_to_queue(message)
return {"status": "success", "message": "Reporte enviado a procesar"}
```
### Infrastructure Layer (src/infrastructure/)
#### Persistence Adapters:
#### API Handlers (src/infrastructure/api/)
**user_repository_sql.py**:
- Implementa `UserRepository` usando SQLAlchemy
**Users API (puerto 8000)**
- `users.py` - Endpoints REST para CRUD de usuarios
- `auth_service.py` - Autenticación JWT
- `schemas.py` - Esquemas Pydantic para validación
- `router.py` - Rutas de FastAPI
- `app.py` - Aplicación FastAPI
**Reports API (puerto 8001)**
- `reports.py` - Endpoints REST para CRUD de reportes
- `schemas.py` - Esquemas Pydantic
- `router.py` - Rutas
- `app.py` - Aplicación FastAPI
#### Persistence Adapters (src/infrastructure/adapters/persistence/)
**user_repository_sql.py** (Implementación de UserRepository)
- Usa SQLAlchemy ORM
- Implementa todas las operaciones CRUD en MySQL
- Convierte entre modelo de dominio y modelo de BD
**report_repository_mongo.py**:
- Implementa `ReportRepository` usando PyMongo
**report_repository_mongo.py** (Implementación de ReportRepository)
- Usa PyMongo para MongoDB
- Implementa todas las operaciones CRUD
- Convierte entre modelo de dominio y documento MongoDB
#### API Handlers:
**db.py**
- Configuración de conexiones
- Instancias de motor de BD
**users/users.py**:
- Endpoints HTTP para gestión de usuarios
- Usa esquemas Pydantic para validación
**models.py**
- Modelos SQLAlchemy para MySQL
- Esquemas para MongoDB
**reports/reports.py**:
- Endpoints HTTP para gestión de reportes
- Mapeo de solicitudes a use cases
#### RabbitMQ Adapters (src/infrastructure/adapters/rabbitmq/)
**sender.py**
- Función `send_to_queue()` para enviar mensajes
- Serialización de objetos a JSON
**consumer.py** (Base)
- Clase `RabbitMQConsumer` para consumir mensajes
- Conexión y suscripción a colas
- Manejo de excepciones
**messages.py**
- `UserMessage` - Schema para eventos de usuario
- `ReportMessage` - Schema para eventos de reporte
- `UserEventType` - Enumeración de tipos de eventos
- `ReportEventType` - Enumeración de tipos de eventos
#### File Storage (src/infrastructure/adapters/file_storage.py)
- `image_storage` - Manejo de almacenamiento de imágenes
- Convierte imágenes a formato WebP
- Almacena en `/storage/reports_images/`
- Limpieza automática si reporte es eliminado
### Consumers (src/consumers/)
Ejecutan como threads separados en paralelo con las APIs.
#### User Consumer (src/consumers/user_consumer.py)
```python
class UserConsumer:
def __init__(self):
self.repo = UserRepositorySQL()
self.consumer = RabbitMQConsumer(queue_name='users_queue')
self.consumer.set_callback(self.process_message)
def process_message(self, message_dict: dict):
# Reconvertir a UserMessage
message = UserMessage.from_dict(message_dict)
if message.event_type == UserEventType.CREATE:
self._handle_create_user(message)
elif message.event_type == UserEventType.UPDATE:
self._handle_update_user(message)
# ... etc
def start(self):
# Inicia la conexión y escucha
self.consumer.start_consuming()
```
#### Report Consumer (src/consumers/report_consumer.py)
Similar a User Consumer, pero:
- Escucha `reports_queue`
- Procesa `ReportMessage` con `ReportEventType`
- Interactúa con `ReportRepositoryMongo` y `UserRepositorySQL`
- Maneja almacenamiento de imágenes
### Core Layer (src/core/)
Configuración centralizada:
**config.py** - Configuración centralizada:
- Variables de entorno
- Configuración de logging
- Configuración de bases de datos
- Configuración de conexiones de BD
- Configuración de RabbitMQ
- Parámetros de JWT
## Punto de Entrada (src/main.py)
Orquesta todos los componentes en paralelo:
```python
def run():
"""Inicia los 4 componentes en threads separados"""
users_thread = threading.Thread(target=run_users_api)
reports_thread = threading.Thread(target=run_reports_api)
user_consumer_thread = threading.Thread(target=run_user_consumer)
report_consumer_thread = threading.Thread(target=run_reports_consumer)
users_thread.start()
reports_thread.start()
user_consumer_thread.start()
report_consumer_thread.start()
```
Esto inicia:
- API de Usuarios en puerto 8000
- API de Reportes en puerto 8001
- User Consumer escuchando `users_queue`
- Report Consumer escuchando `reports_queue`
## Diagrama de Despliegue
```
┌─────────────────────────────────────────────────────────────┐
│ Docker Container (VoxPopuli) │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ main.py (Orquestador) │ │
│ │ - Lanza 4 threads │ │
│ └───┬──────────┬──────────┬──────────────────────────┬─┘ │
│ │ │ │ │ │
│ ┌───▼──┐ ┌───▼──┐ ┌──▼────┐ ┌─────▼──────┐ │ │
│ │Users │ │Report│ │User │ │Report │ │ │
│ │API │ │API │ │Consume│ │Consumer │ │ │
│ │:8000 │ │:8001 │ │r │ │ │ │ │
│ └───┬──┘ └───┬──┘ └──┬────┘ └─────┬──────┘ │ │
│ │ │ │ │ │ │
└─────┼─────────┼─────────┼────────────┼─────────────┘ │
│ │ │ │ │
└────┬────┴────┬────┴────────────┘ │
│ │ │
┌─────▼──┐ ┌───▼──────┐ │
│RabbitMQ│ │MySQL + │ │
│:5672 │ │MongoDB │ │
└────────┘ └──────────┘ │
```
## Testing
La arquitectura facilita el testing:
La arquitectura facilita el testing en múltiples niveles:
### Unit Tests
- Mockear repositorios para testear Use Cases
- Testear lógica de negocio en Domain Layer
```python
# Mock Repository para testing
class UserRepositoryMock(UserRepository):
def __init__(self):
self.users = {}
class TestCreateReport:
def test_report_creation(self):
mock_repo = ReportRepositoryMock()
service = CreateReport(mock_repo)
result = service.execute(...)
assert result['status'] == 'success'
```
### Integration Tests
- Usar DB reales de prueba
- Testear flujo completo API → RabbitMQ → Consumer → DB
### E2E Tests
- Testear desde HTTP request hasta persistencia
- Validar consumidores procesan eventos correctamente
## Ventajas de la Arquitectura Actual
1. **Bajo Acoplamiento** - Servicios se comunican por eventos
2. **Escalabilidad Horizontal** - Consumidores pueden replicarse
3. **Resiliencia** - RabbitMQ reintenta entregas fallidas
4. **Independencia de BD** - Abstractos por puertos
5. **Testabilidad** - Inyección de dependencias
6. **Mantenibilidad** - Capas claramente separadas
7. **Asincronía** - APIs responden rápidamente
8. **Extensibilidad** - Nuevos tipos de eventos fácilmente
## Stack Tecnológico
| Componente | Tecnología |
|-----------|-----------|
| Framework Web | FastAPI |
| Servidor ASGI | Uvicorn |
| ORM SQL | SQLAlchemy |
| Driver MongoDB | PyMongo |
| Message Queue | RabbitMQ |
| Autenticación | JWT |
| Validación | Pydantic |
| Base Datos SQL | MySQL |
| Base Datos NoSQL | MongoDB |
| Almacenamiento | WebP (imágenes) |
| Concurrencia | threading |
def save(self, user):
self.users[user.user_id] = user
return user

View File

@@ -34,6 +34,22 @@ services:
timeout: 5s
retries: 5
mongodb_nots:
image: mongo:7.0
container_name: voxpopuli_mongo_2
environment:
MONGO_INITDB_DATABASE: voxpopuli_notifications
ports:
- "27018:27017"
volumes:
- mongo_data_notifications:/data/db
healthcheck:
test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"]
interval: 10s
timeout: 5s
retries: 5
volumes:
mysql_data:
mongo_data:
mongo_data_notifications:

View File

@@ -0,0 +1,53 @@
"""Port (interface) for Notification Repository"""
from abc import ABC, abstractmethod
from domain.notifications import Notification
from typing import List, Optional
class NotificationRepository(ABC):
"""Puerto (interfaz) para el repositorio de Notificaciones"""
@abstractmethod
def save(self, notification: Notification) -> Notification:
"""Guarda una notificación en la base de datos"""
pass
@abstractmethod
def find_by_id(self, notification_id: str) -> Optional[Notification]:
"""Obtiene una notificación por ID"""
pass
@abstractmethod
def find_by_user_id(self, user_id: int) -> List[Notification]:
"""Obtiene todas las notificaciones de un usuario"""
pass
@abstractmethod
def find_unread_by_user_id(self, user_id: int) -> List[Notification]:
"""Obtiene todas las notificaciones no leídas de un usuario"""
pass
@abstractmethod
def find_all(self) -> List[Notification]:
"""Obtiene todas las notificaciones"""
pass
@abstractmethod
def mark_as_read(self, notification_id: str) -> bool:
"""Marca una notificación como leída"""
pass
@abstractmethod
def mark_all_as_read(self, user_id: int) -> bool:
"""Marca todas las notificaciones de un usuario como leídas"""
pass
@abstractmethod
def delete(self, notification_id: str) -> bool:
"""Elimina una notificación"""
pass
@abstractmethod
def delete_all_by_user(self, user_id: int) -> bool:
"""Elimina todas las notificaciones de un usuario"""
pass

View File

@@ -0,0 +1,173 @@
"""Notification services implementing the business logic"""
import uuid
from datetime import datetime
from typing import Optional, List, Dict
from domain.notifications import Notification
from application.ports.notification_repository import NotificationRepository
import logging
logger = logging.getLogger(__name__)
class CreateNotification:
"""Use case: Create a new notification"""
def __init__(self, notification_repo: NotificationRepository):
self.notification_repo = notification_repo
def execute(self, id_usuario: int, tipo_notificacion: str, titulo: str,
mensaje: str, id_reporte: Optional[str] = None,
estado_reporte: Optional[str] = None) -> Dict:
"""
Creates a new notification
Args:
id_usuario: User ID who receives the notification
tipo_notificacion: Type of notification
titulo: Notification title
mensaje: Notification message
id_reporte: Optional report ID related to notification
estado_reporte: Optional report status
Returns:
Dictionary with status and message
"""
try:
# Validate inputs
if not id_usuario or id_usuario <= 0:
return {"status": "error", "message": "Usuario inválido"}
if not tipo_notificacion or not titulo or not mensaje:
return {"status": "error", "message": "Campos obligatorios faltantes"}
# Create notification domain object
notification = Notification(
id_notificacion=str(uuid.uuid4()),
id_usuario=id_usuario,
tipo_notificacion=tipo_notificacion,
titulo=titulo,
mensaje=mensaje,
id_reporte=id_reporte,
estado_reporte=estado_reporte,
leida=False,
fecha_creacion=datetime.utcnow()
)
# Save to repository
saved_notification = self.notification_repo.save(notification)
logger.info(f"Notification created: {saved_notification.id_notificacion}")
return {
"status": "success",
"message": "Notificación creada exitosamente",
"id_notificacion": saved_notification.id_notificacion
}
except Exception as e:
logger.error(f"Error creating notification: {e}", exc_info=True)
return {"status": "error", "message": f"Error al crear notificación: {str(e)}"}
class GetNotificationById:
"""Use case: Get a notification by ID"""
def __init__(self, notification_repo: NotificationRepository):
self.notification_repo = notification_repo
def execute(self, notification_id: str) -> Optional[Notification]:
"""Retrieves a notification by ID"""
return self.notification_repo.find_by_id(notification_id)
class GetUserNotifications:
"""Use case: Get all notifications for a user"""
def __init__(self, notification_repo: NotificationRepository):
self.notification_repo = notification_repo
def execute(self, user_id: int) -> List[Notification]:
"""Retrieves all notifications for a user"""
return self.notification_repo.find_by_user_id(user_id)
class GetUnreadUserNotifications:
"""Use case: Get unread notifications for a user"""
def __init__(self, notification_repo: NotificationRepository):
self.notification_repo = notification_repo
def execute(self, user_id: int) -> List[Notification]:
"""Retrieves unread notifications for a user"""
return self.notification_repo.find_unread_by_user_id(user_id)
class MarkNotificationAsRead:
"""Use case: Mark a notification as read"""
def __init__(self, notification_repo: NotificationRepository):
self.notification_repo = notification_repo
def execute(self, notification_id: str) -> Dict:
"""Marks a notification as read"""
try:
success = self.notification_repo.mark_as_read(notification_id)
if success:
logger.info(f"Notification marked as read: {notification_id}")
return {"status": "success", "message": "Notificación marcada como leída"}
else:
return {"status": "error", "message": "Notificación no encontrada"}
except Exception as e:
logger.error(f"Error marking notification as read: {e}", exc_info=True)
return {"status": "error", "message": f"Error: {str(e)}"}
class MarkAllUserNotificationsAsRead:
"""Use case: Mark all notifications for a user as read"""
def __init__(self, notification_repo: NotificationRepository):
self.notification_repo = notification_repo
def execute(self, user_id: int) -> Dict:
"""Marks all notifications for a user as read"""
try:
success = self.notification_repo.mark_all_as_read(user_id)
logger.info(f"All notifications marked as read for user: {user_id}")
return {"status": "success", "message": "Todas las notificaciones marcadas como leídas"}
except Exception as e:
logger.error(f"Error marking notifications as read: {e}", exc_info=True)
return {"status": "error", "message": f"Error: {str(e)}"}
class DeleteNotification:
"""Use case: Delete a notification"""
def __init__(self, notification_repo: NotificationRepository):
self.notification_repo = notification_repo
def execute(self, notification_id: str) -> Dict:
"""Deletes a notification"""
try:
success = self.notification_repo.delete(notification_id)
if success:
logger.info(f"Notification deleted: {notification_id}")
return {"status": "success", "message": "Notificación eliminada"}
else:
return {"status": "error", "message": "Notificación no encontrada"}
except Exception as e:
logger.error(f"Error deleting notification: {e}", exc_info=True)
return {"status": "error", "message": f"Error: {str(e)}"}
class DeleteAllUserNotifications:
"""Use case: Delete all notifications for a user"""
def __init__(self, notification_repo: NotificationRepository):
self.notification_repo = notification_repo
def execute(self, user_id: int) -> Dict:
"""Deletes all notifications for a user"""
try:
success = self.notification_repo.delete_all_by_user(user_id)
logger.info(f"All notifications deleted for user: {user_id}")
return {"status": "success", "message": "Todas las notificaciones eliminadas"}
except Exception as e:
logger.error(f"Error deleting notifications: {e}", exc_info=True)
return {"status": "error", "message": f"Error: {str(e)}"}

View File

@@ -0,0 +1,135 @@
"""Notification RabbitMQ Consumer - Processes notification events"""
import sys
import os
import logging
from datetime import datetime
# Add src to path to import modules
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer
from infrastructure.adapters.rabbitmq.messages import NotificationMessage, NotificationEventType
from infrastructure.adapters.persistence.notification_repository_mongo import NotificationRepositoryMongo
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class NotificationConsumer:
"""Consumer for notification events from RabbitMQ"""
def __init__(self):
self.repo = NotificationRepositoryMongo()
self.consumer = RabbitMQConsumer(queue_name='notifications_queue')
self.consumer.set_callback(self.process_message)
def process_message(self, message_dict: dict):
"""
Processes a notification event message from RabbitMQ
Args:
message_dict: Dictionary containing the message data
"""
try:
# Reconstruct the NotificationMessage object
message = NotificationMessage.from_dict(message_dict)
if message.event_type == NotificationEventType.CREATE:
self._handle_create_notification(message)
elif message.event_type == NotificationEventType.REPORT_STATUS_CHANGE:
self._handle_report_status_change(message)
else:
logger.warning(f"Unknown event type: {message.event_type}")
except Exception as e:
logger.error(f"Error processing notification message: {e}", exc_info=True)
raise
def _handle_create_notification(self, message: NotificationMessage):
"""Handle notification create event"""
try:
logger.info(f"Creating notification for user: {message.id_usuario}")
# Parse datetime string if provided
fecha_creacion = None
if message.fecha_creacion:
try:
fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
except (ValueError, TypeError):
fecha_creacion = datetime.utcnow()
else:
fecha_creacion = datetime.utcnow()
from domain.notifications import Notification
# Create Notification domain object
notification = Notification(
id_notificacion=message.id_notificacion,
id_usuario=message.id_usuario,
tipo_notificacion=message.tipo_notificacion,
titulo=message.titulo,
mensaje=message.mensaje,
id_reporte=message.id_reporte,
estado_reporte=message.estado_reporte,
leida=False,
fecha_creacion=fecha_creacion
)
# Save to MongoDB repository
saved_notification = self.repo.save(notification)
logger.info(f"Notification created successfully: {message.id_notificacion}")
except Exception as e:
logger.error(f"Error creating notification: {e}", exc_info=True)
raise
def _handle_report_status_change(self, message: NotificationMessage):
"""Handle report status change notification"""
try:
logger.info(f"Processing report status change notification for user: {message.id_usuario}")
# Parse datetime string if provided
fecha_creacion = None
if message.fecha_creacion:
try:
fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
except (ValueError, TypeError):
fecha_creacion = datetime.utcnow()
else:
fecha_creacion = datetime.utcnow()
from domain.notifications import Notification
# Create Notification domain object for status change
notification = Notification(
id_notificacion=message.id_notificacion,
id_usuario=message.id_usuario,
tipo_notificacion=message.tipo_notificacion,
titulo=message.titulo,
mensaje=message.mensaje,
id_reporte=message.id_reporte,
estado_reporte=message.estado_reporte,
leida=False,
fecha_creacion=fecha_creacion
)
# Save to MongoDB repository
saved_notification = self.repo.save(notification)
logger.info(f"Report status change notification created: {message.id_notificacion}")
except Exception as e:
logger.error(f"Error processing report status change: {e}", exc_info=True)
raise
def start(self):
"""Start consuming messages from RabbitMQ"""
try:
logger.info("Starting Notification Consumer...")
self.consumer.start()
except Exception as e:
logger.error(f"Error starting notification consumer: {e}", exc_info=True)
raise

View File

@@ -21,6 +21,16 @@ class Settings(BaseSettings):
description="Base de datos MongoDB"
)
# Base de datos MongoDB para Notificaciones
mongodb_notifications_url: str = Field(
default=os.getenv("MONGODB_NOTIFICATIONS_URL", "mongodb://localhost:27018"),
description="URL de conexión a MongoDB para API de Notificaciones"
)
mongodb_notifications_db: str = Field(
default="voxpopuli_notifications",
description="Base de datos MongoDB para notificaciones"
)
rabbitmq: str = Field (
default=os.getenv("RABBITMQ_URI", "localhost")

View File

@@ -0,0 +1,34 @@
"""Notification domain model"""
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class Notification:
"""Domain model for notifications"""
id_notificacion: str
id_usuario: int # Usuario que recibe la notificación
tipo_notificacion: str # "report_status_change", etc.
titulo: str
mensaje: str
id_reporte: Optional[str] = None
estado_reporte: Optional[str] = None # "en proceso", "resuelto", "no resuelto"
leida: bool = False
fecha_creacion: Optional[datetime] = None
fecha_lectura: Optional[datetime] = None
def __post_init__(self):
"""Validations after initialization"""
if not self.id_notificacion:
raise ValueError("id_notificacion is required")
if not self.id_usuario or self.id_usuario <= 0:
raise ValueError("id_usuario must be a positive integer")
if not self.tipo_notificacion:
raise ValueError("tipo_notificacion is required")
if not self.titulo:
raise ValueError("titulo is required")
if not self.mensaje:
raise ValueError("mensaje is required")
if self.fecha_creacion is None:
self.fecha_creacion = datetime.now()

View File

@@ -6,6 +6,14 @@ from core.config import ConfSettings
mongo_client = MongoClient(ConfSettings.mongodb_url)
mongodb = mongo_client[ConfSettings.mongodb_db]
# Conexión a MongoDB para Notificaciones
mongo_client_notifications = MongoClient(ConfSettings.mongodb_notifications_url)
mongodb_notifications = mongo_client_notifications[ConfSettings.mongodb_notifications_db]
def get_reports_collection() -> Collection:
"""Obtiene la colección de reportes desde MongoDB"""
return mongodb["reportes"]
def get_notifications_collection() -> Collection:
"""Obtiene la colección de notificaciones desde MongoDB"""
return mongodb_notifications["notificaciones"]

View File

@@ -0,0 +1,103 @@
"""Notification Repository Implementation using MongoDB"""
from application.ports.notification_repository import NotificationRepository
from domain.notifications import Notification
from infrastructure.adapters.persistence.mongodb import get_notifications_collection
from typing import List, Optional
from datetime import datetime
import uuid
class NotificationRepositoryMongo(NotificationRepository):
"""Implementación del repositorio de Notificaciones usando MongoDB"""
def __init__(self):
self.collection = get_notifications_collection()
def save(self, notification: Notification) -> Notification:
"""Guarda una nueva notificación"""
notification_dict = {
"id_notificacion": notification.id_notificacion,
"id_usuario": notification.id_usuario,
"tipo_notificacion": notification.tipo_notificacion,
"titulo": notification.titulo,
"mensaje": notification.mensaje,
"id_reporte": notification.id_reporte,
"estado_reporte": notification.estado_reporte,
"leida": notification.leida,
"fecha_creacion": notification.fecha_creacion or datetime.utcnow(),
"fecha_lectura": notification.fecha_lectura
}
result = self.collection.insert_one(notification_dict)
return notification
def find_by_id(self, notification_id: str) -> Optional[Notification]:
"""Obtiene una notificación por ID"""
doc = self.collection.find_one({"id_notificacion": notification_id})
if doc:
return self._to_domain(doc)
return None
def find_by_user_id(self, user_id: int) -> List[Notification]:
"""Obtiene todas las notificaciones de un usuario, ordenadas por fecha descendente"""
docs = self.collection.find({"id_usuario": user_id}).sort("fecha_creacion", -1)
return [self._to_domain(doc) for doc in docs]
def find_unread_by_user_id(self, user_id: int) -> List[Notification]:
"""Obtiene todas las notificaciones no leídas de un usuario"""
docs = self.collection.find({
"id_usuario": user_id,
"leida": False
}).sort("fecha_creacion", -1)
return [self._to_domain(doc) for doc in docs]
def find_all(self) -> List[Notification]:
"""Obtiene todas las notificaciones"""
docs = self.collection.find().sort("fecha_creacion", -1)
return [self._to_domain(doc) for doc in docs]
def mark_as_read(self, notification_id: str) -> bool:
"""Marca una notificación como leída"""
result = self.collection.update_one(
{"id_notificacion": notification_id},
{"$set": {
"leida": True,
"fecha_lectura": datetime.utcnow()
}}
)
return result.modified_count > 0
def mark_all_as_read(self, user_id: int) -> bool:
"""Marca todas las notificaciones de un usuario como leídas"""
result = self.collection.update_many(
{"id_usuario": user_id, "leida": False},
{"$set": {
"leida": True,
"fecha_lectura": datetime.utcnow()
}}
)
return result.modified_count > 0
def delete(self, notification_id: str) -> bool:
"""Elimina una notificación"""
result = self.collection.delete_one({"id_notificacion": notification_id})
return result.deleted_count > 0
def delete_all_by_user(self, user_id: int) -> bool:
"""Elimina todas las notificaciones de un usuario"""
result = self.collection.delete_many({"id_usuario": user_id})
return result.deleted_count > 0
def _to_domain(self, doc: dict) -> Notification:
"""Convierte un documento de MongoDB a un objeto de dominio"""
return Notification(
id_notificacion=doc.get("id_notificacion"),
id_usuario=doc.get("id_usuario"),
tipo_notificacion=doc.get("tipo_notificacion"),
titulo=doc.get("titulo"),
mensaje=doc.get("mensaje"),
id_reporte=doc.get("id_reporte"),
estado_reporte=doc.get("estado_reporte"),
leida=doc.get("leida", False),
fecha_creacion=doc.get("fecha_creacion"),
fecha_lectura=doc.get("fecha_lectura")
)

View File

@@ -18,6 +18,13 @@ class ReportEventType(str, Enum):
CREATE = "report.create"
UPDATE_VISIBILITY = "report.update_visibility"
DELETE = "report.delete"
UPDATE_STATUS = "report.update_status"
class NotificationEventType(str, Enum):
"""Types of notification events"""
CREATE = "notification.create"
REPORT_STATUS_CHANGE = "notification.report_status_change"
@dataclass
@@ -85,3 +92,33 @@ class ReportMessage:
"""Create from dictionary"""
data['event_type'] = ReportEventType(data['event_type'])
return ReportMessage(**data)
@dataclass
class NotificationMessage:
"""Message for notification events"""
event_type: NotificationEventType
id_notificacion: Optional[str] = None
id_usuario: Optional[int] = None
tipo_notificacion: Optional[str] = None
titulo: Optional[str] = None
mensaje: Optional[str] = None
id_reporte: Optional[str] = None
estado_reporte: Optional[str] = None # Estado del reporte que cambió
fecha_creacion: Optional[str] = None # ISO format datetime string
def to_dict(self):
"""Convert to dictionary"""
data = asdict(self)
data['event_type'] = self.event_type.value
return data
def to_json(self) -> str:
"""Convert to JSON string"""
return json.dumps(self.to_dict())
@staticmethod
def from_dict(data: dict) -> 'NotificationMessage':
"""Create from dictionary"""
data['event_type'] = NotificationEventType(data['event_type'])
return NotificationMessage(**data)

View File

@@ -0,0 +1,16 @@
"""Notifications API Application Factory"""
from fastapi import FastAPI
from core.config import ConfSettings
from infrastructure.api.notifications.router import router
def create_app() -> FastAPI:
"""Factory para crear la aplicación de Notificaciones"""
app = FastAPI(
title="Notificaciones Microservice",
version="1.0.0",
description="Microservicio de gestión de notificaciones"
)
app.include_router(router)
return app

View File

@@ -0,0 +1,176 @@
"""Notifications API endpoints"""
from fastapi import APIRouter, HTTPException, status
from infrastructure.api.notifications.schemas import NotificationResponse, NotificationListResponse, MarkNotificationAsReadRequest
from application.services.notification_services import (
GetUserNotifications, GetUnreadUserNotifications, MarkNotificationAsRead,
MarkAllUserNotificationsAsRead, DeleteNotification, DeleteAllUserNotifications
)
from infrastructure.adapters.persistence.notification_repository_mongo import NotificationRepositoryMongo
import logging
router = APIRouter()
notification_repo = NotificationRepositoryMongo()
logger = logging.getLogger(__name__)
def _notification_to_response(notification) -> dict:
"""Convierte un objeto Notification a dict de respuesta"""
return {
"id_notificacion": notification.id_notificacion,
"id_usuario": notification.id_usuario,
"tipo_notificacion": notification.tipo_notificacion,
"titulo": notification.titulo,
"mensaje": notification.mensaje,
"id_reporte": notification.id_reporte,
"estado_reporte": notification.estado_reporte,
"leida": notification.leida,
"fecha_creacion": notification.fecha_creacion,
"fecha_lectura": notification.fecha_lectura
}
@router.get("/user/{user_id}")
async def get_user_notifications(user_id: int):
"""Obtiene todas las notificaciones de un usuario"""
try:
get_use_case = GetUserNotifications(notification_repo)
notifications = get_use_case.execute(user_id)
unread_use_case = GetUnreadUserNotifications(notification_repo)
unread = unread_use_case.execute(user_id)
return NotificationListResponse(
total=len(notifications),
unread=len(unread),
notifications=[_notification_to_response(n) for n in notifications]
)
except Exception as e:
logger.error(f"Error al obtener notificaciones del usuario {user_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.get("/user/{user_id}/unread")
async def get_unread_user_notifications(user_id: int):
"""Obtiene todas las notificaciones no leídas de un usuario"""
try:
get_use_case = GetUnreadUserNotifications(notification_repo)
notifications = get_use_case.execute(user_id)
return NotificationListResponse(
total=len(notifications),
unread=len(notifications),
notifications=[_notification_to_response(n) for n in notifications]
)
except Exception as e:
logger.error(f"Error al obtener notificaciones no leídas del usuario {user_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.get("/{notification_id}")
async def get_notification(notification_id: str):
"""Obtiene una notificación por ID"""
try:
notification = notification_repo.find_by_id(notification_id)
if not notification:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Notificación con ID {notification_id} no encontrada"
)
return _notification_to_response(notification)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error al obtener notificación {notification_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.put("/{notification_id}/read", status_code=status.HTTP_200_OK)
async def mark_notification_as_read(notification_id: str):
"""Marca una notificación como leída"""
try:
mark_use_case = MarkNotificationAsRead(notification_repo)
result = mark_use_case.execute(notification_id)
if result["status"] == "error":
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=result["message"]
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error al marcar notificación como leída {notification_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.put("/user/{user_id}/read-all", status_code=status.HTTP_200_OK)
async def mark_all_user_notifications_as_read(user_id: int):
"""Marca todas las notificaciones de un usuario como leídas"""
try:
mark_use_case = MarkAllUserNotificationsAsRead(notification_repo)
result = mark_use_case.execute(user_id)
return result
except Exception as e:
logger.error(f"Error al marcar todas las notificaciones como leídas para el usuario {user_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.delete("/{notification_id}", status_code=status.HTTP_200_OK)
async def delete_notification(notification_id: str):
"""Elimina una notificación"""
try:
delete_use_case = DeleteNotification(notification_repo)
result = delete_use_case.execute(notification_id)
if result["status"] == "error":
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=result["message"]
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error al eliminar notificación {notification_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.delete("/user/{user_id}/delete-all", status_code=status.HTTP_200_OK)
async def delete_all_user_notifications(user_id: int):
"""Elimina todas las notificaciones de un usuario"""
try:
delete_use_case = DeleteAllUserNotifications(notification_repo)
result = delete_use_case.execute(user_id)
return result
except Exception as e:
logger.error(f"Error al eliminar todas las notificaciones del usuario {user_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)

View File

@@ -0,0 +1,14 @@
"""Notifications API Root endpoint"""
from fastapi import APIRouter
router = APIRouter()
@router.get("/")
async def root():
"""Root endpoint for Notifications API"""
return {
"message": "Notificaciones API",
"version": "1.0.0",
"status": "running"
}

View File

@@ -0,0 +1,18 @@
"""Notifications API Router"""
from fastapi import APIRouter
from infrastructure.api.notifications.notifications import router as notifications_router
from infrastructure.api.notifications.root import router as root_router
router = APIRouter()
router.include_router(
notifications_router,
prefix="/notifications",
tags=["notifications"]
)
router.include_router(
root_router,
prefix='',
tags=["root"]
)

View File

@@ -0,0 +1,33 @@
"""Notification API Schemas"""
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
class NotificationResponse(BaseModel):
"""Schema for notification response"""
id_notificacion: str
id_usuario: int
tipo_notificacion: str
titulo: str
mensaje: str
id_reporte: Optional[str] = None
estado_reporte: Optional[str] = None
leida: bool
fecha_creacion: datetime
fecha_lectura: Optional[datetime] = None
class Config:
from_attributes = True
class NotificationListResponse(BaseModel):
"""Schema for notification list response"""
total: int
unread: int
notifications: list[NotificationResponse]
class MarkNotificationAsReadRequest(BaseModel):
"""Schema for marking notification as read"""
pass

View File

@@ -7,8 +7,12 @@ from application.services.report_services import (
from infrastructure.adapters.persistence.report_repository_mongo import ReportRepositoryMongo
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
from infrastructure.adapters.file_storage import image_storage
from infrastructure.adapters.rabbitmq.sender import send_to_queue
from infrastructure.adapters.rabbitmq.messages import NotificationMessage, NotificationEventType
import logging
from typing import Optional
from datetime import datetime
import uuid
router = APIRouter()
report_repo = ReportRepositoryMongo()
@@ -231,8 +235,17 @@ async def delete_report(report_id: str):
@router.put("/{report_id}/status", status_code=status.HTTP_200_OK)
async def update_report_status(report_id: str, status_data: ReportUpdateStatusRequest):
"""Actualiza el estado de un reporte"""
"""Actualiza el estado de un reporte y envía notificación al usuario"""
try:
# Obtener el reporte actual para saber el usuario creador
report = report_repo.find_by_id(report_id)
if not report:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Reporte con ID {report_id} no encontrado"
)
# Actualizar el estado
update_use_case = UpdateReportStatus(report_repo)
result = update_use_case.execute(
report_id=report_id,
@@ -254,6 +267,30 @@ async def update_report_status(report_id: str, status_data: ReportUpdateStatusRe
detail=message
)
# Enviar notificación al usuario creador del reporte
try:
notification_message = NotificationMessage(
event_type=NotificationEventType.REPORT_STATUS_CHANGE,
id_notificacion=str(uuid.uuid4()),
id_usuario=report.id_usuario,
tipo_notificacion="report_status_change",
titulo="Tu reporte ha cambiado de estado",
mensaje=f"El estado de tu reporte ha sido actualizado a: {status_data.estado}",
id_reporte=report_id,
estado_reporte=status_data.estado,
fecha_creacion=datetime.utcnow().isoformat()
)
# Enviar a la cola de notificaciones
send_to_queue(
queue_name='notifications_queue',
message=notification_message.to_dict()
)
logger.info(f"Notification sent to user {report.id_usuario} for report {report_id}")
except Exception as notification_error:
logger.warning(f"Error sending notification for report {report_id}: {notification_error}")
# No fallar la actualización si hay error en notificación
# 200 OK: estado actualizado correctamente
return result

View File

@@ -1,11 +1,13 @@
"""
Punto de entrada principal para VoxPopuli Microservices
Ejecuta dos APIs en paralelo: Usuarios (puerto 8000) y Reportes (puerto 8001)
Ejecuta dos APIs en paralelo: Usuarios (puerto 8000) y Reportes (puerto 8001), Notificaciones (puerto 8002)
"""
from infrastructure.api.users.app import create_app as create_users_app
from infrastructure.api.reports.app import create_app as create_reports_app
from infrastructure.api.notifications.app import create_app as create_notifications_app
from consumers.report_consumer import ReportConsumer
from consumers.user_consumer import UserConsumer
from consumers.notification_consumer import NotificationConsumer
from core.config import ConfSettings
import threading
import uvicorn
@@ -32,6 +34,17 @@ def run_reports_api():
log_level=ConfSettings.log_level,
)
def run_notifications_api():
"""Ejecuta la API de Notificaciones en puerto 8002"""
app_notifications = create_notifications_app()
uvicorn.run(
app_notifications,
host=ConfSettings.host,
port=8002,
reload=False,
log_level=ConfSettings.log_level,
)
def run_user_consumer():
consumer = UserConsumer()
consumer.start()
@@ -40,34 +53,45 @@ def run_reports_consumer():
consumer = ReportConsumer()
consumer.start()
def run_notifications_consumer():
consumer = NotificationConsumer()
consumer.start()
def run():
"""Inicia ambas APIs en threads separados"""
"""Inicia todas las APIs en threads separados"""
print("=" * 60)
print("Iniciando VoxPopuli Microservices...")
print("=" * 60)
users_thread = threading.Thread(target=run_users_api, daemon=True, name="Users-API")
reports_thread = threading.Thread(target=run_reports_api, daemon=True, name="Reports-API")
notifications_thread = threading.Thread(target=run_notifications_api, daemon=True, name="Notifications-API")
user_consumer_thread = threading.Thread(target=run_user_consumer, daemon=True, name="Users-Consumer")
report_consumer_thread = threading.Thread(target=run_reports_consumer, daemon=True, name="Reports-Consumer")
notifications_consumer_thread = threading.Thread(target=run_notifications_consumer, daemon=True, name="Notifications-Consumer")
users_thread.start()
reports_thread.start()
notifications_thread.start()
user_consumer_thread.start()
report_consumer_thread.start()
notifications_consumer_thread.start()
print("\n✓ API de Usuarios ejecutándose en http://0.0.0.0:8000")
print("✓ API de Reportes ejecutándose en http://0.0.0.0:8001")
print("✓ API de Notificaciones ejecutándose en http://0.0.0.0:8002")
print("\nDocumentación disponible en:")
print(" - Usuarios: http://localhost:8000/docs")
print(" - Reportes: http://localhost:8001/docs")
print(" - Notificaciones: http://localhost:8002/docs")
print("\n" + "=" * 60 + "\n")
try:
users_thread.join()
reports_thread.join()
notifications_thread.join()
except KeyboardInterrupt:
print("\n\nRecibiendo señal de salida...")
print("Cerrando APIs...")