Files
VoxPopuli/ARCHITECTURE.md

21 KiB

Arquitectura de VoxPopuli Microservices

Resumen General

VoxPopuli es un sistema de microservicios basado en eventos asincronos con comunicación mediante RabbitMQ. El sistema ejecuta múltiples componentes en paralelo:

  • 2 APIs REST independientes (Usuarios y Reportes)
  • 2 Consumidores de mensajes para procesamiento asincrónico
  • Bases de datos separadas por dominio (MySQL para usuarios, MongoDB para reportes)

Patrones Arquitectónicos Usados

1. Clean Architecture (Arquitectura Limpia)

La aplicación está organizada en capas independientes:

┌─────────────────────────────────────────────────────────┐
│                    API REST (FastAPI)                    │
│         Infraestructura API (HTTP Handlers)              │
├─────────────────────────────────────────────────────────┤
│              Application Layer (Servicios)               │
│  - Use Cases / Application Services                      │
│  - Lógica de negocio de la aplicación                   │
├─────────────────────────────────────────────────────────┤
│                  Domain Layer (Dominio)                  │
│  - Entidades de negocio puras                           │
│  - Reglas de negocio independientes                     │
├─────────────────────────────────────────────────────────┤
│            Infrastructure Layer (Infraestructura)        │
│  - Adaptadores (Repositorios, RabbitMQ)                │
│  - Acceso a Datos (MySQL, MongoDB)                      │
│  - Almacenamiento de archivos                           │
└─────────────────────────────────────────────────────────┘

2. Arquitectura Orientada a Eventos (Event-Driven Architecture)

En lugar de procesamiento sincrónico, los servicios se comunican mediante eventos:

┌──────────────────────────────────────────────────────────────┐
│                     API REST (FastAPI)                        │
│                 (Recibe solicitudes HTTP)                     │
└────────┬─────────────────────────────────┬───────────────────┘
         │ Valida y envía evento           │
         ▼                                  ▼
    ┌─────────────────────────────────────────────┐
    │        RabbitMQ Message Queue               │
    │  - users_queue (eventos de usuario)         │
    │  - reports_queue (eventos de reporte)       │
    └────────┬────────────────────┬───────────────┘
             │                    │
             ▼                    ▼
        ┌─────────────┐      ┌──────────────┐
        │ User        │      │ Report       │
        │ Consumer    │      │ Consumer     │
        │ (Thread)    │      │ (Thread)     │
        └─────┬───────┘      └──────┬───────┘
              │ Procesa evento     │ Procesa evento
              ▼                    ▼
         ┌──────────────┐     ┌──────────────┐
         │ MySQL (BD    │     │ MongoDB (BD  │
         │ de Usuarios) │     │ de Reportes) │
         └──────────────┘     └──────────────┘

Ventajas:

  • Bajo acoplamiento entre servicios
  • Mayor escalabilidad
  • Mejor tolerancia a fallos
  • Procesamiento asincrónico

3. Microservicios

El sistema consta de dos microservicios independientes que ejecutan en paralelo:

Usuarios API (Puerto 8000)

  • Gestión de usuarios y autenticación
  • Base de datos: MySQL con SQLAlchemy
  • Endpoints: POST/GET/PUT/DELETE /users/

Reportes API (Puerto 8001)

  • Gestión de reportes comunitarios
  • Base de datos: MongoDB
  • Endpoints: POST/GET/PUT/DELETE /reports/
  • Almacenamiento: Imágenes en WebP (directorio /storage/reports_images/)

Cada API:

  • Valida solicitudes y envía eventos a RabbitMQ
  • Expone documentación automática en /docs
  • Es independientemente escalable

4. Consumidores de Eventos (Message Consumers)

Dos consumidores ejecutan como threads separados:

User Consumer

  • Escucha la cola users_queue en RabbitMQ
  • Eventos procesados:
    • user.create → Guarda usuario en MySQL
    • user.update → Actualiza usuario
    • user.delete → Elimina usuario

Report Consumer

  • Escucha la cola reports_queue en RabbitMQ
  • Eventos procesados:
    • report.create → Guarda reporte en MongoDB, incrementa contador de usuario
    • report.update_visibility → Actualiza puntuación comunitaria
    • report.delete → Elimina reporte

Beneficios:

  • Desacoplamiento de API y procesamiento
  • Reintentos automáticos si falla la BD
  • Procesamiento en background

5. Patrón Repository

Abstracción para acceso a datos:

┌──────────────────┐
│  Service Layer   │
│  (API Handler)   │
└────────┬─────────┘
         │ depends on
         ▼
┌──────────────────────────┐
│   Port (Interface)       │
│  UserRepository          │
│  ReportRepository        │
└────────┬─────────────────┘
         │ implemented by
         ▼
┌──────────────────────────┐
│   Concrete Adapters      │
│  UserRepositorySQL       │
│  ReportRepositoryMongo   │
└──────────────────────────┘

Beneficios:

  • Independencia de la implementación de BD
  • Fácil de testear (usar mocks)
  • Reutilizable en diferentes contextos

6. Inversión de Dependencias (Dependency Inversion)

Los servicios dependen de abstracciones (interfaces), no de implementaciones concretas:

class CreateUser:
    def __init__(self, repo: UserRepository):  # Depende de interfaz
        self.repo = repo

# Puede usar cualquier implementación de UserRepository
service = CreateUser(UserRepositorySQL())      # Implementación SQL
# o
service = CreateUser(UserRepositoryMock())     # Para testing

Flujo de Solicitud

Crear Usuario (Asincrónico):

1. HTTP POST /users/
   └─> Validar datos (FastAPI Handler)
       └─> Crear objeto UserMessage
           └─> Enviar mensaje a RabbitMQ (users_queue)
               └─> Retornar respuesta HTTP inmediatamente
                   
[En paralelo, el User Consumer procesa:]
2. User Consumer recibe evento user.create
   └─> UserRepositorySQL.save()
       └─> SQLAlchemy
           └─> MySQL Database

Crear Reporte (Asincrónico):

1. HTTP POST /reports/
   └─> Validar datos y usuario existe (FastAPI Handler)
       └─> Crear objeto ReportMessage
           └─> Enviar mensaje a RabbitMQ (reports_queue)
               └─> Retornar respuesta HTTP inmediatamente
                   
[En paralelo, el Report Consumer procesa:]
2. Report Consumer recibe evento report.create
   └─> ReportRepositoryMongo.save()
   └─> UserRepositorySQL.increment_reports(id_usuario)
       └─> MongoDB + MySQL Database

Capas Detalladas

Domain Layer (src/domain/)

Entidades puras de negocio (sin dependencias externas):

User

@dataclass
class User:
    user_id: int
    nombre: str
    apellido: str
    email: str
    contraseña_hash: Optional[str] = None
    fecha_nacimiento: datetime = None
    fecha_creacion: datetime = None
    calificacion: float = 50.0  # 0-100
    numero_reportes: int = 0
    url_foto_perfil: Optional[str] = None
    biografia: Optional[str] = None

Report

@dataclass
class Report:
    id_reporte: str
    id_usuario: int
    tipo_reporte: int  # Número que representa el tipo
    descripcion: str
    ubicacion: Optional[str]
    lat: Optional[float] = None
    lng: Optional[float] = None
    image_filename: Optional[str] = None  # WebP format
    visibilidad: float = 0.0  # Puntuación comunitaria 0-100
    estado: Literal["en proceso", "no resuelto", "resuelto"] = "en proceso"
    fecha_creacion: Optional[datetime] = None

Application Layer (src/application/)

Contiene la lógica de negocio encapsulada en Use Cases y Puertos.

Ports (Interfaces - src/application/ports/)

UserRepository (Interface)

class UserRepository:
    def save(self, user: User) -> User
    def find_by_id(self, user_id: int) -> Optional[User]
    def find_by_email(self, email: str) -> Optional[User]
    def update(self, user: User) -> User
    def delete(self, user_id: int) -> bool
    def increment_reports(self, user_id: int) -> bool

ReportRepository (Interface)

class ReportRepository:
    def save(self, report: Report) -> Report
    def find_by_id(self, id_reporte: str) -> Optional[Report]
    def find_by_user(self, id_usuario: int) -> List[Report]
    def update(self, report: Report) -> Report
    def delete(self, id_reporte: str) -> bool
    def update_visibility(self, id_reporte: str, visibilidad: float) -> bool

Services / Use Cases (src/application/services/)

User Services:

  • CreateUser - Valida datos y envía evento a RabbitMQ
  • GetUser - Recupera usuario de la BD
  • UpdateUser - Actualiza usuario
  • DeleteUser - Elimina usuario

Report Services:

  • CreateReport - Valida usuario y reporte, envía evento a RabbitMQ
  • GetReport - Recupera reporte
  • UpdateReportVisibility - Actualiza puntuación comunitaria
  • DeleteReport - Elimina reporte

Ejemplo de CreateReport:

class CreateReport:
    def __init__(self, repo: ReportRepository, user_repo: UserRepository):
        self.repo = repo
        self.user_repo = user_repo
    
    def execute(self, id_usuario: int, tipo_reporte: int, descripcion: str, ...):
        # 1. Validar que usuario existe
        if not self.user_repo.find_by_id(id_usuario):
            return {"status": "error", "message": "Usuario no encontrado"}
        
        # 2. Validar descripción
        if not descripcion.strip():
            return {"status": "error", "message": "Descripción requerida"}
        
        # 3. Crear mensaje de evento
        message = ReportMessage(
            event_type=ReportEventType.CREATE,
            id_usuario=id_usuario,
            tipo_reporte=tipo_reporte,
            descripcion=descripcion,
            ...
        )
        
        # 4. Enviar a RabbitMQ
        send_to_queue(message)
        
        return {"status": "success", "message": "Reporte enviado a procesar"}

Infrastructure Layer (src/infrastructure/)

API Handlers (src/infrastructure/api/)

Users API (puerto 8000)

  • users.py - Endpoints REST para CRUD de usuarios
  • auth_service.py - Autenticación JWT
  • schemas.py - Esquemas Pydantic para validación
  • router.py - Rutas de FastAPI
  • app.py - Aplicación FastAPI

Reports API (puerto 8001)

  • reports.py - Endpoints REST para CRUD de reportes
  • schemas.py - Esquemas Pydantic
  • router.py - Rutas
  • app.py - Aplicación FastAPI

Persistence Adapters (src/infrastructure/adapters/persistence/)

user_repository_sql.py (Implementación de UserRepository)

  • Usa SQLAlchemy ORM
  • Implementa todas las operaciones CRUD en MySQL
  • Convierte entre modelo de dominio y modelo de BD

report_repository_mongo.py (Implementación de ReportRepository)

  • Usa PyMongo para MongoDB
  • Implementa todas las operaciones CRUD
  • Convierte entre modelo de dominio y documento MongoDB

db.py

  • Configuración de conexiones
  • Instancias de motor de BD

models.py

  • Modelos SQLAlchemy para MySQL
  • Esquemas para MongoDB

RabbitMQ Adapters (src/infrastructure/adapters/rabbitmq/)

sender.py

  • Función send_to_queue() para enviar mensajes
  • Serialización de objetos a JSON

consumer.py (Base)

  • Clase RabbitMQConsumer para consumir mensajes
  • Conexión y suscripción a colas
  • Manejo de excepciones

messages.py

  • UserMessage - Schema para eventos de usuario
  • ReportMessage - Schema para eventos de reporte
  • UserEventType - Enumeración de tipos de eventos
  • ReportEventType - Enumeración de tipos de eventos

File Storage (src/infrastructure/adapters/file_storage.py)

  • image_storage - Manejo de almacenamiento de imágenes
  • Convierte imágenes a formato WebP
  • Almacena en /storage/reports_images/
  • Limpieza automática si reporte es eliminado

Consumers (src/consumers/)

Ejecutan como threads separados en paralelo con las APIs.

User Consumer (src/consumers/user_consumer.py)

class UserConsumer:
    def __init__(self):
        self.repo = UserRepositorySQL()
        self.consumer = RabbitMQConsumer(queue_name='users_queue')
        self.consumer.set_callback(self.process_message)
    
    def process_message(self, message_dict: dict):
        # Reconvertir a UserMessage
        message = UserMessage.from_dict(message_dict)
        
        if message.event_type == UserEventType.CREATE:
            self._handle_create_user(message)
        elif message.event_type == UserEventType.UPDATE:
            self._handle_update_user(message)
        # ... etc
    
    def start(self):
        # Inicia la conexión y escucha
        self.consumer.start_consuming()

Report Consumer (src/consumers/report_consumer.py)

Similar a User Consumer, pero:

  • Escucha reports_queue
  • Procesa ReportMessage con ReportEventType
  • Interactúa con ReportRepositoryMongo y UserRepositorySQL
  • Maneja almacenamiento de imágenes

Core Layer (src/core/)

config.py - Configuración centralizada:

  • Variables de entorno
  • Configuración de logging
  • Configuración de conexiones de BD
  • Configuración de RabbitMQ
  • Parámetros de JWT

Punto de Entrada (src/main.py)

Orquesta todos los componentes en paralelo:

def run():
    """Inicia los 4 componentes en threads separados"""
    users_thread = threading.Thread(target=run_users_api)
    reports_thread = threading.Thread(target=run_reports_api)
    user_consumer_thread = threading.Thread(target=run_user_consumer)
    report_consumer_thread = threading.Thread(target=run_reports_consumer)
    
    users_thread.start()
    reports_thread.start()
    user_consumer_thread.start()
    report_consumer_thread.start()

Esto inicia:

  • API de Usuarios en puerto 8000
  • API de Reportes en puerto 8001
  • User Consumer escuchando users_queue
  • Report Consumer escuchando reports_queue

Diagrama de Despliegue

┌─────────────────────────────────────────────────────────────┐
│           Docker Container (VoxPopuli)                      │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│ ┌──────────────────────────────────────────────────────┐   │
│ │  main.py (Orquestador)                               │   │
│ │  - Lanza 4 threads                                   │   │
│ └───┬──────────┬──────────┬──────────────────────────┬─┘   │
│     │          │          │                          │      │
│ ┌───▼──┐  ┌───▼──┐  ┌──▼────┐  ┌─────▼──────┐     │      │
│ │Users │  │Report│  │User   │  │Report      │     │      │
│ │API   │  │API   │  │Consume│  │Consumer    │     │      │
│ │:8000 │  │:8001 │  │r      │  │            │     │      │
│ └───┬──┘  └───┬──┘  └──┬────┘  └─────┬──────┘     │      │
│     │         │         │            │             │      │
└─────┼─────────┼─────────┼────────────┼─────────────┘      │
      │         │         │            │                     │
      └────┬────┴────┬────┴────────────┘                     │
           │         │                                       │
     ┌─────▼──┐  ┌───▼──────┐                               │
     │RabbitMQ│  │MySQL +   │                               │
     │:5672   │  │MongoDB   │                               │
     └────────┘  └──────────┘                               │

Testing

La arquitectura facilita el testing en múltiples niveles:

Unit Tests

  • Mockear repositorios para testear Use Cases
  • Testear lógica de negocio en Domain Layer
class TestCreateReport:
    def test_report_creation(self):
        mock_repo = ReportRepositoryMock()
        service = CreateReport(mock_repo)
        result = service.execute(...)
        assert result['status'] == 'success'

Integration Tests

  • Usar DB reales de prueba
  • Testear flujo completo API → RabbitMQ → Consumer → DB

E2E Tests

  • Testear desde HTTP request hasta persistencia
  • Validar consumidores procesan eventos correctamente

Ventajas de la Arquitectura Actual

  1. Bajo Acoplamiento - Servicios se comunican por eventos
  2. Escalabilidad Horizontal - Consumidores pueden replicarse
  3. Resiliencia - RabbitMQ reintenta entregas fallidas
  4. Independencia de BD - Abstractos por puertos
  5. Testabilidad - Inyección de dependencias
  6. Mantenibilidad - Capas claramente separadas
  7. Asincronía - APIs responden rápidamente
  8. Extensibilidad - Nuevos tipos de eventos fácilmente

Stack Tecnológico

Componente Tecnología
Framework Web FastAPI
Servidor ASGI Uvicorn
ORM SQL SQLAlchemy
Driver MongoDB PyMongo
Message Queue RabbitMQ
Autenticación JWT
Validación Pydantic
Base Datos SQL MySQL
Base Datos NoSQL MongoDB
Almacenamiento WebP (imágenes)
Concurrencia threading
def save(self, user):
self.users[user.user_id] = user
return user

Usar en tests

def test_create_user(): mock_repo = UserRepositoryMock() service = CreateUser(mock_repo) user = service.execute(...) assert mock_repo.users[1] is not None


## Escalabilidad

Cada componente puede escalar independientemente:

1. **Usuarios API**: Escalar horizontal con load balancer
2. **Reportes API**: Escalar horizontal con load balancer
3. **MySQL**: Replicación master-slave
4. **MongoDB**: Sharding automático

## Despliegue

Cada microservicio puede desplegarse:

- **Docker**: Contenedores independientes
- **Kubernetes**: Pods independientes
- **Serverless**: Funciones Lambda independientes

## Monitoreo

Cada API expone:
- `/health` - Health check
- `/docs` - Swagger UI
- Logging estructurado
- Métricas por endpoint

## Futuras Mejoras

1. **Autenticación**: JWT tokens en API
2. **Autorización**: RBAC (Role-Based Access Control)
3. **Rate Limiting**: Proteger contra abuso
4. **Caché**: Redis para datos frecuentes
5. **Message Queue**: RabbitMQ para comunicación asíncrona
6. **Logging Centralizado**: ELK Stack
7. **Observabilidad**: Prometheus + Grafana
8. **Tracing**: Jaeger para rastreo de solicitudes