564 lines
21 KiB
Markdown
564 lines
21 KiB
Markdown
# Arquitectura de VoxPopuli Microservices
|
|
|
|
## Resumen General
|
|
|
|
VoxPopuli es un sistema de microservicios **basado en eventos asincronos** con comunicación mediante RabbitMQ. El sistema ejecuta múltiples componentes en paralelo:
|
|
- 2 APIs REST independientes (Usuarios y Reportes)
|
|
- 2 Consumidores de mensajes para procesamiento asincrónico
|
|
- Bases de datos separadas por dominio (MySQL para usuarios, MongoDB para reportes)
|
|
|
|
## Patrones Arquitectónicos Usados
|
|
|
|
### 1. Clean Architecture (Arquitectura Limpia)
|
|
|
|
La aplicación está organizada en capas independientes:
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ API REST (FastAPI) │
|
|
│ Infraestructura API (HTTP Handlers) │
|
|
├─────────────────────────────────────────────────────────┤
|
|
│ Application Layer (Servicios) │
|
|
│ - Use Cases / Application Services │
|
|
│ - Lógica de negocio de la aplicación │
|
|
├─────────────────────────────────────────────────────────┤
|
|
│ Domain Layer (Dominio) │
|
|
│ - Entidades de negocio puras │
|
|
│ - Reglas de negocio independientes │
|
|
├─────────────────────────────────────────────────────────┤
|
|
│ Infrastructure Layer (Infraestructura) │
|
|
│ - Adaptadores (Repositorios, RabbitMQ) │
|
|
│ - Acceso a Datos (MySQL, MongoDB) │
|
|
│ - Almacenamiento de archivos │
|
|
└─────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
### 2. Arquitectura Orientada a Eventos (Event-Driven Architecture)
|
|
|
|
En lugar de procesamiento sincrónico, los servicios se comunican mediante eventos:
|
|
|
|
```
|
|
┌──────────────────────────────────────────────────────────────┐
|
|
│ API REST (FastAPI) │
|
|
│ (Recibe solicitudes HTTP) │
|
|
└────────┬─────────────────────────────────┬───────────────────┘
|
|
│ Valida y envía evento │
|
|
▼ ▼
|
|
┌─────────────────────────────────────────────┐
|
|
│ RabbitMQ Message Queue │
|
|
│ - users_queue (eventos de usuario) │
|
|
│ - reports_queue (eventos de reporte) │
|
|
└────────┬────────────────────┬───────────────┘
|
|
│ │
|
|
▼ ▼
|
|
┌─────────────┐ ┌──────────────┐
|
|
│ User │ │ Report │
|
|
│ Consumer │ │ Consumer │
|
|
│ (Thread) │ │ (Thread) │
|
|
└─────┬───────┘ └──────┬───────┘
|
|
│ Procesa evento │ Procesa evento
|
|
▼ ▼
|
|
┌──────────────┐ ┌──────────────┐
|
|
│ MySQL (BD │ │ MongoDB (BD │
|
|
│ de Usuarios) │ │ de Reportes) │
|
|
└──────────────┘ └──────────────┘
|
|
```
|
|
|
|
**Ventajas:**
|
|
- Bajo acoplamiento entre servicios
|
|
- Mayor escalabilidad
|
|
- Mejor tolerancia a fallos
|
|
- Procesamiento asincrónico
|
|
|
|
### 3. Microservicios
|
|
|
|
El sistema consta de dos microservicios independientes que ejecutan en paralelo:
|
|
|
|
#### **Usuarios API** (Puerto 8000)
|
|
- Gestión de usuarios y autenticación
|
|
- Base de datos: MySQL con SQLAlchemy
|
|
- Endpoints: POST/GET/PUT/DELETE /users/
|
|
|
|
#### **Reportes API** (Puerto 8001)
|
|
- Gestión de reportes comunitarios
|
|
- Base de datos: MongoDB
|
|
- Endpoints: POST/GET/PUT/DELETE /reports/
|
|
- Almacenamiento: Imágenes en WebP (directorio `/storage/reports_images/`)
|
|
|
|
Cada API:
|
|
- Valida solicitudes y envía eventos a RabbitMQ
|
|
- Expone documentación automática en `/docs`
|
|
- Es independientemente escalable
|
|
|
|
### 4. Consumidores de Eventos (Message Consumers)
|
|
|
|
Dos consumidores ejecutan como threads separados:
|
|
|
|
#### **User Consumer**
|
|
- Escucha la cola `users_queue` en RabbitMQ
|
|
- Eventos procesados:
|
|
- `user.create` → Guarda usuario en MySQL
|
|
- `user.update` → Actualiza usuario
|
|
- `user.delete` → Elimina usuario
|
|
|
|
#### **Report Consumer**
|
|
- Escucha la cola `reports_queue` en RabbitMQ
|
|
- Eventos procesados:
|
|
- `report.create` → Guarda reporte en MongoDB, incrementa contador de usuario
|
|
- `report.update_visibility` → Actualiza puntuación comunitaria
|
|
- `report.delete` → Elimina reporte
|
|
|
|
**Beneficios:**
|
|
- Desacoplamiento de API y procesamiento
|
|
- Reintentos automáticos si falla la BD
|
|
- Procesamiento en background
|
|
|
|
### 5. Patrón Repository
|
|
|
|
Abstracción para acceso a datos:
|
|
|
|
```
|
|
┌──────────────────┐
|
|
│ Service Layer │
|
|
│ (API Handler) │
|
|
└────────┬─────────┘
|
|
│ depends on
|
|
▼
|
|
┌──────────────────────────┐
|
|
│ Port (Interface) │
|
|
│ UserRepository │
|
|
│ ReportRepository │
|
|
└────────┬─────────────────┘
|
|
│ implemented by
|
|
▼
|
|
┌──────────────────────────┐
|
|
│ Concrete Adapters │
|
|
│ UserRepositorySQL │
|
|
│ ReportRepositoryMongo │
|
|
└──────────────────────────┘
|
|
```
|
|
|
|
**Beneficios:**
|
|
- Independencia de la implementación de BD
|
|
- Fácil de testear (usar mocks)
|
|
- Reutilizable en diferentes contextos
|
|
|
|
### 6. Inversión de Dependencias (Dependency Inversion)
|
|
|
|
Los servicios dependen de **abstracciones (interfaces)**, no de implementaciones concretas:
|
|
|
|
```python
|
|
class CreateUser:
|
|
def __init__(self, repo: UserRepository): # Depende de interfaz
|
|
self.repo = repo
|
|
|
|
# Puede usar cualquier implementación de UserRepository
|
|
service = CreateUser(UserRepositorySQL()) # Implementación SQL
|
|
# o
|
|
service = CreateUser(UserRepositoryMock()) # Para testing
|
|
```
|
|
|
|
## Flujo de Solicitud
|
|
|
|
### Crear Usuario (Asincrónico):
|
|
|
|
```
|
|
1. HTTP POST /users/
|
|
└─> Validar datos (FastAPI Handler)
|
|
└─> Crear objeto UserMessage
|
|
└─> Enviar mensaje a RabbitMQ (users_queue)
|
|
└─> Retornar respuesta HTTP inmediatamente
|
|
|
|
[En paralelo, el User Consumer procesa:]
|
|
2. User Consumer recibe evento user.create
|
|
└─> UserRepositorySQL.save()
|
|
└─> SQLAlchemy
|
|
└─> MySQL Database
|
|
```
|
|
|
|
### Crear Reporte (Asincrónico):
|
|
|
|
```
|
|
1. HTTP POST /reports/
|
|
└─> Validar datos y usuario existe (FastAPI Handler)
|
|
└─> Crear objeto ReportMessage
|
|
└─> Enviar mensaje a RabbitMQ (reports_queue)
|
|
└─> Retornar respuesta HTTP inmediatamente
|
|
|
|
[En paralelo, el Report Consumer procesa:]
|
|
2. Report Consumer recibe evento report.create
|
|
└─> ReportRepositoryMongo.save()
|
|
└─> UserRepositorySQL.increment_reports(id_usuario)
|
|
└─> MongoDB + MySQL Database
|
|
```
|
|
|
|
## Capas Detalladas
|
|
|
|
### Domain Layer (src/domain/)
|
|
|
|
**Entidades puras de negocio** (sin dependencias externas):
|
|
|
|
#### User
|
|
```python
|
|
@dataclass
|
|
class User:
|
|
user_id: int
|
|
nombre: str
|
|
apellido: str
|
|
email: str
|
|
contraseña_hash: Optional[str] = None
|
|
fecha_nacimiento: datetime = None
|
|
fecha_creacion: datetime = None
|
|
calificacion: float = 50.0 # 0-100
|
|
numero_reportes: int = 0
|
|
url_foto_perfil: Optional[str] = None
|
|
biografia: Optional[str] = None
|
|
```
|
|
|
|
#### Report
|
|
```python
|
|
@dataclass
|
|
class Report:
|
|
id_reporte: str
|
|
id_usuario: int
|
|
tipo_reporte: int # Número que representa el tipo
|
|
descripcion: str
|
|
ubicacion: Optional[str]
|
|
lat: Optional[float] = None
|
|
lng: Optional[float] = None
|
|
image_filename: Optional[str] = None # WebP format
|
|
visibilidad: float = 0.0 # Puntuación comunitaria 0-100
|
|
estado: Literal["en proceso", "no resuelto", "resuelto"] = "en proceso"
|
|
fecha_creacion: Optional[datetime] = None
|
|
```
|
|
|
|
### Application Layer (src/application/)
|
|
|
|
Contiene la lógica de negocio encapsulada en **Use Cases** y **Puertos**.
|
|
|
|
#### Ports (Interfaces - src/application/ports/)
|
|
|
|
**UserRepository (Interface)**
|
|
```python
|
|
class UserRepository:
|
|
def save(self, user: User) -> User
|
|
def find_by_id(self, user_id: int) -> Optional[User]
|
|
def find_by_email(self, email: str) -> Optional[User]
|
|
def update(self, user: User) -> User
|
|
def delete(self, user_id: int) -> bool
|
|
def increment_reports(self, user_id: int) -> bool
|
|
```
|
|
|
|
**ReportRepository (Interface)**
|
|
```python
|
|
class ReportRepository:
|
|
def save(self, report: Report) -> Report
|
|
def find_by_id(self, id_reporte: str) -> Optional[Report]
|
|
def find_by_user(self, id_usuario: int) -> List[Report]
|
|
def update(self, report: Report) -> Report
|
|
def delete(self, id_reporte: str) -> bool
|
|
def update_visibility(self, id_reporte: str, visibilidad: float) -> bool
|
|
```
|
|
|
|
#### Services / Use Cases (src/application/services/)
|
|
|
|
**User Services:**
|
|
- `CreateUser` - Valida datos y envía evento a RabbitMQ
|
|
- `GetUser` - Recupera usuario de la BD
|
|
- `UpdateUser` - Actualiza usuario
|
|
- `DeleteUser` - Elimina usuario
|
|
|
|
**Report Services:**
|
|
- `CreateReport` - Valida usuario y reporte, envía evento a RabbitMQ
|
|
- `GetReport` - Recupera reporte
|
|
- `UpdateReportVisibility` - Actualiza puntuación comunitaria
|
|
- `DeleteReport` - Elimina reporte
|
|
|
|
**Ejemplo de CreateReport:**
|
|
```python
|
|
class CreateReport:
|
|
def __init__(self, repo: ReportRepository, user_repo: UserRepository):
|
|
self.repo = repo
|
|
self.user_repo = user_repo
|
|
|
|
def execute(self, id_usuario: int, tipo_reporte: int, descripcion: str, ...):
|
|
# 1. Validar que usuario existe
|
|
if not self.user_repo.find_by_id(id_usuario):
|
|
return {"status": "error", "message": "Usuario no encontrado"}
|
|
|
|
# 2. Validar descripción
|
|
if not descripcion.strip():
|
|
return {"status": "error", "message": "Descripción requerida"}
|
|
|
|
# 3. Crear mensaje de evento
|
|
message = ReportMessage(
|
|
event_type=ReportEventType.CREATE,
|
|
id_usuario=id_usuario,
|
|
tipo_reporte=tipo_reporte,
|
|
descripcion=descripcion,
|
|
...
|
|
)
|
|
|
|
# 4. Enviar a RabbitMQ
|
|
send_to_queue(message)
|
|
|
|
return {"status": "success", "message": "Reporte enviado a procesar"}
|
|
```
|
|
|
|
### Infrastructure Layer (src/infrastructure/)
|
|
|
|
#### API Handlers (src/infrastructure/api/)
|
|
|
|
**Users API (puerto 8000)**
|
|
- `users.py` - Endpoints REST para CRUD de usuarios
|
|
- `auth_service.py` - Autenticación JWT
|
|
- `schemas.py` - Esquemas Pydantic para validación
|
|
- `router.py` - Rutas de FastAPI
|
|
- `app.py` - Aplicación FastAPI
|
|
|
|
**Reports API (puerto 8001)**
|
|
- `reports.py` - Endpoints REST para CRUD de reportes
|
|
- `schemas.py` - Esquemas Pydantic
|
|
- `router.py` - Rutas
|
|
- `app.py` - Aplicación FastAPI
|
|
|
|
#### Persistence Adapters (src/infrastructure/adapters/persistence/)
|
|
|
|
**user_repository_sql.py** (Implementación de UserRepository)
|
|
- Usa SQLAlchemy ORM
|
|
- Implementa todas las operaciones CRUD en MySQL
|
|
- Convierte entre modelo de dominio y modelo de BD
|
|
|
|
**report_repository_mongo.py** (Implementación de ReportRepository)
|
|
- Usa PyMongo para MongoDB
|
|
- Implementa todas las operaciones CRUD
|
|
- Convierte entre modelo de dominio y documento MongoDB
|
|
|
|
**db.py**
|
|
- Configuración de conexiones
|
|
- Instancias de motor de BD
|
|
|
|
**models.py**
|
|
- Modelos SQLAlchemy para MySQL
|
|
- Esquemas para MongoDB
|
|
|
|
#### RabbitMQ Adapters (src/infrastructure/adapters/rabbitmq/)
|
|
|
|
**sender.py**
|
|
- Función `send_to_queue()` para enviar mensajes
|
|
- Serialización de objetos a JSON
|
|
|
|
**consumer.py** (Base)
|
|
- Clase `RabbitMQConsumer` para consumir mensajes
|
|
- Conexión y suscripción a colas
|
|
- Manejo de excepciones
|
|
|
|
**messages.py**
|
|
- `UserMessage` - Schema para eventos de usuario
|
|
- `ReportMessage` - Schema para eventos de reporte
|
|
- `UserEventType` - Enumeración de tipos de eventos
|
|
- `ReportEventType` - Enumeración de tipos de eventos
|
|
|
|
#### File Storage (src/infrastructure/adapters/file_storage.py)
|
|
|
|
- `image_storage` - Manejo de almacenamiento de imágenes
|
|
- Convierte imágenes a formato WebP
|
|
- Almacena en `/storage/reports_images/`
|
|
- Limpieza automática si reporte es eliminado
|
|
|
|
### Consumers (src/consumers/)
|
|
|
|
Ejecutan como threads separados en paralelo con las APIs.
|
|
|
|
#### User Consumer (src/consumers/user_consumer.py)
|
|
|
|
```python
|
|
class UserConsumer:
|
|
def __init__(self):
|
|
self.repo = UserRepositorySQL()
|
|
self.consumer = RabbitMQConsumer(queue_name='users_queue')
|
|
self.consumer.set_callback(self.process_message)
|
|
|
|
def process_message(self, message_dict: dict):
|
|
# Reconvertir a UserMessage
|
|
message = UserMessage.from_dict(message_dict)
|
|
|
|
if message.event_type == UserEventType.CREATE:
|
|
self._handle_create_user(message)
|
|
elif message.event_type == UserEventType.UPDATE:
|
|
self._handle_update_user(message)
|
|
# ... etc
|
|
|
|
def start(self):
|
|
# Inicia la conexión y escucha
|
|
self.consumer.start_consuming()
|
|
```
|
|
|
|
#### Report Consumer (src/consumers/report_consumer.py)
|
|
|
|
Similar a User Consumer, pero:
|
|
- Escucha `reports_queue`
|
|
- Procesa `ReportMessage` con `ReportEventType`
|
|
- Interactúa con `ReportRepositoryMongo` y `UserRepositorySQL`
|
|
- Maneja almacenamiento de imágenes
|
|
|
|
### Core Layer (src/core/)
|
|
|
|
**config.py** - Configuración centralizada:
|
|
- Variables de entorno
|
|
- Configuración de logging
|
|
- Configuración de conexiones de BD
|
|
- Configuración de RabbitMQ
|
|
- Parámetros de JWT
|
|
|
|
## Punto de Entrada (src/main.py)
|
|
|
|
Orquesta todos los componentes en paralelo:
|
|
|
|
```python
|
|
def run():
|
|
"""Inicia los 4 componentes en threads separados"""
|
|
users_thread = threading.Thread(target=run_users_api)
|
|
reports_thread = threading.Thread(target=run_reports_api)
|
|
user_consumer_thread = threading.Thread(target=run_user_consumer)
|
|
report_consumer_thread = threading.Thread(target=run_reports_consumer)
|
|
|
|
users_thread.start()
|
|
reports_thread.start()
|
|
user_consumer_thread.start()
|
|
report_consumer_thread.start()
|
|
```
|
|
|
|
Esto inicia:
|
|
- API de Usuarios en puerto 8000
|
|
- API de Reportes en puerto 8001
|
|
- User Consumer escuchando `users_queue`
|
|
- Report Consumer escuchando `reports_queue`
|
|
|
|
## Diagrama de Despliegue
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ Docker Container (VoxPopuli) │
|
|
├─────────────────────────────────────────────────────────────┤
|
|
│ │
|
|
│ ┌──────────────────────────────────────────────────────┐ │
|
|
│ │ main.py (Orquestador) │ │
|
|
│ │ - Lanza 4 threads │ │
|
|
│ └───┬──────────┬──────────┬──────────────────────────┬─┘ │
|
|
│ │ │ │ │ │
|
|
│ ┌───▼──┐ ┌───▼──┐ ┌──▼────┐ ┌─────▼──────┐ │ │
|
|
│ │Users │ │Report│ │User │ │Report │ │ │
|
|
│ │API │ │API │ │Consume│ │Consumer │ │ │
|
|
│ │:8000 │ │:8001 │ │r │ │ │ │ │
|
|
│ └───┬──┘ └───┬──┘ └──┬────┘ └─────┬──────┘ │ │
|
|
│ │ │ │ │ │ │
|
|
└─────┼─────────┼─────────┼────────────┼─────────────┘ │
|
|
│ │ │ │ │
|
|
└────┬────┴────┬────┴────────────┘ │
|
|
│ │ │
|
|
┌─────▼──┐ ┌───▼──────┐ │
|
|
│RabbitMQ│ │MySQL + │ │
|
|
│:5672 │ │MongoDB │ │
|
|
└────────┘ └──────────┘ │
|
|
```
|
|
|
|
## Testing
|
|
|
|
La arquitectura facilita el testing en múltiples niveles:
|
|
|
|
### Unit Tests
|
|
- Mockear repositorios para testear Use Cases
|
|
- Testear lógica de negocio en Domain Layer
|
|
|
|
```python
|
|
class TestCreateReport:
|
|
def test_report_creation(self):
|
|
mock_repo = ReportRepositoryMock()
|
|
service = CreateReport(mock_repo)
|
|
result = service.execute(...)
|
|
assert result['status'] == 'success'
|
|
```
|
|
|
|
### Integration Tests
|
|
- Usar DB reales de prueba
|
|
- Testear flujo completo API → RabbitMQ → Consumer → DB
|
|
|
|
### E2E Tests
|
|
- Testear desde HTTP request hasta persistencia
|
|
- Validar consumidores procesan eventos correctamente
|
|
|
|
## Ventajas de la Arquitectura Actual
|
|
|
|
1. **Bajo Acoplamiento** - Servicios se comunican por eventos
|
|
2. **Escalabilidad Horizontal** - Consumidores pueden replicarse
|
|
3. **Resiliencia** - RabbitMQ reintenta entregas fallidas
|
|
4. **Independencia de BD** - Abstractos por puertos
|
|
5. **Testabilidad** - Inyección de dependencias
|
|
6. **Mantenibilidad** - Capas claramente separadas
|
|
7. **Asincronía** - APIs responden rápidamente
|
|
8. **Extensibilidad** - Nuevos tipos de eventos fácilmente
|
|
|
|
## Stack Tecnológico
|
|
|
|
| Componente | Tecnología |
|
|
|-----------|-----------|
|
|
| Framework Web | FastAPI |
|
|
| Servidor ASGI | Uvicorn |
|
|
| ORM SQL | SQLAlchemy |
|
|
| Driver MongoDB | PyMongo |
|
|
| Message Queue | RabbitMQ |
|
|
| Autenticación | JWT |
|
|
| Validación | Pydantic |
|
|
| Base Datos SQL | MySQL |
|
|
| Base Datos NoSQL | MongoDB |
|
|
| Almacenamiento | WebP (imágenes) |
|
|
| Concurrencia | threading |
|
|
def save(self, user):
|
|
self.users[user.user_id] = user
|
|
return user
|
|
|
|
# Usar en tests
|
|
def test_create_user():
|
|
mock_repo = UserRepositoryMock()
|
|
service = CreateUser(mock_repo)
|
|
user = service.execute(...)
|
|
assert mock_repo.users[1] is not None
|
|
```
|
|
|
|
## Escalabilidad
|
|
|
|
Cada componente puede escalar independientemente:
|
|
|
|
1. **Usuarios API**: Escalar horizontal con load balancer
|
|
2. **Reportes API**: Escalar horizontal con load balancer
|
|
3. **MySQL**: Replicación master-slave
|
|
4. **MongoDB**: Sharding automático
|
|
|
|
## Despliegue
|
|
|
|
Cada microservicio puede desplegarse:
|
|
|
|
- **Docker**: Contenedores independientes
|
|
- **Kubernetes**: Pods independientes
|
|
- **Serverless**: Funciones Lambda independientes
|
|
|
|
## Monitoreo
|
|
|
|
Cada API expone:
|
|
- `/health` - Health check
|
|
- `/docs` - Swagger UI
|
|
- Logging estructurado
|
|
- Métricas por endpoint
|
|
|
|
## Futuras Mejoras
|
|
|
|
1. **Autenticación**: JWT tokens en API
|
|
2. **Autorización**: RBAC (Role-Based Access Control)
|
|
3. **Rate Limiting**: Proteger contra abuso
|
|
4. **Caché**: Redis para datos frecuentes
|
|
5. **Message Queue**: RabbitMQ para comunicación asíncrona
|
|
6. **Logging Centralizado**: ELK Stack
|
|
7. **Observabilidad**: Prometheus + Grafana
|
|
8. **Tracing**: Jaeger para rastreo de solicitudes
|