Files
VoxPopuli/ARCHITECTURE.md

564 lines
21 KiB
Markdown

# Arquitectura de VoxPopuli Microservices
## Resumen General
VoxPopuli es un sistema de microservicios **basado en eventos asincronos** con comunicación mediante RabbitMQ. El sistema ejecuta múltiples componentes en paralelo:
- 2 APIs REST independientes (Usuarios y Reportes)
- 2 Consumidores de mensajes para procesamiento asincrónico
- Bases de datos separadas por dominio (MySQL para usuarios, MongoDB para reportes)
## Patrones Arquitectónicos Usados
### 1. Clean Architecture (Arquitectura Limpia)
La aplicación está organizada en capas independientes:
```
┌─────────────────────────────────────────────────────────┐
│ API REST (FastAPI) │
│ Infraestructura API (HTTP Handlers) │
├─────────────────────────────────────────────────────────┤
│ Application Layer (Servicios) │
│ - Use Cases / Application Services │
│ - Lógica de negocio de la aplicación │
├─────────────────────────────────────────────────────────┤
│ Domain Layer (Dominio) │
│ - Entidades de negocio puras │
│ - Reglas de negocio independientes │
├─────────────────────────────────────────────────────────┤
│ Infrastructure Layer (Infraestructura) │
│ - Adaptadores (Repositorios, RabbitMQ) │
│ - Acceso a Datos (MySQL, MongoDB) │
│ - Almacenamiento de archivos │
└─────────────────────────────────────────────────────────┘
```
### 2. Arquitectura Orientada a Eventos (Event-Driven Architecture)
En lugar de procesamiento sincrónico, los servicios se comunican mediante eventos:
```
┌──────────────────────────────────────────────────────────────┐
│ API REST (FastAPI) │
│ (Recibe solicitudes HTTP) │
└────────┬─────────────────────────────────┬───────────────────┘
│ Valida y envía evento │
▼ ▼
┌─────────────────────────────────────────────┐
│ RabbitMQ Message Queue │
│ - users_queue (eventos de usuario) │
│ - reports_queue (eventos de reporte) │
└────────┬────────────────────┬───────────────┘
│ │
▼ ▼
┌─────────────┐ ┌──────────────┐
│ User │ │ Report │
│ Consumer │ │ Consumer │
│ (Thread) │ │ (Thread) │
└─────┬───────┘ └──────┬───────┘
│ Procesa evento │ Procesa evento
▼ ▼
┌──────────────┐ ┌──────────────┐
│ MySQL (BD │ │ MongoDB (BD │
│ de Usuarios) │ │ de Reportes) │
└──────────────┘ └──────────────┘
```
**Ventajas:**
- Bajo acoplamiento entre servicios
- Mayor escalabilidad
- Mejor tolerancia a fallos
- Procesamiento asincrónico
### 3. Microservicios
El sistema consta de dos microservicios independientes que ejecutan en paralelo:
#### **Usuarios API** (Puerto 8000)
- Gestión de usuarios y autenticación
- Base de datos: MySQL con SQLAlchemy
- Endpoints: POST/GET/PUT/DELETE /users/
#### **Reportes API** (Puerto 8001)
- Gestión de reportes comunitarios
- Base de datos: MongoDB
- Endpoints: POST/GET/PUT/DELETE /reports/
- Almacenamiento: Imágenes en WebP (directorio `/storage/reports_images/`)
Cada API:
- Valida solicitudes y envía eventos a RabbitMQ
- Expone documentación automática en `/docs`
- Es independientemente escalable
### 4. Consumidores de Eventos (Message Consumers)
Dos consumidores ejecutan como threads separados:
#### **User Consumer**
- Escucha la cola `users_queue` en RabbitMQ
- Eventos procesados:
- `user.create` → Guarda usuario en MySQL
- `user.update` → Actualiza usuario
- `user.delete` → Elimina usuario
#### **Report Consumer**
- Escucha la cola `reports_queue` en RabbitMQ
- Eventos procesados:
- `report.create` → Guarda reporte en MongoDB, incrementa contador de usuario
- `report.update_visibility` → Actualiza puntuación comunitaria
- `report.delete` → Elimina reporte
**Beneficios:**
- Desacoplamiento de API y procesamiento
- Reintentos automáticos si falla la BD
- Procesamiento en background
### 5. Patrón Repository
Abstracción para acceso a datos:
```
┌──────────────────┐
│ Service Layer │
│ (API Handler) │
└────────┬─────────┘
│ depends on
┌──────────────────────────┐
│ Port (Interface) │
│ UserRepository │
│ ReportRepository │
└────────┬─────────────────┘
│ implemented by
┌──────────────────────────┐
│ Concrete Adapters │
│ UserRepositorySQL │
│ ReportRepositoryMongo │
└──────────────────────────┘
```
**Beneficios:**
- Independencia de la implementación de BD
- Fácil de testear (usar mocks)
- Reutilizable en diferentes contextos
### 6. Inversión de Dependencias (Dependency Inversion)
Los servicios dependen de **abstracciones (interfaces)**, no de implementaciones concretas:
```python
class CreateUser:
def __init__(self, repo: UserRepository): # Depende de interfaz
self.repo = repo
# Puede usar cualquier implementación de UserRepository
service = CreateUser(UserRepositorySQL()) # Implementación SQL
# o
service = CreateUser(UserRepositoryMock()) # Para testing
```
## Flujo de Solicitud
### Crear Usuario (Asincrónico):
```
1. HTTP POST /users/
└─> Validar datos (FastAPI Handler)
└─> Crear objeto UserMessage
└─> Enviar mensaje a RabbitMQ (users_queue)
└─> Retornar respuesta HTTP inmediatamente
[En paralelo, el User Consumer procesa:]
2. User Consumer recibe evento user.create
└─> UserRepositorySQL.save()
└─> SQLAlchemy
└─> MySQL Database
```
### Crear Reporte (Asincrónico):
```
1. HTTP POST /reports/
└─> Validar datos y usuario existe (FastAPI Handler)
└─> Crear objeto ReportMessage
└─> Enviar mensaje a RabbitMQ (reports_queue)
└─> Retornar respuesta HTTP inmediatamente
[En paralelo, el Report Consumer procesa:]
2. Report Consumer recibe evento report.create
└─> ReportRepositoryMongo.save()
└─> UserRepositorySQL.increment_reports(id_usuario)
└─> MongoDB + MySQL Database
```
## Capas Detalladas
### Domain Layer (src/domain/)
**Entidades puras de negocio** (sin dependencias externas):
#### User
```python
@dataclass
class User:
user_id: int
nombre: str
apellido: str
email: str
contraseña_hash: Optional[str] = None
fecha_nacimiento: datetime = None
fecha_creacion: datetime = None
calificacion: float = 50.0 # 0-100
numero_reportes: int = 0
url_foto_perfil: Optional[str] = None
biografia: Optional[str] = None
```
#### Report
```python
@dataclass
class Report:
id_reporte: str
id_usuario: int
tipo_reporte: int # Número que representa el tipo
descripcion: str
ubicacion: Optional[str]
lat: Optional[float] = None
lng: Optional[float] = None
image_filename: Optional[str] = None # WebP format
visibilidad: float = 0.0 # Puntuación comunitaria 0-100
estado: Literal["en proceso", "no resuelto", "resuelto"] = "en proceso"
fecha_creacion: Optional[datetime] = None
```
### Application Layer (src/application/)
Contiene la lógica de negocio encapsulada en **Use Cases** y **Puertos**.
#### Ports (Interfaces - src/application/ports/)
**UserRepository (Interface)**
```python
class UserRepository:
def save(self, user: User) -> User
def find_by_id(self, user_id: int) -> Optional[User]
def find_by_email(self, email: str) -> Optional[User]
def update(self, user: User) -> User
def delete(self, user_id: int) -> bool
def increment_reports(self, user_id: int) -> bool
```
**ReportRepository (Interface)**
```python
class ReportRepository:
def save(self, report: Report) -> Report
def find_by_id(self, id_reporte: str) -> Optional[Report]
def find_by_user(self, id_usuario: int) -> List[Report]
def update(self, report: Report) -> Report
def delete(self, id_reporte: str) -> bool
def update_visibility(self, id_reporte: str, visibilidad: float) -> bool
```
#### Services / Use Cases (src/application/services/)
**User Services:**
- `CreateUser` - Valida datos y envía evento a RabbitMQ
- `GetUser` - Recupera usuario de la BD
- `UpdateUser` - Actualiza usuario
- `DeleteUser` - Elimina usuario
**Report Services:**
- `CreateReport` - Valida usuario y reporte, envía evento a RabbitMQ
- `GetReport` - Recupera reporte
- `UpdateReportVisibility` - Actualiza puntuación comunitaria
- `DeleteReport` - Elimina reporte
**Ejemplo de CreateReport:**
```python
class CreateReport:
def __init__(self, repo: ReportRepository, user_repo: UserRepository):
self.repo = repo
self.user_repo = user_repo
def execute(self, id_usuario: int, tipo_reporte: int, descripcion: str, ...):
# 1. Validar que usuario existe
if not self.user_repo.find_by_id(id_usuario):
return {"status": "error", "message": "Usuario no encontrado"}
# 2. Validar descripción
if not descripcion.strip():
return {"status": "error", "message": "Descripción requerida"}
# 3. Crear mensaje de evento
message = ReportMessage(
event_type=ReportEventType.CREATE,
id_usuario=id_usuario,
tipo_reporte=tipo_reporte,
descripcion=descripcion,
...
)
# 4. Enviar a RabbitMQ
send_to_queue(message)
return {"status": "success", "message": "Reporte enviado a procesar"}
```
### Infrastructure Layer (src/infrastructure/)
#### API Handlers (src/infrastructure/api/)
**Users API (puerto 8000)**
- `users.py` - Endpoints REST para CRUD de usuarios
- `auth_service.py` - Autenticación JWT
- `schemas.py` - Esquemas Pydantic para validación
- `router.py` - Rutas de FastAPI
- `app.py` - Aplicación FastAPI
**Reports API (puerto 8001)**
- `reports.py` - Endpoints REST para CRUD de reportes
- `schemas.py` - Esquemas Pydantic
- `router.py` - Rutas
- `app.py` - Aplicación FastAPI
#### Persistence Adapters (src/infrastructure/adapters/persistence/)
**user_repository_sql.py** (Implementación de UserRepository)
- Usa SQLAlchemy ORM
- Implementa todas las operaciones CRUD en MySQL
- Convierte entre modelo de dominio y modelo de BD
**report_repository_mongo.py** (Implementación de ReportRepository)
- Usa PyMongo para MongoDB
- Implementa todas las operaciones CRUD
- Convierte entre modelo de dominio y documento MongoDB
**db.py**
- Configuración de conexiones
- Instancias de motor de BD
**models.py**
- Modelos SQLAlchemy para MySQL
- Esquemas para MongoDB
#### RabbitMQ Adapters (src/infrastructure/adapters/rabbitmq/)
**sender.py**
- Función `send_to_queue()` para enviar mensajes
- Serialización de objetos a JSON
**consumer.py** (Base)
- Clase `RabbitMQConsumer` para consumir mensajes
- Conexión y suscripción a colas
- Manejo de excepciones
**messages.py**
- `UserMessage` - Schema para eventos de usuario
- `ReportMessage` - Schema para eventos de reporte
- `UserEventType` - Enumeración de tipos de eventos
- `ReportEventType` - Enumeración de tipos de eventos
#### File Storage (src/infrastructure/adapters/file_storage.py)
- `image_storage` - Manejo de almacenamiento de imágenes
- Convierte imágenes a formato WebP
- Almacena en `/storage/reports_images/`
- Limpieza automática si reporte es eliminado
### Consumers (src/consumers/)
Ejecutan como threads separados en paralelo con las APIs.
#### User Consumer (src/consumers/user_consumer.py)
```python
class UserConsumer:
def __init__(self):
self.repo = UserRepositorySQL()
self.consumer = RabbitMQConsumer(queue_name='users_queue')
self.consumer.set_callback(self.process_message)
def process_message(self, message_dict: dict):
# Reconvertir a UserMessage
message = UserMessage.from_dict(message_dict)
if message.event_type == UserEventType.CREATE:
self._handle_create_user(message)
elif message.event_type == UserEventType.UPDATE:
self._handle_update_user(message)
# ... etc
def start(self):
# Inicia la conexión y escucha
self.consumer.start_consuming()
```
#### Report Consumer (src/consumers/report_consumer.py)
Similar a User Consumer, pero:
- Escucha `reports_queue`
- Procesa `ReportMessage` con `ReportEventType`
- Interactúa con `ReportRepositoryMongo` y `UserRepositorySQL`
- Maneja almacenamiento de imágenes
### Core Layer (src/core/)
**config.py** - Configuración centralizada:
- Variables de entorno
- Configuración de logging
- Configuración de conexiones de BD
- Configuración de RabbitMQ
- Parámetros de JWT
## Punto de Entrada (src/main.py)
Orquesta todos los componentes en paralelo:
```python
def run():
"""Inicia los 4 componentes en threads separados"""
users_thread = threading.Thread(target=run_users_api)
reports_thread = threading.Thread(target=run_reports_api)
user_consumer_thread = threading.Thread(target=run_user_consumer)
report_consumer_thread = threading.Thread(target=run_reports_consumer)
users_thread.start()
reports_thread.start()
user_consumer_thread.start()
report_consumer_thread.start()
```
Esto inicia:
- API de Usuarios en puerto 8000
- API de Reportes en puerto 8001
- User Consumer escuchando `users_queue`
- Report Consumer escuchando `reports_queue`
## Diagrama de Despliegue
```
┌─────────────────────────────────────────────────────────────┐
│ Docker Container (VoxPopuli) │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ main.py (Orquestador) │ │
│ │ - Lanza 4 threads │ │
│ └───┬──────────┬──────────┬──────────────────────────┬─┘ │
│ │ │ │ │ │
│ ┌───▼──┐ ┌───▼──┐ ┌──▼────┐ ┌─────▼──────┐ │ │
│ │Users │ │Report│ │User │ │Report │ │ │
│ │API │ │API │ │Consume│ │Consumer │ │ │
│ │:8000 │ │:8001 │ │r │ │ │ │ │
│ └───┬──┘ └───┬──┘ └──┬────┘ └─────┬──────┘ │ │
│ │ │ │ │ │ │
└─────┼─────────┼─────────┼────────────┼─────────────┘ │
│ │ │ │ │
└────┬────┴────┬────┴────────────┘ │
│ │ │
┌─────▼──┐ ┌───▼──────┐ │
│RabbitMQ│ │MySQL + │ │
│:5672 │ │MongoDB │ │
└────────┘ └──────────┘ │
```
## Testing
La arquitectura facilita el testing en múltiples niveles:
### Unit Tests
- Mockear repositorios para testear Use Cases
- Testear lógica de negocio en Domain Layer
```python
class TestCreateReport:
def test_report_creation(self):
mock_repo = ReportRepositoryMock()
service = CreateReport(mock_repo)
result = service.execute(...)
assert result['status'] == 'success'
```
### Integration Tests
- Usar DB reales de prueba
- Testear flujo completo API → RabbitMQ → Consumer → DB
### E2E Tests
- Testear desde HTTP request hasta persistencia
- Validar consumidores procesan eventos correctamente
## Ventajas de la Arquitectura Actual
1. **Bajo Acoplamiento** - Servicios se comunican por eventos
2. **Escalabilidad Horizontal** - Consumidores pueden replicarse
3. **Resiliencia** - RabbitMQ reintenta entregas fallidas
4. **Independencia de BD** - Abstractos por puertos
5. **Testabilidad** - Inyección de dependencias
6. **Mantenibilidad** - Capas claramente separadas
7. **Asincronía** - APIs responden rápidamente
8. **Extensibilidad** - Nuevos tipos de eventos fácilmente
## Stack Tecnológico
| Componente | Tecnología |
|-----------|-----------|
| Framework Web | FastAPI |
| Servidor ASGI | Uvicorn |
| ORM SQL | SQLAlchemy |
| Driver MongoDB | PyMongo |
| Message Queue | RabbitMQ |
| Autenticación | JWT |
| Validación | Pydantic |
| Base Datos SQL | MySQL |
| Base Datos NoSQL | MongoDB |
| Almacenamiento | WebP (imágenes) |
| Concurrencia | threading |
def save(self, user):
self.users[user.user_id] = user
return user
# Usar en tests
def test_create_user():
mock_repo = UserRepositoryMock()
service = CreateUser(mock_repo)
user = service.execute(...)
assert mock_repo.users[1] is not None
```
## Escalabilidad
Cada componente puede escalar independientemente:
1. **Usuarios API**: Escalar horizontal con load balancer
2. **Reportes API**: Escalar horizontal con load balancer
3. **MySQL**: Replicación master-slave
4. **MongoDB**: Sharding automático
## Despliegue
Cada microservicio puede desplegarse:
- **Docker**: Contenedores independientes
- **Kubernetes**: Pods independientes
- **Serverless**: Funciones Lambda independientes
## Monitoreo
Cada API expone:
- `/health` - Health check
- `/docs` - Swagger UI
- Logging estructurado
- Métricas por endpoint
## Futuras Mejoras
1. **Autenticación**: JWT tokens en API
2. **Autorización**: RBAC (Role-Based Access Control)
3. **Rate Limiting**: Proteger contra abuso
4. **Caché**: Redis para datos frecuentes
5. **Message Queue**: RabbitMQ para comunicación asíncrona
6. **Logging Centralizado**: ELK Stack
7. **Observabilidad**: Prometheus + Grafana
8. **Tracing**: Jaeger para rastreo de solicitudes