Compare commits

..

10 Commits

Author SHA1 Message Date
31c567653a docker stuff almost complete 2026-05-06 14:23:28 -06:00
fea63bb553 fixes with SQL Alchemy to avoid precaching 2026-05-06 12:05:12 -06:00
57fa3b7944 Fixes with queuing 2026-05-06 11:13:23 -06:00
7331dcb086 Fixed metrics api to recieve all metrics 2026-05-06 01:30:20 -06:00
543ba8870c Added the isAdmin parameter to schemas and users in that api. 2026-05-05 09:57:52 -06:00
b4fc640c1a stuffies 2026-05-04 22:07:30 -06:00
a8ee92afc8 Added isAdmin tag for users 2026-05-04 21:03:50 -06:00
fc7c821d0f more minor changes 2026-05-04 17:54:02 -06:00
b96c44381a minor changes
Co-authored-by: Copilot <copilot@github.com>
2026-05-04 17:48:05 -06:00
b3788beedd Added two new apis
Co-authored-by: Copilot <copilot@github.com>
2026-05-04 17:37:08 -06:00
47 changed files with 3551 additions and 456 deletions

4
.gitignore vendored
View File

@@ -67,3 +67,7 @@ dmypy.json
node_modules/ node_modules/
*.pem *.pem
credentials.json credentials.json
migrations/
*.tar.gz

53
ADMIN_SETUP.md Normal file
View File

@@ -0,0 +1,53 @@
# Admin User Setup - SQL Commands
## 1. Agregar columna a BD (Auto-ejecutado)
La migración SQL se encuentra en: `migrations/001_add_is_admin_to_usuarios.sql`
Ejecutar manualmente si es necesario:
```sql
ALTER TABLE `voxpopuli_users`.`usuarios`
ADD COLUMN `is_admin` BOOLEAN NOT NULL DEFAULT FALSE AFTER `biografia`,
ADD INDEX `idx_is_admin` (`is_admin`);
```
## 2. Promover usuario a admin
```sql
UPDATE `voxpopuli_users`.`usuarios`
SET `is_admin` = TRUE
WHERE `user_id` = 1;
```
## 3. Listar usuarios admin
```sql
SELECT `user_id`, `nombre`, `email`, `is_admin`
FROM `voxpopuli_users`.`usuarios`
WHERE `is_admin` = TRUE;
```
## 4. Revocar permisos admin
```sql
UPDATE `voxpopuli_users`.`usuarios`
SET `is_admin` = FALSE
WHERE `user_id` = 1;
```
## API Usage
### Endpoints de Moderación (requieren token JWT de admin)
```bash
# Usar token JWT en header Authorization
curl -X POST "http://localhost:8003/moderation/reports/delete" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"report_id": 123,
"reason": "Contenido ofensivo",
"description": "Violación de políticas"
}'
```
### Obtener usuario y verificar is_admin
```bash
GET http://localhost:8000/users/{user_id}
# Response incluye "is_admin": true/false
```

86
METRICS_API_EXAMPLES.md Normal file
View File

@@ -0,0 +1,86 @@
# Metrics API Examples
## Record Metric (Manual)
```bash
curl -X POST "http://localhost:8004/metrics/record" \
-H "Content-Type: application/json" \
-d '{
"event_type": "user_created",
"entity_id": "123",
"entity_type": "user",
"user_id": 456,
"metadata": {"email": "user@example.com"}
}'
```
## Get Analytics Report (Last N Days)
```bash
curl "http://localhost:8004/metrics/report?days=7"
```
Response:
```json
{
"start_date": "2026-04-27T00:00:00",
"end_date": "2026-05-04T23:59:59",
"total_events": 245,
"events_by_type": {
"user_created": 12,
"report_created": 45,
"notification_sent": 188
}
}
```
## Get Daily Statistics
```bash
curl "http://localhost:8004/metrics/daily-stats?date=2026-05-04T00:00:00"
```
Response:
```json
[
{
"date": "2026-05-04T00:00:00",
"event_type": "user_created",
"count": 3
},
{
"date": "2026-05-04T00:00:00",
"event_type": "report_created",
"count": 8
}
]
```
## Get Event Summary (Date Range)
```bash
curl "http://localhost:8004/metrics/summary?start_date=2026-04-27T00:00:00&end_date=2026-05-04T23:59:59"
```
Response:
```json
{
"summary": {
"user_created": 12,
"report_created": 45,
"notification_sent": 188,
"moderation_completed": 5
},
"date_range": {
"start_date": "2026-04-27T00:00:00",
"end_date": "2026-05-04T23:59:59"
}
}
```
## Health Check
```bash
curl "http://localhost:8004/metrics/health"
```
## Automatic Events (RabbitMQ)
- `user_created`, `user_updated`, `user_deleted`
- `report_created`, `report_resolved`
- `notification_sent`
- `moderation_completed`

View File

@@ -0,0 +1,58 @@
# API de Métricas y Analítica - Resumen de Implementación
## ✅ Integración Completada
### Estructura Creada
**Dominio:**
- `src/domain/metrics.py` - Modelos: `Metric`, `DailyStats`, `AnalyticsReport`, `EventType`
**Aplicación:**
- `src/application/ports/metrics_repository.py` - Interfaz del repositorio
- `src/application/services/metrics_services.py` - Lógica de negocio
**Infraestructura:**
- `src/infrastructure/adapters/persistence/metrics_repository_postgres.py` - Implementación PostgreSQL
- `src/infrastructure/api/metrics/` - API REST FastAPI
- `app.py` - Factory de la aplicación
- `router.py` - Endpoints
- `schemas.py` - Modelos Pydantic
**Consumidor:**
- `src/consumers/metrics_consumer.py` - Listener RabbitMQ para todos los eventos
### Base de Datos
**PostgreSQL añadido al docker-compose:**
- Host: localhost:5432
- DB: voxpopuli_metrics
- Usuario: voxpopuli / voxpopuli_pass
- Tabla `metrics` creada automáticamente
### Integración sin Cambios Significativos
**main.py** - Añadida API y consumer de métricas sin modificar APIs existentes
**docker-compose.yaml** - PostgreSQL añadido sin cambiar servicios existentes
**requirements.txt** - Añadido psycopg2-binary
✅ RabbitMQ NO añadido al docker-compose (usa instancias previas)
### Endpoints API (Puerto 8004)
```
POST /metrics/record - Registrar métrica
GET /metrics/report - Reporte de últimos N días
GET /metrics/daily-stats - Estadísticas diarias
GET /metrics/summary - Resumen de eventos por rango
GET /metrics/health - Health check
GET /docs - Swagger UI
```
### Eventos Automáticos Capturados
El consumer escucha automáticamente:
- `users_queue` → user_created, user_updated, user_deleted
- `reports_queue` → report_created, report_resolved
- `notifications_queue` → notification_sent
- `moderations_queue` → moderation_completed
Cada evento genera una métrica en PostgreSQL con timestamp, usuario_id, metadata y tipo de evento.

60
METRICS_SETUP.md Normal file
View File

@@ -0,0 +1,60 @@
# Metrics API Setup
## Configuración Base de Datos
PostgreSQL se ejecuta automáticamente en el contenedor Docker.
**Conexión:**
- Host: localhost
- Puerto: 5432
- Base de datos: voxpopuli_metrics
- Usuario: voxpopuli
- Contraseña: voxpopuli_pass
**Tabla creada automáticamente:**
```
metrics:
- id (PK)
- event_type (STRING, INDEXED)
- entity_id (STRING)
- entity_type (STRING)
- timestamp (DATETIME, INDEXED)
- metadata (JSON)
- user_id (INT, NULLABLE)
```
## Arquitectura
```
API Metrics (Puerto 8004)
├── FastAPI Router
│ ├── POST /metrics/record - Registrar métrica
│ ├── GET /metrics/report - Reporte analítica
│ ├── GET /metrics/daily-stats - Estadísticas diarias
│ ├── GET /metrics/summary - Resumen eventos
│ └── GET /metrics/health - Health check
├── Metrics Service (Lógica)
│ └── Metrics Repository (Postgres)
└── Metrics Consumer (RabbitMQ)
├── Escucha: users_queue
├── Escucha: reports_queue
├── Escucha: notifications_queue
└── Escucha: moderations_queue
```
## Inicio
```bash
# Levantar DB (docker)
docker-compose up -d
# Instalar dependencias
pip install -r requirements.txt
# Ejecutar todas las APIs
python -m src.main
```
La API de métricas se ejecutará automáticamente en puerto 8004.

310
MODERATION_API.md Normal file
View File

@@ -0,0 +1,310 @@
# API de Moderación - VoxPopuli
## Descripción General
La API de Moderación es un microservicio complementario integrado al proyecto VoxPopuli que proporciona funcionalidades de moderación de contenido y gestión de usuarios. Utiliza arquitectura hexagonal y se comunica mediante **RabbitMQ** para mantener bajo acoplamiento con el resto del sistema.
## Características Principales
### 1. **Eliminación de Reportes**
- Permite a moderadores eliminar reportes del sistema
- Evento: `moderation.delete_report`
- Endpoint: `POST /moderation/reports/delete`
### 2. **Cierre de Cuentas**
- Cierre permanente o temporal de cuentas de usuario
- Evento: `moderation.close_account`
- Endpoint: `POST /moderation/accounts/close`
### 3. **Baneo de Usuarios**
- Ban permanente o temporal (en días)
- Evento: `moderation.ban_user`
- Endpoint: `POST /moderation/users/ban`
### 4. **Advertencias**
- Registra advertencias contra usuarios
- Evento: `moderation.warn_user`
- Endpoint: `POST /moderation/users/warn`
### 5. **Revisión de Contenido**
- Aprueba, rechaza o solicita más información sobre reportes
- Evento: `moderation.review_content`
- Endpoint: `POST /moderation/content/review`
## Arquitectura
### Componentes
```
┌─────────────────────────────────────────────────────────┐
│ API REST de Moderación (FastAPI) │
│ Puerto 8003 - Endpoints de moderación │
└──────────────┬──────────────────────────────────────────┘
│ Envía eventos
┌─────────────────────┐
│ RabbitMQ Queue │
│ moderation_queue │
└──────────┬──────────┘
│ Consume
┌─────────────────────────┐
│ Moderation Consumer │
│ (Thread - background) │
└──────────┬──────────────┘
│ Ejecuta acciones
┌──────────────────────────┐
│ MongoDB │
│ - moderation_actions │
│ - moderation_logs │
└──────────────────────────┘
```
### Capas de la Arquitectura
```
domain/
└── moderations.py # Entidades de negocio: ModerationAction, ModerationLog
application/
├── ports/
│ └── moderation_repository.py # Interfaz de persistencia
└── services/
└── moderation_services.py # Use cases: DeleteReport, CloseAccount, etc.
infrastructure/
├── adapters/
│ ├── moderation_repository_mongo.py # Implementación MongoDB
│ └── rabbitmq/
│ └── messages.py # Esquemas de eventos RabbitMQ
├── api/
│ └── moderations/
│ ├── app.py # Aplicación FastAPI
│ ├── router.py # Rutas principales
│ ├── moderations.py # Endpoints
│ ├── schemas.py # Validación Pydantic
│ └── root.py # Rutas raíz
└── consumers/
└── moderation_consumer.py # Consumidor de eventos
main.py # Punto de entrada (integración)
```
## Integración con RabbitMQ
### Flujo de Eventos
1. **Cliente envía solicitud HTTP** a API de Moderación
2. **Servicio valida** la solicitud
3. **Servicio envía evento** a cola `moderation_queue` en RabbitMQ
4. **Consumer escucha** la cola en background
5. **Consumer procesa** el evento y ejecuta acciones
6. **Sistema persiste** en MongoDB
### Implementación de ModerationConsumer
El `ModerationConsumer` está integrado en `main.py` ejecutándose como un thread daemon:
```python
# En main.py
moderations_consumer_thread = threading.Thread(
target=run_moderations_consumer,
daemon=True,
name="Moderations-Consumer"
)
```
### Mensajes de Evento
Todos los eventos se envían a través de `RabbitMQSender`:
```json
{
"action_id": "uuid-aqui",
"event_type": "moderation.delete_report",
"moderator_id": 123,
"report_id": "report-uuid",
"reason": "Contenido violento",
"description": "Publicación violenta contra usuarios",
"timestamp": "2026-05-04T10:30:00"
}
```
## Endpoints API
### 1. Eliminación de Reporte
```http
POST /moderation/reports/delete HTTP/1.1
Content-Type: application/json
```
**Respuesta:**
```json
{
"status": "success",
"message": "Reporte marcado para eliminación",
"action_id": "action-uuid"
}
```
### 2. Cierre de Cuenta
```http
POST /moderation/accounts/close HTTP/1.1
Content-Type: application/json
```
### 3. Baneo de Usuario
```http
POST /moderation/users/ban HTTP/1.1
Content-Type: application/json
```
### 4. Advertencia
```http
POST /moderation/users/warn HTTP/1.1
Content-Type: application/json
```
### 5. Revisión de Contenido
```http
POST /moderation/content/review HTTP/1.1
Content-Type: application/json
```
## Ejecución
### Iniciar todo el sistema
```bash
python src/main.py
```
Esto iniciará:
- ✓ API de Usuarios (puerto 8000)
- ✓ API de Reportes (puerto 8001)
- ✓ API de Notificaciones (puerto 8002)
-**API de Moderación (puerto 8003)**
- Consumer de Usuarios
- Consumer de Reportes
- Consumer de Notificaciones
- **Consumer de Moderación**
### Acceder a documentación
- Moderación: http://localhost:8003/docs
## Persistencia
La API de Moderación almacena datos en **MongoDB** con dos colecciones:
### `moderation_actions`
Almacena todas las acciones de moderación:
```javascript
{
"action_id": "uuid",
"moderator_id": 123,
"action_type": "delete_report",
"target_type": "report",
"target_id": "report-uuid",
"reason": "Contenido inapropiado",
"status": "pending",
"fecha_creacion": "2026-05-04T10:30:00",
"fecha_ejecucion": null,
...
}
```
### `moderation_logs`
Registro auditable de todas las operaciones:
```javascript
{
"log_id": "uuid",
"action_id": "action-uuid",
"moderator_id": 123,
"resultado": "success",
"fecha_log": "2026-05-04T10:30:00",
"ip_moderator": "192.168.1.100"
}
```
## Integración Sin Cambios Significativos
Esta API fue diseñada para integrarse sin modificar el código existente:
**No modifica** `docker-compose.yaml` (usa instancias RabbitMQ existentes)
**No modifica** APIs existentes (Usuarios, Reportes, Notificaciones)
**No modifica** consumidores existentes
**Comunicación desacoplada** via RabbitMQ
**Base de datos independiente** (MongoDB)
**Puerto separado** (8003)
## Extensibilidad
Para agregar nuevas acciones de moderación:
1. **Crear Use Case** en `application/services/moderation_services.py`
2. **Crear evento** en `infrastructure/adapters/rabbitmq/messages.py`
3. **Agregar endpoint** en `infrastructure/api/moderations/moderations.py`
4. **Manejar evento** en `consumers/moderation_consumer.py`
## Requisitos
- Python 3.8+
- FastAPI
- Pydantic
- pika (RabbitMQ client)
- pymongo (MongoDB driver)
- MongoDB instancia
- RabbitMQ instancia
## Seguridad
### Recomendaciones Implementadas
- ✓ Validación de entrada con Pydantic
- ✓ Logs auditables de todas las acciones
- ✓ Soporte para IP del moderador
- ✓ Eventos inmutables en RabbitMQ
- ✓ Estados de acción para auditoría
### Recomendaciones Futuras
- Implementar autenticación JWT para moderadores
- Rate limiting por moderador
- Notificaciones a usuarios afectados
- Dashboard de auditoría
## Extensibilidad
Para agregar nuevas acciones de moderación:
1. **Crear Use Case** en `application/services/moderation_services.py`
2. **Crear evento** en `infrastructure/adapters/rabbitmq/messages.py`
3. **Agregar endpoint** en `infrastructure/api/moderations/moderations.py`
4. **Manejar evento** en `consumers/moderation_consumer.py`
## Requisitos
- Python 3.8+
- FastAPI
- Pydantic
- pika (RabbitMQ client)
- pymongo (MongoDB driver)
- MongoDB instancia
- RabbitMQ instancia
## Seguridad
### Recomendaciones Implementadas
- ✓ Validación de entrada con Pydantic
- ✓ Logs auditables de todas las acciones
- ✓ Soporte para IP del moderador
- ✓ Eventos inmutables en RabbitMQ
- ✓ Estados de acción para auditoría
### Recomendaciones Futuras
- Implementar autenticación JWT para moderadores
- Rate limiting por moderador
- Notificaciones a usuarios afectados
- Dashboard de auditoría

View File

@@ -0,0 +1,334 @@
{
"moderation_examples": {
"delete_report": {
"description": "Eliminar un reporte (requiere token JWT de admin)",
"endpoint": "POST /moderation/reports/delete",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_JWT_TOKEN"
},
"request_body": {
"report_id": "550e8400-e29b-41d4-a716-446655440000",
"reason": "Contenido violento",
"description": "La imagen contiene violencia explícita"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Reporte eliminado",
"action_id": "action-uuid-123"
}
},
"response_error_401": {
"status": 401,
"body": {"detail": "Token inválido o expirado"}
},
"response_error_403": {
"status": 403,
"body": {"detail": "Permisos insuficientes"}
}
},
"close_account": {
"description": "Cerrar cuenta de usuario (requiere token JWT de admin)",
"endpoint": "POST /moderation/accounts/close",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_JWT_TOKEN"
},
"request_body": {
"user_id": 42,
"reason": "Violación grave de términos",
"description": "Comportamiento acosador",
"is_permanent": true
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Cuenta cerrada",
"action_id": "action-uuid-456"
}
}
},
"ban_user": {
"description": "Banear usuario (requiere token JWT de admin)",
"endpoint": "POST /moderation/users/ban",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_JWT_TOKEN"
},
"request_body": {
"user_id": 42,
"reason": "Spam sistemático",
"duration_days": 30,
"description": "Múltiples reportes de spam"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Usuario baneado por 30 días",
"action_id": "action-uuid-789"
}
}
},
"warn_user": {
"description": "Advertir usuario (requiere token JWT de admin)",
"endpoint": "POST /moderation/users/warn",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_JWT_TOKEN"
},
"request_body": {
"user_id": 42,
"reason": "Lenguaje inapropiado",
"description": "Primera advertencia"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Usuario advertido",
"action_id": "action-uuid-101"
}
}
},
"review_content": {
"description": "Revisar contenido reportado (requiere token JWT de admin)",
"endpoint": "POST /moderation/content/review",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_JWT_TOKEN"
},
"request_body": {
"report_id": "550e8400-e29b-41d4-a716-446655440000",
"action": "approve",
"reason": "Contenido válido según políticas",
"notes": "Aprobado después de revisión"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Decisión registrada",
"action_id": "action-uuid-202"
}
}
}
}
}
}
},
"ban_user": {
"description": "Banear un usuario temporalmente",
"endpoint": "POST /moderation/users/ban",
"headers": {
"Content-Type": "application/json"
},
"request_body_temporal": {
"moderator_id": 1,
"user_id": 42,
"reason": "Lenguaje inapropiado repetido",
"duration_days": 7,
"description": "Ban de 7 días por contenido ofensivo"
},
"request_body_permanent": {
"moderator_id": 1,
"user_id": 99,
"reason": "Servidumbre reiterada de términos",
"duration_days": null,
"description": "Ban permanente"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Usuario marcado para ban",
"action_id": "action-uuid-789"
}
}
},
"warn_user": {
"description": "Emitir una advertencia a un usuario",
"endpoint": "POST /moderation/users/warn",
"headers": {
"Content-Type": "application/json"
},
"request_body": {
"moderator_id": 1,
"user_id": 42,
"reason": "Primera violación menor de comunidad",
"description": "Publicación con spam de enlaces"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Advertencia registrada",
"action_id": "action-uuid-012"
}
}
},
"review_content": {
"description": "Revisar y tomar decisión sobre un reporte",
"endpoint": "POST /moderation/content/review",
"headers": {
"Content-Type": "application/json"
},
"request_body_approve": {
"moderator_id": 1,
"report_id": "550e8400-e29b-41d4-a716-446655440000",
"action": "approve",
"reason": "Contenido reportado es legítimamente violatorio",
"notes": "Proceder inmediatamente con eliminación"
},
"request_body_reject": {
"moderator_id": 1,
"report_id": "550e8400-e29b-41d4-a716-446655440001",
"action": "reject",
"reason": "Contenido está dentro de políticas",
"notes": "El usuario tiene derecho a expresar esta opinión"
},
"request_body_needs_info": {
"moderator_id": 1,
"report_id": "550e8400-e29b-41d4-a716-446655440002",
"action": "needs_more_info",
"reason": "Necesaria información contextual adicional",
"notes": "Contactar con el reportante para detalles"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Revisión registrada: approve",
"action_id": "action-uuid-345"
}
}
},
"health_check": {
"description": "Verificar estado de la API",
"endpoint": "GET /health",
"response": {
"status": 200,
"body": {
"status": "healthy",
"service": "Moderation API"
}
}
},
"root_info": {
"description": "Obtener información sobre la API",
"endpoint": "GET /",
"response": {
"status": 200,
"body": {
"status": "ok",
"service": "VoxPopuli Moderation API",
"version": "1.0.0",
"description": "API para gestión de acciones de moderación",
"endpoints": {
"delete_report": "POST /moderation/reports/delete",
"close_account": "POST /moderation/accounts/close",
"ban_user": "POST /moderation/users/ban",
"warn_user": "POST /moderation/users/warn",
"review_content": "POST /moderation/content/review"
}
}
}
}
},
"curl_examples": {
"delete_report": "curl -X POST http://localhost:8003/moderation/reports/delete -H 'Content-Type: application/json' -d '{\"moderator_id\": 1, \"report_id\": \"550e8400-e29b-41d4-a716-446655440000\", \"reason\": \"Contenido violento\", \"description\": \"Imagen con violencia explícita\"}'",
"ban_user": "curl -X POST http://localhost:8003/moderation/users/ban -H 'Content-Type: application/json' -d '{\"moderator_id\": 1, \"user_id\": 42, \"reason\": \"Lenguaje inapropiado\", \"duration_days\": 7}'",
"warn_user": "curl -X POST http://localhost:8003/moderation/users/warn -H 'Content-Type: application/json' -d '{\"moderator_id\": 1, \"user_id\": 42, \"reason\": \"Primera advertencia\", \"description\": \"Spam de enlaces\"}'",
"close_account": "curl -X POST http://localhost:8003/moderation/accounts/close -H 'Content-Type: application/json' -d '{\"moderator_id\": 1, \"user_id\": 42, \"reason\": \"Violación de términos\", \"is_permanent\": true}'",
"review_approve": "curl -X POST http://localhost:8003/moderation/content/review -H 'Content-Type: application/json' -d '{\"moderator_id\": 1, \"report_id\": \"550e8400-e29b-41d4-a716-446655440000\", \"action\": \"approve\", \"reason\": \"Contenido legítimamente violatorio\"}'"
},
"rabbitmq_events": {
"moderation_delete_report": {
"queue": "moderation_queue",
"event_type": "moderation.delete_report",
"payload": {
"action_id": "uuid",
"event_type": "moderation.delete_report",
"moderator_id": 1,
"report_id": "550e8400-e29b-41d4-a716-446655440000",
"reason": "Contenido violento",
"description": "Imagen con violencia explícita",
"timestamp": "2026-05-04T10:30:00.000Z"
}
},
"moderation_close_account": {
"queue": "moderation_queue",
"event_type": "moderation.close_account",
"payload": {
"action_id": "uuid",
"event_type": "moderation.close_account",
"moderator_id": 1,
"user_id": 42,
"reason": "Violación de términos",
"description": "Comportamiento acosador",
"is_permanent": true,
"timestamp": "2026-05-04T10:30:00.000Z"
}
},
"moderation_ban_user": {
"queue": "moderation_queue",
"event_type": "moderation.ban_user",
"payload": {
"action_id": "uuid",
"event_type": "moderation.ban_user",
"moderator_id": 1,
"user_id": 42,
"reason": "Lenguaje inapropiado",
"duration_days": 7,
"is_permanent": false,
"description": "Ban temporal por contenido ofensivo",
"timestamp": "2026-05-04T10:30:00.000Z"
}
},
"moderation_warn_user": {
"queue": "moderation_queue",
"event_type": "moderation.warn_user",
"payload": {
"action_id": "uuid",
"event_type": "moderation.warn_user",
"moderator_id": 1,
"user_id": 42,
"reason": "Primera advertencia",
"description": "Spam de enlaces",
"timestamp": "2026-05-04T10:30:00.000Z"
}
},
"moderation_review_content": {
"queue": "moderation_queue",
"event_type": "moderation.review_content",
"payload": {
"action_id": "uuid",
"event_type": "moderation.review_content",
"moderator_id": 1,
"report_id": "550e8400-e29b-41d4-a716-446655440000",
"review_action": "approve",
"reason": "Contenido legítimamente violatorio",
"notes": "Proceder inmediatamente",
"timestamp": "2026-05-04T10:30:00.000Z"
}
}
}
}

250
MODERATION_SETUP.md Normal file
View File

@@ -0,0 +1,250 @@
# Guía de Instalación - API de Moderación
## Verificar Requisitos Previos
La API de Moderación requiere que ya tengas instalados y funcionando:
### 1. RabbitMQ
Debe estar ejecutándose con una instancia ya configurada:
```bash
# Verificar conexión
telnet localhost 5672
```
### 2. MongoDB
Debe estar ejecutándose y accesible:
```bash
# Verificar conexión
mongosh --eval "db.version()"
```
### 3. Python 3.8+
```bash
python --version
```
### 4. Dependencias del Proyecto
Las dependencias ya están en `requirements.txt`:
```bash
pip install -r requirements.txt
```
## Instalación de la API de Moderación
### Paso 1: Actualizar dependencias (si es necesario)
La API utiliza las siguientes librerías (ya incluidas en requirements.txt):
- `fastapi` - Framework web
- `uvicorn` - Servidor ASGI
- `pydantic` - Validación de datos
- `pika` - Cliente de RabbitMQ
- `pymongo` - Driver de MongoDB
Si necesitas instalarlas manualmente:
```bash
pip install fastapi uvicorn pydantic pika pymongo
```
### Paso 2: Verificar estructura de directorios
Asegúrate que existan los siguientes directorios:
```
src/
├── domain/
│ └── moderations.py ✓ Creado
├── application/
│ ├── ports/
│ │ └── moderation_repository.py ✓ Creado
│ └── services/
│ └── moderation_services.py ✓ Creado
├── consumers/
│ └── moderation_consumer.py ✓ Creado
└── infrastructure/
├── adapters/
│ └── moderation_repository_mongo.py ✓ Creado
└── api/
└── moderations/ ✓ Creado
├── __init__.py
├── app.py
├── router.py
├── moderations.py
├── schemas.py
└── root.py
```
### Paso 3: Actualizar archivo principal
El archivo `src/main.py` ya fue actualizado para incluir:
- ✓ Importación de la API de Moderación
- ✓ Función `run_moderations_api()`
- ✓ Función `run_moderations_consumer()`
- ✓ Threads para ejecutar ambos componentes
### Paso 4: Configurar MongoDB (si es necesario)
Si MongoDB no está configurado, crea las colecciones:
```javascript
// En mongosh o cliente MongoDB
use voxpopuli
// Las colecciones se crearán automáticamente al insertar datos
// Pero puedes pre-crearlas si lo prefieres:
db.createCollection("moderation_actions")
db.createCollection("moderation_logs")
// Crear índices para mejor rendimiento
db.moderation_actions.createIndex({ "action_id": 1 })
db.moderation_actions.createIndex({ "moderator_id": 1 })
db.moderation_actions.createIndex({ "target_id": 1 })
db.moderation_logs.createIndex({ "action_id": 1 })
```
### Paso 5: Configurar RabbitMQ (si es necesario)
La cola se crea automáticamente, pero si quieres pre-crearla:
```bash
# Acceder a RabbitMQ Management Console
# http://localhost:15672 (usuario: guest, password: guest)
# O via CLI:
rabbitmqctl declare_queue moderation_queue -d true
```
## Ejecutar la API
### Opción 1: Ejecutar todo (Recomendado)
```bash
python src/main.py
```
Esto iniciará automáticamente:
- API de Usuarios (8000)
- API de Reportes (8001)
- API de Notificaciones (8002)
- **API de Moderación (8003)** ← NUEVA
- Todos los consumidores (incluyendo Moderación)
### Opción 2: Ejecutar solo la API de Moderación
```bash
cd src
python -c "from infrastructure.api.moderations.app import create_app; import uvicorn; app = create_app(); uvicorn.run(app, host='0.0.0.0', port=8003)"
```
## Verificar que funciona
### 1. Health Check
```bash
curl http://localhost:8003/health
```
Respuesta esperada:
```json
{
"status": "healthy",
"service": "Moderation API"
}
```
### 2. Información de la API
```bash
curl http://localhost:8003/
```
### 3. Documentación Interactiva
Abre en tu navegador:
- http://localhost:8003/docs (Swagger UI)
- http://localhost:8003/redoc (ReDoc)
### 4. Probar un endpoint
```bash
curl -X POST http://localhost:8003/moderation/reports/delete \
-H "Content-Type: application/json" \
-d '{
"moderator_id": 1,
"report_id": "test-report-123",
"reason": "Contenido inapropiado",
"description": "Test description"
}'
```
## Solucionar Problemas
### Error: "Connection refused" a RabbitMQ
```
Error: [Errno 111] Connection refused
```
**Solución:**
- Verifica que RabbitMQ esté ejecutándose
- Verifica el host y puerto en `core/config.py`
### Error: "ConnectionFailure" a MongoDB
```
Error: ServerSelectionTimeoutError
```
**Solución:**
- Verifica que MongoDB esté ejecutándose
- Verifica la configuración de conexión en `infrastructure/adapters/persistence/mongodb.py`
### Error: "Port 8003 already in use"
```
Error: [Errno 48] Address already in use
```
**Solución:**
Cambia el puerto en `main.py`:
```python
def run_moderations_api():
uvicorn.run(
app_moderations,
host=ConfSettings.host,
port=8004, # Cambiar puerto aquí
...
)
```
### Los eventos no se procesan
**Causas posibles:**
1. Consumer no está ejecutándose
- Verifica que `run_moderations_consumer()` esté ejecutándose
- Revisa los logs del consumer
2. Cola no existe
- Se crea automáticamente al enviar el primer evento
- Verifica en RabbitMQ Management Console
3. MongoDB no está guardando datos
- Verifica que `moderation_actions` colección exista
- Revisa permisos de MongoDB
## Logs
Los logs se guardarán en:
```
src/logs/
```
Para ver logs en tiempo real:
```bash
tail -f src/logs/moderation_consumer.log
```
## Próximos Pasos
1. **Integrar autenticación JWT** para moderadores
2. **Crear dashboard** para visualizar acciones
3. **Agregar notificaciones** a usuarios afectados
4. **Implementar auditoría** más detallada
5. **Agregar rate limiting** por moderador
## Contacto y Soporte
Para problemas con la instalación:
1. Revisa logs en `src/logs/`
2. Verifica configuración en `src/core/config.py`
3. Consulta MODERATION_API.md para arquitectura
4. Revisa MODERATION_API_EXAMPLES.json para ejemplos

203
docker-compose-py.yaml Normal file
View File

@@ -0,0 +1,203 @@
# ============================================================
# VoxPopuli - docker-compose.yaml
# Orquesta la app + toda la infraestructura de datos/mensajería
# - MySQL → API Usuarios (8000)
# - MongoDB → API Reportes (8001)
# - MongoDB → API Notificaciones(8002)
# - MongoDB → API Moderación (8003)
# - PostgreSQL → API Métricas (8004) ← faltaba esto
# - RabbitMQ → cola de mensajes entre microservicios
# ============================================================
# Uso rápido:
# cp .env.example .env # ajusta credenciales
# docker compose up --build
# ============================================================
name: voxpopuli
services:
# ──────────────────────────────────────────
# INFRAESTRUCTURA
# ──────────────────────────────────────────
mysql:
image: mysql:8.0
container_name: voxpopuli_mysql
restart: unless-stopped
environment:
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-rootpassword}
MYSQL_DATABASE: voxpopuli_users
MYSQL_USER: ${MYSQL_USER:-voxpopuli}
MYSQL_PASSWORD: ${MYSQL_PASSWORD:-voxpopuli_pass}
ports:
- "${MYSQL_PORT:-3306}:3306"
volumes:
- mysql_data:/var/lib/mysql
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-p${MYSQL_ROOT_PASSWORD:-rootpassword}"]
interval: 10s
timeout: 5s
retries: 5
start_period: 30s
networks:
- voxpopuli_net
mongodb-reports:
image: mongo:7.0
container_name: voxpopuli_mongo_reports
restart: unless-stopped
environment:
MONGO_INITDB_ROOT_USERNAME: ${MONGO_USER:-admin}
MONGO_INITDB_ROOT_PASSWORD: ${MONGO_PASSWORD:-admin_password}
MONGO_INITDB_DATABASE: voxpopuli_reports
command: mongod --auth
ports:
- "${MONGO_REPORTS_PORT:-27017}:27017"
volumes:
- mongo_reports_data:/data/db
healthcheck:
test: ["CMD", "mongosh", "--quiet", "--eval", "db.adminCommand('ping').ok", "--username", "${MONGO_USER:-admin}", "--password", "${MONGO_PASSWORD:-admin_password}", "--authenticationDatabase", "admin"]
interval: 10s
timeout: 5s
retries: 5
start_period: 20s
networks:
- voxpopuli_net
mongodb-notifications:
image: mongo:7.0
container_name: voxpopuli_mongo_notifications
restart: unless-stopped
environment:
MONGO_INITDB_ROOT_USERNAME: ${MONGO_USER:-admin}
MONGO_INITDB_ROOT_PASSWORD: ${MONGO_PASSWORD:-admin_password}
MONGO_INITDB_DATABASE: voxpopuli_notifications
command: mongod --auth
ports:
- "${MONGO_NOTIFICATIONS_PORT:-27018}:27017"
volumes:
- mongo_notifications_data:/data/db
healthcheck:
test: ["CMD", "mongosh", "--quiet", "--eval", "db.adminCommand('ping').ok", "--username", "${MONGO_USER:-admin}", "--password", "${MONGO_PASSWORD:-admin_password}", "--authenticationDatabase", "admin"]
interval: 10s
timeout: 5s
retries: 5
start_period: 20s
networks:
- voxpopuli_net
# PostgreSQL para la API de Métricas
postgres:
image: postgres:16-alpine
container_name: voxpopuli_postgres
restart: unless-stopped
environment:
POSTGRES_USER: ${POSTGRES_USER:-voxpopuli}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-voxpopuli_pass}
POSTGRES_DB: voxpopuli_metrics
ports:
- "${POSTGRES_PORT:-5432}:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-voxpopuli} -d voxpopuli_metrics"]
interval: 10s
timeout: 5s
retries: 5
start_period: 20s
networks:
- voxpopuli_net
rabbitmq:
image: rabbitmq:3.13-management
container_name: voxpopuli_rabbitmq
restart: unless-stopped
environment:
RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER:-voxpopuli}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS:-voxpopuli_pass}
ports:
- "${RABBITMQ_AMQP_PORT:-5672}:5672"
- "${RABBITMQ_MGMT_PORT:-15672}:15672"
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
interval: 10s
timeout: 5s
retries: 5
start_period: 30s
networks:
- voxpopuli_net
# ──────────────────────────────────────────
# APLICACIÓN
# ──────────────────────────────────────────
app:
build:
context: .
dockerfile: Dockerfile
image: voxpopuli-app:latest
container_name: voxpopuli_app
restart: unless-stopped
env_file:
- .env
environment:
# MySQL - Usuarios
MYSQL_URL: mysql+pymysql://${MYSQL_USER:-voxpopuli}:${MYSQL_PASSWORD:-voxpopuli_pass}@mysql/voxpopuli_users
# MongoDB - Reportes
MONGODB_URL: mongodb://${MONGO_USER:-admin}:${MONGO_PASSWORD:-admin_password}@mongodb-reports:27017
MONGODB_DB: voxpopuli_reports
# MongoDB - Notificaciones
MONGODB_NOTIFICATIONS_URL: mongodb://${MONGO_USER:-admin}:${MONGO_PASSWORD:-admin_password}@mongodb-notifications:27017
MONGODB_NOTIFICATIONS_DB: voxpopuli_notifications
# PostgreSQL - Métricas
POSTGRES_URL: postgresql://${POSTGRES_USER:-voxpopuli}:${POSTGRES_PASSWORD:-voxpopuli_pass}@postgres:5432/voxpopuli_metrics
# RabbitMQ
RABBITMQ_URL: amqp://${RABBITMQ_USER:-voxpopuli}:${RABBITMQ_PASS:-voxpopuli_pass}@rabbitmq:5672/
# App
HOST: 0.0.0.0
LOG_LEVEL: ${LOG_LEVEL:-info}
ports:
- "8000:8000" # API Usuarios
- "8001:8001" # API Reportes
- "8002:8002" # API Notificaciones
- "8003:8003" # API Moderación
- "8004:8004" # API Métricas
depends_on:
mysql:
condition: service_healthy
mongodb-reports:
condition: service_healthy
mongodb-notifications:
condition: service_healthy
postgres:
condition: service_healthy
rabbitmq:
condition: service_healthy
networks:
- voxpopuli_net
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/')"]
interval: 30s
timeout: 10s
retries: 3
start_period: 20s
# ──────────────────────────────────────────
# VOLÚMENES
# ──────────────────────────────────────────
volumes:
mysql_data:
mongo_reports_data:
mongo_notifications_data:
postgres_data:
rabbitmq_data:
# ──────────────────────────────────────────
# RED INTERNA
# ──────────────────────────────────────────
networks:
voxpopuli_net:
driver: bridge

View File

@@ -49,7 +49,25 @@ services:
timeout: 5s timeout: 5s
retries: 5 retries: 5
postgres:
image: postgres:15-alpine
container_name: voxpopuli_postgres
environment:
POSTGRES_USER: voxpopuli
POSTGRES_PASSWORD: voxpopuli_pass
POSTGRES_DB: voxpopuli_metrics
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U voxpopuli -d voxpopuli_metrics"]
interval: 10s
timeout: 5s
retries: 5
volumes: volumes:
mysql_data: mysql_data:
mongo_data: mongo_data:
mongo_data_notifications: mongo_data_notifications:
postgres_data:

30
dockerdotenv.env Normal file
View File

@@ -0,0 +1,30 @@
# ============================================================
# VoxPopuli - Variables de entorno para Docker
# Copia este archivo como .env y ajusta los valores
# ============================================================
# ── MySQL (API Usuarios) ─────────────────────────────────────
MYSQL_ROOT_PASSWORD=rootpassword
MYSQL_USER=voxpopuli
MYSQL_PASSWORD=voxpopuli_pass
MYSQL_PORT=3306
# ── MongoDB (Reportes + Notificaciones) ──────────────────────
MONGO_USER=admin
MONGO_PASSWORD=admin_password
MONGO_REPORTS_PORT=27017
MONGO_NOTIFICATIONS_PORT=27018
# ── PostgreSQL (API Métricas) ────────────────────────────────
POSTGRES_USER=voxpopuli
POSTGRES_PASSWORD=voxpopuli_pass
POSTGRES_PORT=5432
# ── RabbitMQ ─────────────────────────────────────────────────
RABBITMQ_USER=voxpopuli
RABBITMQ_PASS=voxpopuli_pass
RABBITMQ_AMQP_PORT=5672
RABBITMQ_MGMT_PORT=15672
# ── App ──────────────────────────────────────────────────────
LOG_LEVEL=info

63
dockerfile Normal file
View File

@@ -0,0 +1,63 @@
# ============================================================
# VoxPopuli - Dockerfile
# Microservicios FastAPI: Usuarios (8000), Reportes (8001),
# Notificaciones (8002), Moderación (8003), Métricas (8004)
# ============================================================
# --- Stage 1: Builder ---
FROM python:3.11-slim AS builder
WORKDIR /build
# Instalar dependencias del sistema necesarias para compilación
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
libffi-dev \
libssl-dev \
default-libmysqlclient-dev \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
# Copiar e instalar dependencias Python en un directorio aislado
COPY requirements.txt .
RUN pip install --upgrade pip && \
pip install --prefix=/install --no-cache-dir -r requirements.txt
# --- Stage 2: Runtime ---
FROM python:3.11-slim
LABEL maintainer="Hokzaap S. de R.L. de C.V."
LABEL description="VoxPopuli - Infraestructura de Voz Ciudadana"
WORKDIR /app
# Instalar únicamente librerías de runtime
RUN apt-get update && apt-get install -y --no-install-recommends \
default-libmysqlclient-dev \
&& rm -rf /var/lib/apt/lists/*
RUN mkdir -p /app/logs/
# Copiar paquetes instalados desde el builder
COPY --from=builder /install /usr/local
# Copiar código fuente
COPY src/ ./src/
# Variables de entorno con valores por defecto (sobreescribibles via .env o compose)
ENV HOST=0.0.0.0 \
LOG_LEVEL=info \
PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONPATH=/app/src
# Puertos expuestos por los cinco microservicios
EXPOSE 8000 8001 8002 8003 8004
# Healthcheck básico contra la API de Usuarios
HEALTHCHECK --interval=30s --timeout=10s --start-period=20s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/')" || exit 1
# Punto de entrada
CMD ["python", "src/main.py"]

View File

@@ -11,3 +11,5 @@ Pillow
PyJWT PyJWT
passlib[argon2] passlib[argon2]
python-multipart python-multipart
psycopg2-binary
cryptography

View File

@@ -0,0 +1,33 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List, Dict
from domain.metrics import Metric, DailyStats, AnalyticsReport
class MetricsRepository(ABC):
"""Puerto para persistencia de métricas"""
@abstractmethod
def save_metric(self, metric: Metric) -> Metric:
"""Guarda una métrica"""
pass
@abstractmethod
def get_metrics_by_date_range(self, start_date: datetime, end_date: datetime) -> List[Metric]:
"""Obtiene métricas en un rango de fechas"""
pass
@abstractmethod
def get_daily_stats(self, date: datetime) -> List[DailyStats]:
"""Obtiene estadísticas diarias"""
pass
@abstractmethod
def get_event_count_by_type(self, start_date: datetime, end_date: datetime) -> Dict[str, int]:
"""Obtiene conteo de eventos por tipo"""
pass
@abstractmethod
def generate_report(self, start_date: datetime, end_date: datetime) -> AnalyticsReport:
"""Genera reporte de analítica"""
pass

View File

@@ -0,0 +1,43 @@
"""Puerto de abstracción para repositorio de Moderaciones"""
from abc import ABC, abstractmethod
from domain.moderations import ModerationAction, ModerationLog
from typing import List, Optional
class ModerationRepository(ABC):
"""Interfaz (puerto) para operaciones de persistencia de moderaciones"""
@abstractmethod
def save_moderation_action(self, action: ModerationAction) -> bool:
"""Guarda una acción de moderación"""
pass
@abstractmethod
def get_moderation_action(self, action_id: str) -> Optional[ModerationAction]:
"""Obtiene una acción de moderación por ID"""
pass
@abstractmethod
def get_actions_by_moderator(self, moderator_id: int) -> List[ModerationAction]:
"""Obtiene todas las acciones de un moderador"""
pass
@abstractmethod
def get_actions_by_target(self, target_id: str, target_type: str) -> List[ModerationAction]:
"""Obtiene todas las acciones sobre un target específico"""
pass
@abstractmethod
def update_action_status(self, action_id: str, status: str) -> bool:
"""Actualiza el estado de una acción"""
pass
@abstractmethod
def save_moderation_log(self, log: ModerationLog) -> bool:
"""Guarda un log de moderación"""
pass
@abstractmethod
def get_moderation_logs(self, action_id: str) -> List[ModerationLog]:
"""Obtiene todos los logs de una acción"""
pass

View File

@@ -0,0 +1,39 @@
from datetime import datetime, timedelta
from typing import List, Dict
from domain.metrics import Metric, DailyStats, AnalyticsReport, EventType
from application.ports.metrics_repository import MetricsRepository
class MetricsService:
"""Servicio de lógica de negocio para métricas"""
def __init__(self, repository: MetricsRepository):
self.repository = repository
def record_event(self, event_type: str, entity_id: str, entity_type: str,
user_id: int = None, metadata: dict = None) -> Metric:
"""Registra un evento de métrica"""
metric = Metric(
metric_id=None,
event_type=event_type,
entity_id=entity_id,
entity_type=entity_type,
timestamp=datetime.now(),
user_id=user_id,
metadata=metadata or {}
)
return self.repository.save_metric(metric)
def get_metrics_report(self, days_back: int = 7) -> AnalyticsReport:
"""Obtiene reporte de últimos N días"""
end_date = datetime.now()
start_date = end_date - timedelta(days=days_back)
return self.repository.generate_report(start_date, end_date)
def get_daily_statistics(self, date: datetime) -> List[DailyStats]:
"""Obtiene estadísticas de un día específico"""
return self.repository.get_daily_stats(date)
def get_event_summary(self, start_date: datetime, end_date: datetime) -> Dict[str, int]:
"""Obtiene resumen de eventos en rango"""
return self.repository.get_event_count_by_type(start_date, end_date)

View File

@@ -0,0 +1,347 @@
"""Servicios de aplicación para Moderación"""
from domain.moderations import ModerationAction, ModerationLog
from application.ports.moderation_repository import ModerationRepository
from infrastructure.adapters.rabbitmq.sender import send_to_queue
from infrastructure.adapters.rabbitmq.messages import ModerationMessage, ModerationEventType
from datetime import datetime
from typing import Dict, Any, Optional
import uuid
import logging
logger = logging.getLogger(__name__)
class DeleteReportUseCase:
"""Use case para eliminar un reporte como moderador"""
def __init__(self, repo: ModerationRepository):
if not isinstance(repo, ModerationRepository):
raise TypeError("repo must implement ModerationRepository")
self.repo = repo
def execute(self, moderator_id: int, report_id: str, reason: str,
description: Optional[str] = None) -> Dict[str, Any]:
"""
Envía un evento a RabbitMQ para eliminar un reporte
Args:
moderator_id: ID del moderador
report_id: ID del reporte a eliminar
reason: Razón de la eliminación
description: Descripción adicional
Returns:
Dictionary con status y detalles
"""
try:
action_id = str(uuid.uuid4())
# Crear acción de moderación
action = ModerationAction(
action_id=action_id,
moderator_id=moderator_id,
action_type="delete_report",
target_type="report",
target_id=report_id,
reason=reason,
description=description,
status="pending",
fecha_creacion=datetime.now()
)
# Guardar acción
self.repo.save_moderation_action(action)
# Enviar evento a RabbitMQ
message = {
"action_id": action_id,
"event_type": "moderation.delete_report",
"moderator_id": moderator_id,
"report_id": report_id,
"reason": reason,
"description": description or "",
"timestamp": datetime.now().isoformat()
}
send_to_queue('moderation_queue', message)
return {
"status": "success",
"message": "Reporte marcado para eliminación",
"action_id": action_id
}
except Exception as e:
logger.error(f"Error deleting report: {e}")
return {
"status": "error",
"message": f"Error al eliminar reporte: {str(e)}"
}
class CloseAccountUseCase:
"""Use case para cerrar una cuenta de usuario"""
def __init__(self, repo: ModerationRepository):
if not isinstance(repo, ModerationRepository):
raise TypeError("repo must implement ModerationRepository")
self.repo = repo
def execute(self, moderator_id: int, user_id: int, reason: str,
description: Optional[str] = None, is_permanent: bool = True) -> Dict[str, Any]:
"""
Envía un evento para cerrar una cuenta de usuario
Args:
moderator_id: ID del moderador
user_id: ID del usuario cuya cuenta cerrar
reason: Razón del cierre
description: Descripción adicional
is_permanent: Si el cierre es permanente o temporal
Returns:
Dictionary con status y detalles
"""
try:
action_id = str(uuid.uuid4())
action = ModerationAction(
action_id=action_id,
moderator_id=moderator_id,
action_type="close_account",
target_type="user",
target_id=str(user_id),
reason=reason,
description=description,
is_permanent=is_permanent,
status="pending",
fecha_creacion=datetime.now()
)
self.repo.save_moderation_action(action)
message = {
"action_id": action_id,
"event_type": "moderation.close_account",
"moderator_id": moderator_id,
"user_id": user_id,
"reason": reason,
"description": description or "",
"is_permanent": is_permanent,
"timestamp": datetime.now().isoformat()
}
send_to_queue('moderation_queue', message)
return {
"status": "success",
"message": "Cuenta marcada para cierre",
"action_id": action_id
}
except Exception as e:
logger.error(f"Error closing account: {e}")
return {
"status": "error",
"message": f"Error al cerrar cuenta: {str(e)}"
}
class BanUserUseCase:
"""Use case para banear un usuario"""
def __init__(self, repo: ModerationRepository):
if not isinstance(repo, ModerationRepository):
raise TypeError("repo must implement ModerationRepository")
self.repo = repo
def execute(self, moderator_id: int, user_id: int, reason: str,
duration_days: Optional[int] = None, description: Optional[str] = None) -> Dict[str, Any]:
"""
Envía un evento para banear un usuario
Args:
moderator_id: ID del moderador
user_id: ID del usuario a banear
reason: Razón del ban
duration_days: Duración en días (None para ban permanente)
description: Descripción adicional
Returns:
Dictionary con status y detalles
"""
try:
action_id = str(uuid.uuid4())
is_permanent = duration_days is None
action = ModerationAction(
action_id=action_id,
moderator_id=moderator_id,
action_type="ban_user",
target_type="user",
target_id=str(user_id),
reason=reason,
description=description,
duration_days=duration_days,
is_permanent=is_permanent,
status="pending",
fecha_creacion=datetime.now()
)
self.repo.save_moderation_action(action)
message = {
"action_id": action_id,
"event_type": "moderation.ban_user",
"moderator_id": moderator_id,
"user_id": user_id,
"reason": reason,
"duration_days": duration_days,
"is_permanent": is_permanent,
"description": description or "",
"timestamp": datetime.now().isoformat()
}
send_to_queue('moderation_queue', message)
return {
"status": "success",
"message": "Usuario marcado para ban",
"action_id": action_id
}
except Exception as e:
logger.error(f"Error banning user: {e}")
return {
"status": "error",
"message": f"Error al banear usuario: {str(e)}"
}
class WarnUserUseCase:
"""Use case para advertir a un usuario"""
def __init__(self, repo: ModerationRepository):
if not isinstance(repo, ModerationRepository):
raise TypeError("repo must implement ModerationRepository")
self.repo = repo
def execute(self, moderator_id: int, user_id: int, reason: str,
description: Optional[str] = None) -> Dict[str, Any]:
"""
Envía un evento para advertir a un usuario
Args:
moderator_id: ID del moderador
user_id: ID del usuario a advertir
reason: Razón de la advertencia
description: Descripción adicional
Returns:
Dictionary con status y detalles
"""
try:
action_id = str(uuid.uuid4())
action = ModerationAction(
action_id=action_id,
moderator_id=moderator_id,
action_type="warn_user",
target_type="user",
target_id=str(user_id),
reason=reason,
description=description,
status="pending",
fecha_creacion=datetime.now()
)
self.repo.save_moderation_action(action)
message = {
"action_id": action_id,
"event_type": "moderation.warn_user",
"moderator_id": moderator_id,
"user_id": user_id,
"reason": reason,
"description": description or "",
"timestamp": datetime.now().isoformat()
}
send_to_queue('moderation_queue', message)
return {
"status": "success",
"message": "Advertencia registrada",
"action_id": action_id
}
except Exception as e:
logger.error(f"Error warning user: {e}")
return {
"status": "error",
"message": f"Error al advertir usuario: {str(e)}"
}
class ReviewContentUseCase:
"""Use case para revisar contenido reportado"""
def __init__(self, repo: ModerationRepository):
if not isinstance(repo, ModerationRepository):
raise TypeError("repo must implement ModerationRepository")
self.repo = repo
def execute(self, moderator_id: int, report_id: str, action: str,
reason: Optional[str] = None, notes: Optional[str] = None) -> Dict[str, Any]:
"""
Envía un evento para revisar y actuar sobre contenido reportado
Args:
moderator_id: ID del moderador
report_id: ID del reporte
action: Acción a tomar (approve, reject, needs_more_info)
reason: Razón de la decisión
notes: Notas adicionales
Returns:
Dictionary con status y detalles
"""
try:
if action not in ["approve", "reject", "needs_more_info"]:
return {
"status": "error",
"message": "Acción inválida"
}
action_id = str(uuid.uuid4())
moderation_action = ModerationAction(
action_id=action_id,
moderator_id=moderator_id,
action_type="review_content",
target_type="report",
target_id=report_id,
reason=reason or action,
description=notes,
status="approved",
fecha_creacion=datetime.now(),
fecha_ejecucion=datetime.now()
)
self.repo.save_moderation_action(moderation_action)
message = {
"action_id": action_id,
"event_type": "moderation.review_content",
"moderator_id": moderator_id,
"report_id": report_id,
"review_action": action,
"reason": reason or "",
"notes": notes or "",
"timestamp": datetime.now().isoformat()
}
send_to_queue('moderation_queue', message)
return {
"status": "success",
"message": f"Revisión registrada: {action}",
"action_id": action_id
}
except Exception as e:
logger.error(f"Error reviewing content: {e}")
return {
"status": "error",
"message": f"Error al revisar contenido: {str(e)}"
}

View File

@@ -0,0 +1,143 @@
"""Metrics RabbitMQ Consumer - Processes system events and saves metrics"""
import sys
import os
import logging
from datetime import datetime
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer
from infrastructure.adapters.rabbitmq.messages import (
UserMessage, ReportMessage, NotificationMessage, ModerationMessage
)
from application.services.metrics_services import MetricsService
from infrastructure.adapters.persistence.metrics_repository_postgres import MetricsRepositoryPostgres
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/metrics_consumer.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Colas dedicadas para métricas — no compiten con los consumers de dominio
METRICS_QUEUES = {
'user': 'metrics_users_queue',
'report': 'metrics_reports_queue',
'notification': 'metrics_notifications_queue',
'moderation': 'metrics_moderations_queue',
}
class MetricsConsumer:
"""Consumer for all system events from RabbitMQ"""
def __init__(self):
self.metrics_service = MetricsService(MetricsRepositoryPostgres())
self.user_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['user'])
self.report_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['report'])
self.notification_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['notification'])
self.moderation_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['moderation'])
self.user_consumer.set_callback(self.process_user_event)
self.report_consumer.set_callback(self.process_report_event)
self.notification_consumer.set_callback(self.process_notification_event)
self.moderation_consumer.set_callback(self.process_moderation_event)
def process_user_event(self, message_dict: dict):
try:
message = UserMessage.from_dict(message_dict)
self.metrics_service.record_event(
event_type=f"user_{message.event_type.value}",
entity_id=str(message.user_id),
entity_type="user",
user_id=message.user_id,
metadata={"email": message.email}
)
logger.info(f"Métrica registrada: user_{message.event_type.value}")
except Exception as e:
logger.error(f"Error procesando evento de usuario: {e}")
def process_report_event(self, message_dict: dict):
try:
message = ReportMessage.from_dict(message_dict)
self.metrics_service.record_event(
event_type=f"report_{message.event_type.value}",
entity_id=str(message.id_reporte),
entity_type="report",
user_id=message.id_usuario,
metadata={"estado": message.estado}
)
logger.info(f"Métrica registrada: report_{message.event_type.value}")
except Exception as e:
logger.error(f"Error procesando evento de reporte: {e}")
def process_notification_event(self, message_dict: dict):
try:
message = NotificationMessage.from_dict(message_dict)
self.metrics_service.record_event(
event_type=f"notification_{message.event_type.value}",
entity_id=str(message.id_notificacion),
entity_type="notification",
user_id=message.id_usuario,
metadata={"type": message.tipo_notificacion}
)
logger.info(f"Métrica registrada: notification_{message.event_type.value}")
except Exception as e:
logger.error(f"Error procesando evento de notificación: {e}")
def process_moderation_event(self, message_dict: dict):
try:
message = ModerationMessage.from_dict(message_dict)
self.metrics_service.record_event(
event_type=f"moderation_{message.event_type.value}",
entity_id=str(message.action_id),
entity_type="moderation",
user_id=message.moderator_id,
metadata={"action": message.review_action, "reason": message.reason}
)
logger.info(f"Métrica registrada: moderation_{message.event_type.value}")
except Exception as e:
logger.error(f"Error procesando evento de moderación: {e}")
def start(self):
logger.info("Iniciando Metrics Consumer...")
try:
import threading
threads = [
threading.Thread(target=self._safe_consume, args=(self.user_consumer,), daemon=True),
threading.Thread(target=self._safe_consume, args=(self.report_consumer,), daemon=True),
threading.Thread(target=self._safe_consume, args=(self.notification_consumer,), daemon=True),
threading.Thread(target=self._safe_consume, args=(self.moderation_consumer,), daemon=True),
]
for t in threads:
t.start()
for t in threads:
t.join()
except KeyboardInterrupt:
logger.info("Metrics Consumer detenido")
self.stop()
def _safe_consume(self, consumer: RabbitMQConsumer):
try:
consumer.start_consuming()
except Exception as e:
logger.error(f"Error en consumer de {consumer.queue_name}: {e}")
def stop(self):
for c in [self.user_consumer, self.report_consumer,
self.notification_consumer, self.moderation_consumer]:
try:
c.stop()
except Exception:
pass
if __name__ == "__main__":
consumer = MetricsConsumer()
consumer.start()

View File

@@ -0,0 +1,174 @@
"""Consumidor de eventos de Moderación desde RabbitMQ"""
import pika
import json
import logging
from datetime import datetime
from threading import Thread
from typing import Dict, Any
logger = logging.getLogger(__name__)
class ModerationConsumer:
"""Consumidor de eventos de moderación desde RabbitMQ"""
def __init__(self, host: str = 'localhost', port: int = 5672):
self.host = host
self.port = port
self.queue_name = 'moderation_queue'
self.running = False
def start(self):
"""Inicia el consumidor en un thread separado"""
thread = Thread(target=self._consume, daemon=True)
thread.start()
logger.info("Moderation Consumer started")
def _consume(self):
"""Consume mensajes de RabbitMQ"""
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host, port=self.port)
)
channel = connection.channel()
# Declarar cola
channel.queue_declare(queue=self.queue_name, durable=True)
# Configurar QoS
channel.basic_qos(prefetch_count=1)
# Configurar callback
channel.basic_consume(
queue=self.queue_name,
on_message_callback=self._process_message
)
logger.info(f"Moderation Consumer listening on {self.queue_name}")
self.running = True
channel.start_consuming()
except Exception as e:
logger.error(f"Error in moderation consumer: {e}")
self.running = False
def _process_message(self, ch, method, properties, body):
"""Procesa un mensaje de moderación"""
try:
message = json.loads(body)
logger.info(f"Processing moderation event: {message.get('event_type')}")
event_type = message.get('event_type')
if event_type == 'moderation.delete_report':
self._handle_delete_report(message)
elif event_type == 'moderation.close_account':
self._handle_close_account(message)
elif event_type == 'moderation.ban_user':
self._handle_ban_user(message)
elif event_type == 'moderation.warn_user':
self._handle_warn_user(message)
elif event_type == 'moderation.review_content':
self._handle_review_content(message)
else:
logger.warning(f"Unknown moderation event type: {event_type}")
# Confirmar mensaje
ch.basic_ack(delivery_tag=method.delivery_tag)
except json.JSONDecodeError:
logger.error("Error decoding JSON message")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
logger.error(f"Error processing moderation message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def _handle_delete_report(self, message: Dict[str, Any]):
"""Maneja evento de eliminación de reporte"""
try:
action_id = message.get('action_id')
report_id = message.get('report_id')
reason = message.get('reason')
logger.info(f"Deleting report {report_id} (Action: {action_id})")
# Aquí implementarías la lógica para eliminar el reporte en la BD
# Por ahora solo registramos el evento
logger.info(f"Report {report_id} marked as deleted by moderator {message.get('moderator_id')}")
except Exception as e:
logger.error(f"Error handling delete report: {e}")
raise
def _handle_close_account(self, message: Dict[str, Any]):
"""Maneja evento de cierre de cuenta"""
try:
action_id = message.get('action_id')
user_id = message.get('user_id')
is_permanent = message.get('is_permanent', True)
logger.info(f"Closing account for user {user_id} (Action: {action_id}, Permanent: {is_permanent})")
# Aquí implementarías la lógica para cerrar la cuenta en la BD
logger.info(f"Account for user {user_id} marked as closed")
except Exception as e:
logger.error(f"Error handling close account: {e}")
raise
def _handle_ban_user(self, message: Dict[str, Any]):
"""Maneja evento de ban de usuario"""
try:
action_id = message.get('action_id')
user_id = message.get('user_id')
duration_days = message.get('duration_days')
is_permanent = message.get('is_permanent', False)
reason = message.get('reason')
logger.info(f"Banning user {user_id} (Action: {action_id}, Duration: {duration_days} days, Permanent: {is_permanent})")
# Aquí implementarías la lógica para banear al usuario
if is_permanent:
logger.info(f"User {user_id} permanently banned")
else:
logger.info(f"User {user_id} banned for {duration_days} days")
except Exception as e:
logger.error(f"Error handling ban user: {e}")
raise
def _handle_warn_user(self, message: Dict[str, Any]):
"""Maneja evento de advertencia a usuario"""
try:
action_id = message.get('action_id')
user_id = message.get('user_id')
reason = message.get('reason')
logger.info(f"Warning user {user_id} (Action: {action_id}, Reason: {reason})")
# Aquí implementarías la lógica para registrar la advertencia
logger.info(f"Warning recorded for user {user_id}")
except Exception as e:
logger.error(f"Error handling warn user: {e}")
raise
def _handle_review_content(self, message: Dict[str, Any]):
"""Maneja evento de revisión de contenido"""
try:
action_id = message.get('action_id')
report_id = message.get('report_id')
review_action = message.get('review_action') # approve, reject, needs_more_info
logger.info(f"Reviewing report {report_id} (Action: {action_id}, Decision: {review_action})")
# Aquí implementarías la lógica para procesar la revisión
logger.info(f"Report {report_id} review: {review_action}")
except Exception as e:
logger.error(f"Error handling review content: {e}")
raise
# Crear instancia global
moderation_consumer = ModerationConsumer()

View File

@@ -129,7 +129,7 @@ class NotificationConsumer:
"""Start consuming messages from RabbitMQ""" """Start consuming messages from RabbitMQ"""
try: try:
logger.info("Starting Notification Consumer...") logger.info("Starting Notification Consumer...")
self.consumer.start() self.consumer.start_consuming()
except Exception as e: except Exception as e:
logger.error(f"Error starting notification consumer: {e}", exc_info=True) logger.error(f"Error starting notification consumer: {e}", exc_info=True)
raise raise

View File

@@ -35,103 +35,78 @@ class UserConsumer:
Args: Args:
message_dict: Dictionary containing the message data message_dict: Dictionary containing the message data
""" """
try: # Reconstruct the UserMessage object — let exceptions propagate so the
# Reconstruct the UserMessage object # consumer's callback_wrapper can decide whether to ack, nack+discard, or requeue.
message = UserMessage.from_dict(message_dict) message = UserMessage.from_dict(message_dict)
if message.event_type == UserEventType.CREATE: if message.event_type == UserEventType.CREATE:
self._handle_create_user(message) self._handle_create_user(message)
elif message.event_type == UserEventType.UPDATE: elif message.event_type == UserEventType.UPDATE:
self._handle_update_user(message) self._handle_update_user(message)
elif message.event_type == UserEventType.DELETE: elif message.event_type == UserEventType.DELETE:
self._handle_delete_user(message) self._handle_delete_user(message)
else: else:
logger.warning(f"Unknown event type: {message.event_type}") logger.warning(f"Unknown event type: {message.event_type}")
except Exception as e:
logger.error(f"Error processing user message: {e}", exc_info=True)
# Rollback en caso de error en el procesamiento del mensaje
self.repo.db.rollback()
raise
def _handle_create_user(self, message: UserMessage): def _handle_create_user(self, message: UserMessage):
"""Handle user create event""" """Handle user create event"""
try: logger.info(f"Creating user: {message.email}")
logger.info(f"Creating user: {message.email}")
# Parse datetime strings if not message.fecha_nacimiento:
fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento) raise ValueError(f"Missing fecha_nacimiento in CREATE message for email {message.email}")
fecha_creacion = datetime.fromisoformat(message.fecha_creacion) if not message.fecha_creacion:
raise ValueError(f"Missing fecha_creacion in CREATE message for email {message.email}")
# Create User domain object fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento)
user = User( fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
user_id=0, # Will be auto-generated by DB
nombre=message.nombre,
apellido=message.apellido,
email=message.email,
contraseña_hash=message.contraseña_hash,
fecha_nacimiento=fecha_nacimiento,
fecha_creacion=fecha_creacion,
calificacion=message.calificacion,
numero_reportes=message.numero_reportes,
url_foto_perfil=message.url_foto_perfil,
biografia=message.biografia
)
# Save to repository with transaction handling user = User(
saved_user = self.repo.save(user) user_id=0, # Will be auto-generated by DB
logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}") nombre=message.nombre,
apellido=message.apellido,
email=message.email,
contraseña_hash=message.contraseña_hash,
fecha_nacimiento=fecha_nacimiento,
fecha_creacion=fecha_creacion,
calificacion=message.calificacion,
numero_reportes=message.numero_reportes,
url_foto_perfil=message.url_foto_perfil,
biografia=message.biografia
)
except Exception as e: saved_user = self.repo.save(user)
logger.error(f"Error creating user: {e}", exc_info=True) logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}")
self.repo.db.rollback()
raise
def _handle_update_user(self, message: UserMessage): def _handle_update_user(self, message: UserMessage):
"""Handle user update event""" """Handle user update event"""
try: logger.info(f"Updating user: {message.user_id}")
logger.info(f"Updating user: {message.user_id}")
# Find the user user = self.repo.find_by_id(message.user_id)
user = self.repo.find_by_id(message.user_id) if not user:
if not user: logger.warning(f"User not found: {message.user_id}")
logger.warning(f"User not found: {message.user_id}") return
return
# Update fields if provided if message.nombre:
if message.nombre: user.nombre = message.nombre
user.nombre = message.nombre if message.apellido:
if message.apellido: user.apellido = message.apellido
user.apellido = message.apellido if message.url_foto_perfil is not None:
if message.url_foto_perfil is not None: user.url_foto_perfil = message.url_foto_perfil
user.url_foto_perfil = message.url_foto_perfil if message.biografia is not None:
if message.biografia is not None: user.biografia = message.biografia
user.biografia = message.biografia
# Save to repository with transaction handling self.repo.update(user)
updated_user = self.repo.update(user) logger.info(f"User updated successfully: {message.user_id}")
logger.info(f"User updated successfully: {message.user_id}")
except Exception as e:
logger.error(f"Error updating user: {e}", exc_info=True)
self.repo.db.rollback()
raise
def _handle_delete_user(self, message: UserMessage): def _handle_delete_user(self, message: UserMessage):
"""Handle user delete event""" """Handle user delete event"""
try: logger.info(f"Deleting user: {message.user_id}")
logger.info(f"Deleting user: {message.user_id}")
success = self.repo.delete(message.user_id) success = self.repo.delete(message.user_id)
if success: if success:
logger.info(f"User deleted successfully: {message.user_id}") logger.info(f"User deleted successfully: {message.user_id}")
else: else:
logger.warning(f"Failed to delete user: {message.user_id}") logger.warning(f"Failed to delete user: {message.user_id}")
except Exception as e:
logger.error(f"Error deleting user: {e}", exc_info=True)
self.repo.db.rollback()
raise
def start(self): def start(self):
"""Start consuming messages""" """Start consuming messages"""
@@ -145,7 +120,6 @@ class UserConsumer:
logger.error(f"Consumer error: {e}", exc_info=True) logger.error(f"Consumer error: {e}", exc_info=True)
raise raise
finally: finally:
# Asegurar cierre de sesión
if self.repo.db: if self.repo.db:
try: try:
self.repo.db.close() self.repo.db.close()

View File

@@ -36,6 +36,11 @@ class Settings(BaseSettings):
) )
postgres_url: str = Field(
default=os.getenv("POSTGRES_URL", "postgresql://voxpopuli:voxpopuli_pass@localhost:5432/voxpopuli_metrics"),
description="Base de datos PostgreSQL para métricas"
)
# JWT Configuration # JWT Configuration
jwt_secret_key: str = Field( jwt_secret_key: str = Field(
default=os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production"), default=os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production"),
@@ -81,6 +86,12 @@ class Settings(BaseSettings):
description="Calidad de compresión WebP (0-100)" description="Calidad de compresión WebP (0-100)"
) )
# CORS Configuration
cors_origins: list = Field(
default=["http://localhost:3000", "http://localhost:8000", "http://localhost:8001", "http://localhost:8002", "http://localhost:8003", "http://localhost:8004"],
description="Orígenes permitidos para CORS"
)
class Config: class Config:
env_file = ".env" env_file = ".env"
case_sensitive = False case_sensitive = False

46
src/domain/metrics.py Normal file
View File

@@ -0,0 +1,46 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum
class EventType(str, Enum):
"""Tipos de eventos que se rastrean"""
USER_CREATED = "user_created"
USER_UPDATED = "user_updated"
USER_DELETED = "user_deleted"
REPORT_CREATED = "report_created"
REPORT_RESOLVED = "report_resolved"
NOTIFICATION_SENT = "notification_sent"
MODERATION_COMPLETED = "moderation_completed"
@dataclass
class Metric:
"""Modelo de dominio para Métrica"""
metric_id: int
event_type: str
entity_id: str
entity_type: str
timestamp: datetime
metadata: dict = field(default_factory=dict)
user_id: Optional[int] = None
@dataclass
class DailyStats:
"""Estadísticas diarias"""
date: datetime
event_type: str
count: int
@dataclass
class AnalyticsReport:
"""Reporte de analítica"""
report_id: int
start_date: datetime
end_date: datetime
total_events: int
events_by_type: dict
creation_date: datetime = field(default_factory=datetime.now)

35
src/domain/moderations.py Normal file
View File

@@ -0,0 +1,35 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Literal
@dataclass
class ModerationAction:
"""Modelo de dominio para Acciones de Moderación"""
action_id: str
moderator_id: int
action_type: Literal["delete_report", "close_account", "ban_user", "warn_user", "review_content"]
target_type: Literal["report", "user"]
target_id: str # ID del reporte o usuario afectado
reason: str
description: Optional[str] = None
duration_days: Optional[int] = None # Para bans temporales
is_permanent: bool = False
status: Literal["pending", "approved", "rejected", "executed"] = "pending"
fecha_creacion: Optional[datetime] = None
fecha_ejecucion: Optional[datetime] = None
observaciones: Optional[str] = None
@dataclass
class ModerationLog:
"""Log de todas las acciones de moderación realizadas"""
log_id: str
action_id: str
moderator_id: int
action_type: str
target_type: str
target_id: str
resultado: Literal["success", "failed", "pending"]
mensaje: Optional[str] = None
fecha_log: Optional[datetime] = None
ip_moderator: Optional[str] = None

View File

@@ -16,3 +16,4 @@ class User:
numero_reportes: int = 0 numero_reportes: int = 0
url_foto_perfil: Optional[str] = None url_foto_perfil: Optional[str] = None
biografia: Optional[str] = None biografia: Optional[str] = None
is_admin: bool = False # Indica si el usuario tiene permisos de administrador

View File

@@ -0,0 +1,176 @@
"""Adaptador de repositorio de Moderación con MongoDB"""
from application.ports.moderation_repository import ModerationRepository
from domain.moderations import ModerationAction, ModerationLog
from typing import List, Optional
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class ModerationRepositoryMongo(ModerationRepository):
"""Implementación de ModerationRepository usando MongoDB"""
def __init__(self):
self.db = None
self.moderation_actions_collection = None
self.moderation_logs_collection = None
self._initialize_db()
def _initialize_db(self):
"""Inicializa conexión a MongoDB"""
try:
from infrastructure.adapters.persistence.mongodb import mongodb
self.db = mongodb
# Obtener o crear colecciones
self.moderation_actions_collection = self.db['moderation_actions']
self.moderation_logs_collection = self.db['moderation_logs']
# Crear índices
self.moderation_actions_collection.create_index('action_id')
self.moderation_actions_collection.create_index('moderator_id')
self.moderation_actions_collection.create_index('target_id')
self.moderation_logs_collection.create_index('action_id')
logger.info("Moderation MongoDB adapter initialized")
except Exception as e:
logger.error(f"Error initializing MongoDB for moderations: {e}")
raise
def save_moderation_action(self, action: ModerationAction) -> bool:
"""Guarda una acción de moderación en MongoDB"""
try:
action_dict = {
'action_id': action.action_id,
'moderator_id': action.moderator_id,
'action_type': action.action_type,
'target_type': action.target_type,
'target_id': action.target_id,
'reason': action.reason,
'description': action.description,
'duration_days': action.duration_days,
'is_permanent': action.is_permanent,
'status': action.status,
'fecha_creacion': action.fecha_creacion or datetime.now(),
'fecha_ejecucion': action.fecha_ejecucion,
'observaciones': action.observaciones
}
self.moderation_actions_collection.insert_one(action_dict)
logger.info(f"Moderation action saved: {action.action_id}")
return True
except Exception as e:
logger.error(f"Error saving moderation action: {e}")
return False
def get_moderation_action(self, action_id: str) -> Optional[ModerationAction]:
"""Obtiene una acción de moderación por ID"""
try:
doc = self.moderation_actions_collection.find_one({'action_id': action_id})
if doc:
return self._doc_to_moderation_action(doc)
return None
except Exception as e:
logger.error(f"Error getting moderation action: {e}")
return None
def get_actions_by_moderator(self, moderator_id: int) -> List[ModerationAction]:
"""Obtiene todas las acciones de un moderador"""
try:
docs = self.moderation_actions_collection.find({'moderator_id': moderator_id})
return [self._doc_to_moderation_action(doc) for doc in docs]
except Exception as e:
logger.error(f"Error getting actions by moderator: {e}")
return []
def get_actions_by_target(self, target_id: str, target_type: str) -> List[ModerationAction]:
"""Obtiene todas las acciones sobre un target específico"""
try:
docs = self.moderation_actions_collection.find({
'target_id': target_id,
'target_type': target_type
})
return [self._doc_to_moderation_action(doc) for doc in docs]
except Exception as e:
logger.error(f"Error getting actions by target: {e}")
return []
def update_action_status(self, action_id: str, status: str) -> bool:
"""Actualiza el estado de una acción"""
try:
result = self.moderation_actions_collection.update_one(
{'action_id': action_id},
{'$set': {'status': status}}
)
return result.modified_count > 0
except Exception as e:
logger.error(f"Error updating action status: {e}")
return False
def save_moderation_log(self, log: ModerationLog) -> bool:
"""Guarda un log de moderación"""
try:
log_dict = {
'log_id': log.log_id,
'action_id': log.action_id,
'moderator_id': log.moderator_id,
'action_type': log.action_type,
'target_type': log.target_type,
'target_id': log.target_id,
'resultado': log.resultado,
'mensaje': log.mensaje,
'fecha_log': log.fecha_log or datetime.now(),
'ip_moderator': log.ip_moderator
}
self.moderation_logs_collection.insert_one(log_dict)
logger.info(f"Moderation log saved: {log.log_id}")
return True
except Exception as e:
logger.error(f"Error saving moderation log: {e}")
return False
def get_moderation_logs(self, action_id: str) -> List[ModerationLog]:
"""Obtiene todos los logs de una acción"""
try:
docs = self.moderation_logs_collection.find({'action_id': action_id})
return [self._doc_to_moderation_log(doc) for doc in docs]
except Exception as e:
logger.error(f"Error getting moderation logs: {e}")
return []
@staticmethod
def _doc_to_moderation_action(doc: dict) -> ModerationAction:
"""Convierte un documento de MongoDB a ModerationAction"""
return ModerationAction(
action_id=doc.get('action_id'),
moderator_id=doc.get('moderator_id'),
action_type=doc.get('action_type'),
target_type=doc.get('target_type'),
target_id=doc.get('target_id'),
reason=doc.get('reason'),
description=doc.get('description'),
duration_days=doc.get('duration_days'),
is_permanent=doc.get('is_permanent', False),
status=doc.get('status', 'pending'),
fecha_creacion=doc.get('fecha_creacion'),
fecha_ejecucion=doc.get('fecha_ejecucion'),
observaciones=doc.get('observaciones')
)
@staticmethod
def _doc_to_moderation_log(doc: dict) -> ModerationLog:
"""Convierte un documento de MongoDB a ModerationLog"""
return ModerationLog(
log_id=doc.get('log_id'),
action_id=doc.get('action_id'),
moderator_id=doc.get('moderator_id'),
action_type=doc.get('action_type'),
target_type=doc.get('target_type'),
target_id=doc.get('target_id'),
resultado=doc.get('resultado'),
mensaje=doc.get('mensaje'),
fecha_log=doc.get('fecha_log'),
ip_moderator=doc.get('ip_moderator')
)

View File

@@ -0,0 +1,137 @@
from datetime import datetime
from typing import List, Dict, Optional
from sqlalchemy import Column, Integer, String, DateTime, JSON, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.sql import func
from domain.metrics import Metric, DailyStats, AnalyticsReport, EventType
from application.ports.metrics_repository import MetricsRepository
from core.config import ConfSettings
Base = declarative_base()
#settings = ConfSettings()
class MetricModel(Base):
"""Modelo SQLAlchemy para métricas en Postgres"""
__tablename__ = "metrics"
id = Column(Integer, primary_key=True, autoincrement=True)
event_type = Column(String, nullable=False, index=True)
entity_id = Column(String, nullable=False)
entity_type = Column(String, nullable=False)
timestamp = Column(DateTime, nullable=False, default=datetime.now, index=True)
event_metadata = Column(JSON, default={})
user_id = Column(Integer, nullable=True)
class MetricsRepositoryPostgres(MetricsRepository):
"""Implementación de repositorio de métricas con PostgreSQL"""
def __init__(self):
db_url = f"postgresql://voxpopuli:voxpopuli_pass@postgres:5432/voxpopuli_metrics"
self.engine = create_engine(db_url, echo=False)
Base.metadata.create_all(self.engine)
self.SessionLocal = sessionmaker(bind=self.engine)
def save_metric(self, metric: Metric) -> Metric:
"""Guarda una métrica en la base de datos"""
session = self.SessionLocal()
try:
db_metric = MetricModel(
event_type=metric.event_type,
entity_id=metric.entity_id,
entity_type=metric.entity_type,
timestamp=metric.timestamp,
event_metadata=metric.metadata,
user_id=metric.user_id
)
session.add(db_metric)
session.commit()
metric.metric_id = db_metric.id
return metric
finally:
session.close()
def get_metrics_by_date_range(self, start_date: datetime, end_date: datetime) -> List[Metric]:
"""Obtiene métricas en un rango de fechas"""
session = self.SessionLocal()
try:
db_metrics = session.query(MetricModel).filter(
MetricModel.timestamp >= start_date,
MetricModel.timestamp <= end_date
).all()
return [
Metric(
metric_id=m.id,
event_type=m.event_type,
entity_id=m.entity_id,
entity_type=m.entity_type,
timestamp=m.timestamp,
metadata=m.event_metadata or {},
user_id=m.user_id
)
for m in db_metrics
]
finally:
session.close()
def get_daily_stats(self, date: datetime) -> List[DailyStats]:
"""Obtiene estadísticas diarias"""
session = self.SessionLocal()
try:
start = datetime(date.year, date.month, date.day)
end = datetime(date.year, date.month, date.day, 23, 59, 59)
results = session.query(
MetricModel.event_type,
func.count(MetricModel.id).label('count')
).filter(
MetricModel.timestamp >= start,
MetricModel.timestamp <= end
).group_by(MetricModel.event_type).all()
return [
DailyStats(date=date, event_type=r[0], count=r[1])
for r in results
]
finally:
session.close()
def get_event_count_by_type(self, start_date: datetime, end_date: datetime) -> Dict[str, int]:
"""Obtiene conteo de eventos por tipo"""
session = self.SessionLocal()
try:
results = session.query(
MetricModel.event_type,
func.count(MetricModel.id).label('count')
).filter(
MetricModel.timestamp >= start_date,
MetricModel.timestamp <= end_date
).group_by(MetricModel.event_type).all()
return {r[0]: r[1] for r in results}
finally:
session.close()
def generate_report(self, start_date: datetime, end_date: datetime) -> AnalyticsReport:
"""Genera reporte de analítica"""
session = self.SessionLocal()
try:
total_events = session.query(func.count(MetricModel.id)).filter(
MetricModel.timestamp >= start_date,
MetricModel.timestamp <= end_date
).scalar() or 0
events_by_type = self.get_event_count_by_type(start_date, end_date)
return AnalyticsReport(
report_id=None,
start_date=start_date,
end_date=end_date,
total_events=total_events,
events_by_type=events_by_type
)
finally:
session.close()

View File

@@ -1,4 +1,4 @@
from sqlalchemy import Column, Integer, String, Float, DateTime from sqlalchemy import Column, Integer, String, Float, DateTime, Boolean
from infrastructure.adapters.persistence.db import Base from infrastructure.adapters.persistence.db import Base
from datetime import datetime from datetime import datetime
@@ -17,3 +17,4 @@ class UserModel(Base):
numero_reportes = Column(Integer, default=0, nullable=False) numero_reportes = Column(Integer, default=0, nullable=False)
url_foto_perfil = Column(String(500), nullable=True) url_foto_perfil = Column(String(500), nullable=True)
biografia = Column(String(1000), nullable=True) biografia = Column(String(1000), nullable=True)
is_admin = Column(Boolean, default=False, nullable=False, index=True) # Permisos de administrador

View File

@@ -2,6 +2,7 @@ from application.ports.user_repository import UserRepository
from domain.users import User from domain.users import User
from infrastructure.adapters.persistence.models import UserModel from infrastructure.adapters.persistence.models import UserModel
from infrastructure.adapters.persistence.db import SessionLocal from infrastructure.adapters.persistence.db import SessionLocal
from sqlalchemy.orm import Session
from typing import List, Optional from typing import List, Optional
import logging import logging
@@ -10,11 +11,24 @@ logger = logging.getLogger(__name__)
class UserRepositorySQL(UserRepository): class UserRepositorySQL(UserRepository):
"""Implementación del repositorio de Usuarios usando SQLAlchemy (MySQL)""" """Implementación del repositorio de Usuarios usando SQLAlchemy (MySQL)"""
def __init__(self, db_session=None): def __init__(self, db_session: Session = None):
self.db = db_session or SessionLocal() # FIXED: la sesión se guarda solo si se inyecta explícitamente.
# Cuando db_session es None, cada método abre y cierra su propia
# sesión, eliminando la caché de primer nivel entre requests.
self._injected_session = db_session
def _get_session(self):
"""
Devuelve la sesión inyectada si existe, o crea una nueva.
El caller es responsable de cerrarla cuando no fue inyectada.
"""
if self._injected_session is not None:
return self._injected_session, False # (sesión, es_propia)
return SessionLocal(), True
def save(self, user: User) -> User: def save(self, user: User) -> User:
"""Guarda un nuevo usuario con manejo de transacciones""" """Guarda un nuevo usuario con manejo de transacciones"""
db, is_own = self._get_session()
try: try:
db_user = UserModel( db_user = UserModel(
nombre=user.nombre, nombre=user.nombre,
@@ -26,62 +40,87 @@ class UserRepositorySQL(UserRepository):
calificacion=user.calificacion, calificacion=user.calificacion,
numero_reportes=user.numero_reportes, numero_reportes=user.numero_reportes,
url_foto_perfil=user.url_foto_perfil, url_foto_perfil=user.url_foto_perfil,
biografia=user.biografia biografia=user.biografia,
is_admin=user.is_admin
) )
self.db.add(db_user) db.add(db_user)
self.db.commit() db.commit()
self.db.refresh(db_user) db.refresh(db_user)
logger.info(f"Usuario guardado exitosamente: {db_user.user_id}") logger.info(f"Usuario guardado exitosamente: {db_user.user_id}")
return self._to_domain(db_user) return self._to_domain(db_user)
except Exception as e: except Exception as e:
self.db.rollback() db.rollback()
logger.error(f"Error al guardar usuario: {e}", exc_info=True) logger.error(f"Error al guardar usuario: {e}", exc_info=True)
raise raise
finally:
if is_own:
db.close()
def find_by_id(self, user_id: int) -> Optional[User]: def find_by_id(self, user_id: int) -> Optional[User]:
"""Obtiene un usuario por ID""" """Obtiene un usuario por ID"""
db, is_own = self._get_session()
try: try:
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first() db_user = db.query(UserModel).filter(UserModel.user_id == user_id).first()
if db_user: if db_user:
return self._to_domain(db_user) return self._to_domain(db_user)
return None return None
except Exception as e: except Exception as e:
logger.error(f"Error al buscar usuario por ID {user_id}: {e}", exc_info=True) logger.error(f"Error al buscar usuario por ID {user_id}: {e}", exc_info=True)
raise raise
finally:
if is_own:
db.close()
def find_by_email(self, email: str) -> Optional[User]: def find_by_email(self, email: str) -> Optional[User]:
"""Obtiene un usuario por email""" """Obtiene un usuario por email"""
db, is_own = self._get_session()
try: try:
db_user = self.db.query(UserModel).filter(UserModel.email == email).first() db_user = db.query(UserModel).filter(UserModel.email == email).first()
if db_user: if db_user:
return self._to_domain(db_user) return self._to_domain(db_user)
return None return None
except Exception as e: except Exception as e:
logger.error(f"Error al buscar usuario por email {email}: {e}", exc_info=True) logger.error(f"Error al buscar usuario por email {email}: {e}", exc_info=True)
raise raise
finally:
if is_own:
db.close()
def find_by_email_with_password(self, email: str) -> Optional[UserModel]: def find_by_email_with_password(self, email: str) -> Optional[UserModel]:
"""Obtiene un usuario por email incluyendo el hash de contraseña (para autenticación)""" """Obtiene un usuario por email incluyendo el hash de contraseña (para autenticación)"""
db, is_own = self._get_session()
try: try:
db_user = self.db.query(UserModel).filter(UserModel.email == email).first() db_user = db.query(UserModel).filter(UserModel.email == email).first()
# FIXED: si la sesión es propia, expunge para poder usar el objeto
# fuera de la sesión sin que SQLAlchemy intente lazy-load nada.
if db_user and is_own:
db.expunge(db_user)
return db_user return db_user
except Exception as e: except Exception as e:
logger.error(f"Error al buscar usuario por email {email}: {e}", exc_info=True) logger.error(f"Error al buscar usuario por email {email}: {e}", exc_info=True)
raise raise
finally:
if is_own:
db.close()
def find_all(self) -> List[User]: def find_all(self) -> List[User]:
"""Obtiene todos los usuarios""" """Obtiene todos los usuarios"""
db, is_own = self._get_session()
try: try:
db_users = self.db.query(UserModel).all() db_users = db.query(UserModel).all()
return [self._to_domain(user) for user in db_users] return [self._to_domain(user) for user in db_users]
except Exception as e: except Exception as e:
logger.error(f"Error al obtener todos los usuarios: {e}", exc_info=True) logger.error(f"Error al obtener todos los usuarios: {e}", exc_info=True)
raise raise
finally:
if is_own:
db.close()
def update(self, user: User) -> User: def update(self, user: User) -> User:
"""Actualiza un usuario con manejo de transacciones""" """Actualiza un usuario con manejo de transacciones"""
db, is_own = self._get_session()
try: try:
db_user = self.db.query(UserModel).filter(UserModel.user_id == user.user_id).first() db_user = db.query(UserModel).filter(UserModel.user_id == user.user_id).first()
if not db_user: if not db_user:
logger.warning(f"Usuario no encontrado para actualizar: {user.user_id}") logger.warning(f"Usuario no encontrado para actualizar: {user.user_id}")
return user return user
@@ -92,61 +131,75 @@ class UserRepositorySQL(UserRepository):
db_user.numero_reportes = user.numero_reportes db_user.numero_reportes = user.numero_reportes
db_user.url_foto_perfil = user.url_foto_perfil db_user.url_foto_perfil = user.url_foto_perfil
db_user.biografia = user.biografia db_user.biografia = user.biografia
self.db.commit() db.commit()
self.db.refresh(db_user) db.refresh(db_user)
logger.info(f"Usuario actualizado exitosamente: {user.user_id}") logger.info(f"Usuario actualizado exitosamente: {user.user_id}")
return self._to_domain(db_user) return self._to_domain(db_user)
except Exception as e: except Exception as e:
self.db.rollback() db.rollback()
logger.error(f"Error al actualizar usuario {user.user_id}: {e}", exc_info=True) logger.error(f"Error al actualizar usuario {user.user_id}: {e}", exc_info=True)
raise raise
finally:
if is_own:
db.close()
def delete(self, user_id: int) -> bool: def delete(self, user_id: int) -> bool:
"""Elimina un usuario con manejo de transacciones""" """Elimina un usuario con manejo de transacciones"""
db, is_own = self._get_session()
try: try:
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first() db_user = db.query(UserModel).filter(UserModel.user_id == user_id).first()
if db_user: if db_user:
self.db.delete(db_user) db.delete(db_user)
self.db.commit() db.commit()
logger.info(f"Usuario eliminado exitosamente: {user_id}") logger.info(f"Usuario eliminado exitosamente: {user_id}")
return True return True
logger.warning(f"Usuario no encontrado para eliminar: {user_id}") logger.warning(f"Usuario no encontrado para eliminar: {user_id}")
return False return False
except Exception as e: except Exception as e:
self.db.rollback() db.rollback()
logger.error(f"Error al eliminar usuario {user_id}: {e}", exc_info=True) logger.error(f"Error al eliminar usuario {user_id}: {e}", exc_info=True)
raise raise
finally:
if is_own:
db.close()
def increment_reports(self, user_id: int) -> None: def increment_reports(self, user_id: int) -> None:
"""Incrementa el contador de reportes con manejo de transacciones""" """Incrementa el contador de reportes con manejo de transacciones"""
db, is_own = self._get_session()
try: try:
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first() db_user = db.query(UserModel).filter(UserModel.user_id == user_id).first()
if db_user: if db_user:
db_user.numero_reportes += 1 db_user.numero_reportes += 1
self.db.commit() db.commit()
logger.info(f"Contador de reportes incrementado para usuario: {user_id}") logger.info(f"Contador de reportes incrementado para usuario: {user_id}")
else: else:
logger.warning(f"Usuario no encontrado para incrementar reportes: {user_id}") logger.warning(f"Usuario no encontrado para incrementar reportes: {user_id}")
except Exception as e: except Exception as e:
self.db.rollback() db.rollback()
logger.error(f"Error al incrementar reportes del usuario {user_id}: {e}", exc_info=True) logger.error(f"Error al incrementar reportes del usuario {user_id}: {e}", exc_info=True)
raise raise
finally:
if is_own:
db.close()
def update_rating(self, user_id: int, new_rating: float) -> None: def update_rating(self, user_id: int, new_rating: float) -> None:
"""Actualiza la calificación de un usuario con manejo de transacciones""" """Actualiza la calificación de un usuario con manejo de transacciones"""
db, is_own = self._get_session()
try: try:
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first() db_user = db.query(UserModel).filter(UserModel.user_id == user_id).first()
if db_user: if db_user:
# Asegurar que la calificación esté en el rango 0-100
db_user.calificacion = max(0, min(100, new_rating)) db_user.calificacion = max(0, min(100, new_rating))
self.db.commit() db.commit()
logger.info(f"Calificación actualizada para usuario {user_id}: {db_user.calificacion}") logger.info(f"Calificación actualizada para usuario {user_id}: {db_user.calificacion}")
else: else:
logger.warning(f"Usuario no encontrado para actualizar calificación: {user_id}") logger.warning(f"Usuario no encontrado para actualizar calificación: {user_id}")
except Exception as e: except Exception as e:
self.db.rollback() db.rollback()
logger.error(f"Error al actualizar calificación del usuario {user_id}: {e}", exc_info=True) logger.error(f"Error al actualizar calificación del usuario {user_id}: {e}", exc_info=True)
raise raise
finally:
if is_own:
db.close()
def _to_domain(self, db_user: UserModel) -> User: def _to_domain(self, db_user: UserModel) -> User:
"""Convierte un modelo SQLAlchemy a un objeto de dominio""" """Convierte un modelo SQLAlchemy a un objeto de dominio"""
@@ -160,5 +213,6 @@ class UserRepositorySQL(UserRepository):
calificacion=db_user.calificacion, calificacion=db_user.calificacion,
numero_reportes=db_user.numero_reportes, numero_reportes=db_user.numero_reportes,
url_foto_perfil=db_user.url_foto_perfil, url_foto_perfil=db_user.url_foto_perfil,
biografia=db_user.biografia biografia=db_user.biografia,
is_admin=db_user.is_admin
) )

View File

@@ -23,6 +23,7 @@ class RabbitMQConsumer:
) )
channel = connection.channel() channel = connection.channel()
channel.queue_declare(queue=self.queue_name, durable=True) channel.queue_declare(queue=self.queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
def callback_wrapper(ch, method, properties, body): def callback_wrapper(ch, method, properties, body):
try: try:
@@ -34,13 +35,13 @@ class RabbitMQConsumer:
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
except IntegrityError as e: except (IntegrityError, ValueError, TypeError, KeyError) as e:
# Error de negocio: no tiene sentido reintentar # Errores de negocio/datos: no tiene sentido reintentar
logger.warning(f"Business error, discarding message: {e}") logger.warning(f"Business/data error, discarding message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e: except Exception as e:
# Error transitorio (red, DB caída): puede resolverse solo # Error transitorio (red, DB caída): puede resolverse solo
logger.error(f"Transient error processing message: {e}") logger.error(f"Transient error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

View File

@@ -27,6 +27,15 @@ class NotificationEventType(str, Enum):
REPORT_STATUS_CHANGE = "notification.report_status_change" REPORT_STATUS_CHANGE = "notification.report_status_change"
class ModerationEventType(str, Enum):
"""Types of moderation events"""
DELETE_REPORT = "moderation.delete_report"
CLOSE_ACCOUNT = "moderation.close_account"
BAN_USER = "moderation.ban_user"
WARN_USER = "moderation.warn_user"
REVIEW_CONTENT = "moderation.review_content"
@dataclass @dataclass
class UserMessage: class UserMessage:
"""Message for user events""" """Message for user events"""
@@ -122,3 +131,36 @@ class NotificationMessage:
"""Create from dictionary""" """Create from dictionary"""
data['event_type'] = NotificationEventType(data['event_type']) data['event_type'] = NotificationEventType(data['event_type'])
return NotificationMessage(**data) return NotificationMessage(**data)
@dataclass
class ModerationMessage:
"""Message for moderation events"""
event_type: ModerationEventType
action_id: Optional[str] = None
moderator_id: Optional[int] = None
report_id: Optional[str] = None
user_id: Optional[int] = None
reason: Optional[str] = None
description: Optional[str] = None
duration_days: Optional[int] = None # Para bans temporales
is_permanent: Optional[bool] = None
review_action: Optional[str] = None # approve, reject, needs_more_info
notes: Optional[str] = None
fecha_creacion: Optional[str] = None # ISO format datetime string
def to_dict(self):
"""Convert to dictionary"""
data = asdict(self)
data['event_type'] = self.event_type.value
return data
def to_json(self) -> str:
"""Convert to JSON string"""
return json.dumps(self.to_dict())
@staticmethod
def from_dict(data: dict) -> 'ModerationMessage':
"""Create from dictionary"""
data['event_type'] = ModerationEventType(data['event_type'])
return ModerationMessage(**data)

View File

@@ -3,41 +3,34 @@ import pika
import json import json
from typing import Any, Dict from typing import Any, Dict
import logging import logging
import os
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _get_rabbitmq_host() -> str:
try:
from core.config import ConfSettings
return ConfSettings.rabbitmq
except Exception:
return os.getenv("RABBITMQ_URI", "localhost")
class RabbitMQSender: class RabbitMQSender:
"""Generic RabbitMQ sender for publishing messages to queues""" """Generic RabbitMQ sender for publishing messages to queues"""
def __init__(self, host: str = 'localhost', port: int = 5672): def __init__(self, host: str = None, port: int = 5672):
self.host = host self.host = host or _get_rabbitmq_host()
self.port = port self.port = port
def send_message(self, queue_name: str, message: Dict[str, Any]) -> bool: def send_message(self, queue_name: str, message: Dict[str, Any]) -> bool:
"""
Sends a message to a RabbitMQ queue
Args:
queue_name: Name of the queue to send to
message: Dictionary containing the message data
Returns:
True if successful, False otherwise
"""
try: try:
connection = pika.BlockingConnection( connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host, port=self.port) pika.ConnectionParameters(host=self.host, port=self.port)
) )
channel = connection.channel() channel = connection.channel()
# Declare queue to ensure it exists
channel.queue_declare(queue=queue_name, durable=True) channel.queue_declare(queue=queue_name, durable=True)
# Convert message to JSON
message_json = json.dumps(message) message_json = json.dumps(message)
# Publish the message
channel.basic_publish( channel.basic_publish(
exchange='', exchange='',
routing_key=queue_name, routing_key=queue_name,
@@ -46,29 +39,15 @@ class RabbitMQSender:
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
) )
) )
connection.close() connection.close()
logger.info(f"Message sent to queue '{queue_name}': {message_json}") logger.info(f"Message sent to queue '{queue_name}': {message_json}")
return True return True
except Exception as e: except Exception as e:
logger.error(f"Error sending message to RabbitMQ: {e}") logger.error(f"Error sending message to RabbitMQ: {e}")
return False return False
def send_to_queue(queue_name: str, message: Dict[str, Any], def send_to_queue(queue_name: str, message: Dict[str, Any],
host: str = 'localhost', port: int = 5672) -> bool: host: str = None, port: int = 5672) -> bool:
"""
Convenience function to send a message to RabbitMQ
Args:
queue_name: Name of the queue
message: Message dictionary
host: RabbitMQ host
port: RabbitMQ port
Returns:
True if successful, False otherwise
"""
sender = RabbitMQSender(host=host, port=port) sender = RabbitMQSender(host=host, port=port)
return sender.send_message(queue_name, message) return sender.send_message(queue_name, message)

View File

@@ -0,0 +1,49 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer
import jwt
from core.config import ConfSettings
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
import logging
logger = logging.getLogger(__name__)
security = HTTPBearer()
async def get_current_admin_user(credentials = Depends(security)):
try:
payload = jwt.decode(
credentials.credentials,
ConfSettings.jwt_secret_key,
algorithms=[ConfSettings.jwt_algorithm]
)
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido",
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido o expirado",
headers={"WWW-Authenticate": "Bearer"},
)
user_repo = UserRepositorySQL()
user = user_repo.find_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Usuario no encontrado",
)
if not user.is_admin:
logger.warning(f"Intento de acceso no autorizado a moderación por usuario {user_id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Permisos insuficientes",
)
return user

View File

@@ -0,0 +1 @@
# Metrics API

View File

@@ -0,0 +1,13 @@
from fastapi import FastAPI
from infrastructure.api.metrics.router import router
def create_app() -> FastAPI:
"""Factory para crear la aplicación de Métricas"""
app = FastAPI(
title="Métricas Microservice",
version="1.0.0",
description="Microservicio de métricas y analítica"
)
app.include_router(router)
return app

View File

@@ -0,0 +1,80 @@
from fastapi import APIRouter, Query
from datetime import datetime, timedelta
from infrastructure.api.metrics.schemas import (
MetricRequest, MetricResponse, AnalyticsReportResponse,
EventSummaryResponse, DailyStatsResponse
)
from application.services.metrics_services import MetricsService
from infrastructure.adapters.persistence.metrics_repository_postgres import MetricsRepositoryPostgres
from domain.metrics import Metric
router = APIRouter(prefix="/metrics", tags=["metrics"])
metrics_service = MetricsService(MetricsRepositoryPostgres())
@router.post("/record", response_model=MetricResponse)
async def record_metric(metric: MetricRequest):
"""Registra un nuevo evento de métrica"""
saved_metric = metrics_service.record_event(
event_type=metric.event_type,
entity_id=metric.entity_id,
entity_type=metric.entity_type,
user_id=metric.user_id,
metadata=metric.metadata
)
return {
"metric_id": saved_metric.metric_id,
"event_type": saved_metric.event_type,
"entity_id": saved_metric.entity_id,
"entity_type": saved_metric.entity_type,
"timestamp": saved_metric.timestamp,
"user_id": saved_metric.user_id
}
@router.get("/report", response_model=AnalyticsReportResponse)
async def get_analytics_report(days: int = Query(7, ge=1, le=365)):
"""Obtiene reporte de analítica de los últimos N días"""
report = metrics_service.get_metrics_report(days_back=days)
return {
"start_date": report.start_date,
"end_date": report.end_date,
"total_events": report.total_events,
"events_by_type": report.events_by_type
}
@router.get("/daily-stats", response_model=list[DailyStatsResponse])
async def get_daily_stats(date: datetime = Query(default_factory=datetime.now)):
"""Obtiene estadísticas de un día específico"""
stats = metrics_service.get_daily_statistics(date)
return [
{
"date": s.date,
"event_type": s.event_type,
"count": s.count
}
for s in stats
]
@router.get("/summary", response_model=EventSummaryResponse)
async def get_event_summary(
start_date: datetime = Query(default_factory=lambda: datetime.now() - timedelta(days=7)),
end_date: datetime = Query(default_factory=datetime.now)
):
"""Obtiene resumen de eventos en un rango de fechas"""
summary = metrics_service.get_event_summary(start_date, end_date)
return {
"summary": summary,
"date_range": {
"start_date": start_date,
"end_date": end_date
}
}
@router.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy"}

View File

@@ -0,0 +1,43 @@
from pydantic import BaseModel
from datetime import datetime
from typing import Dict, List, Optional
class MetricRequest(BaseModel):
"""Esquema para crear una métrica"""
event_type: str
entity_id: str
entity_type: str
user_id: Optional[int] = None
metadata: Optional[Dict] = None
class MetricResponse(BaseModel):
"""Esquema de respuesta para métrica"""
metric_id: int
event_type: str
entity_id: str
entity_type: str
timestamp: datetime
user_id: Optional[int]
class DailyStatsResponse(BaseModel):
"""Esquema de estadísticas diarias"""
date: datetime
event_type: str
count: int
class AnalyticsReportResponse(BaseModel):
"""Esquema de reporte de analítica"""
start_date: datetime
end_date: datetime
total_events: int
events_by_type: Dict[str, int]
class EventSummaryResponse(BaseModel):
"""Esquema de resumen de eventos"""
summary: Dict[str, int]
date_range: Dict[str, datetime]

View File

@@ -0,0 +1,46 @@
"""Aplicación FastAPI para API de Moderación"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from infrastructure.api.moderations.router import router
from core.config import ConfSettings
import logging
# Configurar logging
logging.basicConfig(
level=ConfSettings.log_level.upper(),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def create_app() -> FastAPI:
"""
Factory function para crear la aplicación FastAPI de Moderación
Returns:
FastAPI application instance
"""
app = FastAPI(
title="VoxPopuli Moderation API",
description="API para gestión de acciones de moderación en VoxPopuli",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json"
)
# Agregar CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=ConfSettings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Incluir rutas
app.include_router(router)
logger.info("Moderation API initialized")
return app

View File

@@ -0,0 +1,153 @@
"""Endpoints de moderación para gestión de reportes, cuentas y usuarios"""
from fastapi import APIRouter, HTTPException, Depends
from fastapi.responses import JSONResponse
from infrastructure.api.moderations.schemas import (
DeleteReportRequest, CloseAccountRequest, BanUserRequest,
WarnUserRequest, ReviewContentRequest, ModerationActionResponse
)
from infrastructure.api.auth import get_current_admin_user
from application.services.moderation_services import (
DeleteReportUseCase, CloseAccountUseCase, BanUserUseCase,
WarnUserUseCase, ReviewContentUseCase
)
from infrastructure.adapters.persistence.mongodb import mongodb
from infrastructure.adapters.persistence.db import get_db
from infrastructure.adapters.moderation_repository_mongo import ModerationRepositoryMongo
from infrastructure.adapters.rabbitmq.sender import send_to_queue
from infrastructure.adapters.rabbitmq.messages import ModerationMessage, ModerationEventType
from domain.users import User
from sqlalchemy.orm import Session
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
router = APIRouter()
moderation_repo = ModerationRepositoryMongo()
def _publish_moderation_event(event_type: ModerationEventType, **kwargs):
try:
msg = ModerationMessage(event_type=event_type, fecha_creacion=datetime.utcnow().isoformat(), **kwargs)
payload = msg.to_dict()
send_to_queue('moderations_queue', payload)
send_to_queue('metrics_moderations_queue', payload)
except Exception as e:
logger.warning(f"Error publicando evento de moderación a RabbitMQ: {e}")
@router.post("/reports/delete", response_model=ModerationActionResponse)
async def delete_report(request: DeleteReportRequest, current_admin: User = Depends(get_current_admin_user)):
"""
Eliminar un reporte como moderador (requiere permisos de admin)
- **report_id**: ID del reporte a eliminar
- **reason**: Razón de la eliminación (mínimo 5 caracteres)
- **description**: Descripción adicional (opcional)
"""
try:
use_case = DeleteReportUseCase(moderation_repo)
result = use_case.execute(moderator_id=current_admin.user_id, report_id=request.report_id, reason=request.reason, description=request.description)
if result["status"] == "error":
raise HTTPException(status_code=400, detail=result["message"])
_publish_moderation_event(ModerationEventType.DELETE_REPORT, moderator_id=current_admin.user_id, report_id=request.report_id, reason=request.reason, description=request.description, action_id=result.get("action_id"))
return ModerationActionResponse(**result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in delete_report: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor")
@router.post("/accounts/close", response_model=ModerationActionResponse)
async def close_account(request: CloseAccountRequest, current_admin: User = Depends(get_current_admin_user)):
"""
Cerrar una cuenta de usuario (requiere permisos de admin)
- **user_id**: ID del usuario cuya cuenta cerrar
- **reason**: Razón del cierre (mínimo 5 caracteres)
- **description**: Descripción adicional (opcional)
- **is_permanent**: Si el cierre es permanente (por defecto True)
"""
try:
use_case = CloseAccountUseCase(moderation_repo)
result = use_case.execute(moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, description=request.description, is_permanent=request.is_permanent)
if result["status"] == "error":
raise HTTPException(status_code=400, detail=result["message"])
_publish_moderation_event(ModerationEventType.CLOSE_ACCOUNT, moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, description=request.description, is_permanent=request.is_permanent, action_id=result.get("action_id"))
return ModerationActionResponse(**result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in close_account: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor")
@router.post("/users/ban", response_model=ModerationActionResponse)
async def ban_user(request: BanUserRequest, current_admin: User = Depends(get_current_admin_user)):
"""
Banear a un usuario (requiere permisos de admin)
- **user_id**: ID del usuario a banear
- **reason**: Razón del ban (mínimo 5 caracteres)
- **duration_days**: Duración del ban en días (None para permanente)
- **description**: Descripción adicional (opcional)
"""
try:
use_case = BanUserUseCase(moderation_repo)
result = use_case.execute(moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, duration_days=request.duration_days, description=request.description)
if result["status"] == "error":
raise HTTPException(status_code=400, detail=result["message"])
_publish_moderation_event(ModerationEventType.BAN_USER, moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, duration_days=request.duration_days, description=request.description, is_permanent=request.duration_days is None, action_id=result.get("action_id"))
return ModerationActionResponse(**result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in ban_user: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor")
@router.post("/users/warn", response_model=ModerationActionResponse)
async def warn_user(request: WarnUserRequest, current_admin: User = Depends(get_current_admin_user)):
"""
Advertir a un usuario (requiere permisos de admin)
- **user_id**: ID del usuario a advertir
- **reason**: Razón de la advertencia (mínimo 5 caracteres)
- **description**: Descripción adicional (opcional)
"""
try:
use_case = WarnUserUseCase(moderation_repo)
result = use_case.execute(moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, description=request.description)
if result["status"] == "error":
raise HTTPException(status_code=400, detail=result["message"])
_publish_moderation_event(ModerationEventType.WARN_USER, moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, description=request.description, action_id=result.get("action_id"))
return ModerationActionResponse(**result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in warn_user: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor")
@router.post("/content/review", response_model=ModerationActionResponse)
async def review_content(request: ReviewContentRequest, current_admin: User = Depends(get_current_admin_user)):
"""
Revisar y actuar sobre contenido reportado (requiere permisos de admin)
- **report_id**: ID del reporte a revisar
- **action**: Acción a tomar (approve, reject, needs_more_info)
- **reason**: Razón de la decisión (opcional)
- **notes**: Notas adicionales (opcional)
"""
try:
use_case = ReviewContentUseCase(moderation_repo)
result = use_case.execute(moderator_id=current_admin.user_id, report_id=request.report_id, action=request.action, reason=request.reason, notes=request.notes)
if result["status"] == "error":
raise HTTPException(status_code=400, detail=result["message"])
_publish_moderation_event(ModerationEventType.REVIEW_CONTENT, moderator_id=current_admin.user_id, report_id=request.report_id, review_action=request.action, reason=request.reason, notes=request.notes, action_id=result.get("action_id"))
return ModerationActionResponse(**result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in review_content: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor")

View File

@@ -0,0 +1,32 @@
"""Rutas raíz para API de Moderación"""
from fastapi import APIRouter, Response
from fastapi.responses import JSONResponse
router = APIRouter()
@router.get("/")
async def root():
"""Root endpoint - información de la API"""
return JSONResponse({
"status": "ok",
"service": "VoxPopuli Moderation API",
"version": "1.0.0",
"description": "API para gestión de acciones de moderación",
"endpoints": {
"delete_report": "POST /moderation/reports/delete",
"close_account": "POST /moderation/accounts/close",
"ban_user": "POST /moderation/users/ban",
"warn_user": "POST /moderation/users/warn",
"review_content": "POST /moderation/content/review"
}
})
@router.get("/health")
async def health_check():
"""Health check endpoint"""
return JSONResponse({
"status": "healthy",
"service": "Moderation API"
})

View File

@@ -0,0 +1,18 @@
"""Router principal para API de Moderación"""
from fastapi import APIRouter
from infrastructure.api.moderations.moderations import router as moderations_router
from infrastructure.api.moderations.root import router as root_router
router = APIRouter()
router.include_router(
moderations_router,
prefix="/moderation",
tags=["moderation"]
)
router.include_router(
root_router,
prefix='',
tags=["root"]
)

View File

@@ -0,0 +1,60 @@
"""Pydantic schemas para validación de datos en API de Moderación"""
from pydantic import BaseModel, Field
from typing import Optional, Literal
from datetime import datetime
class DeleteReportRequest(BaseModel):
"""Esquema para solicitud de eliminación de reporte"""
moderator_id: int = Field(..., description="ID del moderador")
report_id: str = Field(..., description="ID del reporte a eliminar")
reason: str = Field(..., min_length=5, description="Razón de la eliminación")
description: Optional[str] = Field(None, description="Descripción adicional")
class CloseAccountRequest(BaseModel):
"""Esquema para solicitud de cierre de cuenta"""
moderator_id: int = Field(..., description="ID del moderador")
user_id: int = Field(..., description="ID del usuario")
reason: str = Field(..., min_length=5, description="Razón del cierre")
description: Optional[str] = Field(None, description="Descripción adicional")
is_permanent: bool = Field(True, description="Si el cierre es permanente")
class BanUserRequest(BaseModel):
"""Esquema para solicitud de ban de usuario"""
moderator_id: int = Field(..., description="ID del moderador")
user_id: int = Field(..., description="ID del usuario a banear")
reason: str = Field(..., min_length=5, description="Razón del ban")
duration_days: Optional[int] = Field(None, description="Duración en días (None para permanente)")
description: Optional[str] = Field(None, description="Descripción adicional")
class WarnUserRequest(BaseModel):
"""Esquema para solicitud de advertencia"""
moderator_id: int = Field(..., description="ID del moderador")
user_id: int = Field(..., description="ID del usuario a advertir")
reason: str = Field(..., min_length=5, description="Razón de la advertencia")
description: Optional[str] = Field(None, description="Descripción adicional")
class ReviewContentRequest(BaseModel):
"""Esquema para solicitud de revisión de contenido"""
moderator_id: int = Field(..., description="ID del moderador")
report_id: str = Field(..., description="ID del reporte a revisar")
action: Literal["approve", "reject", "needs_more_info"] = Field(..., description="Acción de revisión")
reason: Optional[str] = Field(None, description="Razón de la decisión")
notes: Optional[str] = Field(None, description="Notas adicionales")
class ModerationActionResponse(BaseModel):
"""Esquema de respuesta para acciones de moderación"""
status: str = Field(..., description="Estado de la acción (success/error)")
message: str = Field(..., description="Mensaje descriptivo")
action_id: Optional[str] = Field(None, description="ID de la acción creada")
class ActionStatusRequest(BaseModel):
"""Esquema para actualizar estado de acción"""
status: Literal["pending", "approved", "rejected", "executed"] = Field(..., description="Nuevo estado")
notes: Optional[str] = Field(None, description="Notas sobre el cambio de estado")

View File

@@ -8,7 +8,10 @@ from infrastructure.adapters.persistence.report_repository_mongo import ReportRe
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
from infrastructure.adapters.file_storage import image_storage from infrastructure.adapters.file_storage import image_storage
from infrastructure.adapters.rabbitmq.sender import send_to_queue from infrastructure.adapters.rabbitmq.sender import send_to_queue
from infrastructure.adapters.rabbitmq.messages import NotificationMessage, NotificationEventType from infrastructure.adapters.rabbitmq.messages import (
NotificationMessage, NotificationEventType,
ReportMessage, ReportEventType
)
import logging import logging
from typing import Optional from typing import Optional
from datetime import datetime from datetime import datetime
@@ -21,7 +24,6 @@ logger = logging.getLogger(__name__)
def _report_to_response(report) -> dict: def _report_to_response(report) -> dict:
"""Convierte un objeto Report a dict con image_url"""
return { return {
"id_reporte": report.id_reporte, "id_reporte": report.id_reporte,
"id_usuario": report.id_usuario, "id_usuario": report.id_usuario,
@@ -36,6 +38,17 @@ def _report_to_response(report) -> dict:
"fecha_creacion": report.fecha_creacion "fecha_creacion": report.fecha_creacion
} }
def _publish_report_event(event_type: ReportEventType, **kwargs):
try:
msg = ReportMessage(event_type=event_type, **kwargs)
payload = msg.to_dict()
send_to_queue('reports_queue', payload)
send_to_queue('metrics_reports_queue', payload)
except Exception as e:
logger.warning(f"Error publicando evento de reporte a RabbitMQ: {e}")
@router.post("/", status_code=status.HTTP_202_ACCEPTED) @router.post("/", status_code=status.HTTP_202_ACCEPTED)
async def create_report( async def create_report(
id_usuario: int = Form(...), id_usuario: int = Form(...),
@@ -49,7 +62,6 @@ async def create_report(
): ):
"""Crea un nuevo reporte - envía a cola de procesamiento con validaciones previas""" """Crea un nuevo reporte - envía a cola de procesamiento con validaciones previas"""
try: try:
# Procesar imagen si fue proporcionada
image_filename = None image_filename = None
if file: if file:
logger.info(f"Processing image file: {file.filename} ({file.content_type})") logger.info(f"Processing image file: {file.filename} ({file.content_type})")
@@ -57,46 +69,31 @@ async def create_report(
create_use_case = CreateReport(report_repo, user_repo) create_use_case = CreateReport(report_repo, user_repo)
result = create_use_case.execute( result = create_use_case.execute(
id_usuario=id_usuario, id_usuario=id_usuario, tipo_reporte=tipo_reporte, descripcion=descripcion,
tipo_reporte=tipo_reporte, ubicacion=ubicacion, lat=lat, lng=lng, image_filename=image_filename, estado=estado
descripcion=descripcion,
ubicacion=ubicacion,
lat=lat,
lng=lng,
image_filename=image_filename,
estado=estado
) )
if result["status"] == "error": if result["status"] == "error":
# Si hay error, eliminar imagen si fue guardada
if image_filename: if image_filename:
image_storage.delete_image(image_filename) image_storage.delete_image(image_filename)
message = result["message"] message = result["message"]
if "no existe" in message: if "no existe" in message:
# 404 Not Found: usuario no existe raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
raise HTTPException( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
status_code=status.HTTP_404_NOT_FOUND,
detail=message
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# 202 Accepted: enviado a la cola correctamente _publish_report_event(
ReportEventType.CREATE,
id_reporte=result.get("id_reporte"), id_usuario=id_usuario, tipo_reporte=tipo_reporte,
descripcion=descripcion, ubicacion=ubicacion, lat=lat, lng=lng, estado=estado,
fecha_creacion=datetime.utcnow().isoformat()
)
return result return result
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Error inesperado en create_report: {e}", exc_info=True) logger.error(f"Error inesperado en create_report: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.get("/{report_id}", response_model=ReportResponse) @router.get("/{report_id}", response_model=ReportResponse)
async def get_report(report_id: str): async def get_report(report_id: str):
@@ -105,19 +102,13 @@ async def get_report(report_id: str):
get_use_case = GetReportById(report_repo) get_use_case = GetReportById(report_repo)
report = get_use_case.execute(report_id) report = get_use_case.execute(report_id)
if not report: if not report:
raise HTTPException( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Reporte con ID {report_id} no encontrado")
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Reporte con ID {report_id} no encontrado"
)
return _report_to_response(report) return _report_to_response(report)
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Error al obtener reporte {report_id}: {e}", exc_info=True) logger.error(f"Error al obtener reporte {report_id}: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.get("/user/{user_id}") @router.get("/user/{user_id}")
async def get_user_reports(user_id: int): async def get_user_reports(user_id: int):
@@ -128,10 +119,7 @@ async def get_user_reports(user_id: int):
return [_report_to_response(report) for report in reports] return [_report_to_response(report) for report in reports]
except Exception as e: except Exception as e:
logger.error(f"Error al obtener reportes del usuario {user_id}: {e}", exc_info=True) logger.error(f"Error al obtener reportes del usuario {user_id}: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.get("/") @router.get("/")
async def list_reports(): async def list_reports():
@@ -142,10 +130,7 @@ async def list_reports():
return [_report_to_response(report) for report in reports] return [_report_to_response(report) for report in reports]
except Exception as e: except Exception as e:
logger.error(f"Error al listar reportes: {e}", exc_info=True) logger.error(f"Error al listar reportes: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.get("/shadowbanned/list") @router.get("/shadowbanned/list")
async def get_shadowbanned_reports(threshold: float = 20): async def get_shadowbanned_reports(threshold: float = 20):
@@ -156,48 +141,26 @@ async def get_shadowbanned_reports(threshold: float = 20):
return [_report_to_response(report) for report in reports] return [_report_to_response(report) for report in reports]
except Exception as e: except Exception as e:
logger.error(f"Error al obtener reportes shadowbaneados: {e}", exc_info=True) logger.error(f"Error al obtener reportes shadowbaneados: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.put("/{report_id}/visibility", status_code=status.HTTP_202_ACCEPTED) @router.put("/{report_id}/visibility", status_code=status.HTTP_202_ACCEPTED)
async def update_report_visibility(report_id: str, visibility_data: ReportUpdateVisibilityRequest): async def update_report_visibility(report_id: str, visibility_data: ReportUpdateVisibilityRequest):
"""Actualiza la visibilidad de un reporte - envía a cola de procesamiento con validaciones previas""" """Actualiza la visibilidad de un reporte - envía a cola de procesamiento con validaciones previas"""
try: try:
update_use_case = UpdateReportVisibility(report_repo, user_repo) update_use_case = UpdateReportVisibility(report_repo, user_repo)
result = update_use_case.execute( result = update_use_case.execute(report_id=report_id, new_visibility=visibility_data.new_visibility, penalize_author=visibility_data.penalize_author)
report_id=report_id,
new_visibility=visibility_data.new_visibility,
penalize_author=visibility_data.penalize_author
)
if result["status"] == "error": if result["status"] == "error":
message = result["message"] message = result["message"]
if "no existe" in message: if "no existe" in message:
# 404 Not Found: reporte no existe raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
raise HTTPException( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
status_code=status.HTTP_404_NOT_FOUND, _publish_report_event(ReportEventType.UPDATE_VISIBILITY, id_reporte=report_id, visibilidad=visibility_data.new_visibility, penalize_author=visibility_data.penalize_author)
detail=message
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# 202 Accepted: enviado a la cola correctamente
return result return result
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Error al actualizar visibilidad del reporte {report_id}: {e}", exc_info=True) logger.error(f"Error al actualizar visibilidad del reporte {report_id}: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.delete("/{report_id}", status_code=status.HTTP_202_ACCEPTED) @router.delete("/{report_id}", status_code=status.HTTP_202_ACCEPTED)
async def delete_report(report_id: str): async def delete_report(report_id: str):
@@ -205,69 +168,38 @@ async def delete_report(report_id: str):
try: try:
delete_use_case = DeleteReport(report_repo) delete_use_case = DeleteReport(report_repo)
result = delete_use_case.execute(report_id) result = delete_use_case.execute(report_id)
if result["status"] == "error": if result["status"] == "error":
message = result["message"] message = result["message"]
if "no existe" in message: if "no existe" in message:
# 404 Not Found: reporte no existe raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
raise HTTPException( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
status_code=status.HTTP_404_NOT_FOUND, _publish_report_event(ReportEventType.DELETE, id_reporte=report_id)
detail=message
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# 202 Accepted: enviado a la cola correctamente
return result return result
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Error al eliminar reporte {report_id}: {e}", exc_info=True) logger.error(f"Error al eliminar reporte {report_id}: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.put("/{report_id}/status", status_code=status.HTTP_200_OK) @router.put("/{report_id}/status", status_code=status.HTTP_200_OK)
async def update_report_status(report_id: str, status_data: ReportUpdateStatusRequest): async def update_report_status(report_id: str, status_data: ReportUpdateStatusRequest):
"""Actualiza el estado de un reporte y envía notificación al usuario""" """Actualiza el estado de un reporte y envía notificación al usuario"""
try: try:
# Obtener el reporte actual para saber el usuario creador
report = report_repo.find_by_id(report_id) report = report_repo.find_by_id(report_id)
if not report: if not report:
raise HTTPException( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Reporte con ID {report_id} no encontrado")
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Reporte con ID {report_id} no encontrado"
)
# Actualizar el estado
update_use_case = UpdateReportStatus(report_repo) update_use_case = UpdateReportStatus(report_repo)
result = update_use_case.execute( result = update_use_case.execute(report_id=report_id, new_estado=status_data.estado)
report_id=report_id,
new_estado=status_data.estado
)
if result["status"] == "error": if result["status"] == "error":
message = result["message"] message = result["message"]
if "no existe" in message: if "no existe" in message:
# 404 Not Found: reporte no existe raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
raise HTTPException( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
status_code=status.HTTP_404_NOT_FOUND,
detail=message _publish_report_event(ReportEventType.UPDATE_STATUS, id_reporte=report_id, id_usuario=report.id_usuario, estado=status_data.estado)
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# Enviar notificación al usuario creador del reporte
try: try:
notification_message = NotificationMessage( notification_message = NotificationMessage(
event_type=NotificationEventType.REPORT_STATUS_CHANGE, event_type=NotificationEventType.REPORT_STATUS_CHANGE,
@@ -280,25 +212,16 @@ async def update_report_status(report_id: str, status_data: ReportUpdateStatusRe
estado_reporte=status_data.estado, estado_reporte=status_data.estado,
fecha_creacion=datetime.utcnow().isoformat() fecha_creacion=datetime.utcnow().isoformat()
) )
send_to_queue('notifications_queue', notification_message.to_dict())
# Enviar a la cola de notificaciones send_to_queue('metrics_notifications_queue', notification_message.to_dict())
send_to_queue(
queue_name='notifications_queue',
message=notification_message.to_dict()
)
logger.info(f"Notification sent to user {report.id_usuario} for report {report_id}") logger.info(f"Notification sent to user {report.id_usuario} for report {report_id}")
except Exception as notification_error: except Exception as notification_error:
logger.warning(f"Error sending notification for report {report_id}: {notification_error}") logger.warning(f"Error sending notification for report {report_id}: {notification_error}")
# No fallar la actualización si hay error en notificación
# 200 OK: estado actualizado correctamente
return result return result
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Error al actualizar estado del reporte {report_id}: {e}", exc_info=True) logger.error(f"Error al actualizar estado del reporte {report_id}: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)

View File

@@ -33,6 +33,7 @@ class UserLoginResponse(BaseModel):
token_type: str = Field(default="bearer", description="Tipo de token") token_type: str = Field(default="bearer", description="Tipo de token")
user_id: int = Field(..., description="ID del usuario") user_id: int = Field(..., description="ID del usuario")
email: str = Field(..., description="Email del usuario") email: str = Field(..., description="Email del usuario")
is_admin: bool = Field(..., description="Indica si el usuario es administrador")
class UserResponse(BaseModel): class UserResponse(BaseModel):
"""Respuesta con datos de usuario""" """Respuesta con datos de usuario"""
@@ -46,6 +47,7 @@ class UserResponse(BaseModel):
numero_reportes: int numero_reportes: int
url_foto_perfil: Optional[str] url_foto_perfil: Optional[str]
biografia: Optional[str] biografia: Optional[str]
is_admin: bool
class Config: class Config:
from_attributes = True from_attributes = True

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, HTTPException, status, Depends from fastapi import APIRouter, HTTPException, status
from infrastructure.api.users.schemas import ( from infrastructure.api.users.schemas import (
UserCreateRequest, UserUpdateRequest, UserResponse, UserCreateRequest, UserUpdateRequest, UserResponse,
UserLoginRequest, UserLoginResponse UserLoginRequest, UserLoginResponse
@@ -8,12 +8,25 @@ from application.services.user_services import (
) )
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
from infrastructure.api.users.auth_service import auth_service from infrastructure.api.users.auth_service import auth_service
from infrastructure.adapters.rabbitmq.sender import send_to_queue
from infrastructure.adapters.rabbitmq.messages import UserMessage, UserEventType
import logging import logging
from datetime import datetime
router = APIRouter() router = APIRouter()
user_repo = UserRepositorySQL()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _publish_user_event(event_type: UserEventType, **kwargs):
try:
msg = UserMessage(event_type=event_type, fecha_creacion=datetime.utcnow().isoformat(), **kwargs)
payload = msg.to_dict()
send_to_queue('users_queue', payload)
send_to_queue('metrics_users_queue', payload)
except Exception as e:
logger.warning(f"Error publicando evento de usuario a RabbitMQ: {e}")
@router.post("/login", response_model=UserLoginResponse, status_code=status.HTTP_200_OK) @router.post("/login", response_model=UserLoginResponse, status_code=status.HTTP_200_OK)
async def login_user(credentials: UserLoginRequest): async def login_user(credentials: UserLoginRequest):
""" """
@@ -30,48 +43,21 @@ async def login_user(credentials: UserLoginRequest):
- email: Email confirmado - email: Email confirmado
""" """
try: try:
# Obtener usuario por email user_repo = UserRepositorySQL() # sesión nueva por request
get_use_case = GetUserByEmail(user_repo) get_use_case = GetUserByEmail(user_repo)
user = get_use_case.execute(credentials.email) user = get_use_case.execute(credentials.email)
if not user: if not user:
raise HTTPException( raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Email o contraseña incorrectos", headers={"WWW-Authenticate": "Bearer"})
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email o contraseña incorrectos",
headers={"WWW-Authenticate": "Bearer"}
)
# Verificar contraseña
# Necesitamos obtener el hash de contraseña del modelo
user_model = user_repo.find_by_email_with_password(credentials.email) user_model = user_repo.find_by_email_with_password(credentials.email)
if not user_model or not auth_service.verify_password(credentials.contraseña, user_model.contraseña_hash): if not user_model or not auth_service.verify_password(credentials.contraseña, user_model.contraseña_hash):
raise HTTPException( raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Email o contraseña incorrectos", headers={"WWW-Authenticate": "Bearer"})
status_code=status.HTTP_401_UNAUTHORIZED, access_token = auth_service.create_access_token(user_id=user.user_id, email=user.email)
detail="Email o contraseña incorrectos", return {"access_token": access_token, "token_type": "bearer", "user_id": user.user_id, "email": user.email, "is_admin": user.is_admin}
headers={"WWW-Authenticate": "Bearer"}
)
# Crear token JWT
access_token = auth_service.create_access_token(
user_id=user.user_id,
email=user.email
)
return {
"access_token": access_token,
"token_type": "bearer",
"user_id": user.user_id,
"email": user.email
}
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Error en login_user: {e}", exc_info=True) logger.error(f"Error en login_user: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.post("/", status_code=status.HTTP_202_ACCEPTED) @router.post("/", status_code=status.HTTP_202_ACCEPTED)
async def create_user(user_data: UserCreateRequest): async def create_user(user_data: UserCreateRequest):
@@ -88,170 +74,105 @@ async def create_user(user_data: UserCreateRequest):
- biografia: Biografía del usuario (opcional) - biografia: Biografía del usuario (opcional)
""" """
try: try:
user_repo = UserRepositorySQL() # sesión nueva por request
create_use_case = CreateUser(user_repo) create_use_case = CreateUser(user_repo)
result = create_use_case.execute( result = create_use_case.execute(
nombre=user_data.nombre, nombre=user_data.nombre, apellido=user_data.apellido, email=user_data.email,
apellido=user_data.apellido, contraseña=user_data.contraseña, fecha_nacimiento=user_data.fecha_nacimiento,
email=user_data.email, url_foto_perfil=user_data.url_foto_perfil, biografia=user_data.biografia
contraseña=user_data.contraseña,
fecha_nacimiento=user_data.fecha_nacimiento,
url_foto_perfil=user_data.url_foto_perfil,
biografia=user_data.biografia
) )
if result["status"] == "error": if result["status"] == "error":
# Detectar tipo de error para código HTTP apropiado
message = result["message"] message = result["message"]
if "ya está registrado" in message: if "ya está registrado" in message:
# 409 Conflict: email duplicado raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=message)
raise HTTPException( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
status_code=status.HTTP_409_CONFLICT, _publish_user_event(UserEventType.CREATE, user_id=result.get("user_id"), email=user_data.email, nombre=user_data.nombre, apellido=user_data.apellido)
detail=message
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# 202 Accepted: enviado a la cola correctamente
return result return result
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Error inesperado en create_user: {e}", exc_info=True) logger.error(f"Error inesperado en create_user: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.get("/{user_id}", response_model=UserResponse) @router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int): async def get_user(user_id: int):
"""Obtiene un usuario por ID""" """Obtiene un usuario por ID"""
try: try:
user_repo = UserRepositorySQL() # sesión nueva por request
get_use_case = GetUserById(user_repo) get_use_case = GetUserById(user_repo)
user = get_use_case.execute(user_id) user = get_use_case.execute(user_id)
if not user: if not user:
raise HTTPException( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Usuario con ID {user_id} no encontrado")
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Usuario con ID {user_id} no encontrado"
)
return user return user
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Error al obtener usuario {user_id}: {e}", exc_info=True) logger.error(f"Error al obtener usuario {user_id}: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.get("/email/{email}", response_model=UserResponse) @router.get("/email/{email}", response_model=UserResponse)
async def get_user_by_email(email: str): async def get_user_by_email(email: str):
"""Obtiene un usuario por email""" """Obtiene un usuario por email"""
try: try:
user_repo = UserRepositorySQL() # sesión nueva por request
get_use_case = GetUserByEmail(user_repo) get_use_case = GetUserByEmail(user_repo)
user = get_use_case.execute(email) user = get_use_case.execute(email)
if not user: if not user:
raise HTTPException( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Usuario con email {email} no encontrado")
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Usuario con email {email} no encontrado"
)
return user return user
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Error al obtener usuario por email {email}: {e}", exc_info=True) logger.error(f"Error al obtener usuario por email {email}: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.get("/") @router.get("/")
async def list_users(): async def list_users():
"""Obtiene todos los usuarios - retorna lista vacía si no hay registros""" """Obtiene todos los usuarios - retorna lista vacía si no hay registros"""
try: try:
user_repo = UserRepositorySQL() # sesión nueva por request
list_use_case = ListAllUsers(user_repo) list_use_case = ListAllUsers(user_repo)
return list_use_case.execute() return list_use_case.execute()
except Exception as e: except Exception as e:
logger.error(f"Error al listar usuarios: {e}", exc_info=True) logger.error(f"Error al listar usuarios: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.put("/{user_id}", status_code=status.HTTP_202_ACCEPTED) @router.put("/{user_id}", status_code=status.HTTP_202_ACCEPTED)
async def update_user(user_id: int, user_data: UserUpdateRequest): async def update_user(user_id: int, user_data: UserUpdateRequest):
"""Actualiza un usuario - envía a cola de procesamiento con validaciones previas""" """Actualiza un usuario - envía a cola de procesamiento con validaciones previas"""
try: try:
user_repo = UserRepositorySQL() # sesión nueva por request
update_use_case = UpdateUser(user_repo) update_use_case = UpdateUser(user_repo)
result = update_use_case.execute( result = update_use_case.execute(user_id=user_id, nombre=user_data.nombre, apellido=user_data.apellido, url_foto_perfil=user_data.url_foto_perfil, biografia=user_data.biografia)
user_id=user_id,
nombre=user_data.nombre,
apellido=user_data.apellido,
url_foto_perfil=user_data.url_foto_perfil,
biografia=user_data.biografia
)
if result["status"] == "error": if result["status"] == "error":
message = result["message"] message = result["message"]
if "no existe" in message: if "no existe" in message:
# 404 Not Found: usuario no existe raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
raise HTTPException( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
status_code=status.HTTP_404_NOT_FOUND, _publish_user_event(UserEventType.UPDATE, user_id=user_id, nombre=user_data.nombre, apellido=user_data.apellido)
detail=message
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# 202 Accepted: enviado a la cola correctamente
return result return result
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Error al actualizar usuario {user_id}: {e}", exc_info=True) logger.error(f"Error al actualizar usuario {user_id}: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
@router.delete("/{user_id}", status_code=status.HTTP_202_ACCEPTED) @router.delete("/{user_id}", status_code=status.HTTP_202_ACCEPTED)
async def delete_user(user_id: int): async def delete_user(user_id: int):
"""Elimina un usuario - envía a cola de procesamiento con validaciones previas""" """Elimina un usuario - envía a cola de procesamiento con validaciones previas"""
try: try:
user_repo = UserRepositorySQL() # sesión nueva por request
delete_use_case = DeleteUser(user_repo) delete_use_case = DeleteUser(user_repo)
result = delete_use_case.execute(user_id) result = delete_use_case.execute(user_id)
if result["status"] == "error": if result["status"] == "error":
message = result["message"] message = result["message"]
if "no existe" in message: if "no existe" in message:
# 404 Not Found: usuario no existe raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
raise HTTPException( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
status_code=status.HTTP_404_NOT_FOUND, _publish_user_event(UserEventType.DELETE, user_id=user_id)
detail=message
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# 202 Accepted: enviado a la cola correctamente
return result return result
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Error al eliminar usuario {user_id}: {e}", exc_info=True) logger.error(f"Error al eliminar usuario {user_id}: {e}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)

View File

@@ -1,13 +1,17 @@
""" """
Punto de entrada principal para VoxPopuli Microservices Punto de entrada principal para VoxPopuli Microservices
Ejecuta dos APIs en paralelo: Usuarios (puerto 8000) y Reportes (puerto 8001), Notificaciones (puerto 8002) Ejecuta cinco APIs en paralelo: Usuarios (8000), Reportes (8001), Notificaciones (8002), Moderación (8003), Métricas (8004)
""" """
from infrastructure.api.users.app import create_app as create_users_app from infrastructure.api.users.app import create_app as create_users_app
from infrastructure.api.reports.app import create_app as create_reports_app from infrastructure.api.reports.app import create_app as create_reports_app
from infrastructure.api.notifications.app import create_app as create_notifications_app from infrastructure.api.notifications.app import create_app as create_notifications_app
from infrastructure.api.moderations.app import create_app as create_moderations_app
from infrastructure.api.metrics.app import create_app as create_metrics_app
from consumers.report_consumer import ReportConsumer from consumers.report_consumer import ReportConsumer
from consumers.user_consumer import UserConsumer from consumers.user_consumer import UserConsumer
from consumers.notification_consumer import NotificationConsumer from consumers.notification_consumer import NotificationConsumer
from consumers.moderation_consumer import ModerationConsumer
from consumers.metrics_consumer import MetricsConsumer
from core.config import ConfSettings from core.config import ConfSettings
import threading import threading
import uvicorn import uvicorn
@@ -45,6 +49,28 @@ def run_notifications_api():
log_level=ConfSettings.log_level, log_level=ConfSettings.log_level,
) )
def run_moderations_api():
"""Ejecuta la API de Moderación en puerto 8003"""
app_moderations = create_moderations_app()
uvicorn.run(
app_moderations,
host=ConfSettings.host,
port=8003,
reload=False,
log_level=ConfSettings.log_level,
)
def run_metrics_api():
"""Ejecuta la API de Métricas en puerto 8004"""
app_metrics = create_metrics_app()
uvicorn.run(
app_metrics,
host=ConfSettings.host,
port=8004,
reload=False,
log_level=ConfSettings.log_level,
)
def run_user_consumer(): def run_user_consumer():
consumer = UserConsumer() consumer = UserConsumer()
consumer.start() consumer.start()
@@ -57,6 +83,14 @@ def run_notifications_consumer():
consumer = NotificationConsumer() consumer = NotificationConsumer()
consumer.start() consumer.start()
def run_moderations_consumer():
consumer = ModerationConsumer()
consumer.start()
def run_metrics_consumer():
consumer = MetricsConsumer()
consumer.start()
def run(): def run():
"""Inicia todas las APIs en threads separados""" """Inicia todas las APIs en threads separados"""
@@ -67,31 +101,44 @@ def run():
users_thread = threading.Thread(target=run_users_api, daemon=True, name="Users-API") users_thread = threading.Thread(target=run_users_api, daemon=True, name="Users-API")
reports_thread = threading.Thread(target=run_reports_api, daemon=True, name="Reports-API") reports_thread = threading.Thread(target=run_reports_api, daemon=True, name="Reports-API")
notifications_thread = threading.Thread(target=run_notifications_api, daemon=True, name="Notifications-API") notifications_thread = threading.Thread(target=run_notifications_api, daemon=True, name="Notifications-API")
moderations_thread = threading.Thread(target=run_moderations_api, daemon=True, name="Moderations-API")
metrics_thread = threading.Thread(target=run_metrics_api, daemon=True, name="Metrics-API")
user_consumer_thread = threading.Thread(target=run_user_consumer, daemon=True, name="Users-Consumer") user_consumer_thread = threading.Thread(target=run_user_consumer, daemon=True, name="Users-Consumer")
report_consumer_thread = threading.Thread(target=run_reports_consumer, daemon=True, name="Reports-Consumer") report_consumer_thread = threading.Thread(target=run_reports_consumer, daemon=True, name="Reports-Consumer")
notifications_consumer_thread = threading.Thread(target=run_notifications_consumer, daemon=True, name="Notifications-Consumer") notifications_consumer_thread = threading.Thread(target=run_notifications_consumer, daemon=True, name="Notifications-Consumer")
moderations_consumer_thread = threading.Thread(target=run_moderations_consumer, daemon=True, name="Moderations-Consumer")
metrics_consumer_thread = threading.Thread(target=run_metrics_consumer, daemon=True, name="Metrics-Consumer")
users_thread.start() users_thread.start()
reports_thread.start() reports_thread.start()
notifications_thread.start() notifications_thread.start()
moderations_thread.start()
metrics_thread.start()
user_consumer_thread.start() user_consumer_thread.start()
report_consumer_thread.start() report_consumer_thread.start()
notifications_consumer_thread.start() notifications_consumer_thread.start()
moderations_consumer_thread.start()
metrics_consumer_thread.start()
print("\n✓ API de Usuarios ejecutándose en http://0.0.0.0:8000") print("\n✓ API de Usuarios ejecutándose en http://0.0.0.0:8000")
print("✓ API de Reportes ejecutándose en http://0.0.0.0:8001") print("✓ API de Reportes ejecutándose en http://0.0.0.0:8001")
print("✓ API de Notificaciones ejecutándose en http://0.0.0.0:8002") print("✓ API de Notificaciones ejecutándose en http://0.0.0.0:8002")
print("✓ API de Moderación ejecutándose en http://0.0.0.0:8003")
print("✓ API de Métricas ejecutándose en http://0.0.0.0:8004")
print("\nDocumentación disponible en:") print("\nDocumentación disponible en:")
print(" - Usuarios: http://localhost:8000/docs") print(" - Usuarios: http://localhost:8000/docs")
print(" - Reportes: http://localhost:8001/docs") print(" - Reportes: http://localhost:8001/docs")
print(" - Notificaciones: http://localhost:8002/docs") print(" - Notificaciones: http://localhost:8002/docs")
print(" - Moderación: http://localhost:8003/docs")
print(" - Métricas: http://localhost:8004/docs")
print("\n" + "=" * 60 + "\n") print("\n" + "=" * 60 + "\n")
try: try:
users_thread.join() users_thread.join()
reports_thread.join() reports_thread.join()
notifications_thread.join() notifications_thread.join()
moderations_thread.join()
metrics_thread.join()
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n\nRecibiendo señal de salida...") print("\n\nRecibiendo señal de salida...")
print("Cerrando APIs...") print("Cerrando APIs...")