21 KiB
Arquitectura de VoxPopuli Microservices
Resumen General
VoxPopuli es un sistema de microservicios basado en eventos asincronos con comunicación mediante RabbitMQ. El sistema ejecuta múltiples componentes en paralelo:
- 2 APIs REST independientes (Usuarios y Reportes)
- 2 Consumidores de mensajes para procesamiento asincrónico
- Bases de datos separadas por dominio (MySQL para usuarios, MongoDB para reportes)
Patrones Arquitectónicos Usados
1. Clean Architecture (Arquitectura Limpia)
La aplicación está organizada en capas independientes:
┌─────────────────────────────────────────────────────────┐
│ API REST (FastAPI) │
│ Infraestructura API (HTTP Handlers) │
├─────────────────────────────────────────────────────────┤
│ Application Layer (Servicios) │
│ - Use Cases / Application Services │
│ - Lógica de negocio de la aplicación │
├─────────────────────────────────────────────────────────┤
│ Domain Layer (Dominio) │
│ - Entidades de negocio puras │
│ - Reglas de negocio independientes │
├─────────────────────────────────────────────────────────┤
│ Infrastructure Layer (Infraestructura) │
│ - Adaptadores (Repositorios, RabbitMQ) │
│ - Acceso a Datos (MySQL, MongoDB) │
│ - Almacenamiento de archivos │
└─────────────────────────────────────────────────────────┘
2. Arquitectura Orientada a Eventos (Event-Driven Architecture)
En lugar de procesamiento sincrónico, los servicios se comunican mediante eventos:
┌──────────────────────────────────────────────────────────────┐
│ API REST (FastAPI) │
│ (Recibe solicitudes HTTP) │
└────────┬─────────────────────────────────┬───────────────────┘
│ Valida y envía evento │
▼ ▼
┌─────────────────────────────────────────────┐
│ RabbitMQ Message Queue │
│ - users_queue (eventos de usuario) │
│ - reports_queue (eventos de reporte) │
└────────┬────────────────────┬───────────────┘
│ │
▼ ▼
┌─────────────┐ ┌──────────────┐
│ User │ │ Report │
│ Consumer │ │ Consumer │
│ (Thread) │ │ (Thread) │
└─────┬───────┘ └──────┬───────┘
│ Procesa evento │ Procesa evento
▼ ▼
┌──────────────┐ ┌──────────────┐
│ MySQL (BD │ │ MongoDB (BD │
│ de Usuarios) │ │ de Reportes) │
└──────────────┘ └──────────────┘
Ventajas:
- Bajo acoplamiento entre servicios
- Mayor escalabilidad
- Mejor tolerancia a fallos
- Procesamiento asincrónico
3. Microservicios
El sistema consta de dos microservicios independientes que ejecutan en paralelo:
Usuarios API (Puerto 8000)
- Gestión de usuarios y autenticación
- Base de datos: MySQL con SQLAlchemy
- Endpoints: POST/GET/PUT/DELETE /users/
Reportes API (Puerto 8001)
- Gestión de reportes comunitarios
- Base de datos: MongoDB
- Endpoints: POST/GET/PUT/DELETE /reports/
- Almacenamiento: Imágenes en WebP (directorio
/storage/reports_images/)
Cada API:
- Valida solicitudes y envía eventos a RabbitMQ
- Expone documentación automática en
/docs - Es independientemente escalable
4. Consumidores de Eventos (Message Consumers)
Dos consumidores ejecutan como threads separados:
User Consumer
- Escucha la cola
users_queueen RabbitMQ - Eventos procesados:
user.create→ Guarda usuario en MySQLuser.update→ Actualiza usuariouser.delete→ Elimina usuario
Report Consumer
- Escucha la cola
reports_queueen RabbitMQ - Eventos procesados:
report.create→ Guarda reporte en MongoDB, incrementa contador de usuarioreport.update_visibility→ Actualiza puntuación comunitariareport.delete→ Elimina reporte
Beneficios:
- Desacoplamiento de API y procesamiento
- Reintentos automáticos si falla la BD
- Procesamiento en background
5. Patrón Repository
Abstracción para acceso a datos:
┌──────────────────┐
│ Service Layer │
│ (API Handler) │
└────────┬─────────┘
│ depends on
▼
┌──────────────────────────┐
│ Port (Interface) │
│ UserRepository │
│ ReportRepository │
└────────┬─────────────────┘
│ implemented by
▼
┌──────────────────────────┐
│ Concrete Adapters │
│ UserRepositorySQL │
│ ReportRepositoryMongo │
└──────────────────────────┘
Beneficios:
- Independencia de la implementación de BD
- Fácil de testear (usar mocks)
- Reutilizable en diferentes contextos
6. Inversión de Dependencias (Dependency Inversion)
Los servicios dependen de abstracciones (interfaces), no de implementaciones concretas:
class CreateUser:
def __init__(self, repo: UserRepository): # Depende de interfaz
self.repo = repo
# Puede usar cualquier implementación de UserRepository
service = CreateUser(UserRepositorySQL()) # Implementación SQL
# o
service = CreateUser(UserRepositoryMock()) # Para testing
Flujo de Solicitud
Crear Usuario (Asincrónico):
1. HTTP POST /users/
└─> Validar datos (FastAPI Handler)
└─> Crear objeto UserMessage
└─> Enviar mensaje a RabbitMQ (users_queue)
└─> Retornar respuesta HTTP inmediatamente
[En paralelo, el User Consumer procesa:]
2. User Consumer recibe evento user.create
└─> UserRepositorySQL.save()
└─> SQLAlchemy
└─> MySQL Database
Crear Reporte (Asincrónico):
1. HTTP POST /reports/
└─> Validar datos y usuario existe (FastAPI Handler)
└─> Crear objeto ReportMessage
└─> Enviar mensaje a RabbitMQ (reports_queue)
└─> Retornar respuesta HTTP inmediatamente
[En paralelo, el Report Consumer procesa:]
2. Report Consumer recibe evento report.create
└─> ReportRepositoryMongo.save()
└─> UserRepositorySQL.increment_reports(id_usuario)
└─> MongoDB + MySQL Database
Capas Detalladas
Domain Layer (src/domain/)
Entidades puras de negocio (sin dependencias externas):
User
@dataclass
class User:
user_id: int
nombre: str
apellido: str
email: str
contraseña_hash: Optional[str] = None
fecha_nacimiento: datetime = None
fecha_creacion: datetime = None
calificacion: float = 50.0 # 0-100
numero_reportes: int = 0
url_foto_perfil: Optional[str] = None
biografia: Optional[str] = None
Report
@dataclass
class Report:
id_reporte: str
id_usuario: int
tipo_reporte: int # Número que representa el tipo
descripcion: str
ubicacion: Optional[str]
lat: Optional[float] = None
lng: Optional[float] = None
image_filename: Optional[str] = None # WebP format
visibilidad: float = 0.0 # Puntuación comunitaria 0-100
estado: Literal["en proceso", "no resuelto", "resuelto"] = "en proceso"
fecha_creacion: Optional[datetime] = None
Application Layer (src/application/)
Contiene la lógica de negocio encapsulada en Use Cases y Puertos.
Ports (Interfaces - src/application/ports/)
UserRepository (Interface)
class UserRepository:
def save(self, user: User) -> User
def find_by_id(self, user_id: int) -> Optional[User]
def find_by_email(self, email: str) -> Optional[User]
def update(self, user: User) -> User
def delete(self, user_id: int) -> bool
def increment_reports(self, user_id: int) -> bool
ReportRepository (Interface)
class ReportRepository:
def save(self, report: Report) -> Report
def find_by_id(self, id_reporte: str) -> Optional[Report]
def find_by_user(self, id_usuario: int) -> List[Report]
def update(self, report: Report) -> Report
def delete(self, id_reporte: str) -> bool
def update_visibility(self, id_reporte: str, visibilidad: float) -> bool
Services / Use Cases (src/application/services/)
User Services:
CreateUser- Valida datos y envía evento a RabbitMQGetUser- Recupera usuario de la BDUpdateUser- Actualiza usuarioDeleteUser- Elimina usuario
Report Services:
CreateReport- Valida usuario y reporte, envía evento a RabbitMQGetReport- Recupera reporteUpdateReportVisibility- Actualiza puntuación comunitariaDeleteReport- Elimina reporte
Ejemplo de CreateReport:
class CreateReport:
def __init__(self, repo: ReportRepository, user_repo: UserRepository):
self.repo = repo
self.user_repo = user_repo
def execute(self, id_usuario: int, tipo_reporte: int, descripcion: str, ...):
# 1. Validar que usuario existe
if not self.user_repo.find_by_id(id_usuario):
return {"status": "error", "message": "Usuario no encontrado"}
# 2. Validar descripción
if not descripcion.strip():
return {"status": "error", "message": "Descripción requerida"}
# 3. Crear mensaje de evento
message = ReportMessage(
event_type=ReportEventType.CREATE,
id_usuario=id_usuario,
tipo_reporte=tipo_reporte,
descripcion=descripcion,
...
)
# 4. Enviar a RabbitMQ
send_to_queue(message)
return {"status": "success", "message": "Reporte enviado a procesar"}
Infrastructure Layer (src/infrastructure/)
API Handlers (src/infrastructure/api/)
Users API (puerto 8000)
users.py- Endpoints REST para CRUD de usuariosauth_service.py- Autenticación JWTschemas.py- Esquemas Pydantic para validaciónrouter.py- Rutas de FastAPIapp.py- Aplicación FastAPI
Reports API (puerto 8001)
reports.py- Endpoints REST para CRUD de reportesschemas.py- Esquemas Pydanticrouter.py- Rutasapp.py- Aplicación FastAPI
Persistence Adapters (src/infrastructure/adapters/persistence/)
user_repository_sql.py (Implementación de UserRepository)
- Usa SQLAlchemy ORM
- Implementa todas las operaciones CRUD en MySQL
- Convierte entre modelo de dominio y modelo de BD
report_repository_mongo.py (Implementación de ReportRepository)
- Usa PyMongo para MongoDB
- Implementa todas las operaciones CRUD
- Convierte entre modelo de dominio y documento MongoDB
db.py
- Configuración de conexiones
- Instancias de motor de BD
models.py
- Modelos SQLAlchemy para MySQL
- Esquemas para MongoDB
RabbitMQ Adapters (src/infrastructure/adapters/rabbitmq/)
sender.py
- Función
send_to_queue()para enviar mensajes - Serialización de objetos a JSON
consumer.py (Base)
- Clase
RabbitMQConsumerpara consumir mensajes - Conexión y suscripción a colas
- Manejo de excepciones
messages.py
UserMessage- Schema para eventos de usuarioReportMessage- Schema para eventos de reporteUserEventType- Enumeración de tipos de eventosReportEventType- Enumeración de tipos de eventos
File Storage (src/infrastructure/adapters/file_storage.py)
image_storage- Manejo de almacenamiento de imágenes- Convierte imágenes a formato WebP
- Almacena en
/storage/reports_images/ - Limpieza automática si reporte es eliminado
Consumers (src/consumers/)
Ejecutan como threads separados en paralelo con las APIs.
User Consumer (src/consumers/user_consumer.py)
class UserConsumer:
def __init__(self):
self.repo = UserRepositorySQL()
self.consumer = RabbitMQConsumer(queue_name='users_queue')
self.consumer.set_callback(self.process_message)
def process_message(self, message_dict: dict):
# Reconvertir a UserMessage
message = UserMessage.from_dict(message_dict)
if message.event_type == UserEventType.CREATE:
self._handle_create_user(message)
elif message.event_type == UserEventType.UPDATE:
self._handle_update_user(message)
# ... etc
def start(self):
# Inicia la conexión y escucha
self.consumer.start_consuming()
Report Consumer (src/consumers/report_consumer.py)
Similar a User Consumer, pero:
- Escucha
reports_queue - Procesa
ReportMessageconReportEventType - Interactúa con
ReportRepositoryMongoyUserRepositorySQL - Maneja almacenamiento de imágenes
Core Layer (src/core/)
config.py - Configuración centralizada:
- Variables de entorno
- Configuración de logging
- Configuración de conexiones de BD
- Configuración de RabbitMQ
- Parámetros de JWT
Punto de Entrada (src/main.py)
Orquesta todos los componentes en paralelo:
def run():
"""Inicia los 4 componentes en threads separados"""
users_thread = threading.Thread(target=run_users_api)
reports_thread = threading.Thread(target=run_reports_api)
user_consumer_thread = threading.Thread(target=run_user_consumer)
report_consumer_thread = threading.Thread(target=run_reports_consumer)
users_thread.start()
reports_thread.start()
user_consumer_thread.start()
report_consumer_thread.start()
Esto inicia:
- API de Usuarios en puerto 8000
- API de Reportes en puerto 8001
- User Consumer escuchando
users_queue - Report Consumer escuchando
reports_queue
Diagrama de Despliegue
┌─────────────────────────────────────────────────────────────┐
│ Docker Container (VoxPopuli) │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ main.py (Orquestador) │ │
│ │ - Lanza 4 threads │ │
│ └───┬──────────┬──────────┬──────────────────────────┬─┘ │
│ │ │ │ │ │
│ ┌───▼──┐ ┌───▼──┐ ┌──▼────┐ ┌─────▼──────┐ │ │
│ │Users │ │Report│ │User │ │Report │ │ │
│ │API │ │API │ │Consume│ │Consumer │ │ │
│ │:8000 │ │:8001 │ │r │ │ │ │ │
│ └───┬──┘ └───┬──┘ └──┬────┘ └─────┬──────┘ │ │
│ │ │ │ │ │ │
└─────┼─────────┼─────────┼────────────┼─────────────┘ │
│ │ │ │ │
└────┬────┴────┬────┴────────────┘ │
│ │ │
┌─────▼──┐ ┌───▼──────┐ │
│RabbitMQ│ │MySQL + │ │
│:5672 │ │MongoDB │ │
└────────┘ └──────────┘ │
Testing
La arquitectura facilita el testing en múltiples niveles:
Unit Tests
- Mockear repositorios para testear Use Cases
- Testear lógica de negocio en Domain Layer
class TestCreateReport:
def test_report_creation(self):
mock_repo = ReportRepositoryMock()
service = CreateReport(mock_repo)
result = service.execute(...)
assert result['status'] == 'success'
Integration Tests
- Usar DB reales de prueba
- Testear flujo completo API → RabbitMQ → Consumer → DB
E2E Tests
- Testear desde HTTP request hasta persistencia
- Validar consumidores procesan eventos correctamente
Ventajas de la Arquitectura Actual
- Bajo Acoplamiento - Servicios se comunican por eventos
- Escalabilidad Horizontal - Consumidores pueden replicarse
- Resiliencia - RabbitMQ reintenta entregas fallidas
- Independencia de BD - Abstractos por puertos
- Testabilidad - Inyección de dependencias
- Mantenibilidad - Capas claramente separadas
- Asincronía - APIs responden rápidamente
- Extensibilidad - Nuevos tipos de eventos fácilmente
Stack Tecnológico
| Componente | Tecnología |
|---|---|
| Framework Web | FastAPI |
| Servidor ASGI | Uvicorn |
| ORM SQL | SQLAlchemy |
| Driver MongoDB | PyMongo |
| Message Queue | RabbitMQ |
| Autenticación | JWT |
| Validación | Pydantic |
| Base Datos SQL | MySQL |
| Base Datos NoSQL | MongoDB |
| Almacenamiento | WebP (imágenes) |
| Concurrencia | threading |
| def save(self, user): | |
| self.users[user.user_id] = user | |
| return user |
Usar en tests
def test_create_user(): mock_repo = UserRepositoryMock() service = CreateUser(mock_repo) user = service.execute(...) assert mock_repo.users[1] is not None
## Escalabilidad
Cada componente puede escalar independientemente:
1. **Usuarios API**: Escalar horizontal con load balancer
2. **Reportes API**: Escalar horizontal con load balancer
3. **MySQL**: Replicación master-slave
4. **MongoDB**: Sharding automático
## Despliegue
Cada microservicio puede desplegarse:
- **Docker**: Contenedores independientes
- **Kubernetes**: Pods independientes
- **Serverless**: Funciones Lambda independientes
## Monitoreo
Cada API expone:
- `/health` - Health check
- `/docs` - Swagger UI
- Logging estructurado
- Métricas por endpoint
## Futuras Mejoras
1. **Autenticación**: JWT tokens en API
2. **Autorización**: RBAC (Role-Based Access Control)
3. **Rate Limiting**: Proteger contra abuso
4. **Caché**: Redis para datos frecuentes
5. **Message Queue**: RabbitMQ para comunicación asíncrona
6. **Logging Centralizado**: ELK Stack
7. **Observabilidad**: Prometheus + Grafana
8. **Tracing**: Jaeger para rastreo de solicitudes