Compare commits

..

10 Commits

Author SHA1 Message Date
31c567653a docker stuff almost complete 2026-05-06 14:23:28 -06:00
fea63bb553 fixes with SQL Alchemy to avoid precaching 2026-05-06 12:05:12 -06:00
57fa3b7944 Fixes with queuing 2026-05-06 11:13:23 -06:00
7331dcb086 Fixed metrics api to recieve all metrics 2026-05-06 01:30:20 -06:00
543ba8870c Added the isAdmin parameter to schemas and users in that api. 2026-05-05 09:57:52 -06:00
b4fc640c1a stuffies 2026-05-04 22:07:30 -06:00
a8ee92afc8 Added isAdmin tag for users 2026-05-04 21:03:50 -06:00
fc7c821d0f more minor changes 2026-05-04 17:54:02 -06:00
b96c44381a minor changes
Co-authored-by: Copilot <copilot@github.com>
2026-05-04 17:48:05 -06:00
b3788beedd Added two new apis
Co-authored-by: Copilot <copilot@github.com>
2026-05-04 17:37:08 -06:00
47 changed files with 3551 additions and 456 deletions

4
.gitignore vendored
View File

@@ -67,3 +67,7 @@ dmypy.json
node_modules/
*.pem
credentials.json
migrations/
*.tar.gz

53
ADMIN_SETUP.md Normal file
View File

@@ -0,0 +1,53 @@
# Admin User Setup - SQL Commands
## 1. Agregar columna a BD (Auto-ejecutado)
La migración SQL se encuentra en: `migrations/001_add_is_admin_to_usuarios.sql`
Ejecutar manualmente si es necesario:
```sql
ALTER TABLE `voxpopuli_users`.`usuarios`
ADD COLUMN `is_admin` BOOLEAN NOT NULL DEFAULT FALSE AFTER `biografia`,
ADD INDEX `idx_is_admin` (`is_admin`);
```
## 2. Promover usuario a admin
```sql
UPDATE `voxpopuli_users`.`usuarios`
SET `is_admin` = TRUE
WHERE `user_id` = 1;
```
## 3. Listar usuarios admin
```sql
SELECT `user_id`, `nombre`, `email`, `is_admin`
FROM `voxpopuli_users`.`usuarios`
WHERE `is_admin` = TRUE;
```
## 4. Revocar permisos admin
```sql
UPDATE `voxpopuli_users`.`usuarios`
SET `is_admin` = FALSE
WHERE `user_id` = 1;
```
## API Usage
### Endpoints de Moderación (requieren token JWT de admin)
```bash
# Usar token JWT en header Authorization
curl -X POST "http://localhost:8003/moderation/reports/delete" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"report_id": 123,
"reason": "Contenido ofensivo",
"description": "Violación de políticas"
}'
```
### Obtener usuario y verificar is_admin
```bash
GET http://localhost:8000/users/{user_id}
# Response incluye "is_admin": true/false
```

86
METRICS_API_EXAMPLES.md Normal file
View File

@@ -0,0 +1,86 @@
# Metrics API Examples
## Record Metric (Manual)
```bash
curl -X POST "http://localhost:8004/metrics/record" \
-H "Content-Type: application/json" \
-d '{
"event_type": "user_created",
"entity_id": "123",
"entity_type": "user",
"user_id": 456,
"metadata": {"email": "user@example.com"}
}'
```
## Get Analytics Report (Last N Days)
```bash
curl "http://localhost:8004/metrics/report?days=7"
```
Response:
```json
{
"start_date": "2026-04-27T00:00:00",
"end_date": "2026-05-04T23:59:59",
"total_events": 245,
"events_by_type": {
"user_created": 12,
"report_created": 45,
"notification_sent": 188
}
}
```
## Get Daily Statistics
```bash
curl "http://localhost:8004/metrics/daily-stats?date=2026-05-04T00:00:00"
```
Response:
```json
[
{
"date": "2026-05-04T00:00:00",
"event_type": "user_created",
"count": 3
},
{
"date": "2026-05-04T00:00:00",
"event_type": "report_created",
"count": 8
}
]
```
## Get Event Summary (Date Range)
```bash
curl "http://localhost:8004/metrics/summary?start_date=2026-04-27T00:00:00&end_date=2026-05-04T23:59:59"
```
Response:
```json
{
"summary": {
"user_created": 12,
"report_created": 45,
"notification_sent": 188,
"moderation_completed": 5
},
"date_range": {
"start_date": "2026-04-27T00:00:00",
"end_date": "2026-05-04T23:59:59"
}
}
```
## Health Check
```bash
curl "http://localhost:8004/metrics/health"
```
## Automatic Events (RabbitMQ)
- `user_created`, `user_updated`, `user_deleted`
- `report_created`, `report_resolved`
- `notification_sent`
- `moderation_completed`

View File

@@ -0,0 +1,58 @@
# API de Métricas y Analítica - Resumen de Implementación
## ✅ Integración Completada
### Estructura Creada
**Dominio:**
- `src/domain/metrics.py` - Modelos: `Metric`, `DailyStats`, `AnalyticsReport`, `EventType`
**Aplicación:**
- `src/application/ports/metrics_repository.py` - Interfaz del repositorio
- `src/application/services/metrics_services.py` - Lógica de negocio
**Infraestructura:**
- `src/infrastructure/adapters/persistence/metrics_repository_postgres.py` - Implementación PostgreSQL
- `src/infrastructure/api/metrics/` - API REST FastAPI
- `app.py` - Factory de la aplicación
- `router.py` - Endpoints
- `schemas.py` - Modelos Pydantic
**Consumidor:**
- `src/consumers/metrics_consumer.py` - Listener RabbitMQ para todos los eventos
### Base de Datos
**PostgreSQL añadido al docker-compose:**
- Host: localhost:5432
- DB: voxpopuli_metrics
- Usuario: voxpopuli / voxpopuli_pass
- Tabla `metrics` creada automáticamente
### Integración sin Cambios Significativos
**main.py** - Añadida API y consumer de métricas sin modificar APIs existentes
**docker-compose.yaml** - PostgreSQL añadido sin cambiar servicios existentes
**requirements.txt** - Añadido psycopg2-binary
✅ RabbitMQ NO añadido al docker-compose (usa instancias previas)
### Endpoints API (Puerto 8004)
```
POST /metrics/record - Registrar métrica
GET /metrics/report - Reporte de últimos N días
GET /metrics/daily-stats - Estadísticas diarias
GET /metrics/summary - Resumen de eventos por rango
GET /metrics/health - Health check
GET /docs - Swagger UI
```
### Eventos Automáticos Capturados
El consumer escucha automáticamente:
- `users_queue` → user_created, user_updated, user_deleted
- `reports_queue` → report_created, report_resolved
- `notifications_queue` → notification_sent
- `moderations_queue` → moderation_completed
Cada evento genera una métrica en PostgreSQL con timestamp, usuario_id, metadata y tipo de evento.

60
METRICS_SETUP.md Normal file
View File

@@ -0,0 +1,60 @@
# Metrics API Setup
## Configuración Base de Datos
PostgreSQL se ejecuta automáticamente en el contenedor Docker.
**Conexión:**
- Host: localhost
- Puerto: 5432
- Base de datos: voxpopuli_metrics
- Usuario: voxpopuli
- Contraseña: voxpopuli_pass
**Tabla creada automáticamente:**
```
metrics:
- id (PK)
- event_type (STRING, INDEXED)
- entity_id (STRING)
- entity_type (STRING)
- timestamp (DATETIME, INDEXED)
- metadata (JSON)
- user_id (INT, NULLABLE)
```
## Arquitectura
```
API Metrics (Puerto 8004)
├── FastAPI Router
│ ├── POST /metrics/record - Registrar métrica
│ ├── GET /metrics/report - Reporte analítica
│ ├── GET /metrics/daily-stats - Estadísticas diarias
│ ├── GET /metrics/summary - Resumen eventos
│ └── GET /metrics/health - Health check
├── Metrics Service (Lógica)
│ └── Metrics Repository (Postgres)
└── Metrics Consumer (RabbitMQ)
├── Escucha: users_queue
├── Escucha: reports_queue
├── Escucha: notifications_queue
└── Escucha: moderations_queue
```
## Inicio
```bash
# Levantar DB (docker)
docker-compose up -d
# Instalar dependencias
pip install -r requirements.txt
# Ejecutar todas las APIs
python -m src.main
```
La API de métricas se ejecutará automáticamente en puerto 8004.

310
MODERATION_API.md Normal file
View File

@@ -0,0 +1,310 @@
# API de Moderación - VoxPopuli
## Descripción General
La API de Moderación es un microservicio complementario integrado al proyecto VoxPopuli que proporciona funcionalidades de moderación de contenido y gestión de usuarios. Utiliza arquitectura hexagonal y se comunica mediante **RabbitMQ** para mantener bajo acoplamiento con el resto del sistema.
## Características Principales
### 1. **Eliminación de Reportes**
- Permite a moderadores eliminar reportes del sistema
- Evento: `moderation.delete_report`
- Endpoint: `POST /moderation/reports/delete`
### 2. **Cierre de Cuentas**
- Cierre permanente o temporal de cuentas de usuario
- Evento: `moderation.close_account`
- Endpoint: `POST /moderation/accounts/close`
### 3. **Baneo de Usuarios**
- Ban permanente o temporal (en días)
- Evento: `moderation.ban_user`
- Endpoint: `POST /moderation/users/ban`
### 4. **Advertencias**
- Registra advertencias contra usuarios
- Evento: `moderation.warn_user`
- Endpoint: `POST /moderation/users/warn`
### 5. **Revisión de Contenido**
- Aprueba, rechaza o solicita más información sobre reportes
- Evento: `moderation.review_content`
- Endpoint: `POST /moderation/content/review`
## Arquitectura
### Componentes
```
┌─────────────────────────────────────────────────────────┐
│ API REST de Moderación (FastAPI) │
│ Puerto 8003 - Endpoints de moderación │
└──────────────┬──────────────────────────────────────────┘
│ Envía eventos
┌─────────────────────┐
│ RabbitMQ Queue │
│ moderation_queue │
└──────────┬──────────┘
│ Consume
┌─────────────────────────┐
│ Moderation Consumer │
│ (Thread - background) │
└──────────┬──────────────┘
│ Ejecuta acciones
┌──────────────────────────┐
│ MongoDB │
│ - moderation_actions │
│ - moderation_logs │
└──────────────────────────┘
```
### Capas de la Arquitectura
```
domain/
└── moderations.py # Entidades de negocio: ModerationAction, ModerationLog
application/
├── ports/
│ └── moderation_repository.py # Interfaz de persistencia
└── services/
└── moderation_services.py # Use cases: DeleteReport, CloseAccount, etc.
infrastructure/
├── adapters/
│ ├── moderation_repository_mongo.py # Implementación MongoDB
│ └── rabbitmq/
│ └── messages.py # Esquemas de eventos RabbitMQ
├── api/
│ └── moderations/
│ ├── app.py # Aplicación FastAPI
│ ├── router.py # Rutas principales
│ ├── moderations.py # Endpoints
│ ├── schemas.py # Validación Pydantic
│ └── root.py # Rutas raíz
└── consumers/
└── moderation_consumer.py # Consumidor de eventos
main.py # Punto de entrada (integración)
```
## Integración con RabbitMQ
### Flujo de Eventos
1. **Cliente envía solicitud HTTP** a API de Moderación
2. **Servicio valida** la solicitud
3. **Servicio envía evento** a cola `moderation_queue` en RabbitMQ
4. **Consumer escucha** la cola en background
5. **Consumer procesa** el evento y ejecuta acciones
6. **Sistema persiste** en MongoDB
### Implementación de ModerationConsumer
El `ModerationConsumer` está integrado en `main.py` ejecutándose como un thread daemon:
```python
# En main.py
moderations_consumer_thread = threading.Thread(
target=run_moderations_consumer,
daemon=True,
name="Moderations-Consumer"
)
```
### Mensajes de Evento
Todos los eventos se envían a través de `RabbitMQSender`:
```json
{
"action_id": "uuid-aqui",
"event_type": "moderation.delete_report",
"moderator_id": 123,
"report_id": "report-uuid",
"reason": "Contenido violento",
"description": "Publicación violenta contra usuarios",
"timestamp": "2026-05-04T10:30:00"
}
```
## Endpoints API
### 1. Eliminación de Reporte
```http
POST /moderation/reports/delete HTTP/1.1
Content-Type: application/json
```
**Respuesta:**
```json
{
"status": "success",
"message": "Reporte marcado para eliminación",
"action_id": "action-uuid"
}
```
### 2. Cierre de Cuenta
```http
POST /moderation/accounts/close HTTP/1.1
Content-Type: application/json
```
### 3. Baneo de Usuario
```http
POST /moderation/users/ban HTTP/1.1
Content-Type: application/json
```
### 4. Advertencia
```http
POST /moderation/users/warn HTTP/1.1
Content-Type: application/json
```
### 5. Revisión de Contenido
```http
POST /moderation/content/review HTTP/1.1
Content-Type: application/json
```
## Ejecución
### Iniciar todo el sistema
```bash
python src/main.py
```
Esto iniciará:
- ✓ API de Usuarios (puerto 8000)
- ✓ API de Reportes (puerto 8001)
- ✓ API de Notificaciones (puerto 8002)
-**API de Moderación (puerto 8003)**
- Consumer de Usuarios
- Consumer de Reportes
- Consumer de Notificaciones
- **Consumer de Moderación**
### Acceder a documentación
- Moderación: http://localhost:8003/docs
## Persistencia
La API de Moderación almacena datos en **MongoDB** con dos colecciones:
### `moderation_actions`
Almacena todas las acciones de moderación:
```javascript
{
"action_id": "uuid",
"moderator_id": 123,
"action_type": "delete_report",
"target_type": "report",
"target_id": "report-uuid",
"reason": "Contenido inapropiado",
"status": "pending",
"fecha_creacion": "2026-05-04T10:30:00",
"fecha_ejecucion": null,
...
}
```
### `moderation_logs`
Registro auditable de todas las operaciones:
```javascript
{
"log_id": "uuid",
"action_id": "action-uuid",
"moderator_id": 123,
"resultado": "success",
"fecha_log": "2026-05-04T10:30:00",
"ip_moderator": "192.168.1.100"
}
```
## Integración Sin Cambios Significativos
Esta API fue diseñada para integrarse sin modificar el código existente:
**No modifica** `docker-compose.yaml` (usa instancias RabbitMQ existentes)
**No modifica** APIs existentes (Usuarios, Reportes, Notificaciones)
**No modifica** consumidores existentes
**Comunicación desacoplada** via RabbitMQ
**Base de datos independiente** (MongoDB)
**Puerto separado** (8003)
## Extensibilidad
Para agregar nuevas acciones de moderación:
1. **Crear Use Case** en `application/services/moderation_services.py`
2. **Crear evento** en `infrastructure/adapters/rabbitmq/messages.py`
3. **Agregar endpoint** en `infrastructure/api/moderations/moderations.py`
4. **Manejar evento** en `consumers/moderation_consumer.py`
## Requisitos
- Python 3.8+
- FastAPI
- Pydantic
- pika (RabbitMQ client)
- pymongo (MongoDB driver)
- MongoDB instancia
- RabbitMQ instancia
## Seguridad
### Recomendaciones Implementadas
- ✓ Validación de entrada con Pydantic
- ✓ Logs auditables de todas las acciones
- ✓ Soporte para IP del moderador
- ✓ Eventos inmutables en RabbitMQ
- ✓ Estados de acción para auditoría
### Recomendaciones Futuras
- Implementar autenticación JWT para moderadores
- Rate limiting por moderador
- Notificaciones a usuarios afectados
- Dashboard de auditoría
## Extensibilidad
Para agregar nuevas acciones de moderación:
1. **Crear Use Case** en `application/services/moderation_services.py`
2. **Crear evento** en `infrastructure/adapters/rabbitmq/messages.py`
3. **Agregar endpoint** en `infrastructure/api/moderations/moderations.py`
4. **Manejar evento** en `consumers/moderation_consumer.py`
## Requisitos
- Python 3.8+
- FastAPI
- Pydantic
- pika (RabbitMQ client)
- pymongo (MongoDB driver)
- MongoDB instancia
- RabbitMQ instancia
## Seguridad
### Recomendaciones Implementadas
- ✓ Validación de entrada con Pydantic
- ✓ Logs auditables de todas las acciones
- ✓ Soporte para IP del moderador
- ✓ Eventos inmutables en RabbitMQ
- ✓ Estados de acción para auditoría
### Recomendaciones Futuras
- Implementar autenticación JWT para moderadores
- Rate limiting por moderador
- Notificaciones a usuarios afectados
- Dashboard de auditoría

View File

@@ -0,0 +1,334 @@
{
"moderation_examples": {
"delete_report": {
"description": "Eliminar un reporte (requiere token JWT de admin)",
"endpoint": "POST /moderation/reports/delete",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_JWT_TOKEN"
},
"request_body": {
"report_id": "550e8400-e29b-41d4-a716-446655440000",
"reason": "Contenido violento",
"description": "La imagen contiene violencia explícita"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Reporte eliminado",
"action_id": "action-uuid-123"
}
},
"response_error_401": {
"status": 401,
"body": {"detail": "Token inválido o expirado"}
},
"response_error_403": {
"status": 403,
"body": {"detail": "Permisos insuficientes"}
}
},
"close_account": {
"description": "Cerrar cuenta de usuario (requiere token JWT de admin)",
"endpoint": "POST /moderation/accounts/close",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_JWT_TOKEN"
},
"request_body": {
"user_id": 42,
"reason": "Violación grave de términos",
"description": "Comportamiento acosador",
"is_permanent": true
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Cuenta cerrada",
"action_id": "action-uuid-456"
}
}
},
"ban_user": {
"description": "Banear usuario (requiere token JWT de admin)",
"endpoint": "POST /moderation/users/ban",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_JWT_TOKEN"
},
"request_body": {
"user_id": 42,
"reason": "Spam sistemático",
"duration_days": 30,
"description": "Múltiples reportes de spam"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Usuario baneado por 30 días",
"action_id": "action-uuid-789"
}
}
},
"warn_user": {
"description": "Advertir usuario (requiere token JWT de admin)",
"endpoint": "POST /moderation/users/warn",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_JWT_TOKEN"
},
"request_body": {
"user_id": 42,
"reason": "Lenguaje inapropiado",
"description": "Primera advertencia"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Usuario advertido",
"action_id": "action-uuid-101"
}
}
},
"review_content": {
"description": "Revisar contenido reportado (requiere token JWT de admin)",
"endpoint": "POST /moderation/content/review",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_JWT_TOKEN"
},
"request_body": {
"report_id": "550e8400-e29b-41d4-a716-446655440000",
"action": "approve",
"reason": "Contenido válido según políticas",
"notes": "Aprobado después de revisión"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Decisión registrada",
"action_id": "action-uuid-202"
}
}
}
}
}
}
},
"ban_user": {
"description": "Banear un usuario temporalmente",
"endpoint": "POST /moderation/users/ban",
"headers": {
"Content-Type": "application/json"
},
"request_body_temporal": {
"moderator_id": 1,
"user_id": 42,
"reason": "Lenguaje inapropiado repetido",
"duration_days": 7,
"description": "Ban de 7 días por contenido ofensivo"
},
"request_body_permanent": {
"moderator_id": 1,
"user_id": 99,
"reason": "Servidumbre reiterada de términos",
"duration_days": null,
"description": "Ban permanente"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Usuario marcado para ban",
"action_id": "action-uuid-789"
}
}
},
"warn_user": {
"description": "Emitir una advertencia a un usuario",
"endpoint": "POST /moderation/users/warn",
"headers": {
"Content-Type": "application/json"
},
"request_body": {
"moderator_id": 1,
"user_id": 42,
"reason": "Primera violación menor de comunidad",
"description": "Publicación con spam de enlaces"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Advertencia registrada",
"action_id": "action-uuid-012"
}
}
},
"review_content": {
"description": "Revisar y tomar decisión sobre un reporte",
"endpoint": "POST /moderation/content/review",
"headers": {
"Content-Type": "application/json"
},
"request_body_approve": {
"moderator_id": 1,
"report_id": "550e8400-e29b-41d4-a716-446655440000",
"action": "approve",
"reason": "Contenido reportado es legítimamente violatorio",
"notes": "Proceder inmediatamente con eliminación"
},
"request_body_reject": {
"moderator_id": 1,
"report_id": "550e8400-e29b-41d4-a716-446655440001",
"action": "reject",
"reason": "Contenido está dentro de políticas",
"notes": "El usuario tiene derecho a expresar esta opinión"
},
"request_body_needs_info": {
"moderator_id": 1,
"report_id": "550e8400-e29b-41d4-a716-446655440002",
"action": "needs_more_info",
"reason": "Necesaria información contextual adicional",
"notes": "Contactar con el reportante para detalles"
},
"response_success": {
"status": 200,
"body": {
"status": "success",
"message": "Revisión registrada: approve",
"action_id": "action-uuid-345"
}
}
},
"health_check": {
"description": "Verificar estado de la API",
"endpoint": "GET /health",
"response": {
"status": 200,
"body": {
"status": "healthy",
"service": "Moderation API"
}
}
},
"root_info": {
"description": "Obtener información sobre la API",
"endpoint": "GET /",
"response": {
"status": 200,
"body": {
"status": "ok",
"service": "VoxPopuli Moderation API",
"version": "1.0.0",
"description": "API para gestión de acciones de moderación",
"endpoints": {
"delete_report": "POST /moderation/reports/delete",
"close_account": "POST /moderation/accounts/close",
"ban_user": "POST /moderation/users/ban",
"warn_user": "POST /moderation/users/warn",
"review_content": "POST /moderation/content/review"
}
}
}
}
},
"curl_examples": {
"delete_report": "curl -X POST http://localhost:8003/moderation/reports/delete -H 'Content-Type: application/json' -d '{\"moderator_id\": 1, \"report_id\": \"550e8400-e29b-41d4-a716-446655440000\", \"reason\": \"Contenido violento\", \"description\": \"Imagen con violencia explícita\"}'",
"ban_user": "curl -X POST http://localhost:8003/moderation/users/ban -H 'Content-Type: application/json' -d '{\"moderator_id\": 1, \"user_id\": 42, \"reason\": \"Lenguaje inapropiado\", \"duration_days\": 7}'",
"warn_user": "curl -X POST http://localhost:8003/moderation/users/warn -H 'Content-Type: application/json' -d '{\"moderator_id\": 1, \"user_id\": 42, \"reason\": \"Primera advertencia\", \"description\": \"Spam de enlaces\"}'",
"close_account": "curl -X POST http://localhost:8003/moderation/accounts/close -H 'Content-Type: application/json' -d '{\"moderator_id\": 1, \"user_id\": 42, \"reason\": \"Violación de términos\", \"is_permanent\": true}'",
"review_approve": "curl -X POST http://localhost:8003/moderation/content/review -H 'Content-Type: application/json' -d '{\"moderator_id\": 1, \"report_id\": \"550e8400-e29b-41d4-a716-446655440000\", \"action\": \"approve\", \"reason\": \"Contenido legítimamente violatorio\"}'"
},
"rabbitmq_events": {
"moderation_delete_report": {
"queue": "moderation_queue",
"event_type": "moderation.delete_report",
"payload": {
"action_id": "uuid",
"event_type": "moderation.delete_report",
"moderator_id": 1,
"report_id": "550e8400-e29b-41d4-a716-446655440000",
"reason": "Contenido violento",
"description": "Imagen con violencia explícita",
"timestamp": "2026-05-04T10:30:00.000Z"
}
},
"moderation_close_account": {
"queue": "moderation_queue",
"event_type": "moderation.close_account",
"payload": {
"action_id": "uuid",
"event_type": "moderation.close_account",
"moderator_id": 1,
"user_id": 42,
"reason": "Violación de términos",
"description": "Comportamiento acosador",
"is_permanent": true,
"timestamp": "2026-05-04T10:30:00.000Z"
}
},
"moderation_ban_user": {
"queue": "moderation_queue",
"event_type": "moderation.ban_user",
"payload": {
"action_id": "uuid",
"event_type": "moderation.ban_user",
"moderator_id": 1,
"user_id": 42,
"reason": "Lenguaje inapropiado",
"duration_days": 7,
"is_permanent": false,
"description": "Ban temporal por contenido ofensivo",
"timestamp": "2026-05-04T10:30:00.000Z"
}
},
"moderation_warn_user": {
"queue": "moderation_queue",
"event_type": "moderation.warn_user",
"payload": {
"action_id": "uuid",
"event_type": "moderation.warn_user",
"moderator_id": 1,
"user_id": 42,
"reason": "Primera advertencia",
"description": "Spam de enlaces",
"timestamp": "2026-05-04T10:30:00.000Z"
}
},
"moderation_review_content": {
"queue": "moderation_queue",
"event_type": "moderation.review_content",
"payload": {
"action_id": "uuid",
"event_type": "moderation.review_content",
"moderator_id": 1,
"report_id": "550e8400-e29b-41d4-a716-446655440000",
"review_action": "approve",
"reason": "Contenido legítimamente violatorio",
"notes": "Proceder inmediatamente",
"timestamp": "2026-05-04T10:30:00.000Z"
}
}
}
}

250
MODERATION_SETUP.md Normal file
View File

@@ -0,0 +1,250 @@
# Guía de Instalación - API de Moderación
## Verificar Requisitos Previos
La API de Moderación requiere que ya tengas instalados y funcionando:
### 1. RabbitMQ
Debe estar ejecutándose con una instancia ya configurada:
```bash
# Verificar conexión
telnet localhost 5672
```
### 2. MongoDB
Debe estar ejecutándose y accesible:
```bash
# Verificar conexión
mongosh --eval "db.version()"
```
### 3. Python 3.8+
```bash
python --version
```
### 4. Dependencias del Proyecto
Las dependencias ya están en `requirements.txt`:
```bash
pip install -r requirements.txt
```
## Instalación de la API de Moderación
### Paso 1: Actualizar dependencias (si es necesario)
La API utiliza las siguientes librerías (ya incluidas en requirements.txt):
- `fastapi` - Framework web
- `uvicorn` - Servidor ASGI
- `pydantic` - Validación de datos
- `pika` - Cliente de RabbitMQ
- `pymongo` - Driver de MongoDB
Si necesitas instalarlas manualmente:
```bash
pip install fastapi uvicorn pydantic pika pymongo
```
### Paso 2: Verificar estructura de directorios
Asegúrate que existan los siguientes directorios:
```
src/
├── domain/
│ └── moderations.py ✓ Creado
├── application/
│ ├── ports/
│ │ └── moderation_repository.py ✓ Creado
│ └── services/
│ └── moderation_services.py ✓ Creado
├── consumers/
│ └── moderation_consumer.py ✓ Creado
└── infrastructure/
├── adapters/
│ └── moderation_repository_mongo.py ✓ Creado
└── api/
└── moderations/ ✓ Creado
├── __init__.py
├── app.py
├── router.py
├── moderations.py
├── schemas.py
└── root.py
```
### Paso 3: Actualizar archivo principal
El archivo `src/main.py` ya fue actualizado para incluir:
- ✓ Importación de la API de Moderación
- ✓ Función `run_moderations_api()`
- ✓ Función `run_moderations_consumer()`
- ✓ Threads para ejecutar ambos componentes
### Paso 4: Configurar MongoDB (si es necesario)
Si MongoDB no está configurado, crea las colecciones:
```javascript
// En mongosh o cliente MongoDB
use voxpopuli
// Las colecciones se crearán automáticamente al insertar datos
// Pero puedes pre-crearlas si lo prefieres:
db.createCollection("moderation_actions")
db.createCollection("moderation_logs")
// Crear índices para mejor rendimiento
db.moderation_actions.createIndex({ "action_id": 1 })
db.moderation_actions.createIndex({ "moderator_id": 1 })
db.moderation_actions.createIndex({ "target_id": 1 })
db.moderation_logs.createIndex({ "action_id": 1 })
```
### Paso 5: Configurar RabbitMQ (si es necesario)
La cola se crea automáticamente, pero si quieres pre-crearla:
```bash
# Acceder a RabbitMQ Management Console
# http://localhost:15672 (usuario: guest, password: guest)
# O via CLI:
rabbitmqctl declare_queue moderation_queue -d true
```
## Ejecutar la API
### Opción 1: Ejecutar todo (Recomendado)
```bash
python src/main.py
```
Esto iniciará automáticamente:
- API de Usuarios (8000)
- API de Reportes (8001)
- API de Notificaciones (8002)
- **API de Moderación (8003)** ← NUEVA
- Todos los consumidores (incluyendo Moderación)
### Opción 2: Ejecutar solo la API de Moderación
```bash
cd src
python -c "from infrastructure.api.moderations.app import create_app; import uvicorn; app = create_app(); uvicorn.run(app, host='0.0.0.0', port=8003)"
```
## Verificar que funciona
### 1. Health Check
```bash
curl http://localhost:8003/health
```
Respuesta esperada:
```json
{
"status": "healthy",
"service": "Moderation API"
}
```
### 2. Información de la API
```bash
curl http://localhost:8003/
```
### 3. Documentación Interactiva
Abre en tu navegador:
- http://localhost:8003/docs (Swagger UI)
- http://localhost:8003/redoc (ReDoc)
### 4. Probar un endpoint
```bash
curl -X POST http://localhost:8003/moderation/reports/delete \
-H "Content-Type: application/json" \
-d '{
"moderator_id": 1,
"report_id": "test-report-123",
"reason": "Contenido inapropiado",
"description": "Test description"
}'
```
## Solucionar Problemas
### Error: "Connection refused" a RabbitMQ
```
Error: [Errno 111] Connection refused
```
**Solución:**
- Verifica que RabbitMQ esté ejecutándose
- Verifica el host y puerto en `core/config.py`
### Error: "ConnectionFailure" a MongoDB
```
Error: ServerSelectionTimeoutError
```
**Solución:**
- Verifica que MongoDB esté ejecutándose
- Verifica la configuración de conexión en `infrastructure/adapters/persistence/mongodb.py`
### Error: "Port 8003 already in use"
```
Error: [Errno 48] Address already in use
```
**Solución:**
Cambia el puerto en `main.py`:
```python
def run_moderations_api():
uvicorn.run(
app_moderations,
host=ConfSettings.host,
port=8004, # Cambiar puerto aquí
...
)
```
### Los eventos no se procesan
**Causas posibles:**
1. Consumer no está ejecutándose
- Verifica que `run_moderations_consumer()` esté ejecutándose
- Revisa los logs del consumer
2. Cola no existe
- Se crea automáticamente al enviar el primer evento
- Verifica en RabbitMQ Management Console
3. MongoDB no está guardando datos
- Verifica que `moderation_actions` colección exista
- Revisa permisos de MongoDB
## Logs
Los logs se guardarán en:
```
src/logs/
```
Para ver logs en tiempo real:
```bash
tail -f src/logs/moderation_consumer.log
```
## Próximos Pasos
1. **Integrar autenticación JWT** para moderadores
2. **Crear dashboard** para visualizar acciones
3. **Agregar notificaciones** a usuarios afectados
4. **Implementar auditoría** más detallada
5. **Agregar rate limiting** por moderador
## Contacto y Soporte
Para problemas con la instalación:
1. Revisa logs en `src/logs/`
2. Verifica configuración en `src/core/config.py`
3. Consulta MODERATION_API.md para arquitectura
4. Revisa MODERATION_API_EXAMPLES.json para ejemplos

203
docker-compose-py.yaml Normal file
View File

@@ -0,0 +1,203 @@
# ============================================================
# VoxPopuli - docker-compose.yaml
# Orquesta la app + toda la infraestructura de datos/mensajería
# - MySQL → API Usuarios (8000)
# - MongoDB → API Reportes (8001)
# - MongoDB → API Notificaciones(8002)
# - MongoDB → API Moderación (8003)
# - PostgreSQL → API Métricas (8004) ← faltaba esto
# - RabbitMQ → cola de mensajes entre microservicios
# ============================================================
# Uso rápido:
# cp .env.example .env # ajusta credenciales
# docker compose up --build
# ============================================================
name: voxpopuli
services:
# ──────────────────────────────────────────
# INFRAESTRUCTURA
# ──────────────────────────────────────────
mysql:
image: mysql:8.0
container_name: voxpopuli_mysql
restart: unless-stopped
environment:
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-rootpassword}
MYSQL_DATABASE: voxpopuli_users
MYSQL_USER: ${MYSQL_USER:-voxpopuli}
MYSQL_PASSWORD: ${MYSQL_PASSWORD:-voxpopuli_pass}
ports:
- "${MYSQL_PORT:-3306}:3306"
volumes:
- mysql_data:/var/lib/mysql
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-p${MYSQL_ROOT_PASSWORD:-rootpassword}"]
interval: 10s
timeout: 5s
retries: 5
start_period: 30s
networks:
- voxpopuli_net
mongodb-reports:
image: mongo:7.0
container_name: voxpopuli_mongo_reports
restart: unless-stopped
environment:
MONGO_INITDB_ROOT_USERNAME: ${MONGO_USER:-admin}
MONGO_INITDB_ROOT_PASSWORD: ${MONGO_PASSWORD:-admin_password}
MONGO_INITDB_DATABASE: voxpopuli_reports
command: mongod --auth
ports:
- "${MONGO_REPORTS_PORT:-27017}:27017"
volumes:
- mongo_reports_data:/data/db
healthcheck:
test: ["CMD", "mongosh", "--quiet", "--eval", "db.adminCommand('ping').ok", "--username", "${MONGO_USER:-admin}", "--password", "${MONGO_PASSWORD:-admin_password}", "--authenticationDatabase", "admin"]
interval: 10s
timeout: 5s
retries: 5
start_period: 20s
networks:
- voxpopuli_net
mongodb-notifications:
image: mongo:7.0
container_name: voxpopuli_mongo_notifications
restart: unless-stopped
environment:
MONGO_INITDB_ROOT_USERNAME: ${MONGO_USER:-admin}
MONGO_INITDB_ROOT_PASSWORD: ${MONGO_PASSWORD:-admin_password}
MONGO_INITDB_DATABASE: voxpopuli_notifications
command: mongod --auth
ports:
- "${MONGO_NOTIFICATIONS_PORT:-27018}:27017"
volumes:
- mongo_notifications_data:/data/db
healthcheck:
test: ["CMD", "mongosh", "--quiet", "--eval", "db.adminCommand('ping').ok", "--username", "${MONGO_USER:-admin}", "--password", "${MONGO_PASSWORD:-admin_password}", "--authenticationDatabase", "admin"]
interval: 10s
timeout: 5s
retries: 5
start_period: 20s
networks:
- voxpopuli_net
# PostgreSQL para la API de Métricas
postgres:
image: postgres:16-alpine
container_name: voxpopuli_postgres
restart: unless-stopped
environment:
POSTGRES_USER: ${POSTGRES_USER:-voxpopuli}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-voxpopuli_pass}
POSTGRES_DB: voxpopuli_metrics
ports:
- "${POSTGRES_PORT:-5432}:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-voxpopuli} -d voxpopuli_metrics"]
interval: 10s
timeout: 5s
retries: 5
start_period: 20s
networks:
- voxpopuli_net
rabbitmq:
image: rabbitmq:3.13-management
container_name: voxpopuli_rabbitmq
restart: unless-stopped
environment:
RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER:-voxpopuli}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS:-voxpopuli_pass}
ports:
- "${RABBITMQ_AMQP_PORT:-5672}:5672"
- "${RABBITMQ_MGMT_PORT:-15672}:15672"
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
interval: 10s
timeout: 5s
retries: 5
start_period: 30s
networks:
- voxpopuli_net
# ──────────────────────────────────────────
# APLICACIÓN
# ──────────────────────────────────────────
app:
build:
context: .
dockerfile: Dockerfile
image: voxpopuli-app:latest
container_name: voxpopuli_app
restart: unless-stopped
env_file:
- .env
environment:
# MySQL - Usuarios
MYSQL_URL: mysql+pymysql://${MYSQL_USER:-voxpopuli}:${MYSQL_PASSWORD:-voxpopuli_pass}@mysql/voxpopuli_users
# MongoDB - Reportes
MONGODB_URL: mongodb://${MONGO_USER:-admin}:${MONGO_PASSWORD:-admin_password}@mongodb-reports:27017
MONGODB_DB: voxpopuli_reports
# MongoDB - Notificaciones
MONGODB_NOTIFICATIONS_URL: mongodb://${MONGO_USER:-admin}:${MONGO_PASSWORD:-admin_password}@mongodb-notifications:27017
MONGODB_NOTIFICATIONS_DB: voxpopuli_notifications
# PostgreSQL - Métricas
POSTGRES_URL: postgresql://${POSTGRES_USER:-voxpopuli}:${POSTGRES_PASSWORD:-voxpopuli_pass}@postgres:5432/voxpopuli_metrics
# RabbitMQ
RABBITMQ_URL: amqp://${RABBITMQ_USER:-voxpopuli}:${RABBITMQ_PASS:-voxpopuli_pass}@rabbitmq:5672/
# App
HOST: 0.0.0.0
LOG_LEVEL: ${LOG_LEVEL:-info}
ports:
- "8000:8000" # API Usuarios
- "8001:8001" # API Reportes
- "8002:8002" # API Notificaciones
- "8003:8003" # API Moderación
- "8004:8004" # API Métricas
depends_on:
mysql:
condition: service_healthy
mongodb-reports:
condition: service_healthy
mongodb-notifications:
condition: service_healthy
postgres:
condition: service_healthy
rabbitmq:
condition: service_healthy
networks:
- voxpopuli_net
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/')"]
interval: 30s
timeout: 10s
retries: 3
start_period: 20s
# ──────────────────────────────────────────
# VOLÚMENES
# ──────────────────────────────────────────
volumes:
mysql_data:
mongo_reports_data:
mongo_notifications_data:
postgres_data:
rabbitmq_data:
# ──────────────────────────────────────────
# RED INTERNA
# ──────────────────────────────────────────
networks:
voxpopuli_net:
driver: bridge

View File

@@ -49,7 +49,25 @@ services:
timeout: 5s
retries: 5
postgres:
image: postgres:15-alpine
container_name: voxpopuli_postgres
environment:
POSTGRES_USER: voxpopuli
POSTGRES_PASSWORD: voxpopuli_pass
POSTGRES_DB: voxpopuli_metrics
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U voxpopuli -d voxpopuli_metrics"]
interval: 10s
timeout: 5s
retries: 5
volumes:
mysql_data:
mongo_data:
mongo_data_notifications:
postgres_data:

30
dockerdotenv.env Normal file
View File

@@ -0,0 +1,30 @@
# ============================================================
# VoxPopuli - Variables de entorno para Docker
# Copia este archivo como .env y ajusta los valores
# ============================================================
# ── MySQL (API Usuarios) ─────────────────────────────────────
MYSQL_ROOT_PASSWORD=rootpassword
MYSQL_USER=voxpopuli
MYSQL_PASSWORD=voxpopuli_pass
MYSQL_PORT=3306
# ── MongoDB (Reportes + Notificaciones) ──────────────────────
MONGO_USER=admin
MONGO_PASSWORD=admin_password
MONGO_REPORTS_PORT=27017
MONGO_NOTIFICATIONS_PORT=27018
# ── PostgreSQL (API Métricas) ────────────────────────────────
POSTGRES_USER=voxpopuli
POSTGRES_PASSWORD=voxpopuli_pass
POSTGRES_PORT=5432
# ── RabbitMQ ─────────────────────────────────────────────────
RABBITMQ_USER=voxpopuli
RABBITMQ_PASS=voxpopuli_pass
RABBITMQ_AMQP_PORT=5672
RABBITMQ_MGMT_PORT=15672
# ── App ──────────────────────────────────────────────────────
LOG_LEVEL=info

63
dockerfile Normal file
View File

@@ -0,0 +1,63 @@
# ============================================================
# VoxPopuli - Dockerfile
# Microservicios FastAPI: Usuarios (8000), Reportes (8001),
# Notificaciones (8002), Moderación (8003), Métricas (8004)
# ============================================================
# --- Stage 1: Builder ---
FROM python:3.11-slim AS builder
WORKDIR /build
# Instalar dependencias del sistema necesarias para compilación
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
libffi-dev \
libssl-dev \
default-libmysqlclient-dev \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
# Copiar e instalar dependencias Python en un directorio aislado
COPY requirements.txt .
RUN pip install --upgrade pip && \
pip install --prefix=/install --no-cache-dir -r requirements.txt
# --- Stage 2: Runtime ---
FROM python:3.11-slim
LABEL maintainer="Hokzaap S. de R.L. de C.V."
LABEL description="VoxPopuli - Infraestructura de Voz Ciudadana"
WORKDIR /app
# Instalar únicamente librerías de runtime
RUN apt-get update && apt-get install -y --no-install-recommends \
default-libmysqlclient-dev \
&& rm -rf /var/lib/apt/lists/*
RUN mkdir -p /app/logs/
# Copiar paquetes instalados desde el builder
COPY --from=builder /install /usr/local
# Copiar código fuente
COPY src/ ./src/
# Variables de entorno con valores por defecto (sobreescribibles via .env o compose)
ENV HOST=0.0.0.0 \
LOG_LEVEL=info \
PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONPATH=/app/src
# Puertos expuestos por los cinco microservicios
EXPOSE 8000 8001 8002 8003 8004
# Healthcheck básico contra la API de Usuarios
HEALTHCHECK --interval=30s --timeout=10s --start-period=20s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/')" || exit 1
# Punto de entrada
CMD ["python", "src/main.py"]

View File

@@ -11,3 +11,5 @@ Pillow
PyJWT
passlib[argon2]
python-multipart
psycopg2-binary
cryptography

View File

@@ -0,0 +1,33 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List, Dict
from domain.metrics import Metric, DailyStats, AnalyticsReport
class MetricsRepository(ABC):
"""Puerto para persistencia de métricas"""
@abstractmethod
def save_metric(self, metric: Metric) -> Metric:
"""Guarda una métrica"""
pass
@abstractmethod
def get_metrics_by_date_range(self, start_date: datetime, end_date: datetime) -> List[Metric]:
"""Obtiene métricas en un rango de fechas"""
pass
@abstractmethod
def get_daily_stats(self, date: datetime) -> List[DailyStats]:
"""Obtiene estadísticas diarias"""
pass
@abstractmethod
def get_event_count_by_type(self, start_date: datetime, end_date: datetime) -> Dict[str, int]:
"""Obtiene conteo de eventos por tipo"""
pass
@abstractmethod
def generate_report(self, start_date: datetime, end_date: datetime) -> AnalyticsReport:
"""Genera reporte de analítica"""
pass

View File

@@ -0,0 +1,43 @@
"""Puerto de abstracción para repositorio de Moderaciones"""
from abc import ABC, abstractmethod
from domain.moderations import ModerationAction, ModerationLog
from typing import List, Optional
class ModerationRepository(ABC):
"""Interfaz (puerto) para operaciones de persistencia de moderaciones"""
@abstractmethod
def save_moderation_action(self, action: ModerationAction) -> bool:
"""Guarda una acción de moderación"""
pass
@abstractmethod
def get_moderation_action(self, action_id: str) -> Optional[ModerationAction]:
"""Obtiene una acción de moderación por ID"""
pass
@abstractmethod
def get_actions_by_moderator(self, moderator_id: int) -> List[ModerationAction]:
"""Obtiene todas las acciones de un moderador"""
pass
@abstractmethod
def get_actions_by_target(self, target_id: str, target_type: str) -> List[ModerationAction]:
"""Obtiene todas las acciones sobre un target específico"""
pass
@abstractmethod
def update_action_status(self, action_id: str, status: str) -> bool:
"""Actualiza el estado de una acción"""
pass
@abstractmethod
def save_moderation_log(self, log: ModerationLog) -> bool:
"""Guarda un log de moderación"""
pass
@abstractmethod
def get_moderation_logs(self, action_id: str) -> List[ModerationLog]:
"""Obtiene todos los logs de una acción"""
pass

View File

@@ -0,0 +1,39 @@
from datetime import datetime, timedelta
from typing import List, Dict
from domain.metrics import Metric, DailyStats, AnalyticsReport, EventType
from application.ports.metrics_repository import MetricsRepository
class MetricsService:
"""Servicio de lógica de negocio para métricas"""
def __init__(self, repository: MetricsRepository):
self.repository = repository
def record_event(self, event_type: str, entity_id: str, entity_type: str,
user_id: int = None, metadata: dict = None) -> Metric:
"""Registra un evento de métrica"""
metric = Metric(
metric_id=None,
event_type=event_type,
entity_id=entity_id,
entity_type=entity_type,
timestamp=datetime.now(),
user_id=user_id,
metadata=metadata or {}
)
return self.repository.save_metric(metric)
def get_metrics_report(self, days_back: int = 7) -> AnalyticsReport:
"""Obtiene reporte de últimos N días"""
end_date = datetime.now()
start_date = end_date - timedelta(days=days_back)
return self.repository.generate_report(start_date, end_date)
def get_daily_statistics(self, date: datetime) -> List[DailyStats]:
"""Obtiene estadísticas de un día específico"""
return self.repository.get_daily_stats(date)
def get_event_summary(self, start_date: datetime, end_date: datetime) -> Dict[str, int]:
"""Obtiene resumen de eventos en rango"""
return self.repository.get_event_count_by_type(start_date, end_date)

View File

@@ -0,0 +1,347 @@
"""Servicios de aplicación para Moderación"""
from domain.moderations import ModerationAction, ModerationLog
from application.ports.moderation_repository import ModerationRepository
from infrastructure.adapters.rabbitmq.sender import send_to_queue
from infrastructure.adapters.rabbitmq.messages import ModerationMessage, ModerationEventType
from datetime import datetime
from typing import Dict, Any, Optional
import uuid
import logging
logger = logging.getLogger(__name__)
class DeleteReportUseCase:
"""Use case para eliminar un reporte como moderador"""
def __init__(self, repo: ModerationRepository):
if not isinstance(repo, ModerationRepository):
raise TypeError("repo must implement ModerationRepository")
self.repo = repo
def execute(self, moderator_id: int, report_id: str, reason: str,
description: Optional[str] = None) -> Dict[str, Any]:
"""
Envía un evento a RabbitMQ para eliminar un reporte
Args:
moderator_id: ID del moderador
report_id: ID del reporte a eliminar
reason: Razón de la eliminación
description: Descripción adicional
Returns:
Dictionary con status y detalles
"""
try:
action_id = str(uuid.uuid4())
# Crear acción de moderación
action = ModerationAction(
action_id=action_id,
moderator_id=moderator_id,
action_type="delete_report",
target_type="report",
target_id=report_id,
reason=reason,
description=description,
status="pending",
fecha_creacion=datetime.now()
)
# Guardar acción
self.repo.save_moderation_action(action)
# Enviar evento a RabbitMQ
message = {
"action_id": action_id,
"event_type": "moderation.delete_report",
"moderator_id": moderator_id,
"report_id": report_id,
"reason": reason,
"description": description or "",
"timestamp": datetime.now().isoformat()
}
send_to_queue('moderation_queue', message)
return {
"status": "success",
"message": "Reporte marcado para eliminación",
"action_id": action_id
}
except Exception as e:
logger.error(f"Error deleting report: {e}")
return {
"status": "error",
"message": f"Error al eliminar reporte: {str(e)}"
}
class CloseAccountUseCase:
"""Use case para cerrar una cuenta de usuario"""
def __init__(self, repo: ModerationRepository):
if not isinstance(repo, ModerationRepository):
raise TypeError("repo must implement ModerationRepository")
self.repo = repo
def execute(self, moderator_id: int, user_id: int, reason: str,
description: Optional[str] = None, is_permanent: bool = True) -> Dict[str, Any]:
"""
Envía un evento para cerrar una cuenta de usuario
Args:
moderator_id: ID del moderador
user_id: ID del usuario cuya cuenta cerrar
reason: Razón del cierre
description: Descripción adicional
is_permanent: Si el cierre es permanente o temporal
Returns:
Dictionary con status y detalles
"""
try:
action_id = str(uuid.uuid4())
action = ModerationAction(
action_id=action_id,
moderator_id=moderator_id,
action_type="close_account",
target_type="user",
target_id=str(user_id),
reason=reason,
description=description,
is_permanent=is_permanent,
status="pending",
fecha_creacion=datetime.now()
)
self.repo.save_moderation_action(action)
message = {
"action_id": action_id,
"event_type": "moderation.close_account",
"moderator_id": moderator_id,
"user_id": user_id,
"reason": reason,
"description": description or "",
"is_permanent": is_permanent,
"timestamp": datetime.now().isoformat()
}
send_to_queue('moderation_queue', message)
return {
"status": "success",
"message": "Cuenta marcada para cierre",
"action_id": action_id
}
except Exception as e:
logger.error(f"Error closing account: {e}")
return {
"status": "error",
"message": f"Error al cerrar cuenta: {str(e)}"
}
class BanUserUseCase:
"""Use case para banear un usuario"""
def __init__(self, repo: ModerationRepository):
if not isinstance(repo, ModerationRepository):
raise TypeError("repo must implement ModerationRepository")
self.repo = repo
def execute(self, moderator_id: int, user_id: int, reason: str,
duration_days: Optional[int] = None, description: Optional[str] = None) -> Dict[str, Any]:
"""
Envía un evento para banear un usuario
Args:
moderator_id: ID del moderador
user_id: ID del usuario a banear
reason: Razón del ban
duration_days: Duración en días (None para ban permanente)
description: Descripción adicional
Returns:
Dictionary con status y detalles
"""
try:
action_id = str(uuid.uuid4())
is_permanent = duration_days is None
action = ModerationAction(
action_id=action_id,
moderator_id=moderator_id,
action_type="ban_user",
target_type="user",
target_id=str(user_id),
reason=reason,
description=description,
duration_days=duration_days,
is_permanent=is_permanent,
status="pending",
fecha_creacion=datetime.now()
)
self.repo.save_moderation_action(action)
message = {
"action_id": action_id,
"event_type": "moderation.ban_user",
"moderator_id": moderator_id,
"user_id": user_id,
"reason": reason,
"duration_days": duration_days,
"is_permanent": is_permanent,
"description": description or "",
"timestamp": datetime.now().isoformat()
}
send_to_queue('moderation_queue', message)
return {
"status": "success",
"message": "Usuario marcado para ban",
"action_id": action_id
}
except Exception as e:
logger.error(f"Error banning user: {e}")
return {
"status": "error",
"message": f"Error al banear usuario: {str(e)}"
}
class WarnUserUseCase:
"""Use case para advertir a un usuario"""
def __init__(self, repo: ModerationRepository):
if not isinstance(repo, ModerationRepository):
raise TypeError("repo must implement ModerationRepository")
self.repo = repo
def execute(self, moderator_id: int, user_id: int, reason: str,
description: Optional[str] = None) -> Dict[str, Any]:
"""
Envía un evento para advertir a un usuario
Args:
moderator_id: ID del moderador
user_id: ID del usuario a advertir
reason: Razón de la advertencia
description: Descripción adicional
Returns:
Dictionary con status y detalles
"""
try:
action_id = str(uuid.uuid4())
action = ModerationAction(
action_id=action_id,
moderator_id=moderator_id,
action_type="warn_user",
target_type="user",
target_id=str(user_id),
reason=reason,
description=description,
status="pending",
fecha_creacion=datetime.now()
)
self.repo.save_moderation_action(action)
message = {
"action_id": action_id,
"event_type": "moderation.warn_user",
"moderator_id": moderator_id,
"user_id": user_id,
"reason": reason,
"description": description or "",
"timestamp": datetime.now().isoformat()
}
send_to_queue('moderation_queue', message)
return {
"status": "success",
"message": "Advertencia registrada",
"action_id": action_id
}
except Exception as e:
logger.error(f"Error warning user: {e}")
return {
"status": "error",
"message": f"Error al advertir usuario: {str(e)}"
}
class ReviewContentUseCase:
"""Use case para revisar contenido reportado"""
def __init__(self, repo: ModerationRepository):
if not isinstance(repo, ModerationRepository):
raise TypeError("repo must implement ModerationRepository")
self.repo = repo
def execute(self, moderator_id: int, report_id: str, action: str,
reason: Optional[str] = None, notes: Optional[str] = None) -> Dict[str, Any]:
"""
Envía un evento para revisar y actuar sobre contenido reportado
Args:
moderator_id: ID del moderador
report_id: ID del reporte
action: Acción a tomar (approve, reject, needs_more_info)
reason: Razón de la decisión
notes: Notas adicionales
Returns:
Dictionary con status y detalles
"""
try:
if action not in ["approve", "reject", "needs_more_info"]:
return {
"status": "error",
"message": "Acción inválida"
}
action_id = str(uuid.uuid4())
moderation_action = ModerationAction(
action_id=action_id,
moderator_id=moderator_id,
action_type="review_content",
target_type="report",
target_id=report_id,
reason=reason or action,
description=notes,
status="approved",
fecha_creacion=datetime.now(),
fecha_ejecucion=datetime.now()
)
self.repo.save_moderation_action(moderation_action)
message = {
"action_id": action_id,
"event_type": "moderation.review_content",
"moderator_id": moderator_id,
"report_id": report_id,
"review_action": action,
"reason": reason or "",
"notes": notes or "",
"timestamp": datetime.now().isoformat()
}
send_to_queue('moderation_queue', message)
return {
"status": "success",
"message": f"Revisión registrada: {action}",
"action_id": action_id
}
except Exception as e:
logger.error(f"Error reviewing content: {e}")
return {
"status": "error",
"message": f"Error al revisar contenido: {str(e)}"
}

View File

@@ -0,0 +1,143 @@
"""Metrics RabbitMQ Consumer - Processes system events and saves metrics"""
import sys
import os
import logging
from datetime import datetime
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer
from infrastructure.adapters.rabbitmq.messages import (
UserMessage, ReportMessage, NotificationMessage, ModerationMessage
)
from application.services.metrics_services import MetricsService
from infrastructure.adapters.persistence.metrics_repository_postgres import MetricsRepositoryPostgres
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/metrics_consumer.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Colas dedicadas para métricas — no compiten con los consumers de dominio
METRICS_QUEUES = {
'user': 'metrics_users_queue',
'report': 'metrics_reports_queue',
'notification': 'metrics_notifications_queue',
'moderation': 'metrics_moderations_queue',
}
class MetricsConsumer:
"""Consumer for all system events from RabbitMQ"""
def __init__(self):
self.metrics_service = MetricsService(MetricsRepositoryPostgres())
self.user_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['user'])
self.report_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['report'])
self.notification_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['notification'])
self.moderation_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['moderation'])
self.user_consumer.set_callback(self.process_user_event)
self.report_consumer.set_callback(self.process_report_event)
self.notification_consumer.set_callback(self.process_notification_event)
self.moderation_consumer.set_callback(self.process_moderation_event)
def process_user_event(self, message_dict: dict):
try:
message = UserMessage.from_dict(message_dict)
self.metrics_service.record_event(
event_type=f"user_{message.event_type.value}",
entity_id=str(message.user_id),
entity_type="user",
user_id=message.user_id,
metadata={"email": message.email}
)
logger.info(f"Métrica registrada: user_{message.event_type.value}")
except Exception as e:
logger.error(f"Error procesando evento de usuario: {e}")
def process_report_event(self, message_dict: dict):
try:
message = ReportMessage.from_dict(message_dict)
self.metrics_service.record_event(
event_type=f"report_{message.event_type.value}",
entity_id=str(message.id_reporte),
entity_type="report",
user_id=message.id_usuario,
metadata={"estado": message.estado}
)
logger.info(f"Métrica registrada: report_{message.event_type.value}")
except Exception as e:
logger.error(f"Error procesando evento de reporte: {e}")
def process_notification_event(self, message_dict: dict):
try:
message = NotificationMessage.from_dict(message_dict)
self.metrics_service.record_event(
event_type=f"notification_{message.event_type.value}",
entity_id=str(message.id_notificacion),
entity_type="notification",
user_id=message.id_usuario,
metadata={"type": message.tipo_notificacion}
)
logger.info(f"Métrica registrada: notification_{message.event_type.value}")
except Exception as e:
logger.error(f"Error procesando evento de notificación: {e}")
def process_moderation_event(self, message_dict: dict):
try:
message = ModerationMessage.from_dict(message_dict)
self.metrics_service.record_event(
event_type=f"moderation_{message.event_type.value}",
entity_id=str(message.action_id),
entity_type="moderation",
user_id=message.moderator_id,
metadata={"action": message.review_action, "reason": message.reason}
)
logger.info(f"Métrica registrada: moderation_{message.event_type.value}")
except Exception as e:
logger.error(f"Error procesando evento de moderación: {e}")
def start(self):
logger.info("Iniciando Metrics Consumer...")
try:
import threading
threads = [
threading.Thread(target=self._safe_consume, args=(self.user_consumer,), daemon=True),
threading.Thread(target=self._safe_consume, args=(self.report_consumer,), daemon=True),
threading.Thread(target=self._safe_consume, args=(self.notification_consumer,), daemon=True),
threading.Thread(target=self._safe_consume, args=(self.moderation_consumer,), daemon=True),
]
for t in threads:
t.start()
for t in threads:
t.join()
except KeyboardInterrupt:
logger.info("Metrics Consumer detenido")
self.stop()
def _safe_consume(self, consumer: RabbitMQConsumer):
try:
consumer.start_consuming()
except Exception as e:
logger.error(f"Error en consumer de {consumer.queue_name}: {e}")
def stop(self):
for c in [self.user_consumer, self.report_consumer,
self.notification_consumer, self.moderation_consumer]:
try:
c.stop()
except Exception:
pass
if __name__ == "__main__":
consumer = MetricsConsumer()
consumer.start()

View File

@@ -0,0 +1,174 @@
"""Consumidor de eventos de Moderación desde RabbitMQ"""
import pika
import json
import logging
from datetime import datetime
from threading import Thread
from typing import Dict, Any
logger = logging.getLogger(__name__)
class ModerationConsumer:
"""Consumidor de eventos de moderación desde RabbitMQ"""
def __init__(self, host: str = 'localhost', port: int = 5672):
self.host = host
self.port = port
self.queue_name = 'moderation_queue'
self.running = False
def start(self):
"""Inicia el consumidor en un thread separado"""
thread = Thread(target=self._consume, daemon=True)
thread.start()
logger.info("Moderation Consumer started")
def _consume(self):
"""Consume mensajes de RabbitMQ"""
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host, port=self.port)
)
channel = connection.channel()
# Declarar cola
channel.queue_declare(queue=self.queue_name, durable=True)
# Configurar QoS
channel.basic_qos(prefetch_count=1)
# Configurar callback
channel.basic_consume(
queue=self.queue_name,
on_message_callback=self._process_message
)
logger.info(f"Moderation Consumer listening on {self.queue_name}")
self.running = True
channel.start_consuming()
except Exception as e:
logger.error(f"Error in moderation consumer: {e}")
self.running = False
def _process_message(self, ch, method, properties, body):
"""Procesa un mensaje de moderación"""
try:
message = json.loads(body)
logger.info(f"Processing moderation event: {message.get('event_type')}")
event_type = message.get('event_type')
if event_type == 'moderation.delete_report':
self._handle_delete_report(message)
elif event_type == 'moderation.close_account':
self._handle_close_account(message)
elif event_type == 'moderation.ban_user':
self._handle_ban_user(message)
elif event_type == 'moderation.warn_user':
self._handle_warn_user(message)
elif event_type == 'moderation.review_content':
self._handle_review_content(message)
else:
logger.warning(f"Unknown moderation event type: {event_type}")
# Confirmar mensaje
ch.basic_ack(delivery_tag=method.delivery_tag)
except json.JSONDecodeError:
logger.error("Error decoding JSON message")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
logger.error(f"Error processing moderation message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def _handle_delete_report(self, message: Dict[str, Any]):
"""Maneja evento de eliminación de reporte"""
try:
action_id = message.get('action_id')
report_id = message.get('report_id')
reason = message.get('reason')
logger.info(f"Deleting report {report_id} (Action: {action_id})")
# Aquí implementarías la lógica para eliminar el reporte en la BD
# Por ahora solo registramos el evento
logger.info(f"Report {report_id} marked as deleted by moderator {message.get('moderator_id')}")
except Exception as e:
logger.error(f"Error handling delete report: {e}")
raise
def _handle_close_account(self, message: Dict[str, Any]):
"""Maneja evento de cierre de cuenta"""
try:
action_id = message.get('action_id')
user_id = message.get('user_id')
is_permanent = message.get('is_permanent', True)
logger.info(f"Closing account for user {user_id} (Action: {action_id}, Permanent: {is_permanent})")
# Aquí implementarías la lógica para cerrar la cuenta en la BD
logger.info(f"Account for user {user_id} marked as closed")
except Exception as e:
logger.error(f"Error handling close account: {e}")
raise
def _handle_ban_user(self, message: Dict[str, Any]):
"""Maneja evento de ban de usuario"""
try:
action_id = message.get('action_id')
user_id = message.get('user_id')
duration_days = message.get('duration_days')
is_permanent = message.get('is_permanent', False)
reason = message.get('reason')
logger.info(f"Banning user {user_id} (Action: {action_id}, Duration: {duration_days} days, Permanent: {is_permanent})")
# Aquí implementarías la lógica para banear al usuario
if is_permanent:
logger.info(f"User {user_id} permanently banned")
else:
logger.info(f"User {user_id} banned for {duration_days} days")
except Exception as e:
logger.error(f"Error handling ban user: {e}")
raise
def _handle_warn_user(self, message: Dict[str, Any]):
"""Maneja evento de advertencia a usuario"""
try:
action_id = message.get('action_id')
user_id = message.get('user_id')
reason = message.get('reason')
logger.info(f"Warning user {user_id} (Action: {action_id}, Reason: {reason})")
# Aquí implementarías la lógica para registrar la advertencia
logger.info(f"Warning recorded for user {user_id}")
except Exception as e:
logger.error(f"Error handling warn user: {e}")
raise
def _handle_review_content(self, message: Dict[str, Any]):
"""Maneja evento de revisión de contenido"""
try:
action_id = message.get('action_id')
report_id = message.get('report_id')
review_action = message.get('review_action') # approve, reject, needs_more_info
logger.info(f"Reviewing report {report_id} (Action: {action_id}, Decision: {review_action})")
# Aquí implementarías la lógica para procesar la revisión
logger.info(f"Report {report_id} review: {review_action}")
except Exception as e:
logger.error(f"Error handling review content: {e}")
raise
# Crear instancia global
moderation_consumer = ModerationConsumer()

View File

@@ -129,7 +129,7 @@ class NotificationConsumer:
"""Start consuming messages from RabbitMQ"""
try:
logger.info("Starting Notification Consumer...")
self.consumer.start()
self.consumer.start_consuming()
except Exception as e:
logger.error(f"Error starting notification consumer: {e}", exc_info=True)
raise

View File

@@ -35,103 +35,78 @@ class UserConsumer:
Args:
message_dict: Dictionary containing the message data
"""
try:
# Reconstruct the UserMessage object
message = UserMessage.from_dict(message_dict)
# Reconstruct the UserMessage object — let exceptions propagate so the
# consumer's callback_wrapper can decide whether to ack, nack+discard, or requeue.
message = UserMessage.from_dict(message_dict)
if message.event_type == UserEventType.CREATE:
self._handle_create_user(message)
elif message.event_type == UserEventType.UPDATE:
self._handle_update_user(message)
elif message.event_type == UserEventType.DELETE:
self._handle_delete_user(message)
else:
logger.warning(f"Unknown event type: {message.event_type}")
except Exception as e:
logger.error(f"Error processing user message: {e}", exc_info=True)
# Rollback en caso de error en el procesamiento del mensaje
self.repo.db.rollback()
raise
if message.event_type == UserEventType.CREATE:
self._handle_create_user(message)
elif message.event_type == UserEventType.UPDATE:
self._handle_update_user(message)
elif message.event_type == UserEventType.DELETE:
self._handle_delete_user(message)
else:
logger.warning(f"Unknown event type: {message.event_type}")
def _handle_create_user(self, message: UserMessage):
"""Handle user create event"""
try:
logger.info(f"Creating user: {message.email}")
logger.info(f"Creating user: {message.email}")
# Parse datetime strings
fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento)
fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
if not message.fecha_nacimiento:
raise ValueError(f"Missing fecha_nacimiento in CREATE message for email {message.email}")
if not message.fecha_creacion:
raise ValueError(f"Missing fecha_creacion in CREATE message for email {message.email}")
# Create User domain object
user = User(
user_id=0, # Will be auto-generated by DB
nombre=message.nombre,
apellido=message.apellido,
email=message.email,
contraseña_hash=message.contraseña_hash,
fecha_nacimiento=fecha_nacimiento,
fecha_creacion=fecha_creacion,
calificacion=message.calificacion,
numero_reportes=message.numero_reportes,
url_foto_perfil=message.url_foto_perfil,
biografia=message.biografia
)
fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento)
fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
# Save to repository with transaction handling
saved_user = self.repo.save(user)
logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}")
user = User(
user_id=0, # Will be auto-generated by DB
nombre=message.nombre,
apellido=message.apellido,
email=message.email,
contraseña_hash=message.contraseña_hash,
fecha_nacimiento=fecha_nacimiento,
fecha_creacion=fecha_creacion,
calificacion=message.calificacion,
numero_reportes=message.numero_reportes,
url_foto_perfil=message.url_foto_perfil,
biografia=message.biografia
)
except Exception as e:
logger.error(f"Error creating user: {e}", exc_info=True)
self.repo.db.rollback()
raise
saved_user = self.repo.save(user)
logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}")
def _handle_update_user(self, message: UserMessage):
"""Handle user update event"""
try:
logger.info(f"Updating user: {message.user_id}")
logger.info(f"Updating user: {message.user_id}")
# Find the user
user = self.repo.find_by_id(message.user_id)
if not user:
logger.warning(f"User not found: {message.user_id}")
return
user = self.repo.find_by_id(message.user_id)
if not user:
logger.warning(f"User not found: {message.user_id}")
return
# Update fields if provided
if message.nombre:
user.nombre = message.nombre
if message.apellido:
user.apellido = message.apellido
if message.url_foto_perfil is not None:
user.url_foto_perfil = message.url_foto_perfil
if message.biografia is not None:
user.biografia = message.biografia
if message.nombre:
user.nombre = message.nombre
if message.apellido:
user.apellido = message.apellido
if message.url_foto_perfil is not None:
user.url_foto_perfil = message.url_foto_perfil
if message.biografia is not None:
user.biografia = message.biografia
# Save to repository with transaction handling
updated_user = self.repo.update(user)
logger.info(f"User updated successfully: {message.user_id}")
except Exception as e:
logger.error(f"Error updating user: {e}", exc_info=True)
self.repo.db.rollback()
raise
self.repo.update(user)
logger.info(f"User updated successfully: {message.user_id}")
def _handle_delete_user(self, message: UserMessage):
"""Handle user delete event"""
try:
logger.info(f"Deleting user: {message.user_id}")
logger.info(f"Deleting user: {message.user_id}")
success = self.repo.delete(message.user_id)
if success:
logger.info(f"User deleted successfully: {message.user_id}")
else:
logger.warning(f"Failed to delete user: {message.user_id}")
except Exception as e:
logger.error(f"Error deleting user: {e}", exc_info=True)
self.repo.db.rollback()
raise
success = self.repo.delete(message.user_id)
if success:
logger.info(f"User deleted successfully: {message.user_id}")
else:
logger.warning(f"Failed to delete user: {message.user_id}")
def start(self):
"""Start consuming messages"""
@@ -145,7 +120,6 @@ class UserConsumer:
logger.error(f"Consumer error: {e}", exc_info=True)
raise
finally:
# Asegurar cierre de sesión
if self.repo.db:
try:
self.repo.db.close()

View File

@@ -36,6 +36,11 @@ class Settings(BaseSettings):
)
postgres_url: str = Field(
default=os.getenv("POSTGRES_URL", "postgresql://voxpopuli:voxpopuli_pass@localhost:5432/voxpopuli_metrics"),
description="Base de datos PostgreSQL para métricas"
)
# JWT Configuration
jwt_secret_key: str = Field(
default=os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production"),
@@ -81,6 +86,12 @@ class Settings(BaseSettings):
description="Calidad de compresión WebP (0-100)"
)
# CORS Configuration
cors_origins: list = Field(
default=["http://localhost:3000", "http://localhost:8000", "http://localhost:8001", "http://localhost:8002", "http://localhost:8003", "http://localhost:8004"],
description="Orígenes permitidos para CORS"
)
class Config:
env_file = ".env"
case_sensitive = False

46
src/domain/metrics.py Normal file
View File

@@ -0,0 +1,46 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum
class EventType(str, Enum):
"""Tipos de eventos que se rastrean"""
USER_CREATED = "user_created"
USER_UPDATED = "user_updated"
USER_DELETED = "user_deleted"
REPORT_CREATED = "report_created"
REPORT_RESOLVED = "report_resolved"
NOTIFICATION_SENT = "notification_sent"
MODERATION_COMPLETED = "moderation_completed"
@dataclass
class Metric:
"""Modelo de dominio para Métrica"""
metric_id: int
event_type: str
entity_id: str
entity_type: str
timestamp: datetime
metadata: dict = field(default_factory=dict)
user_id: Optional[int] = None
@dataclass
class DailyStats:
"""Estadísticas diarias"""
date: datetime
event_type: str
count: int
@dataclass
class AnalyticsReport:
"""Reporte de analítica"""
report_id: int
start_date: datetime
end_date: datetime
total_events: int
events_by_type: dict
creation_date: datetime = field(default_factory=datetime.now)

35
src/domain/moderations.py Normal file
View File

@@ -0,0 +1,35 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Literal
@dataclass
class ModerationAction:
"""Modelo de dominio para Acciones de Moderación"""
action_id: str
moderator_id: int
action_type: Literal["delete_report", "close_account", "ban_user", "warn_user", "review_content"]
target_type: Literal["report", "user"]
target_id: str # ID del reporte o usuario afectado
reason: str
description: Optional[str] = None
duration_days: Optional[int] = None # Para bans temporales
is_permanent: bool = False
status: Literal["pending", "approved", "rejected", "executed"] = "pending"
fecha_creacion: Optional[datetime] = None
fecha_ejecucion: Optional[datetime] = None
observaciones: Optional[str] = None
@dataclass
class ModerationLog:
"""Log de todas las acciones de moderación realizadas"""
log_id: str
action_id: str
moderator_id: int
action_type: str
target_type: str
target_id: str
resultado: Literal["success", "failed", "pending"]
mensaje: Optional[str] = None
fecha_log: Optional[datetime] = None
ip_moderator: Optional[str] = None

View File

@@ -16,3 +16,4 @@ class User:
numero_reportes: int = 0
url_foto_perfil: Optional[str] = None
biografia: Optional[str] = None
is_admin: bool = False # Indica si el usuario tiene permisos de administrador

View File

@@ -0,0 +1,176 @@
"""Adaptador de repositorio de Moderación con MongoDB"""
from application.ports.moderation_repository import ModerationRepository
from domain.moderations import ModerationAction, ModerationLog
from typing import List, Optional
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class ModerationRepositoryMongo(ModerationRepository):
"""Implementación de ModerationRepository usando MongoDB"""
def __init__(self):
self.db = None
self.moderation_actions_collection = None
self.moderation_logs_collection = None
self._initialize_db()
def _initialize_db(self):
"""Inicializa conexión a MongoDB"""
try:
from infrastructure.adapters.persistence.mongodb import mongodb
self.db = mongodb
# Obtener o crear colecciones
self.moderation_actions_collection = self.db['moderation_actions']
self.moderation_logs_collection = self.db['moderation_logs']
# Crear índices
self.moderation_actions_collection.create_index('action_id')
self.moderation_actions_collection.create_index('moderator_id')
self.moderation_actions_collection.create_index('target_id')
self.moderation_logs_collection.create_index('action_id')
logger.info("Moderation MongoDB adapter initialized")
except Exception as e:
logger.error(f"Error initializing MongoDB for moderations: {e}")
raise
def save_moderation_action(self, action: ModerationAction) -> bool:
"""Guarda una acción de moderación en MongoDB"""
try:
action_dict = {
'action_id': action.action_id,
'moderator_id': action.moderator_id,
'action_type': action.action_type,
'target_type': action.target_type,
'target_id': action.target_id,
'reason': action.reason,
'description': action.description,
'duration_days': action.duration_days,
'is_permanent': action.is_permanent,
'status': action.status,
'fecha_creacion': action.fecha_creacion or datetime.now(),
'fecha_ejecucion': action.fecha_ejecucion,
'observaciones': action.observaciones
}
self.moderation_actions_collection.insert_one(action_dict)
logger.info(f"Moderation action saved: {action.action_id}")
return True
except Exception as e:
logger.error(f"Error saving moderation action: {e}")
return False
def get_moderation_action(self, action_id: str) -> Optional[ModerationAction]:
"""Obtiene una acción de moderación por ID"""
try:
doc = self.moderation_actions_collection.find_one({'action_id': action_id})
if doc:
return self._doc_to_moderation_action(doc)
return None
except Exception as e:
logger.error(f"Error getting moderation action: {e}")
return None
def get_actions_by_moderator(self, moderator_id: int) -> List[ModerationAction]:
"""Obtiene todas las acciones de un moderador"""
try:
docs = self.moderation_actions_collection.find({'moderator_id': moderator_id})
return [self._doc_to_moderation_action(doc) for doc in docs]
except Exception as e:
logger.error(f"Error getting actions by moderator: {e}")
return []
def get_actions_by_target(self, target_id: str, target_type: str) -> List[ModerationAction]:
"""Obtiene todas las acciones sobre un target específico"""
try:
docs = self.moderation_actions_collection.find({
'target_id': target_id,
'target_type': target_type
})
return [self._doc_to_moderation_action(doc) for doc in docs]
except Exception as e:
logger.error(f"Error getting actions by target: {e}")
return []
def update_action_status(self, action_id: str, status: str) -> bool:
"""Actualiza el estado de una acción"""
try:
result = self.moderation_actions_collection.update_one(
{'action_id': action_id},
{'$set': {'status': status}}
)
return result.modified_count > 0
except Exception as e:
logger.error(f"Error updating action status: {e}")
return False
def save_moderation_log(self, log: ModerationLog) -> bool:
"""Guarda un log de moderación"""
try:
log_dict = {
'log_id': log.log_id,
'action_id': log.action_id,
'moderator_id': log.moderator_id,
'action_type': log.action_type,
'target_type': log.target_type,
'target_id': log.target_id,
'resultado': log.resultado,
'mensaje': log.mensaje,
'fecha_log': log.fecha_log or datetime.now(),
'ip_moderator': log.ip_moderator
}
self.moderation_logs_collection.insert_one(log_dict)
logger.info(f"Moderation log saved: {log.log_id}")
return True
except Exception as e:
logger.error(f"Error saving moderation log: {e}")
return False
def get_moderation_logs(self, action_id: str) -> List[ModerationLog]:
"""Obtiene todos los logs de una acción"""
try:
docs = self.moderation_logs_collection.find({'action_id': action_id})
return [self._doc_to_moderation_log(doc) for doc in docs]
except Exception as e:
logger.error(f"Error getting moderation logs: {e}")
return []
@staticmethod
def _doc_to_moderation_action(doc: dict) -> ModerationAction:
"""Convierte un documento de MongoDB a ModerationAction"""
return ModerationAction(
action_id=doc.get('action_id'),
moderator_id=doc.get('moderator_id'),
action_type=doc.get('action_type'),
target_type=doc.get('target_type'),
target_id=doc.get('target_id'),
reason=doc.get('reason'),
description=doc.get('description'),
duration_days=doc.get('duration_days'),
is_permanent=doc.get('is_permanent', False),
status=doc.get('status', 'pending'),
fecha_creacion=doc.get('fecha_creacion'),
fecha_ejecucion=doc.get('fecha_ejecucion'),
observaciones=doc.get('observaciones')
)
@staticmethod
def _doc_to_moderation_log(doc: dict) -> ModerationLog:
"""Convierte un documento de MongoDB a ModerationLog"""
return ModerationLog(
log_id=doc.get('log_id'),
action_id=doc.get('action_id'),
moderator_id=doc.get('moderator_id'),
action_type=doc.get('action_type'),
target_type=doc.get('target_type'),
target_id=doc.get('target_id'),
resultado=doc.get('resultado'),
mensaje=doc.get('mensaje'),
fecha_log=doc.get('fecha_log'),
ip_moderator=doc.get('ip_moderator')
)

View File

@@ -0,0 +1,137 @@
from datetime import datetime
from typing import List, Dict, Optional
from sqlalchemy import Column, Integer, String, DateTime, JSON, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.sql import func
from domain.metrics import Metric, DailyStats, AnalyticsReport, EventType
from application.ports.metrics_repository import MetricsRepository
from core.config import ConfSettings
Base = declarative_base()
#settings = ConfSettings()
class MetricModel(Base):
"""Modelo SQLAlchemy para métricas en Postgres"""
__tablename__ = "metrics"
id = Column(Integer, primary_key=True, autoincrement=True)
event_type = Column(String, nullable=False, index=True)
entity_id = Column(String, nullable=False)
entity_type = Column(String, nullable=False)
timestamp = Column(DateTime, nullable=False, default=datetime.now, index=True)
event_metadata = Column(JSON, default={})
user_id = Column(Integer, nullable=True)
class MetricsRepositoryPostgres(MetricsRepository):
"""Implementación de repositorio de métricas con PostgreSQL"""
def __init__(self):
db_url = f"postgresql://voxpopuli:voxpopuli_pass@postgres:5432/voxpopuli_metrics"
self.engine = create_engine(db_url, echo=False)
Base.metadata.create_all(self.engine)
self.SessionLocal = sessionmaker(bind=self.engine)
def save_metric(self, metric: Metric) -> Metric:
"""Guarda una métrica en la base de datos"""
session = self.SessionLocal()
try:
db_metric = MetricModel(
event_type=metric.event_type,
entity_id=metric.entity_id,
entity_type=metric.entity_type,
timestamp=metric.timestamp,
event_metadata=metric.metadata,
user_id=metric.user_id
)
session.add(db_metric)
session.commit()
metric.metric_id = db_metric.id
return metric
finally:
session.close()
def get_metrics_by_date_range(self, start_date: datetime, end_date: datetime) -> List[Metric]:
"""Obtiene métricas en un rango de fechas"""
session = self.SessionLocal()
try:
db_metrics = session.query(MetricModel).filter(
MetricModel.timestamp >= start_date,
MetricModel.timestamp <= end_date
).all()
return [
Metric(
metric_id=m.id,
event_type=m.event_type,
entity_id=m.entity_id,
entity_type=m.entity_type,
timestamp=m.timestamp,
metadata=m.event_metadata or {},
user_id=m.user_id
)
for m in db_metrics
]
finally:
session.close()
def get_daily_stats(self, date: datetime) -> List[DailyStats]:
"""Obtiene estadísticas diarias"""
session = self.SessionLocal()
try:
start = datetime(date.year, date.month, date.day)
end = datetime(date.year, date.month, date.day, 23, 59, 59)
results = session.query(
MetricModel.event_type,
func.count(MetricModel.id).label('count')
).filter(
MetricModel.timestamp >= start,
MetricModel.timestamp <= end
).group_by(MetricModel.event_type).all()
return [
DailyStats(date=date, event_type=r[0], count=r[1])
for r in results
]
finally:
session.close()
def get_event_count_by_type(self, start_date: datetime, end_date: datetime) -> Dict[str, int]:
"""Obtiene conteo de eventos por tipo"""
session = self.SessionLocal()
try:
results = session.query(
MetricModel.event_type,
func.count(MetricModel.id).label('count')
).filter(
MetricModel.timestamp >= start_date,
MetricModel.timestamp <= end_date
).group_by(MetricModel.event_type).all()
return {r[0]: r[1] for r in results}
finally:
session.close()
def generate_report(self, start_date: datetime, end_date: datetime) -> AnalyticsReport:
"""Genera reporte de analítica"""
session = self.SessionLocal()
try:
total_events = session.query(func.count(MetricModel.id)).filter(
MetricModel.timestamp >= start_date,
MetricModel.timestamp <= end_date
).scalar() or 0
events_by_type = self.get_event_count_by_type(start_date, end_date)
return AnalyticsReport(
report_id=None,
start_date=start_date,
end_date=end_date,
total_events=total_events,
events_by_type=events_by_type
)
finally:
session.close()

View File

@@ -1,4 +1,4 @@
from sqlalchemy import Column, Integer, String, Float, DateTime
from sqlalchemy import Column, Integer, String, Float, DateTime, Boolean
from infrastructure.adapters.persistence.db import Base
from datetime import datetime
@@ -17,3 +17,4 @@ class UserModel(Base):
numero_reportes = Column(Integer, default=0, nullable=False)
url_foto_perfil = Column(String(500), nullable=True)
biografia = Column(String(1000), nullable=True)
is_admin = Column(Boolean, default=False, nullable=False, index=True) # Permisos de administrador

View File

@@ -2,6 +2,7 @@ from application.ports.user_repository import UserRepository
from domain.users import User
from infrastructure.adapters.persistence.models import UserModel
from infrastructure.adapters.persistence.db import SessionLocal
from sqlalchemy.orm import Session
from typing import List, Optional
import logging
@@ -10,11 +11,24 @@ logger = logging.getLogger(__name__)
class UserRepositorySQL(UserRepository):
"""Implementación del repositorio de Usuarios usando SQLAlchemy (MySQL)"""
def __init__(self, db_session=None):
self.db = db_session or SessionLocal()
def __init__(self, db_session: Session = None):
# FIXED: la sesión se guarda solo si se inyecta explícitamente.
# Cuando db_session es None, cada método abre y cierra su propia
# sesión, eliminando la caché de primer nivel entre requests.
self._injected_session = db_session
def _get_session(self):
"""
Devuelve la sesión inyectada si existe, o crea una nueva.
El caller es responsable de cerrarla cuando no fue inyectada.
"""
if self._injected_session is not None:
return self._injected_session, False # (sesión, es_propia)
return SessionLocal(), True
def save(self, user: User) -> User:
"""Guarda un nuevo usuario con manejo de transacciones"""
db, is_own = self._get_session()
try:
db_user = UserModel(
nombre=user.nombre,
@@ -26,62 +40,87 @@ class UserRepositorySQL(UserRepository):
calificacion=user.calificacion,
numero_reportes=user.numero_reportes,
url_foto_perfil=user.url_foto_perfil,
biografia=user.biografia
biografia=user.biografia,
is_admin=user.is_admin
)
self.db.add(db_user)
self.db.commit()
self.db.refresh(db_user)
db.add(db_user)
db.commit()
db.refresh(db_user)
logger.info(f"Usuario guardado exitosamente: {db_user.user_id}")
return self._to_domain(db_user)
except Exception as e:
self.db.rollback()
db.rollback()
logger.error(f"Error al guardar usuario: {e}", exc_info=True)
raise
finally:
if is_own:
db.close()
def find_by_id(self, user_id: int) -> Optional[User]:
"""Obtiene un usuario por ID"""
db, is_own = self._get_session()
try:
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first()
db_user = db.query(UserModel).filter(UserModel.user_id == user_id).first()
if db_user:
return self._to_domain(db_user)
return None
except Exception as e:
logger.error(f"Error al buscar usuario por ID {user_id}: {e}", exc_info=True)
raise
finally:
if is_own:
db.close()
def find_by_email(self, email: str) -> Optional[User]:
"""Obtiene un usuario por email"""
db, is_own = self._get_session()
try:
db_user = self.db.query(UserModel).filter(UserModel.email == email).first()
db_user = db.query(UserModel).filter(UserModel.email == email).first()
if db_user:
return self._to_domain(db_user)
return None
except Exception as e:
logger.error(f"Error al buscar usuario por email {email}: {e}", exc_info=True)
raise
finally:
if is_own:
db.close()
def find_by_email_with_password(self, email: str) -> Optional[UserModel]:
"""Obtiene un usuario por email incluyendo el hash de contraseña (para autenticación)"""
db, is_own = self._get_session()
try:
db_user = self.db.query(UserModel).filter(UserModel.email == email).first()
db_user = db.query(UserModel).filter(UserModel.email == email).first()
# FIXED: si la sesión es propia, expunge para poder usar el objeto
# fuera de la sesión sin que SQLAlchemy intente lazy-load nada.
if db_user and is_own:
db.expunge(db_user)
return db_user
except Exception as e:
logger.error(f"Error al buscar usuario por email {email}: {e}", exc_info=True)
raise
finally:
if is_own:
db.close()
def find_all(self) -> List[User]:
"""Obtiene todos los usuarios"""
db, is_own = self._get_session()
try:
db_users = self.db.query(UserModel).all()
db_users = db.query(UserModel).all()
return [self._to_domain(user) for user in db_users]
except Exception as e:
logger.error(f"Error al obtener todos los usuarios: {e}", exc_info=True)
raise
finally:
if is_own:
db.close()
def update(self, user: User) -> User:
"""Actualiza un usuario con manejo de transacciones"""
db, is_own = self._get_session()
try:
db_user = self.db.query(UserModel).filter(UserModel.user_id == user.user_id).first()
db_user = db.query(UserModel).filter(UserModel.user_id == user.user_id).first()
if not db_user:
logger.warning(f"Usuario no encontrado para actualizar: {user.user_id}")
return user
@@ -92,61 +131,75 @@ class UserRepositorySQL(UserRepository):
db_user.numero_reportes = user.numero_reportes
db_user.url_foto_perfil = user.url_foto_perfil
db_user.biografia = user.biografia
self.db.commit()
self.db.refresh(db_user)
db.commit()
db.refresh(db_user)
logger.info(f"Usuario actualizado exitosamente: {user.user_id}")
return self._to_domain(db_user)
except Exception as e:
self.db.rollback()
db.rollback()
logger.error(f"Error al actualizar usuario {user.user_id}: {e}", exc_info=True)
raise
finally:
if is_own:
db.close()
def delete(self, user_id: int) -> bool:
"""Elimina un usuario con manejo de transacciones"""
db, is_own = self._get_session()
try:
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first()
db_user = db.query(UserModel).filter(UserModel.user_id == user_id).first()
if db_user:
self.db.delete(db_user)
self.db.commit()
db.delete(db_user)
db.commit()
logger.info(f"Usuario eliminado exitosamente: {user_id}")
return True
logger.warning(f"Usuario no encontrado para eliminar: {user_id}")
return False
except Exception as e:
self.db.rollback()
db.rollback()
logger.error(f"Error al eliminar usuario {user_id}: {e}", exc_info=True)
raise
finally:
if is_own:
db.close()
def increment_reports(self, user_id: int) -> None:
"""Incrementa el contador de reportes con manejo de transacciones"""
db, is_own = self._get_session()
try:
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first()
db_user = db.query(UserModel).filter(UserModel.user_id == user_id).first()
if db_user:
db_user.numero_reportes += 1
self.db.commit()
db.commit()
logger.info(f"Contador de reportes incrementado para usuario: {user_id}")
else:
logger.warning(f"Usuario no encontrado para incrementar reportes: {user_id}")
except Exception as e:
self.db.rollback()
db.rollback()
logger.error(f"Error al incrementar reportes del usuario {user_id}: {e}", exc_info=True)
raise
finally:
if is_own:
db.close()
def update_rating(self, user_id: int, new_rating: float) -> None:
"""Actualiza la calificación de un usuario con manejo de transacciones"""
db, is_own = self._get_session()
try:
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first()
db_user = db.query(UserModel).filter(UserModel.user_id == user_id).first()
if db_user:
# Asegurar que la calificación esté en el rango 0-100
db_user.calificacion = max(0, min(100, new_rating))
self.db.commit()
db.commit()
logger.info(f"Calificación actualizada para usuario {user_id}: {db_user.calificacion}")
else:
logger.warning(f"Usuario no encontrado para actualizar calificación: {user_id}")
except Exception as e:
self.db.rollback()
db.rollback()
logger.error(f"Error al actualizar calificación del usuario {user_id}: {e}", exc_info=True)
raise
finally:
if is_own:
db.close()
def _to_domain(self, db_user: UserModel) -> User:
"""Convierte un modelo SQLAlchemy a un objeto de dominio"""
@@ -160,5 +213,6 @@ class UserRepositorySQL(UserRepository):
calificacion=db_user.calificacion,
numero_reportes=db_user.numero_reportes,
url_foto_perfil=db_user.url_foto_perfil,
biografia=db_user.biografia
biografia=db_user.biografia,
is_admin=db_user.is_admin
)

View File

@@ -23,6 +23,7 @@ class RabbitMQConsumer:
)
channel = connection.channel()
channel.queue_declare(queue=self.queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
def callback_wrapper(ch, method, properties, body):
try:
@@ -34,13 +35,13 @@ class RabbitMQConsumer:
ch.basic_ack(delivery_tag=method.delivery_tag)
except IntegrityError as e:
# Error de negocio: no tiene sentido reintentar
logger.warning(f"Business error, discarding message: {e}")
except (IntegrityError, ValueError, TypeError, KeyError) as e:
# Errores de negocio/datos: no tiene sentido reintentar
logger.warning(f"Business/data error, discarding message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
# Error transitorio (red, DB caída): puede resolverse solo
# Error transitorio (red, DB caída): puede resolverse solo
logger.error(f"Transient error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

View File

@@ -27,6 +27,15 @@ class NotificationEventType(str, Enum):
REPORT_STATUS_CHANGE = "notification.report_status_change"
class ModerationEventType(str, Enum):
"""Types of moderation events"""
DELETE_REPORT = "moderation.delete_report"
CLOSE_ACCOUNT = "moderation.close_account"
BAN_USER = "moderation.ban_user"
WARN_USER = "moderation.warn_user"
REVIEW_CONTENT = "moderation.review_content"
@dataclass
class UserMessage:
"""Message for user events"""
@@ -122,3 +131,36 @@ class NotificationMessage:
"""Create from dictionary"""
data['event_type'] = NotificationEventType(data['event_type'])
return NotificationMessage(**data)
@dataclass
class ModerationMessage:
"""Message for moderation events"""
event_type: ModerationEventType
action_id: Optional[str] = None
moderator_id: Optional[int] = None
report_id: Optional[str] = None
user_id: Optional[int] = None
reason: Optional[str] = None
description: Optional[str] = None
duration_days: Optional[int] = None # Para bans temporales
is_permanent: Optional[bool] = None
review_action: Optional[str] = None # approve, reject, needs_more_info
notes: Optional[str] = None
fecha_creacion: Optional[str] = None # ISO format datetime string
def to_dict(self):
"""Convert to dictionary"""
data = asdict(self)
data['event_type'] = self.event_type.value
return data
def to_json(self) -> str:
"""Convert to JSON string"""
return json.dumps(self.to_dict())
@staticmethod
def from_dict(data: dict) -> 'ModerationMessage':
"""Create from dictionary"""
data['event_type'] = ModerationEventType(data['event_type'])
return ModerationMessage(**data)

View File

@@ -3,41 +3,34 @@ import pika
import json
from typing import Any, Dict
import logging
import os
logger = logging.getLogger(__name__)
def _get_rabbitmq_host() -> str:
try:
from core.config import ConfSettings
return ConfSettings.rabbitmq
except Exception:
return os.getenv("RABBITMQ_URI", "localhost")
class RabbitMQSender:
"""Generic RabbitMQ sender for publishing messages to queues"""
def __init__(self, host: str = 'localhost', port: int = 5672):
self.host = host
def __init__(self, host: str = None, port: int = 5672):
self.host = host or _get_rabbitmq_host()
self.port = port
def send_message(self, queue_name: str, message: Dict[str, Any]) -> bool:
"""
Sends a message to a RabbitMQ queue
Args:
queue_name: Name of the queue to send to
message: Dictionary containing the message data
Returns:
True if successful, False otherwise
"""
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host, port=self.port)
)
channel = connection.channel()
# Declare queue to ensure it exists
channel.queue_declare(queue=queue_name, durable=True)
# Convert message to JSON
message_json = json.dumps(message)
# Publish the message
channel.basic_publish(
exchange='',
routing_key=queue_name,
@@ -46,29 +39,15 @@ class RabbitMQSender:
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
connection.close()
logger.info(f"Message sent to queue '{queue_name}': {message_json}")
return True
except Exception as e:
logger.error(f"Error sending message to RabbitMQ: {e}")
return False
def send_to_queue(queue_name: str, message: Dict[str, Any],
host: str = 'localhost', port: int = 5672) -> bool:
"""
Convenience function to send a message to RabbitMQ
Args:
queue_name: Name of the queue
message: Message dictionary
host: RabbitMQ host
port: RabbitMQ port
Returns:
True if successful, False otherwise
"""
host: str = None, port: int = 5672) -> bool:
sender = RabbitMQSender(host=host, port=port)
return sender.send_message(queue_name, message)

View File

@@ -0,0 +1,49 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer
import jwt
from core.config import ConfSettings
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
import logging
logger = logging.getLogger(__name__)
security = HTTPBearer()
async def get_current_admin_user(credentials = Depends(security)):
try:
payload = jwt.decode(
credentials.credentials,
ConfSettings.jwt_secret_key,
algorithms=[ConfSettings.jwt_algorithm]
)
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido",
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido o expirado",
headers={"WWW-Authenticate": "Bearer"},
)
user_repo = UserRepositorySQL()
user = user_repo.find_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Usuario no encontrado",
)
if not user.is_admin:
logger.warning(f"Intento de acceso no autorizado a moderación por usuario {user_id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Permisos insuficientes",
)
return user

View File

@@ -0,0 +1 @@
# Metrics API

View File

@@ -0,0 +1,13 @@
from fastapi import FastAPI
from infrastructure.api.metrics.router import router
def create_app() -> FastAPI:
"""Factory para crear la aplicación de Métricas"""
app = FastAPI(
title="Métricas Microservice",
version="1.0.0",
description="Microservicio de métricas y analítica"
)
app.include_router(router)
return app

View File

@@ -0,0 +1,80 @@
from fastapi import APIRouter, Query
from datetime import datetime, timedelta
from infrastructure.api.metrics.schemas import (
MetricRequest, MetricResponse, AnalyticsReportResponse,
EventSummaryResponse, DailyStatsResponse
)
from application.services.metrics_services import MetricsService
from infrastructure.adapters.persistence.metrics_repository_postgres import MetricsRepositoryPostgres
from domain.metrics import Metric
router = APIRouter(prefix="/metrics", tags=["metrics"])
metrics_service = MetricsService(MetricsRepositoryPostgres())
@router.post("/record", response_model=MetricResponse)
async def record_metric(metric: MetricRequest):
"""Registra un nuevo evento de métrica"""
saved_metric = metrics_service.record_event(
event_type=metric.event_type,
entity_id=metric.entity_id,
entity_type=metric.entity_type,
user_id=metric.user_id,
metadata=metric.metadata
)
return {
"metric_id": saved_metric.metric_id,
"event_type": saved_metric.event_type,
"entity_id": saved_metric.entity_id,
"entity_type": saved_metric.entity_type,
"timestamp": saved_metric.timestamp,
"user_id": saved_metric.user_id
}
@router.get("/report", response_model=AnalyticsReportResponse)
async def get_analytics_report(days: int = Query(7, ge=1, le=365)):
"""Obtiene reporte de analítica de los últimos N días"""
report = metrics_service.get_metrics_report(days_back=days)
return {
"start_date": report.start_date,
"end_date": report.end_date,
"total_events": report.total_events,
"events_by_type": report.events_by_type
}
@router.get("/daily-stats", response_model=list[DailyStatsResponse])
async def get_daily_stats(date: datetime = Query(default_factory=datetime.now)):
"""Obtiene estadísticas de un día específico"""
stats = metrics_service.get_daily_statistics(date)
return [
{
"date": s.date,
"event_type": s.event_type,
"count": s.count
}
for s in stats
]
@router.get("/summary", response_model=EventSummaryResponse)
async def get_event_summary(
start_date: datetime = Query(default_factory=lambda: datetime.now() - timedelta(days=7)),
end_date: datetime = Query(default_factory=datetime.now)
):
"""Obtiene resumen de eventos en un rango de fechas"""
summary = metrics_service.get_event_summary(start_date, end_date)
return {
"summary": summary,
"date_range": {
"start_date": start_date,
"end_date": end_date
}
}
@router.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy"}

View File

@@ -0,0 +1,43 @@
from pydantic import BaseModel
from datetime import datetime
from typing import Dict, List, Optional
class MetricRequest(BaseModel):
"""Esquema para crear una métrica"""
event_type: str
entity_id: str
entity_type: str
user_id: Optional[int] = None
metadata: Optional[Dict] = None
class MetricResponse(BaseModel):
"""Esquema de respuesta para métrica"""
metric_id: int
event_type: str
entity_id: str
entity_type: str
timestamp: datetime
user_id: Optional[int]
class DailyStatsResponse(BaseModel):
"""Esquema de estadísticas diarias"""
date: datetime
event_type: str
count: int
class AnalyticsReportResponse(BaseModel):
"""Esquema de reporte de analítica"""
start_date: datetime
end_date: datetime
total_events: int
events_by_type: Dict[str, int]
class EventSummaryResponse(BaseModel):
"""Esquema de resumen de eventos"""
summary: Dict[str, int]
date_range: Dict[str, datetime]

View File

@@ -0,0 +1,46 @@
"""Aplicación FastAPI para API de Moderación"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from infrastructure.api.moderations.router import router
from core.config import ConfSettings
import logging
# Configurar logging
logging.basicConfig(
level=ConfSettings.log_level.upper(),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def create_app() -> FastAPI:
"""
Factory function para crear la aplicación FastAPI de Moderación
Returns:
FastAPI application instance
"""
app = FastAPI(
title="VoxPopuli Moderation API",
description="API para gestión de acciones de moderación en VoxPopuli",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json"
)
# Agregar CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=ConfSettings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Incluir rutas
app.include_router(router)
logger.info("Moderation API initialized")
return app

View File

@@ -0,0 +1,153 @@
"""Endpoints de moderación para gestión de reportes, cuentas y usuarios"""
from fastapi import APIRouter, HTTPException, Depends
from fastapi.responses import JSONResponse
from infrastructure.api.moderations.schemas import (
DeleteReportRequest, CloseAccountRequest, BanUserRequest,
WarnUserRequest, ReviewContentRequest, ModerationActionResponse
)
from infrastructure.api.auth import get_current_admin_user
from application.services.moderation_services import (
DeleteReportUseCase, CloseAccountUseCase, BanUserUseCase,
WarnUserUseCase, ReviewContentUseCase
)
from infrastructure.adapters.persistence.mongodb import mongodb
from infrastructure.adapters.persistence.db import get_db
from infrastructure.adapters.moderation_repository_mongo import ModerationRepositoryMongo
from infrastructure.adapters.rabbitmq.sender import send_to_queue
from infrastructure.adapters.rabbitmq.messages import ModerationMessage, ModerationEventType
from domain.users import User
from sqlalchemy.orm import Session
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
router = APIRouter()
moderation_repo = ModerationRepositoryMongo()
def _publish_moderation_event(event_type: ModerationEventType, **kwargs):
try:
msg = ModerationMessage(event_type=event_type, fecha_creacion=datetime.utcnow().isoformat(), **kwargs)
payload = msg.to_dict()
send_to_queue('moderations_queue', payload)
send_to_queue('metrics_moderations_queue', payload)
except Exception as e:
logger.warning(f"Error publicando evento de moderación a RabbitMQ: {e}")
@router.post("/reports/delete", response_model=ModerationActionResponse)
async def delete_report(request: DeleteReportRequest, current_admin: User = Depends(get_current_admin_user)):
"""
Eliminar un reporte como moderador (requiere permisos de admin)
- **report_id**: ID del reporte a eliminar
- **reason**: Razón de la eliminación (mínimo 5 caracteres)
- **description**: Descripción adicional (opcional)
"""
try:
use_case = DeleteReportUseCase(moderation_repo)
result = use_case.execute(moderator_id=current_admin.user_id, report_id=request.report_id, reason=request.reason, description=request.description)
if result["status"] == "error":
raise HTTPException(status_code=400, detail=result["message"])
_publish_moderation_event(ModerationEventType.DELETE_REPORT, moderator_id=current_admin.user_id, report_id=request.report_id, reason=request.reason, description=request.description, action_id=result.get("action_id"))
return ModerationActionResponse(**result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in delete_report: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor")
@router.post("/accounts/close", response_model=ModerationActionResponse)
async def close_account(request: CloseAccountRequest, current_admin: User = Depends(get_current_admin_user)):
"""
Cerrar una cuenta de usuario (requiere permisos de admin)
- **user_id**: ID del usuario cuya cuenta cerrar
- **reason**: Razón del cierre (mínimo 5 caracteres)
- **description**: Descripción adicional (opcional)
- **is_permanent**: Si el cierre es permanente (por defecto True)
"""
try:
use_case = CloseAccountUseCase(moderation_repo)
result = use_case.execute(moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, description=request.description, is_permanent=request.is_permanent)
if result["status"] == "error":
raise HTTPException(status_code=400, detail=result["message"])
_publish_moderation_event(ModerationEventType.CLOSE_ACCOUNT, moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, description=request.description, is_permanent=request.is_permanent, action_id=result.get("action_id"))
return ModerationActionResponse(**result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in close_account: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor")
@router.post("/users/ban", response_model=ModerationActionResponse)
async def ban_user(request: BanUserRequest, current_admin: User = Depends(get_current_admin_user)):
"""
Banear a un usuario (requiere permisos de admin)
- **user_id**: ID del usuario a banear
- **reason**: Razón del ban (mínimo 5 caracteres)
- **duration_days**: Duración del ban en días (None para permanente)
- **description**: Descripción adicional (opcional)
"""
try:
use_case = BanUserUseCase(moderation_repo)
result = use_case.execute(moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, duration_days=request.duration_days, description=request.description)
if result["status"] == "error":
raise HTTPException(status_code=400, detail=result["message"])
_publish_moderation_event(ModerationEventType.BAN_USER, moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, duration_days=request.duration_days, description=request.description, is_permanent=request.duration_days is None, action_id=result.get("action_id"))
return ModerationActionResponse(**result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in ban_user: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor")
@router.post("/users/warn", response_model=ModerationActionResponse)
async def warn_user(request: WarnUserRequest, current_admin: User = Depends(get_current_admin_user)):
"""
Advertir a un usuario (requiere permisos de admin)
- **user_id**: ID del usuario a advertir
- **reason**: Razón de la advertencia (mínimo 5 caracteres)
- **description**: Descripción adicional (opcional)
"""
try:
use_case = WarnUserUseCase(moderation_repo)
result = use_case.execute(moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, description=request.description)
if result["status"] == "error":
raise HTTPException(status_code=400, detail=result["message"])
_publish_moderation_event(ModerationEventType.WARN_USER, moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, description=request.description, action_id=result.get("action_id"))
return ModerationActionResponse(**result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in warn_user: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor")
@router.post("/content/review", response_model=ModerationActionResponse)
async def review_content(request: ReviewContentRequest, current_admin: User = Depends(get_current_admin_user)):
"""
Revisar y actuar sobre contenido reportado (requiere permisos de admin)
- **report_id**: ID del reporte a revisar
- **action**: Acción a tomar (approve, reject, needs_more_info)
- **reason**: Razón de la decisión (opcional)
- **notes**: Notas adicionales (opcional)
"""
try:
use_case = ReviewContentUseCase(moderation_repo)
result = use_case.execute(moderator_id=current_admin.user_id, report_id=request.report_id, action=request.action, reason=request.reason, notes=request.notes)
if result["status"] == "error":
raise HTTPException(status_code=400, detail=result["message"])
_publish_moderation_event(ModerationEventType.REVIEW_CONTENT, moderator_id=current_admin.user_id, report_id=request.report_id, review_action=request.action, reason=request.reason, notes=request.notes, action_id=result.get("action_id"))
return ModerationActionResponse(**result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in review_content: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor")

View File

@@ -0,0 +1,32 @@
"""Rutas raíz para API de Moderación"""
from fastapi import APIRouter, Response
from fastapi.responses import JSONResponse
router = APIRouter()
@router.get("/")
async def root():
"""Root endpoint - información de la API"""
return JSONResponse({
"status": "ok",
"service": "VoxPopuli Moderation API",
"version": "1.0.0",
"description": "API para gestión de acciones de moderación",
"endpoints": {
"delete_report": "POST /moderation/reports/delete",
"close_account": "POST /moderation/accounts/close",
"ban_user": "POST /moderation/users/ban",
"warn_user": "POST /moderation/users/warn",
"review_content": "POST /moderation/content/review"
}
})
@router.get("/health")
async def health_check():
"""Health check endpoint"""
return JSONResponse({
"status": "healthy",
"service": "Moderation API"
})

View File

@@ -0,0 +1,18 @@
"""Router principal para API de Moderación"""
from fastapi import APIRouter
from infrastructure.api.moderations.moderations import router as moderations_router
from infrastructure.api.moderations.root import router as root_router
router = APIRouter()
router.include_router(
moderations_router,
prefix="/moderation",
tags=["moderation"]
)
router.include_router(
root_router,
prefix='',
tags=["root"]
)

View File

@@ -0,0 +1,60 @@
"""Pydantic schemas para validación de datos en API de Moderación"""
from pydantic import BaseModel, Field
from typing import Optional, Literal
from datetime import datetime
class DeleteReportRequest(BaseModel):
"""Esquema para solicitud de eliminación de reporte"""
moderator_id: int = Field(..., description="ID del moderador")
report_id: str = Field(..., description="ID del reporte a eliminar")
reason: str = Field(..., min_length=5, description="Razón de la eliminación")
description: Optional[str] = Field(None, description="Descripción adicional")
class CloseAccountRequest(BaseModel):
"""Esquema para solicitud de cierre de cuenta"""
moderator_id: int = Field(..., description="ID del moderador")
user_id: int = Field(..., description="ID del usuario")
reason: str = Field(..., min_length=5, description="Razón del cierre")
description: Optional[str] = Field(None, description="Descripción adicional")
is_permanent: bool = Field(True, description="Si el cierre es permanente")
class BanUserRequest(BaseModel):
"""Esquema para solicitud de ban de usuario"""
moderator_id: int = Field(..., description="ID del moderador")
user_id: int = Field(..., description="ID del usuario a banear")
reason: str = Field(..., min_length=5, description="Razón del ban")
duration_days: Optional[int] = Field(None, description="Duración en días (None para permanente)")
description: Optional[str] = Field(None, description="Descripción adicional")
class WarnUserRequest(BaseModel):
"""Esquema para solicitud de advertencia"""
moderator_id: int = Field(..., description="ID del moderador")
user_id: int = Field(..., description="ID del usuario a advertir")
reason: str = Field(..., min_length=5, description="Razón de la advertencia")
description: Optional[str] = Field(None, description="Descripción adicional")
class ReviewContentRequest(BaseModel):
"""Esquema para solicitud de revisión de contenido"""
moderator_id: int = Field(..., description="ID del moderador")
report_id: str = Field(..., description="ID del reporte a revisar")
action: Literal["approve", "reject", "needs_more_info"] = Field(..., description="Acción de revisión")
reason: Optional[str] = Field(None, description="Razón de la decisión")
notes: Optional[str] = Field(None, description="Notas adicionales")
class ModerationActionResponse(BaseModel):
"""Esquema de respuesta para acciones de moderación"""
status: str = Field(..., description="Estado de la acción (success/error)")
message: str = Field(..., description="Mensaje descriptivo")
action_id: Optional[str] = Field(None, description="ID de la acción creada")
class ActionStatusRequest(BaseModel):
"""Esquema para actualizar estado de acción"""
status: Literal["pending", "approved", "rejected", "executed"] = Field(..., description="Nuevo estado")
notes: Optional[str] = Field(None, description="Notas sobre el cambio de estado")

View File

@@ -8,7 +8,10 @@ from infrastructure.adapters.persistence.report_repository_mongo import ReportRe
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
from infrastructure.adapters.file_storage import image_storage
from infrastructure.adapters.rabbitmq.sender import send_to_queue
from infrastructure.adapters.rabbitmq.messages import NotificationMessage, NotificationEventType
from infrastructure.adapters.rabbitmq.messages import (
NotificationMessage, NotificationEventType,
ReportMessage, ReportEventType
)
import logging
from typing import Optional
from datetime import datetime
@@ -21,7 +24,6 @@ logger = logging.getLogger(__name__)
def _report_to_response(report) -> dict:
"""Convierte un objeto Report a dict con image_url"""
return {
"id_reporte": report.id_reporte,
"id_usuario": report.id_usuario,
@@ -36,6 +38,17 @@ def _report_to_response(report) -> dict:
"fecha_creacion": report.fecha_creacion
}
def _publish_report_event(event_type: ReportEventType, **kwargs):
try:
msg = ReportMessage(event_type=event_type, **kwargs)
payload = msg.to_dict()
send_to_queue('reports_queue', payload)
send_to_queue('metrics_reports_queue', payload)
except Exception as e:
logger.warning(f"Error publicando evento de reporte a RabbitMQ: {e}")
@router.post("/", status_code=status.HTTP_202_ACCEPTED)
async def create_report(
id_usuario: int = Form(...),
@@ -49,7 +62,6 @@ async def create_report(
):
"""Crea un nuevo reporte - envía a cola de procesamiento con validaciones previas"""
try:
# Procesar imagen si fue proporcionada
image_filename = None
if file:
logger.info(f"Processing image file: {file.filename} ({file.content_type})")
@@ -57,46 +69,31 @@ async def create_report(
create_use_case = CreateReport(report_repo, user_repo)
result = create_use_case.execute(
id_usuario=id_usuario,
tipo_reporte=tipo_reporte,
descripcion=descripcion,
ubicacion=ubicacion,
lat=lat,
lng=lng,
image_filename=image_filename,
estado=estado
id_usuario=id_usuario, tipo_reporte=tipo_reporte, descripcion=descripcion,
ubicacion=ubicacion, lat=lat, lng=lng, image_filename=image_filename, estado=estado
)
if result["status"] == "error":
# Si hay error, eliminar imagen si fue guardada
if image_filename:
image_storage.delete_image(image_filename)
message = result["message"]
if "no existe" in message:
# 404 Not Found: usuario no existe
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=message
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
# 202 Accepted: enviado a la cola correctamente
_publish_report_event(
ReportEventType.CREATE,
id_reporte=result.get("id_reporte"), id_usuario=id_usuario, tipo_reporte=tipo_reporte,
descripcion=descripcion, ubicacion=ubicacion, lat=lat, lng=lng, estado=estado,
fecha_creacion=datetime.utcnow().isoformat()
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error inesperado en create_report: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
@router.get("/{report_id}", response_model=ReportResponse)
async def get_report(report_id: str):
@@ -105,19 +102,13 @@ async def get_report(report_id: str):
get_use_case = GetReportById(report_repo)
report = get_use_case.execute(report_id)
if not report:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Reporte con ID {report_id} no encontrado"
)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Reporte con ID {report_id} no encontrado")
return _report_to_response(report)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error al obtener reporte {report_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
@router.get("/user/{user_id}")
async def get_user_reports(user_id: int):
@@ -128,10 +119,7 @@ async def get_user_reports(user_id: int):
return [_report_to_response(report) for report in reports]
except Exception as e:
logger.error(f"Error al obtener reportes del usuario {user_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
@router.get("/")
async def list_reports():
@@ -142,10 +130,7 @@ async def list_reports():
return [_report_to_response(report) for report in reports]
except Exception as e:
logger.error(f"Error al listar reportes: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
@router.get("/shadowbanned/list")
async def get_shadowbanned_reports(threshold: float = 20):
@@ -156,48 +141,26 @@ async def get_shadowbanned_reports(threshold: float = 20):
return [_report_to_response(report) for report in reports]
except Exception as e:
logger.error(f"Error al obtener reportes shadowbaneados: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
@router.put("/{report_id}/visibility", status_code=status.HTTP_202_ACCEPTED)
async def update_report_visibility(report_id: str, visibility_data: ReportUpdateVisibilityRequest):
"""Actualiza la visibilidad de un reporte - envía a cola de procesamiento con validaciones previas"""
try:
update_use_case = UpdateReportVisibility(report_repo, user_repo)
result = update_use_case.execute(
report_id=report_id,
new_visibility=visibility_data.new_visibility,
penalize_author=visibility_data.penalize_author
)
result = update_use_case.execute(report_id=report_id, new_visibility=visibility_data.new_visibility, penalize_author=visibility_data.penalize_author)
if result["status"] == "error":
message = result["message"]
if "no existe" in message:
# 404 Not Found: reporte no existe
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=message
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# 202 Accepted: enviado a la cola correctamente
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
_publish_report_event(ReportEventType.UPDATE_VISIBILITY, id_reporte=report_id, visibilidad=visibility_data.new_visibility, penalize_author=visibility_data.penalize_author)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error al actualizar visibilidad del reporte {report_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
@router.delete("/{report_id}", status_code=status.HTTP_202_ACCEPTED)
async def delete_report(report_id: str):
@@ -205,69 +168,38 @@ async def delete_report(report_id: str):
try:
delete_use_case = DeleteReport(report_repo)
result = delete_use_case.execute(report_id)
if result["status"] == "error":
message = result["message"]
if "no existe" in message:
# 404 Not Found: reporte no existe
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=message
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# 202 Accepted: enviado a la cola correctamente
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
_publish_report_event(ReportEventType.DELETE, id_reporte=report_id)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error al eliminar reporte {report_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
@router.put("/{report_id}/status", status_code=status.HTTP_200_OK)
async def update_report_status(report_id: str, status_data: ReportUpdateStatusRequest):
"""Actualiza el estado de un reporte y envía notificación al usuario"""
try:
# Obtener el reporte actual para saber el usuario creador
report = report_repo.find_by_id(report_id)
if not report:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Reporte con ID {report_id} no encontrado"
)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Reporte con ID {report_id} no encontrado")
# Actualizar el estado
update_use_case = UpdateReportStatus(report_repo)
result = update_use_case.execute(
report_id=report_id,
new_estado=status_data.estado
)
result = update_use_case.execute(report_id=report_id, new_estado=status_data.estado)
if result["status"] == "error":
message = result["message"]
if "no existe" in message:
# 404 Not Found: reporte no existe
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=message
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
_publish_report_event(ReportEventType.UPDATE_STATUS, id_reporte=report_id, id_usuario=report.id_usuario, estado=status_data.estado)
# Enviar notificación al usuario creador del reporte
try:
notification_message = NotificationMessage(
event_type=NotificationEventType.REPORT_STATUS_CHANGE,
@@ -280,25 +212,16 @@ async def update_report_status(report_id: str, status_data: ReportUpdateStatusRe
estado_reporte=status_data.estado,
fecha_creacion=datetime.utcnow().isoformat()
)
# Enviar a la cola de notificaciones
send_to_queue(
queue_name='notifications_queue',
message=notification_message.to_dict()
)
send_to_queue('notifications_queue', notification_message.to_dict())
send_to_queue('metrics_notifications_queue', notification_message.to_dict())
logger.info(f"Notification sent to user {report.id_usuario} for report {report_id}")
except Exception as notification_error:
logger.warning(f"Error sending notification for report {report_id}: {notification_error}")
# No fallar la actualización si hay error en notificación
# 200 OK: estado actualizado correctamente
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error al actualizar estado del reporte {report_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")

View File

@@ -33,6 +33,7 @@ class UserLoginResponse(BaseModel):
token_type: str = Field(default="bearer", description="Tipo de token")
user_id: int = Field(..., description="ID del usuario")
email: str = Field(..., description="Email del usuario")
is_admin: bool = Field(..., description="Indica si el usuario es administrador")
class UserResponse(BaseModel):
"""Respuesta con datos de usuario"""
@@ -46,6 +47,7 @@ class UserResponse(BaseModel):
numero_reportes: int
url_foto_perfil: Optional[str]
biografia: Optional[str]
is_admin: bool
class Config:
from_attributes = True

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, HTTPException, status, Depends
from fastapi import APIRouter, HTTPException, status
from infrastructure.api.users.schemas import (
UserCreateRequest, UserUpdateRequest, UserResponse,
UserLoginRequest, UserLoginResponse
@@ -8,12 +8,25 @@ from application.services.user_services import (
)
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
from infrastructure.api.users.auth_service import auth_service
from infrastructure.adapters.rabbitmq.sender import send_to_queue
from infrastructure.adapters.rabbitmq.messages import UserMessage, UserEventType
import logging
from datetime import datetime
router = APIRouter()
user_repo = UserRepositorySQL()
logger = logging.getLogger(__name__)
def _publish_user_event(event_type: UserEventType, **kwargs):
try:
msg = UserMessage(event_type=event_type, fecha_creacion=datetime.utcnow().isoformat(), **kwargs)
payload = msg.to_dict()
send_to_queue('users_queue', payload)
send_to_queue('metrics_users_queue', payload)
except Exception as e:
logger.warning(f"Error publicando evento de usuario a RabbitMQ: {e}")
@router.post("/login", response_model=UserLoginResponse, status_code=status.HTTP_200_OK)
async def login_user(credentials: UserLoginRequest):
"""
@@ -30,48 +43,21 @@ async def login_user(credentials: UserLoginRequest):
- email: Email confirmado
"""
try:
# Obtener usuario por email
user_repo = UserRepositorySQL() # sesión nueva por request
get_use_case = GetUserByEmail(user_repo)
user = get_use_case.execute(credentials.email)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email o contraseña incorrectos",
headers={"WWW-Authenticate": "Bearer"}
)
# Verificar contraseña
# Necesitamos obtener el hash de contraseña del modelo
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Email o contraseña incorrectos", headers={"WWW-Authenticate": "Bearer"})
user_model = user_repo.find_by_email_with_password(credentials.email)
if not user_model or not auth_service.verify_password(credentials.contraseña, user_model.contraseña_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email o contraseña incorrectos",
headers={"WWW-Authenticate": "Bearer"}
)
# Crear token JWT
access_token = auth_service.create_access_token(
user_id=user.user_id,
email=user.email
)
return {
"access_token": access_token,
"token_type": "bearer",
"user_id": user.user_id,
"email": user.email
}
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Email o contraseña incorrectos", headers={"WWW-Authenticate": "Bearer"})
access_token = auth_service.create_access_token(user_id=user.user_id, email=user.email)
return {"access_token": access_token, "token_type": "bearer", "user_id": user.user_id, "email": user.email, "is_admin": user.is_admin}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error en login_user: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
@router.post("/", status_code=status.HTTP_202_ACCEPTED)
async def create_user(user_data: UserCreateRequest):
@@ -88,170 +74,105 @@ async def create_user(user_data: UserCreateRequest):
- biografia: Biografía del usuario (opcional)
"""
try:
user_repo = UserRepositorySQL() # sesión nueva por request
create_use_case = CreateUser(user_repo)
result = create_use_case.execute(
nombre=user_data.nombre,
apellido=user_data.apellido,
email=user_data.email,
contraseña=user_data.contraseña,
fecha_nacimiento=user_data.fecha_nacimiento,
url_foto_perfil=user_data.url_foto_perfil,
biografia=user_data.biografia
nombre=user_data.nombre, apellido=user_data.apellido, email=user_data.email,
contraseña=user_data.contraseña, fecha_nacimiento=user_data.fecha_nacimiento,
url_foto_perfil=user_data.url_foto_perfil, biografia=user_data.biografia
)
if result["status"] == "error":
# Detectar tipo de error para código HTTP apropiado
message = result["message"]
if "ya está registrado" in message:
# 409 Conflict: email duplicado
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=message
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# 202 Accepted: enviado a la cola correctamente
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
_publish_user_event(UserEventType.CREATE, user_id=result.get("user_id"), email=user_data.email, nombre=user_data.nombre, apellido=user_data.apellido)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error inesperado en create_user: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
"""Obtiene un usuario por ID"""
try:
user_repo = UserRepositorySQL() # sesión nueva por request
get_use_case = GetUserById(user_repo)
user = get_use_case.execute(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Usuario con ID {user_id} no encontrado"
)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Usuario con ID {user_id} no encontrado")
return user
except HTTPException:
raise
except Exception as e:
logger.error(f"Error al obtener usuario {user_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
@router.get("/email/{email}", response_model=UserResponse)
async def get_user_by_email(email: str):
"""Obtiene un usuario por email"""
try:
user_repo = UserRepositorySQL() # sesión nueva por request
get_use_case = GetUserByEmail(user_repo)
user = get_use_case.execute(email)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Usuario con email {email} no encontrado"
)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Usuario con email {email} no encontrado")
return user
except HTTPException:
raise
except Exception as e:
logger.error(f"Error al obtener usuario por email {email}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
@router.get("/")
async def list_users():
"""Obtiene todos los usuarios - retorna lista vacía si no hay registros"""
try:
user_repo = UserRepositorySQL() # sesión nueva por request
list_use_case = ListAllUsers(user_repo)
return list_use_case.execute()
except Exception as e:
logger.error(f"Error al listar usuarios: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
@router.put("/{user_id}", status_code=status.HTTP_202_ACCEPTED)
async def update_user(user_id: int, user_data: UserUpdateRequest):
"""Actualiza un usuario - envía a cola de procesamiento con validaciones previas"""
try:
user_repo = UserRepositorySQL() # sesión nueva por request
update_use_case = UpdateUser(user_repo)
result = update_use_case.execute(
user_id=user_id,
nombre=user_data.nombre,
apellido=user_data.apellido,
url_foto_perfil=user_data.url_foto_perfil,
biografia=user_data.biografia
)
result = update_use_case.execute(user_id=user_id, nombre=user_data.nombre, apellido=user_data.apellido, url_foto_perfil=user_data.url_foto_perfil, biografia=user_data.biografia)
if result["status"] == "error":
message = result["message"]
if "no existe" in message:
# 404 Not Found: usuario no existe
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=message
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# 202 Accepted: enviado a la cola correctamente
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
_publish_user_event(UserEventType.UPDATE, user_id=user_id, nombre=user_data.nombre, apellido=user_data.apellido)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error al actualizar usuario {user_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")
@router.delete("/{user_id}", status_code=status.HTTP_202_ACCEPTED)
async def delete_user(user_id: int):
"""Elimina un usuario - envía a cola de procesamiento con validaciones previas"""
try:
user_repo = UserRepositorySQL() # sesión nueva por request
delete_use_case = DeleteUser(user_repo)
result = delete_use_case.execute(user_id)
if result["status"] == "error":
message = result["message"]
if "no existe" in message:
# 404 Not Found: usuario no existe
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=message
)
else:
# 400 Bad Request: error de validación
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# 202 Accepted: enviado a la cola correctamente
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
_publish_user_event(UserEventType.DELETE, user_id=user_id)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error al eliminar usuario {user_id}: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error interno del servidor"
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor")

View File

@@ -1,13 +1,17 @@
"""
Punto de entrada principal para VoxPopuli Microservices
Ejecuta dos APIs en paralelo: Usuarios (puerto 8000) y Reportes (puerto 8001), Notificaciones (puerto 8002)
Ejecuta cinco APIs en paralelo: Usuarios (8000), Reportes (8001), Notificaciones (8002), Moderación (8003), Métricas (8004)
"""
from infrastructure.api.users.app import create_app as create_users_app
from infrastructure.api.reports.app import create_app as create_reports_app
from infrastructure.api.notifications.app import create_app as create_notifications_app
from infrastructure.api.moderations.app import create_app as create_moderations_app
from infrastructure.api.metrics.app import create_app as create_metrics_app
from consumers.report_consumer import ReportConsumer
from consumers.user_consumer import UserConsumer
from consumers.notification_consumer import NotificationConsumer
from consumers.moderation_consumer import ModerationConsumer
from consumers.metrics_consumer import MetricsConsumer
from core.config import ConfSettings
import threading
import uvicorn
@@ -45,6 +49,28 @@ def run_notifications_api():
log_level=ConfSettings.log_level,
)
def run_moderations_api():
"""Ejecuta la API de Moderación en puerto 8003"""
app_moderations = create_moderations_app()
uvicorn.run(
app_moderations,
host=ConfSettings.host,
port=8003,
reload=False,
log_level=ConfSettings.log_level,
)
def run_metrics_api():
"""Ejecuta la API de Métricas en puerto 8004"""
app_metrics = create_metrics_app()
uvicorn.run(
app_metrics,
host=ConfSettings.host,
port=8004,
reload=False,
log_level=ConfSettings.log_level,
)
def run_user_consumer():
consumer = UserConsumer()
consumer.start()
@@ -57,6 +83,14 @@ def run_notifications_consumer():
consumer = NotificationConsumer()
consumer.start()
def run_moderations_consumer():
consumer = ModerationConsumer()
consumer.start()
def run_metrics_consumer():
consumer = MetricsConsumer()
consumer.start()
def run():
"""Inicia todas las APIs en threads separados"""
@@ -67,31 +101,44 @@ def run():
users_thread = threading.Thread(target=run_users_api, daemon=True, name="Users-API")
reports_thread = threading.Thread(target=run_reports_api, daemon=True, name="Reports-API")
notifications_thread = threading.Thread(target=run_notifications_api, daemon=True, name="Notifications-API")
moderations_thread = threading.Thread(target=run_moderations_api, daemon=True, name="Moderations-API")
metrics_thread = threading.Thread(target=run_metrics_api, daemon=True, name="Metrics-API")
user_consumer_thread = threading.Thread(target=run_user_consumer, daemon=True, name="Users-Consumer")
report_consumer_thread = threading.Thread(target=run_reports_consumer, daemon=True, name="Reports-Consumer")
notifications_consumer_thread = threading.Thread(target=run_notifications_consumer, daemon=True, name="Notifications-Consumer")
moderations_consumer_thread = threading.Thread(target=run_moderations_consumer, daemon=True, name="Moderations-Consumer")
metrics_consumer_thread = threading.Thread(target=run_metrics_consumer, daemon=True, name="Metrics-Consumer")
users_thread.start()
reports_thread.start()
notifications_thread.start()
moderations_thread.start()
metrics_thread.start()
user_consumer_thread.start()
report_consumer_thread.start()
notifications_consumer_thread.start()
moderations_consumer_thread.start()
metrics_consumer_thread.start()
print("\n✓ API de Usuarios ejecutándose en http://0.0.0.0:8000")
print("✓ API de Reportes ejecutándose en http://0.0.0.0:8001")
print("✓ API de Notificaciones ejecutándose en http://0.0.0.0:8002")
print("✓ API de Moderación ejecutándose en http://0.0.0.0:8003")
print("✓ API de Métricas ejecutándose en http://0.0.0.0:8004")
print("\nDocumentación disponible en:")
print(" - Usuarios: http://localhost:8000/docs")
print(" - Reportes: http://localhost:8001/docs")
print(" - Notificaciones: http://localhost:8002/docs")
print(" - Moderación: http://localhost:8003/docs")
print(" - Métricas: http://localhost:8004/docs")
print("\n" + "=" * 60 + "\n")
try:
users_thread.join()
reports_thread.join()
notifications_thread.join()
moderations_thread.join()
metrics_thread.join()
except KeyboardInterrupt:
print("\n\nRecibiendo señal de salida...")
print("Cerrando APIs...")