Some updates for backend thingies

This commit is contained in:
2026-03-20 15:59:07 -06:00
parent 84e9d3001a
commit 76156955b9
16 changed files with 1027 additions and 100 deletions

202
RABBITMQ_SETUP.md Normal file
View File

@@ -0,0 +1,202 @@
# RabbitMQ Integration Guide
## Overview
This project now uses RabbitMQ for asynchronous message queue processing. The architecture follows the **producer-consumer pattern**:
- **Senders (Producers)**: API endpoints send messages to RabbitMQ queues
- **Receivers (Consumers)**: Separate consumer processes listen to queues and save to databases
## Architecture
### Message Flow
```
API Endpoint → Service → RabbitMQ Queue → Consumer → Database
```
### Queues
- **users_queue**: Receives user events (create, update, delete)
- **reports_queue**: Receives report events (create, update_visibility, delete)
## Setup and Configuration
### 1. Install Dependencies
```bash
pip install -r requirements.txt
```
This includes the `pika` package for RabbitMQ communication.
### 2. Start RabbitMQ
Ensure RabbitMQ is running on your system:
```bash
# Using Docker
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# Or using local installation
rabbitmq-server
```
## Running the Application
### Start the API Server
```bash
cd src
python main.py
```
The API will be available at `http://localhost:8000`
### Start Consumers
In separate terminal windows, run the consumers:
#### User Consumer
```bash
cd src
python -m consumers.user_consumer
```
#### Report Consumer
```bash
cd src
python -m consumers.report_consumer
```
## Usage Examples
### Creating a User
```bash
curl -X POST http://localhost:8000/users/ \
-H "Content-Type: application/json" \
-d '{
"nombre": "John",
"apellido": "Doe",
"email": "john@example.com",
"fecha_nacimiento": "1990-01-01T00:00:00",
"url_foto_perfil": "http://example.com/photo.jpg",
"biografia": "A test user"
}'
```
**Response** (Immediate):
```json
{
"status": "queued",
"message": "Usuario enviado a cola para procesamiento",
"email": "john@example.com"
}
```
The user will be saved to the database by the User Consumer.
### Creating a Report
```bash
curl -X POST http://localhost:8000/reports/ \
-H "Content-Type: application/json" \
-d '{
"id_usuario": 1,
"tipo_reporte": 1,
"descripcion": "Issue description",
"ubicacion": "Location info"
}'
```
**Response** (Immediate):
```json
{
"status": "queued",
"message": "Reporte enviado a cola para procesamiento",
"id_reporte": "uuid-string"
}
```
The report will be saved to the database by the Report Consumer.
## Message Formats
### User Event Messages
```python
{
"event_type": "user.create|user.update|user.delete",
"user_id": Optional[int],
"nombre": Optional[str],
"apellido": Optional[str],
"email": Optional[str],
"fecha_nacimiento": Optional[str], # ISO format
"fecha_creacion": Optional[str], # ISO format
"calificacion": Optional[float],
"numero_reportes": Optional[int],
"url_foto_perfil": Optional[str],
"biografia": Optional[str]
}
```
### Report Event Messages
```python
{
"event_type": "report.create|report.update_visibility|report.delete",
"id_reporte": Optional[str],
"id_usuario": Optional[int],
"tipo_reporte": Optional[int],
"descripcion": Optional[str],
"ubicacion": Optional[str],
"visibilidad": Optional[float],
"fecha_creacion": Optional[str], # ISO format
"penalize_author": Optional[bool]
}
```
## Consumer Implementation Details
### User Consumer (`src/consumers/user_consumer.py`)
Processes three types of user events:
1. **CREATE**: Saves a new user to the database
2. **UPDATE**: Updates existing user fields
3. **DELETE**: Removes a user from the database
### Report Consumer (`src/consumers/report_consumer.py`)
Processes three types of report events:
1. **CREATE**: Saves a new report to MongoDB and increments user's report counter
2. **UPDATE_VISIBILITY**: Updates report visibility and optionally penalizes the author
3. **DELETE**: Removes a report from the database
## Benefits of This Architecture
1. **Asynchronous Processing**: API responds immediately without waiting for database operations
2. **Scalability**: Consumers can be scaled independently
3. **Reliability**: Messages are persistent and won't be lost
4. **Decoupling**: Services are decoupled from database operations
5. **Message Ordering**: FIFO guarantee ensures operations are processed in order
## Error Handling
- Messages are acknowledged only after successful processing
- Failed messages are automatically requeued for retry
- All operations are logged for debugging and monitoring
## Database Compatibility
- **Users**: MySQL (via SQLAlchemy)
- **Reports**: MongoDB
## Future Enhancements
- Add retry policies with exponential backoff
- Implement dead-letter queues for failed messages
- Add message monitoring and analytics
- Implement distributed transaction handling

39
docker-compose.yaml Normal file
View File

@@ -0,0 +1,39 @@
version: '3.8'
services:
mysql:
image: mysql:8.0
container_name: voxpopuli_mysql
environment:
MYSQL_ROOT_PASSWORD: rootpassword
MYSQL_DATABASE: voxpopuli_users
MYSQL_USER: voxpopuli
MYSQL_PASSWORD: voxpopuli_pass
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
interval: 10s
timeout: 5s
retries: 5
mongodb:
image: mongo:7.0
container_name: voxpopuli_mongo
environment:
MONGO_INITDB_DATABASE: voxpopuli_reports
ports:
- "27017:27017"
volumes:
- mongo_data:/data/db
healthcheck:
test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"]
interval: 10s
timeout: 5s
retries: 5
volumes:
mysql_data:
mongo_data:

0
enable-venv.sh Normal file → Executable file
View File

View File

@@ -6,3 +6,4 @@ pydantic
pydantic-settings
pymongo
python-dotenv
pika

View File

@@ -1,12 +1,14 @@
from domain.reports import Report
from application.ports.report_repository import ReportRepository
from application.ports.user_repository import UserRepository
from infrastructure.adapters.rabbitmq.sender import send_to_queue
from infrastructure.adapters.rabbitmq.messages import ReportMessage, ReportEventType
from datetime import datetime
from typing import List, Optional
from typing import List, Optional, Dict, Any
from uuid import uuid4
class CreateReport:
"""Use case para crear un nuevo reporte"""
"""Use case para crear un nuevo reporte - envía mensaje a RabbitMQ"""
def __init__(self, repo: ReportRepository, user_repo: UserRepository):
if not isinstance(repo, ReportRepository):
raise TypeError("repo must implement ReportRepository")
@@ -16,26 +18,51 @@ class CreateReport:
self.user_repo = user_repo
def execute(self, id_usuario: int, tipo_reporte: int, descripcion: str,
ubicacion: Optional[str] = None) -> Report:
# Verificar que el usuario existe
ubicacion: Optional[str] = None) -> Dict[str, Any]:
"""
Sends a create report message to RabbitMQ.
The actual database save will be done by the consumer.
Returns:
Dictionary with status and message
"""
# Verify user exists (we still need to check this before queuing)
user = self.user_repo.find_by_id(id_usuario)
if not user:
raise ValueError(f"Usuario con ID {id_usuario} no existe")
return {
"status": "error",
"message": f"Usuario con ID {id_usuario} no existe"
}
report = Report(
id_reporte=str(uuid4()),
id_reporte = str(uuid4())
fecha_creacion = datetime.now()
# Create message object
message = ReportMessage(
event_type=ReportEventType.CREATE,
id_reporte=id_reporte,
id_usuario=id_usuario,
tipo_reporte=tipo_reporte,
descripcion=descripcion,
ubicacion=ubicacion,
visibilidad=50.0, # Visibilidad inicial neutral
fecha_creacion=datetime.now()
fecha_creacion=fecha_creacion.isoformat()
)
# Incrementar contador de reportes del usuario
self.user_repo.increment_reports(id_usuario)
# Send to RabbitMQ
success = send_to_queue("reports_queue", message.to_dict())
return self.repo.save(report)
if success:
return {
"status": "queued",
"message": "Reporte enviado a cola para procesamiento",
"id_reporte": id_reporte
}
else:
return {
"status": "error",
"message": "Error al enviar reporte a la cola de procesamiento"
}
class GetReportById:
"""Use case para obtener un reporte por ID"""
@@ -68,31 +95,51 @@ class ListAllReports:
return self.repo.find_all()
class UpdateReportVisibility:
"""Use case para actualizar la visibilidad de un reporte basado en votación comunitaria"""
"""Use case para actualizar la visibilidad de un reporte basado en votación comunitaria - envía mensaje a RabbitMQ"""
def __init__(self, repo: ReportRepository, user_repo: UserRepository):
if not isinstance(repo, ReportRepository):
raise TypeError("repo must implement ReportRepository")
self.repo = repo
self.user_repo = user_repo
def execute(self, report_id: str, new_visibility: float, penalize_author: bool = False) -> None:
def execute(self, report_id: str, new_visibility: float, penalize_author: bool = False) -> Dict[str, Any]:
"""
Sends an update report visibility message to RabbitMQ.
The actual database update will be done by the consumer.
Returns:
Dictionary with status and message
"""
# Validar rango de visibilidad
if new_visibility < 0 or new_visibility > 100:
raise ValueError("La visibilidad debe estar entre 0 y 100")
return {
"status": "error",
"message": "La visibilidad debe estar entre 0 y 100"
}
report = self.repo.find_by_id(report_id)
if not report:
raise ValueError(f"Reporte con ID {report_id} no existe")
# Create message object
message = ReportMessage(
event_type=ReportEventType.UPDATE_VISIBILITY,
id_reporte=report_id,
visibilidad=new_visibility,
penalize_author=penalize_author
)
self.repo.update_visibility(report_id, new_visibility)
# Send to RabbitMQ
success = send_to_queue("reports_queue", message.to_dict())
# Si la visibilidad es muy baja (shadowban), penalizar al autor
if penalize_author and new_visibility < 20:
user = self.user_repo.find_by_id(report.id_usuario)
if user:
# Reducir calificación del usuario
new_rating = max(0, user.calificacion - 5)
self.user_repo.update_rating(report.id_usuario, new_rating)
if success:
return {
"status": "queued",
"message": "Actualización de visibilidad enviada a cola para procesamiento",
"report_id": report_id,
"new_visibility": new_visibility
}
else:
return {
"status": "error",
"message": "Error al enviar actualización de visibilidad a la cola de procesamiento"
}
class GetShadowbannedReports:
"""Use case para obtener reportes shadowbaneados (baja visibilidad)"""
@@ -105,11 +152,37 @@ class GetShadowbannedReports:
return self.repo.find_shadowbanned(visibility_threshold)
class DeleteReport:
"""Use case para eliminar un reporte"""
"""Use case para eliminar un reporte - envía mensaje a RabbitMQ"""
def __init__(self, repo: ReportRepository):
if not isinstance(repo, ReportRepository):
raise TypeError("repo must implement ReportRepository")
self.repo = repo
def execute(self, report_id: str) -> bool:
return self.repo.delete(report_id)
def execute(self, report_id: str) -> Dict[str, Any]:
"""
Sends a delete report message to RabbitMQ.
The actual database deletion will be done by the consumer.
Returns:
Dictionary with status and message
"""
# Create message object
message = ReportMessage(
event_type=ReportEventType.DELETE,
id_reporte=report_id
)
# Send to RabbitMQ
success = send_to_queue("reports_queue", message.to_dict())
if success:
return {
"status": "queued",
"message": f"Reporte {report_id} enviado a cola para eliminación",
"id_reporte": report_id
}
else:
return {
"status": "error",
"message": "Error al enviar eliminación del reporte a la cola de procesamiento"
}

View File

@@ -1,10 +1,12 @@
from domain.users import User
from application.ports.user_repository import UserRepository
from infrastructure.adapters.rabbitmq.sender import send_to_queue
from infrastructure.adapters.rabbitmq.messages import UserMessage, UserEventType
from datetime import datetime
from typing import List, Optional
from typing import List, Optional, Dict, Any
class CreateUser:
"""Use case para crear un nuevo usuario"""
"""Use case para crear un nuevo usuario - envía mensaje a RabbitMQ"""
def __init__(self, repo: UserRepository):
if not isinstance(repo, UserRepository):
raise TypeError("repo must implement UserRepository")
@@ -12,20 +14,44 @@ class CreateUser:
def execute(self, nombre: str, apellido: str, email: str,
fecha_nacimiento: datetime, url_foto_perfil: Optional[str] = None,
biografia: Optional[str] = None) -> User:
user = User(
user_id=0,
biografia: Optional[str] = None) -> Dict[str, Any]:
"""
Sends a create user message to RabbitMQ.
The actual database save will be done by the consumer.
Returns:
Dictionary with status and message
"""
fecha_creacion = datetime.now()
# Create message object
message = UserMessage(
event_type=UserEventType.CREATE,
nombre=nombre,
apellido=apellido,
email=email,
fecha_nacimiento=fecha_nacimiento,
fecha_creacion=datetime.now(),
calificacion=50.0, # Puntuación inicial
fecha_nacimiento=fecha_nacimiento.isoformat(),
fecha_creacion=fecha_creacion.isoformat(),
calificacion=50.0,
numero_reportes=0,
url_foto_perfil=url_foto_perfil,
biografia=biografia
)
return self.repo.save(user)
# Send to RabbitMQ
success = send_to_queue("users_queue", message.to_dict())
if success:
return {
"status": "queued",
"message": "Usuario enviado a cola para procesamiento",
"email": email
}
else:
return {
"status": "error",
"message": "Error al enviar usuario a la cola de procesamiento"
}
class GetUserById:
"""Use case para obtener un usuario por ID"""
@@ -58,33 +84,78 @@ class ListAllUsers:
return self.repo.find_all()
class UpdateUser:
"""Use case para actualizar un usuario"""
"""Use case para actualizar un usuario - envía mensaje a RabbitMQ"""
def __init__(self, repo: UserRepository):
if not isinstance(repo, UserRepository):
raise TypeError("repo must implement UserRepository")
self.repo = repo
def execute(self, user_id: int, nombre: str = None, apellido: str = None,
url_foto_perfil: str = None, biografia: str = None) -> Optional[User]:
user = self.repo.find_by_id(user_id)
if user:
if nombre:
user.nombre = nombre
if apellido:
user.apellido = apellido
if url_foto_perfil is not None:
user.url_foto_perfil = url_foto_perfil
if biografia is not None:
user.biografia = biografia
return self.repo.update(user)
return None
url_foto_perfil: str = None, biografia: str = None) -> Dict[str, Any]:
"""
Sends an update user message to RabbitMQ.
The actual database update will be done by the consumer.
Returns:
Dictionary with status and message
"""
# Create message object with only the fields to update
message = UserMessage(
event_type=UserEventType.UPDATE,
user_id=user_id,
nombre=nombre,
apellido=apellido,
url_foto_perfil=url_foto_perfil,
biografia=biografia
)
# Send to RabbitMQ
success = send_to_queue("users_queue", message.to_dict())
if success:
return {
"status": "queued",
"message": f"Usuario {user_id} enviado a cola para actualización",
"user_id": user_id
}
else:
return {
"status": "error",
"message": "Error al enviar actualización del usuario a la cola de procesamiento"
}
class DeleteUser:
"""Use case para eliminar un usuario"""
"""Use case para eliminar un usuario - envía mensaje a RabbitMQ"""
def __init__(self, repo: UserRepository):
if not isinstance(repo, UserRepository):
raise TypeError("repo must implement UserRepository")
self.repo = repo
def execute(self, user_id: int) -> bool:
return self.repo.delete(user_id)
def execute(self, user_id: int) -> Dict[str, Any]:
"""
Sends a delete user message to RabbitMQ.
The actual database deletion will be done by the consumer.
Returns:
Dictionary with status and message
"""
# Create message object
message = UserMessage(
event_type=UserEventType.DELETE,
user_id=user_id
)
# Send to RabbitMQ
success = send_to_queue("users_queue", message.to_dict())
if success:
return {
"status": "queued",
"message": f"Usuario {user_id} enviado a cola para eliminación",
"user_id": user_id
}
else:
return {
"status": "error",
"message": "Error al enviar eliminación del usuario a la cola de procesamiento"
}

View File

@@ -0,0 +1 @@
"""RabbitMQ Consumer implementations"""

View File

@@ -0,0 +1,149 @@
"""Report RabbitMQ Consumer - Processes report events and saves to database"""
import sys
import os
import logging
from datetime import datetime
# Add src to path to import modules
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer
from infrastructure.adapters.rabbitmq.messages import ReportMessage, ReportEventType
from infrastructure.adapters.persistence.report_repository_mongo import ReportRepositoryMongo
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
from domain.reports import Report
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ReportConsumer:
"""Consumer for report events from RabbitMQ"""
def __init__(self):
self.repo = ReportRepositoryMongo()
self.user_repo = UserRepositorySQL()
self.consumer = RabbitMQConsumer(queue_name='reports_queue')
self.consumer.set_callback(self.process_message)
def process_message(self, message_dict: dict):
"""
Processes a report event message from RabbitMQ
Args:
message_dict: Dictionary containing the message data
"""
try:
# Reconstruct the ReportMessage object
message = ReportMessage.from_dict(message_dict)
if message.event_type == ReportEventType.CREATE:
self._handle_create_report(message)
elif message.event_type == ReportEventType.UPDATE_VISIBILITY:
self._handle_update_visibility(message)
elif message.event_type == ReportEventType.DELETE:
self._handle_delete_report(message)
else:
logger.warning(f"Unknown event type: {message.event_type}")
except Exception as e:
logger.error(f"Error processing report message: {e}", exc_info=True)
raise
def _handle_create_report(self, message: ReportMessage):
"""Handle report create event"""
try:
logger.info(f"Creating report: {message.id_reporte} from user {message.id_usuario}")
# Parse datetime string
fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
# Create Report domain object
report = Report(
id_reporte=message.id_reporte,
id_usuario=message.id_usuario,
tipo_reporte=message.tipo_reporte,
descripcion=message.descripcion,
ubicacion=message.ubicacion,
visibilidad=message.visibilidad,
fecha_creacion=fecha_creacion
)
# Save to repository
saved_report = self.repo.save(report)
logger.info(f"Report created successfully: {message.id_reporte}")
# Increment user's report counter
self.user_repo.increment_reports(message.id_usuario)
logger.info(f"Incremented report counter for user: {message.id_usuario}")
except Exception as e:
logger.error(f"Error creating report: {e}", exc_info=True)
raise
def _handle_update_visibility(self, message: ReportMessage):
"""Handle report visibility update event"""
try:
logger.info(f"Updating visibility for report: {message.id_reporte}")
# Find the report
report = self.repo.find_by_id(message.id_reporte)
if not report:
logger.warning(f"Report not found: {message.id_reporte}")
return
# Update visibility
self.repo.update_visibility(message.id_reporte, message.visibilidad)
logger.info(f"Report visibility updated: {message.id_reporte} -> {message.visibilidad}")
# Penalize author if visibility is very low (shadowban)
if message.penalize_author and message.visibilidad < 20:
try:
user = self.user_repo.find_by_id(report.id_usuario)
if user:
# Reduce user's rating
new_rating = max(0, user.calificacion - 5)
self.user_repo.update_rating(report.id_usuario, new_rating)
logger.info(f"Author penalized: user {report.id_usuario} rating reduced to {new_rating}")
except Exception as e:
logger.error(f"Error penalizing author: {e}")
except Exception as e:
logger.error(f"Error updating report visibility: {e}", exc_info=True)
raise
def _handle_delete_report(self, message: ReportMessage):
"""Handle report delete event"""
try:
logger.info(f"Deleting report: {message.id_reporte}")
success = self.repo.delete(message.id_reporte)
if success:
logger.info(f"Report deleted successfully: {message.id_reporte}")
else:
logger.warning(f"Failed to delete report: {message.id_reporte}")
except Exception as e:
logger.error(f"Error deleting report: {e}", exc_info=True)
raise
def start(self):
"""Start consuming messages"""
logger.info("Starting Report Consumer...")
logger.info("[*] Waiting for report events. Ctrl+C to exit.")
try:
self.consumer.start_consuming()
except KeyboardInterrupt:
logger.info("Report Consumer stopped by user")
except Exception as e:
logger.error(f"Consumer error: {e}", exc_info=True)
raise
if __name__ == '__main__':
consumer = ReportConsumer()
consumer.start()

View File

@@ -0,0 +1,145 @@
"""User RabbitMQ Consumer - Processes user events and saves to database"""
import sys
import os
import logging
from datetime import datetime
# Add src to path to import modules
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer
from infrastructure.adapters.rabbitmq.messages import UserMessage, UserEventType
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
from domain.users import User
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class UserConsumer:
"""Consumer for user events from RabbitMQ"""
def __init__(self):
self.repo = UserRepositorySQL()
self.consumer = RabbitMQConsumer(queue_name='users_queue')
self.consumer.set_callback(self.process_message)
def process_message(self, message_dict: dict):
"""
Processes a user event message from RabbitMQ
Args:
message_dict: Dictionary containing the message data
"""
try:
# Reconstruct the UserMessage object
message = UserMessage.from_dict(message_dict)
if message.event_type == UserEventType.CREATE:
self._handle_create_user(message)
elif message.event_type == UserEventType.UPDATE:
self._handle_update_user(message)
elif message.event_type == UserEventType.DELETE:
self._handle_delete_user(message)
else:
logger.warning(f"Unknown event type: {message.event_type}")
except Exception as e:
logger.error(f"Error processing user message: {e}", exc_info=True)
raise
def _handle_create_user(self, message: UserMessage):
"""Handle user create event"""
try:
logger.info(f"Creating user: {message.email}")
# Parse datetime strings
fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento)
fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
# Create User domain object
user = User(
user_id=0, # Will be auto-generated by DB
nombre=message.nombre,
apellido=message.apellido,
email=message.email,
fecha_nacimiento=fecha_nacimiento,
fecha_creacion=fecha_creacion,
calificacion=message.calificacion,
numero_reportes=message.numero_reportes,
url_foto_perfil=message.url_foto_perfil,
biografia=message.biografia
)
# Save to repository
saved_user = self.repo.save(user)
logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}")
except Exception as e:
logger.error(f"Error creating user: {e}", exc_info=True)
raise
def _handle_update_user(self, message: UserMessage):
"""Handle user update event"""
try:
logger.info(f"Updating user: {message.user_id}")
# Find the user
user = self.repo.find_by_id(message.user_id)
if not user:
logger.warning(f"User not found: {message.user_id}")
return
# Update fields if provided
if message.nombre:
user.nombre = message.nombre
if message.apellido:
user.apellido = message.apellido
if message.url_foto_perfil is not None:
user.url_foto_perfil = message.url_foto_perfil
if message.biografia is not None:
user.biografia = message.biografia
# Save to repository
updated_user = self.repo.update(user)
logger.info(f"User updated successfully: {message.user_id}")
except Exception as e:
logger.error(f"Error updating user: {e}", exc_info=True)
raise
def _handle_delete_user(self, message: UserMessage):
"""Handle user delete event"""
try:
logger.info(f"Deleting user: {message.user_id}")
success = self.repo.delete(message.user_id)
if success:
logger.info(f"User deleted successfully: {message.user_id}")
else:
logger.warning(f"Failed to delete user: {message.user_id}")
except Exception as e:
logger.error(f"Error deleting user: {e}", exc_info=True)
raise
def start(self):
"""Start consuming messages"""
logger.info("Starting User Consumer...")
logger.info("[*] Waiting for user events. Ctrl+C to exit.")
try:
self.consumer.start_consuming()
except KeyboardInterrupt:
logger.info("User Consumer stopped by user")
except Exception as e:
logger.error(f"Consumer error: {e}", exc_info=True)
raise
if __name__ == '__main__':
consumer = UserConsumer()
consumer.start()

View File

@@ -1,3 +1,4 @@
import os
from pydantic_settings import BaseSettings
from pydantic import Field
@@ -6,13 +7,13 @@ class Settings(BaseSettings):
# Base de datos MySQL
mysql_url: str = Field(
default="mysql+pymysql://user:password@localhost/voxpopuli_users",
default=os.getenv("MYSQL_URL", "mysql+pymysql://voxpopuli:voxpopuli_pass@localhost:3306/voxpopuli_users"),
description="URL de conexión a MySQL para API de Usuarios"
)
# Base de datos MongoDB
mongodb_url: str = Field(
default="mongodb://localhost:27017",
default=os.getenv("MONGODB_URL", "mongodb://localhost:27017"),
description="URL de conexión a MongoDB para API de Reportes"
)
mongodb_db: str = Field(
@@ -20,6 +21,11 @@ class Settings(BaseSettings):
description="Base de datos MongoDB"
)
rabbitmq: str = Field (
default=os.getenv("RABBITMQ_URI", "localhost")
)
# API
api_title: str = "VoxPopuli Microservices"
api_version: str = "1.0.0"

View File

@@ -0,0 +1 @@
"""RabbitMQ adapters for message publishing and consuming"""

View File

@@ -0,0 +1,71 @@
"""RabbitMQ message consumer base"""
import pika
import json
from typing import Callable, Dict, Any
import logging
logger = logging.getLogger(__name__)
class RabbitMQConsumer:
"""Generic RabbitMQ consumer for consuming messages from queues"""
def __init__(self, queue_name: str, host: str = 'localhost', port: int = 5672):
self.queue_name = queue_name
self.host = host
self.port = port
self.callback = None
def set_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
Sets the callback function to be called when a message is received
Args:
callback: Function that takes a message dictionary as argument
"""
self.callback = callback
def start_consuming(self) -> None:
"""
Starts consuming messages from the queue
"""
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host, port=self.port)
)
channel = connection.channel()
# Declare queue to ensure it exists
channel.queue_declare(queue=self.queue_name, durable=True)
def callback_wrapper(ch, method, properties, body):
try:
# Decode the message
message = json.loads(body.decode('utf-8'))
logger.info(f"Received message from queue '{self.queue_name}': {message}")
# Call the user's callback function
if self.callback:
self.callback(message)
# Acknowledge the message
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"Error processing message: {e}")
# Negative acknowledge to requeue the message
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# Set up the consumer with manual acknowledgment
channel.basic_consume(
queue=self.queue_name,
on_message_callback=callback_wrapper,
auto_ack=False
)
logger.info(f"[*] Waiting for messages in queue '{self.queue_name}'. Ctrl+C to exit.")
channel.start_consuming()
except Exception as e:
logger.error(f"Error in consumer: {e}")
raise

View File

@@ -0,0 +1,82 @@
"""Message schemas for RabbitMQ communication"""
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional
from enum import Enum
import json
class UserEventType(str, Enum):
"""Types of user events"""
CREATE = "user.create"
UPDATE = "user.update"
DELETE = "user.delete"
class ReportEventType(str, Enum):
"""Types of report events"""
CREATE = "report.create"
UPDATE_VISIBILITY = "report.update_visibility"
DELETE = "report.delete"
@dataclass
class UserMessage:
"""Message for user events"""
event_type: UserEventType
user_id: Optional[int] = None
nombre: Optional[str] = None
apellido: Optional[str] = None
email: Optional[str] = None
fecha_nacimiento: Optional[str] = None # ISO format datetime string
fecha_creacion: Optional[str] = None # ISO format datetime string
calificacion: Optional[float] = None
numero_reportes: Optional[int] = None
url_foto_perfil: Optional[str] = None
biografia: Optional[str] = None
def to_dict(self):
"""Convert to dictionary"""
data = asdict(self)
data['event_type'] = self.event_type.value
return data
def to_json(self) -> str:
"""Convert to JSON string"""
return json.dumps(self.to_dict())
@staticmethod
def from_dict(data: dict) -> 'UserMessage':
"""Create from dictionary"""
data['event_type'] = UserEventType(data['event_type'])
return UserMessage(**data)
@dataclass
class ReportMessage:
"""Message for report events"""
event_type: ReportEventType
id_reporte: Optional[str] = None
id_usuario: Optional[int] = None
tipo_reporte: Optional[int] = None
descripcion: Optional[str] = None
ubicacion: Optional[str] = None
visibilidad: Optional[float] = None
fecha_creacion: Optional[str] = None # ISO format datetime string
penalize_author: Optional[bool] = None # For update_visibility event
def to_dict(self):
"""Convert to dictionary"""
data = asdict(self)
data['event_type'] = self.event_type.value
return data
def to_json(self) -> str:
"""Convert to JSON string"""
return json.dumps(self.to_dict())
@staticmethod
def from_dict(data: dict) -> 'ReportMessage':
"""Create from dictionary"""
data['event_type'] = ReportEventType(data['event_type'])
return ReportMessage(**data)

View File

@@ -0,0 +1,74 @@
"""RabbitMQ message sender"""
import pika
import json
from typing import Any, Dict
import logging
logger = logging.getLogger(__name__)
class RabbitMQSender:
"""Generic RabbitMQ sender for publishing messages to queues"""
def __init__(self, host: str = 'localhost', port: int = 5672):
self.host = host
self.port = port
def send_message(self, queue_name: str, message: Dict[str, Any]) -> bool:
"""
Sends a message to a RabbitMQ queue
Args:
queue_name: Name of the queue to send to
message: Dictionary containing the message data
Returns:
True if successful, False otherwise
"""
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host, port=self.port)
)
channel = connection.channel()
# Declare queue to ensure it exists
channel.queue_declare(queue=queue_name, durable=True)
# Convert message to JSON
message_json = json.dumps(message)
# Publish the message
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=message_json,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
connection.close()
logger.info(f"Message sent to queue '{queue_name}': {message_json}")
return True
except Exception as e:
logger.error(f"Error sending message to RabbitMQ: {e}")
return False
def send_to_queue(queue_name: str, message: Dict[str, Any],
host: str = 'localhost', port: int = 5672) -> bool:
"""
Convenience function to send a message to RabbitMQ
Args:
queue_name: Name of the queue
message: Message dictionary
host: RabbitMQ host
port: RabbitMQ port
Returns:
True if successful, False otherwise
"""
sender = RabbitMQSender(host=host, port=port)
return sender.send_message(queue_name, message)

View File

@@ -11,23 +11,25 @@ router = APIRouter()
report_repo = ReportRepositoryMongo()
user_repo = UserRepositorySQL()
@router.post("/", response_model=ReportResponse, status_code=status.HTTP_201_CREATED)
@router.post("/", status_code=status.HTTP_202_ACCEPTED)
async def create_report(report_data: ReportCreateRequest):
"""Crea un nuevo reporte"""
"""Crea un nuevo reporte - envía a cola de procesamiento"""
try:
create_use_case = CreateReport(report_repo, user_repo)
report = create_use_case.execute(
result = create_use_case.execute(
id_usuario=report_data.id_usuario,
tipo_reporte=report_data.tipo_reporte,
descripcion=report_data.descripcion,
ubicacion=report_data.ubicacion
)
return report
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
if result["status"] == "error":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=result["message"]
)
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
@@ -65,35 +67,38 @@ async def get_shadowbanned_reports(threshold: float = 20):
get_use_case = GetShadowbannedReports(report_repo)
return get_use_case.execute(threshold)
@router.put("/{report_id}/visibility", status_code=status.HTTP_200_OK)
@router.put("/{report_id}/visibility", status_code=status.HTTP_202_ACCEPTED)
async def update_report_visibility(report_id: str, visibility_data: ReportUpdateVisibilityRequest):
"""Actualiza la visibilidad de un reporte (basado en votación comunitaria)"""
"""Actualiza la visibilidad de un reporte - envía a cola de procesamiento"""
try:
update_use_case = UpdateReportVisibility(report_repo, user_repo)
update_use_case.execute(
result = update_use_case.execute(
report_id=report_id,
new_visibility=visibility_data.new_visibility,
penalize_author=visibility_data.penalize_author
)
return {
"message": "Visibilidad actualizada exitosamente",
"report_id": report_id,
"new_visibility": visibility_data.new_visibility
}
except ValueError as e:
if result["status"] == "error":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=result["message"]
)
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
detail=f"Error al actualizar visibilidad: {str(e)}"
)
@router.delete("/{report_id}", status_code=status.HTTP_204_NO_CONTENT)
@router.delete("/{report_id}", status_code=status.HTTP_202_ACCEPTED)
async def delete_report(report_id: str):
"""Elimina un reporte"""
"""Elimina un reporte - envía a cola de procesamiento"""
delete_use_case = DeleteReport(report_repo)
success = delete_use_case.execute(report_id)
if not success:
result = delete_use_case.execute(report_id)
if result["status"] == "error":
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Reporte con ID {report_id} no encontrado"
status_code=status.HTTP_400_BAD_REQUEST,
detail=result["message"]
)
return None
return result

View File

@@ -8,12 +8,12 @@ from infrastructure.adapters.persistence.user_repository_sql import UserReposito
router = APIRouter()
user_repo = UserRepositorySQL()
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
@router.post("/", status_code=status.HTTP_202_ACCEPTED)
async def create_user(user_data: UserCreateRequest):
"""Crea un nuevo usuario"""
"""Crea un nuevo usuario - envía a cola de procesamiento"""
try:
create_use_case = CreateUser(user_repo)
user = create_use_case.execute(
result = create_use_case.execute(
nombre=user_data.nombre,
apellido=user_data.apellido,
email=user_data.email,
@@ -21,7 +21,14 @@ async def create_user(user_data: UserCreateRequest):
url_foto_perfil=user_data.url_foto_perfil,
biografia=user_data.biografia
)
return user
if result["status"] == "error":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=result["message"]
)
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
@@ -58,32 +65,32 @@ async def list_users():
list_use_case = ListAllUsers(user_repo)
return list_use_case.execute()
@router.put("/{user_id}", response_model=UserResponse)
@router.put("/{user_id}", status_code=status.HTTP_202_ACCEPTED)
async def update_user(user_id: int, user_data: UserUpdateRequest):
"""Actualiza un usuario"""
"""Actualiza un usuario - envía a cola de procesamiento"""
update_use_case = UpdateUser(user_repo)
user = update_use_case.execute(
result = update_use_case.execute(
user_id=user_id,
nombre=user_data.nombre,
apellido=user_data.apellido,
url_foto_perfil=user_data.url_foto_perfil,
biografia=user_data.biografia
)
if not user:
if result["status"] == "error":
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Usuario con ID {user_id} no encontrado"
status_code=status.HTTP_400_BAD_REQUEST,
detail=result["message"]
)
return user
return result
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
@router.delete("/{user_id}", status_code=status.HTTP_202_ACCEPTED)
async def delete_user(user_id: int):
"""Elimina un usuario"""
"""Elimina un usuario - envía a cola de procesamiento"""
delete_use_case = DeleteUser(user_repo)
success = delete_use_case.execute(user_id)
if not success:
result = delete_use_case.execute(user_id)
if result["status"] == "error":
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Usuario con ID {user_id} no encontrado"
status_code=status.HTTP_400_BAD_REQUEST,
detail=result["message"]
)
return None
return result