añadido los estados de reporte
This commit is contained in:
@@ -1,202 +1,202 @@
|
|||||||
# RabbitMQ Integration Guide
|
# RabbitMQ Integration Guide
|
||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
This project now uses RabbitMQ for asynchronous message queue processing. The architecture follows the **producer-consumer pattern**:
|
This project now uses RabbitMQ for asynchronous message queue processing. The architecture follows the **producer-consumer pattern**:
|
||||||
|
|
||||||
- **Senders (Producers)**: API endpoints send messages to RabbitMQ queues
|
- **Senders (Producers)**: API endpoints send messages to RabbitMQ queues
|
||||||
- **Receivers (Consumers)**: Separate consumer processes listen to queues and save to databases
|
- **Receivers (Consumers)**: Separate consumer processes listen to queues and save to databases
|
||||||
|
|
||||||
## Architecture
|
## Architecture
|
||||||
|
|
||||||
### Message Flow
|
### Message Flow
|
||||||
|
|
||||||
```
|
```
|
||||||
API Endpoint → Service → RabbitMQ Queue → Consumer → Database
|
API Endpoint → Service → RabbitMQ Queue → Consumer → Database
|
||||||
```
|
```
|
||||||
|
|
||||||
### Queues
|
### Queues
|
||||||
|
|
||||||
- **users_queue**: Receives user events (create, update, delete)
|
- **users_queue**: Receives user events (create, update, delete)
|
||||||
- **reports_queue**: Receives report events (create, update_visibility, delete)
|
- **reports_queue**: Receives report events (create, update_visibility, delete)
|
||||||
|
|
||||||
## Setup and Configuration
|
## Setup and Configuration
|
||||||
|
|
||||||
### 1. Install Dependencies
|
### 1. Install Dependencies
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
pip install -r requirements.txt
|
pip install -r requirements.txt
|
||||||
```
|
```
|
||||||
|
|
||||||
This includes the `pika` package for RabbitMQ communication.
|
This includes the `pika` package for RabbitMQ communication.
|
||||||
|
|
||||||
### 2. Start RabbitMQ
|
### 2. Start RabbitMQ
|
||||||
|
|
||||||
Ensure RabbitMQ is running on your system:
|
Ensure RabbitMQ is running on your system:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Using Docker
|
# Using Docker
|
||||||
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
|
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
|
||||||
|
|
||||||
# Or using local installation
|
# Or using local installation
|
||||||
rabbitmq-server
|
rabbitmq-server
|
||||||
```
|
```
|
||||||
|
|
||||||
## Running the Application
|
## Running the Application
|
||||||
|
|
||||||
### Start the API Server
|
### Start the API Server
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
cd src
|
cd src
|
||||||
python main.py
|
python main.py
|
||||||
```
|
```
|
||||||
|
|
||||||
The API will be available at `http://localhost:8000`
|
The API will be available at `http://localhost:8000`
|
||||||
|
|
||||||
### Start Consumers
|
### Start Consumers
|
||||||
|
|
||||||
In separate terminal windows, run the consumers:
|
In separate terminal windows, run the consumers:
|
||||||
|
|
||||||
#### User Consumer
|
#### User Consumer
|
||||||
```bash
|
```bash
|
||||||
cd src
|
cd src
|
||||||
python -m consumers.user_consumer
|
python -m consumers.user_consumer
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Report Consumer
|
#### Report Consumer
|
||||||
```bash
|
```bash
|
||||||
cd src
|
cd src
|
||||||
python -m consumers.report_consumer
|
python -m consumers.report_consumer
|
||||||
```
|
```
|
||||||
|
|
||||||
## Usage Examples
|
## Usage Examples
|
||||||
|
|
||||||
### Creating a User
|
### Creating a User
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -X POST http://localhost:8000/users/ \
|
curl -X POST http://localhost:8000/users/ \
|
||||||
-H "Content-Type: application/json" \
|
-H "Content-Type: application/json" \
|
||||||
-d '{
|
-d '{
|
||||||
"nombre": "John",
|
"nombre": "John",
|
||||||
"apellido": "Doe",
|
"apellido": "Doe",
|
||||||
"email": "john@example.com",
|
"email": "john@example.com",
|
||||||
"fecha_nacimiento": "1990-01-01T00:00:00",
|
"fecha_nacimiento": "1990-01-01T00:00:00",
|
||||||
"url_foto_perfil": "http://example.com/photo.jpg",
|
"url_foto_perfil": "http://example.com/photo.jpg",
|
||||||
"biografia": "A test user"
|
"biografia": "A test user"
|
||||||
}'
|
}'
|
||||||
```
|
```
|
||||||
|
|
||||||
**Response** (Immediate):
|
**Response** (Immediate):
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"status": "queued",
|
"status": "queued",
|
||||||
"message": "Usuario enviado a cola para procesamiento",
|
"message": "Usuario enviado a cola para procesamiento",
|
||||||
"email": "john@example.com"
|
"email": "john@example.com"
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
The user will be saved to the database by the User Consumer.
|
The user will be saved to the database by the User Consumer.
|
||||||
|
|
||||||
### Creating a Report
|
### Creating a Report
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -X POST http://localhost:8000/reports/ \
|
curl -X POST http://localhost:8000/reports/ \
|
||||||
-H "Content-Type: application/json" \
|
-H "Content-Type: application/json" \
|
||||||
-d '{
|
-d '{
|
||||||
"id_usuario": 1,
|
"id_usuario": 1,
|
||||||
"tipo_reporte": 1,
|
"tipo_reporte": 1,
|
||||||
"descripcion": "Issue description",
|
"descripcion": "Issue description",
|
||||||
"ubicacion": "Location info"
|
"ubicacion": "Location info"
|
||||||
}'
|
}'
|
||||||
```
|
```
|
||||||
|
|
||||||
**Response** (Immediate):
|
**Response** (Immediate):
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"status": "queued",
|
"status": "queued",
|
||||||
"message": "Reporte enviado a cola para procesamiento",
|
"message": "Reporte enviado a cola para procesamiento",
|
||||||
"id_reporte": "uuid-string"
|
"id_reporte": "uuid-string"
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
The report will be saved to the database by the Report Consumer.
|
The report will be saved to the database by the Report Consumer.
|
||||||
|
|
||||||
## Message Formats
|
## Message Formats
|
||||||
|
|
||||||
### User Event Messages
|
### User Event Messages
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{
|
{
|
||||||
"event_type": "user.create|user.update|user.delete",
|
"event_type": "user.create|user.update|user.delete",
|
||||||
"user_id": Optional[int],
|
"user_id": Optional[int],
|
||||||
"nombre": Optional[str],
|
"nombre": Optional[str],
|
||||||
"apellido": Optional[str],
|
"apellido": Optional[str],
|
||||||
"email": Optional[str],
|
"email": Optional[str],
|
||||||
"fecha_nacimiento": Optional[str], # ISO format
|
"fecha_nacimiento": Optional[str], # ISO format
|
||||||
"fecha_creacion": Optional[str], # ISO format
|
"fecha_creacion": Optional[str], # ISO format
|
||||||
"calificacion": Optional[float],
|
"calificacion": Optional[float],
|
||||||
"numero_reportes": Optional[int],
|
"numero_reportes": Optional[int],
|
||||||
"url_foto_perfil": Optional[str],
|
"url_foto_perfil": Optional[str],
|
||||||
"biografia": Optional[str]
|
"biografia": Optional[str]
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
### Report Event Messages
|
### Report Event Messages
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{
|
{
|
||||||
"event_type": "report.create|report.update_visibility|report.delete",
|
"event_type": "report.create|report.update_visibility|report.delete",
|
||||||
"id_reporte": Optional[str],
|
"id_reporte": Optional[str],
|
||||||
"id_usuario": Optional[int],
|
"id_usuario": Optional[int],
|
||||||
"tipo_reporte": Optional[int],
|
"tipo_reporte": Optional[int],
|
||||||
"descripcion": Optional[str],
|
"descripcion": Optional[str],
|
||||||
"ubicacion": Optional[str],
|
"ubicacion": Optional[str],
|
||||||
"visibilidad": Optional[float],
|
"visibilidad": Optional[float],
|
||||||
"fecha_creacion": Optional[str], # ISO format
|
"fecha_creacion": Optional[str], # ISO format
|
||||||
"penalize_author": Optional[bool]
|
"penalize_author": Optional[bool]
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
## Consumer Implementation Details
|
## Consumer Implementation Details
|
||||||
|
|
||||||
### User Consumer (`src/consumers/user_consumer.py`)
|
### User Consumer (`src/consumers/user_consumer.py`)
|
||||||
|
|
||||||
Processes three types of user events:
|
Processes three types of user events:
|
||||||
|
|
||||||
1. **CREATE**: Saves a new user to the database
|
1. **CREATE**: Saves a new user to the database
|
||||||
2. **UPDATE**: Updates existing user fields
|
2. **UPDATE**: Updates existing user fields
|
||||||
3. **DELETE**: Removes a user from the database
|
3. **DELETE**: Removes a user from the database
|
||||||
|
|
||||||
### Report Consumer (`src/consumers/report_consumer.py`)
|
### Report Consumer (`src/consumers/report_consumer.py`)
|
||||||
|
|
||||||
Processes three types of report events:
|
Processes three types of report events:
|
||||||
|
|
||||||
1. **CREATE**: Saves a new report to MongoDB and increments user's report counter
|
1. **CREATE**: Saves a new report to MongoDB and increments user's report counter
|
||||||
2. **UPDATE_VISIBILITY**: Updates report visibility and optionally penalizes the author
|
2. **UPDATE_VISIBILITY**: Updates report visibility and optionally penalizes the author
|
||||||
3. **DELETE**: Removes a report from the database
|
3. **DELETE**: Removes a report from the database
|
||||||
|
|
||||||
## Benefits of This Architecture
|
## Benefits of This Architecture
|
||||||
|
|
||||||
1. **Asynchronous Processing**: API responds immediately without waiting for database operations
|
1. **Asynchronous Processing**: API responds immediately without waiting for database operations
|
||||||
2. **Scalability**: Consumers can be scaled independently
|
2. **Scalability**: Consumers can be scaled independently
|
||||||
3. **Reliability**: Messages are persistent and won't be lost
|
3. **Reliability**: Messages are persistent and won't be lost
|
||||||
4. **Decoupling**: Services are decoupled from database operations
|
4. **Decoupling**: Services are decoupled from database operations
|
||||||
5. **Message Ordering**: FIFO guarantee ensures operations are processed in order
|
5. **Message Ordering**: FIFO guarantee ensures operations are processed in order
|
||||||
|
|
||||||
## Error Handling
|
## Error Handling
|
||||||
|
|
||||||
- Messages are acknowledged only after successful processing
|
- Messages are acknowledged only after successful processing
|
||||||
- Failed messages are automatically requeued for retry
|
- Failed messages are automatically requeued for retry
|
||||||
- All operations are logged for debugging and monitoring
|
- All operations are logged for debugging and monitoring
|
||||||
|
|
||||||
## Database Compatibility
|
## Database Compatibility
|
||||||
|
|
||||||
- **Users**: MySQL (via SQLAlchemy)
|
- **Users**: MySQL (via SQLAlchemy)
|
||||||
- **Reports**: MongoDB
|
- **Reports**: MongoDB
|
||||||
|
|
||||||
## Future Enhancements
|
## Future Enhancements
|
||||||
|
|
||||||
- Add retry policies with exponential backoff
|
- Add retry policies with exponential backoff
|
||||||
- Implement dead-letter queues for failed messages
|
- Implement dead-letter queues for failed messages
|
||||||
- Add message monitoring and analytics
|
- Add message monitoring and analytics
|
||||||
- Implement distributed transaction handling
|
- Implement distributed transaction handling
|
||||||
|
|||||||
@@ -1,39 +1,39 @@
|
|||||||
version: '3.8'
|
version: '3.8'
|
||||||
|
|
||||||
services:
|
services:
|
||||||
mysql:
|
mysql:
|
||||||
image: mysql:8.0
|
image: mysql:8.0
|
||||||
container_name: voxpopuli_mysql
|
container_name: voxpopuli_mysql
|
||||||
environment:
|
environment:
|
||||||
MYSQL_ROOT_PASSWORD: rootpassword
|
MYSQL_ROOT_PASSWORD: rootpassword
|
||||||
MYSQL_DATABASE: voxpopuli_users
|
MYSQL_DATABASE: voxpopuli_users
|
||||||
MYSQL_USER: voxpopuli
|
MYSQL_USER: voxpopuli
|
||||||
MYSQL_PASSWORD: voxpopuli_pass
|
MYSQL_PASSWORD: voxpopuli_pass
|
||||||
ports:
|
ports:
|
||||||
- "3306:3306"
|
- "3306:3306"
|
||||||
volumes:
|
volumes:
|
||||||
- mysql_data:/var/lib/mysql
|
- mysql_data:/var/lib/mysql
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
|
test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
|
||||||
interval: 10s
|
interval: 10s
|
||||||
timeout: 5s
|
timeout: 5s
|
||||||
retries: 5
|
retries: 5
|
||||||
|
|
||||||
mongodb:
|
mongodb:
|
||||||
image: mongo:7.0
|
image: mongo:7.0
|
||||||
container_name: voxpopuli_mongo
|
container_name: voxpopuli_mongo
|
||||||
environment:
|
environment:
|
||||||
MONGO_INITDB_DATABASE: voxpopuli_reports
|
MONGO_INITDB_DATABASE: voxpopuli_reports
|
||||||
ports:
|
ports:
|
||||||
- "27017:27017"
|
- "27017:27017"
|
||||||
volumes:
|
volumes:
|
||||||
- mongo_data:/data/db
|
- mongo_data:/data/db
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"]
|
test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"]
|
||||||
interval: 10s
|
interval: 10s
|
||||||
timeout: 5s
|
timeout: 5s
|
||||||
retries: 5
|
retries: 5
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
mysql_data:
|
mysql_data:
|
||||||
mongo_data:
|
mongo_data:
|
||||||
104
enable-venv.sh
104
enable-venv.sh
@@ -1,52 +1,52 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
|
||||||
# Script para habilitar el entorno virtual de VoxPopuli Microservices
|
# Script para habilitar el entorno virtual de VoxPopuli Microservices
|
||||||
|
|
||||||
# Colores para output
|
# Colores para output
|
||||||
GREEN='\033[0;32m'
|
GREEN='\033[0;32m'
|
||||||
YELLOW='\033[1;33m'
|
YELLOW='\033[1;33m'
|
||||||
NC='\033[0m' # No Color
|
NC='\033[0m' # No Color
|
||||||
|
|
||||||
echo -e "${YELLOW}================================${NC}"
|
echo -e "${YELLOW}================================${NC}"
|
||||||
echo -e "${YELLOW}VoxPopuli Microservices Setup${NC}"
|
echo -e "${YELLOW}VoxPopuli Microservices Setup${NC}"
|
||||||
echo -e "${YELLOW}================================${NC}"
|
echo -e "${YELLOW}================================${NC}"
|
||||||
|
|
||||||
# Crear entorno virtual si no existe
|
# Crear entorno virtual si no existe
|
||||||
if [ ! -d "venv" ]; then
|
if [ ! -d "venv" ]; then
|
||||||
echo -e "${YELLOW}Creando entorno virtual...${NC}"
|
echo -e "${YELLOW}Creando entorno virtual...${NC}"
|
||||||
python3 -m venv venv
|
python3 -m venv venv
|
||||||
echo -e "${GREEN}✓ Entorno virtual creado${NC}"
|
echo -e "${GREEN}✓ Entorno virtual creado${NC}"
|
||||||
else
|
else
|
||||||
echo -e "${GREEN}✓ Entorno virtual ya existe${NC}"
|
echo -e "${GREEN}✓ Entorno virtual ya existe${NC}"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Activar entorno virtual
|
# Activar entorno virtual
|
||||||
echo -e "${YELLOW}Activando entorno virtual...${NC}"
|
echo -e "${YELLOW}Activando entorno virtual...${NC}"
|
||||||
source venv/bin/activate
|
source venv/bin/activate
|
||||||
echo -e "${GREEN}✓ Entorno virtual activado${NC}"
|
echo -e "${GREEN}✓ Entorno virtual activado${NC}"
|
||||||
|
|
||||||
# Instalar dependencias
|
# Instalar dependencias
|
||||||
if [ -f "requirements.txt" ]; then
|
if [ -f "requirements.txt" ]; then
|
||||||
echo -e "${YELLOW}Instalando dependencias...${NC}"
|
echo -e "${YELLOW}Instalando dependencias...${NC}"
|
||||||
pip install -r requirements.txt
|
pip install -r requirements.txt
|
||||||
echo -e "${GREEN}✓ Dependencias instaladas${NC}"
|
echo -e "${GREEN}✓ Dependencias instaladas${NC}"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Verificar archivo .env
|
# Verificar archivo .env
|
||||||
if [ ! -f ".env" ]; then
|
if [ ! -f ".env" ]; then
|
||||||
if [ -f ".env.example" ]; then
|
if [ -f ".env.example" ]; then
|
||||||
echo -e "${YELLOW}Creando archivo .env desde .env.example...${NC}"
|
echo -e "${YELLOW}Creando archivo .env desde .env.example...${NC}"
|
||||||
cp .env.example .env
|
cp .env.example .env
|
||||||
echo -e "${YELLOW}⚠ Por favor, actualiza .env con tus credenciales${NC}"
|
echo -e "${YELLOW}⚠ Por favor, actualiza .env con tus credenciales${NC}"
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echo
|
echo
|
||||||
echo -e "${GREEN}================================${NC}"
|
echo -e "${GREEN}================================${NC}"
|
||||||
echo -e "${GREEN}✓ Setup completado${NC}"
|
echo -e "${GREEN}✓ Setup completado${NC}"
|
||||||
echo -e "${GREEN}================================${NC}"
|
echo -e "${GREEN}================================${NC}"
|
||||||
echo
|
echo
|
||||||
echo -e "${YELLOW}Para ejecutar los microservicios:${NC}"
|
echo -e "${YELLOW}Para ejecutar los microservicios:${NC}"
|
||||||
echo "cd src"
|
echo "cd src"
|
||||||
echo "python main.py"
|
echo "python main.py"
|
||||||
echo
|
echo
|
||||||
|
|||||||
@@ -35,6 +35,11 @@ class ReportRepository(ABC):
|
|||||||
"""Actualiza la visibilidad de un reporte"""
|
"""Actualiza la visibilidad de un reporte"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def update_estado(self, report_id: str, new_estado: str) -> None:
|
||||||
|
"""Actualiza el estado de un reporte"""
|
||||||
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def delete(self, report_id: str) -> bool:
|
def delete(self, report_id: str) -> bool:
|
||||||
"""Elimina un reporte"""
|
"""Elimina un reporte"""
|
||||||
|
|||||||
@@ -19,7 +19,8 @@ class CreateReport:
|
|||||||
|
|
||||||
def execute(self, id_usuario: int, tipo_reporte: int, descripcion: str,
|
def execute(self, id_usuario: int, tipo_reporte: int, descripcion: str,
|
||||||
ubicacion: Optional[str] = None, lat: Optional[float] = None,
|
ubicacion: Optional[str] = None, lat: Optional[float] = None,
|
||||||
lng: Optional[float] = None, image_filename: Optional[str] = None) -> Dict[str, Any]:
|
lng: Optional[float] = None, image_filename: Optional[str] = None,
|
||||||
|
estado: str = "en proceso") -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Sends a create report message to RabbitMQ.
|
Sends a create report message to RabbitMQ.
|
||||||
Valida previamente:
|
Valida previamente:
|
||||||
@@ -44,6 +45,14 @@ class CreateReport:
|
|||||||
"message": "El tipo de reporte debe estar entre 1 y 5"
|
"message": "El tipo de reporte debe estar entre 1 y 5"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Validación: estado válido
|
||||||
|
estados_validos = ["en proceso", "no resuelto", "resuelto"]
|
||||||
|
if estado not in estados_validos:
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"message": f"El estado del reporte debe ser uno de: {', '.join(estados_validos)}"
|
||||||
|
}
|
||||||
|
|
||||||
# Validación: usuario existe (CRÍTICO)
|
# Validación: usuario existe (CRÍTICO)
|
||||||
try:
|
try:
|
||||||
user = self.user_repo.find_by_id(id_usuario)
|
user = self.user_repo.find_by_id(id_usuario)
|
||||||
@@ -73,6 +82,7 @@ class CreateReport:
|
|||||||
lng=lng,
|
lng=lng,
|
||||||
image_filename=image_filename,
|
image_filename=image_filename,
|
||||||
visibilidad=50.0, # Visibilidad inicial neutral
|
visibilidad=50.0, # Visibilidad inicial neutral
|
||||||
|
estado=estado,
|
||||||
fecha_creacion=fecha_creacion.isoformat()
|
fecha_creacion=fecha_creacion.isoformat()
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -244,3 +254,57 @@ class DeleteReport:
|
|||||||
"status": "error",
|
"status": "error",
|
||||||
"message": "Error al enviar eliminación del reporte a la cola de procesamiento"
|
"message": "Error al enviar eliminación del reporte a la cola de procesamiento"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class UpdateReportStatus:
|
||||||
|
"""Use case para actualizar el estado de un reporte"""
|
||||||
|
def __init__(self, repo: ReportRepository):
|
||||||
|
if not isinstance(repo, ReportRepository):
|
||||||
|
raise TypeError("repo must implement ReportRepository")
|
||||||
|
self.repo = repo
|
||||||
|
|
||||||
|
def execute(self, report_id: str, new_estado: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Actualiza el estado de un reporte.
|
||||||
|
Valida previamente:
|
||||||
|
- Reporte existe
|
||||||
|
- Estado es válido
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with status and message
|
||||||
|
"""
|
||||||
|
# Validación: estado válido
|
||||||
|
estados_validos = ["en proceso", "no resuelto", "resuelto"]
|
||||||
|
if new_estado not in estados_validos:
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"message": f"El estado del reporte debe ser uno de: {', '.join(estados_validos)}"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Validación: reporte existe
|
||||||
|
try:
|
||||||
|
report = self.repo.find_by_id(report_id)
|
||||||
|
if not report:
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"message": f"Reporte con ID {report_id} no existe"
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"message": f"Error al buscar reporte: {str(e)}"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Actualizar estado
|
||||||
|
try:
|
||||||
|
self.repo.update_estado(report_id, new_estado)
|
||||||
|
return {
|
||||||
|
"status": "success",
|
||||||
|
"message": f"Estado del reporte actualizado a '{new_estado}'",
|
||||||
|
"id_reporte": report_id,
|
||||||
|
"nuevo_estado": new_estado
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"message": f"Error al actualizar estado del reporte: {str(e)}"
|
||||||
|
}
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
"""RabbitMQ Consumer implementations"""
|
"""RabbitMQ Consumer implementations"""
|
||||||
|
|||||||
@@ -1,220 +1,220 @@
|
|||||||
"""Report RabbitMQ Consumer - Processes report events and saves to database"""
|
"""Report RabbitMQ Consumer - Processes report events and saves to database"""
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from shutil import move
|
from shutil import move
|
||||||
|
|
||||||
# Add src to path to import modules
|
# Add src to path to import modules
|
||||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||||
|
|
||||||
from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer
|
from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer
|
||||||
from infrastructure.adapters.rabbitmq.messages import ReportMessage, ReportEventType
|
from infrastructure.adapters.rabbitmq.messages import ReportMessage, ReportEventType
|
||||||
from infrastructure.adapters.persistence.report_repository_mongo import ReportRepositoryMongo
|
from infrastructure.adapters.persistence.report_repository_mongo import ReportRepositoryMongo
|
||||||
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
|
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
|
||||||
from infrastructure.adapters.file_storage import image_storage
|
from infrastructure.adapters.file_storage import image_storage
|
||||||
from domain.reports import Report
|
from domain.reports import Report
|
||||||
from core.config import ConfSettings
|
from core.config import ConfSettings
|
||||||
|
|
||||||
# Set up logging
|
# Set up logging
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=logging.INFO,
|
level=logging.INFO,
|
||||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||||
)
|
)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ReportConsumer:
|
class ReportConsumer:
|
||||||
"""Consumer for report events from RabbitMQ"""
|
"""Consumer for report events from RabbitMQ"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.repo = ReportRepositoryMongo()
|
self.repo = ReportRepositoryMongo()
|
||||||
self.user_repo = UserRepositorySQL()
|
self.user_repo = UserRepositorySQL()
|
||||||
self.consumer = RabbitMQConsumer(queue_name='reports_queue')
|
self.consumer = RabbitMQConsumer(queue_name='reports_queue')
|
||||||
self.consumer.set_callback(self.process_message)
|
self.consumer.set_callback(self.process_message)
|
||||||
|
|
||||||
def process_message(self, message_dict: dict):
|
def process_message(self, message_dict: dict):
|
||||||
"""
|
"""
|
||||||
Processes a report event message from RabbitMQ
|
Processes a report event message from RabbitMQ
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
message_dict: Dictionary containing the message data
|
message_dict: Dictionary containing the message data
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Reconstruct the ReportMessage object
|
# Reconstruct the ReportMessage object
|
||||||
message = ReportMessage.from_dict(message_dict)
|
message = ReportMessage.from_dict(message_dict)
|
||||||
|
|
||||||
if message.event_type == ReportEventType.CREATE:
|
if message.event_type == ReportEventType.CREATE:
|
||||||
self._handle_create_report(message)
|
self._handle_create_report(message)
|
||||||
elif message.event_type == ReportEventType.UPDATE_VISIBILITY:
|
elif message.event_type == ReportEventType.UPDATE_VISIBILITY:
|
||||||
self._handle_update_visibility(message)
|
self._handle_update_visibility(message)
|
||||||
elif message.event_type == ReportEventType.DELETE:
|
elif message.event_type == ReportEventType.DELETE:
|
||||||
self._handle_delete_report(message)
|
self._handle_delete_report(message)
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Unknown event type: {message.event_type}")
|
logger.warning(f"Unknown event type: {message.event_type}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing report message: {e}", exc_info=True)
|
logger.error(f"Error processing report message: {e}", exc_info=True)
|
||||||
# Rollback explícito en caso de error
|
# Rollback explícito en caso de error
|
||||||
self.user_repo.db.rollback()
|
self.user_repo.db.rollback()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _handle_create_report(self, message: ReportMessage):
|
def _handle_create_report(self, message: ReportMessage):
|
||||||
"""Handle report create event con manejo de transacciones cruzadas"""
|
"""Handle report create event con manejo de transacciones cruzadas"""
|
||||||
try:
|
try:
|
||||||
logger.info(f"Creating report: {message.id_reporte} from user {message.id_usuario}")
|
logger.info(f"Creating report: {message.id_reporte} from user {message.id_usuario}")
|
||||||
|
|
||||||
# Parse datetime string
|
# Parse datetime string
|
||||||
fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
|
fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
|
||||||
|
|
||||||
# Renombrar imagen temporal si existe
|
# Renombrar imagen temporal si existe
|
||||||
final_image_filename = None
|
final_image_filename = None
|
||||||
if message.image_filename:
|
if message.image_filename:
|
||||||
try:
|
try:
|
||||||
# Renombrar de temp_userid_type a report_id
|
# Renombrar de temp_userid_type a report_id
|
||||||
temp_path = image_storage.get_image_path(message.image_filename)
|
temp_path = image_storage.get_image_path(message.image_filename)
|
||||||
final_filename = f"{message.id_reporte}.webp"
|
final_filename = f"{message.id_reporte}.webp"
|
||||||
final_path = image_storage.get_image_path(final_filename)
|
final_path = image_storage.get_image_path(final_filename)
|
||||||
|
|
||||||
if temp_path.exists():
|
if temp_path.exists():
|
||||||
move(str(temp_path), str(final_path))
|
move(str(temp_path), str(final_path))
|
||||||
final_image_filename = final_filename
|
final_image_filename = final_filename
|
||||||
logger.info(f"Image renamed from {message.image_filename} to {final_filename}")
|
logger.info(f"Image renamed from {message.image_filename} to {final_filename}")
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Temporary image not found: {message.image_filename}")
|
logger.warning(f"Temporary image not found: {message.image_filename}")
|
||||||
except Exception as img_error:
|
except Exception as img_error:
|
||||||
logger.error(f"Error renaming image: {img_error}", exc_info=True)
|
logger.error(f"Error renaming image: {img_error}", exc_info=True)
|
||||||
# Continuar sin imagen si falla el renombramiento
|
# Continuar sin imagen si falla el renombramiento
|
||||||
|
|
||||||
# Create Report domain object
|
# Create Report domain object
|
||||||
report = Report(
|
report = Report(
|
||||||
id_reporte=message.id_reporte,
|
id_reporte=message.id_reporte,
|
||||||
id_usuario=message.id_usuario,
|
id_usuario=message.id_usuario,
|
||||||
tipo_reporte=message.tipo_reporte,
|
tipo_reporte=message.tipo_reporte,
|
||||||
descripcion=message.descripcion,
|
descripcion=message.descripcion,
|
||||||
ubicacion=message.ubicacion,
|
ubicacion=message.ubicacion,
|
||||||
lat=message.lat,
|
lat=message.lat,
|
||||||
lng=message.lng,
|
lng=message.lng,
|
||||||
image_filename=final_image_filename,
|
image_filename=final_image_filename,
|
||||||
visibilidad=message.visibilidad,
|
visibilidad=message.visibilidad,
|
||||||
fecha_creacion=fecha_creacion
|
fecha_creacion=fecha_creacion
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Save to MongoDB repository
|
# Save to MongoDB repository
|
||||||
saved_report = self.repo.save(report)
|
saved_report = self.repo.save(report)
|
||||||
logger.info(f"Report created successfully in MongoDB: {message.id_reporte}")
|
logger.info(f"Report created successfully in MongoDB: {message.id_reporte}")
|
||||||
except Exception as mongo_error:
|
except Exception as mongo_error:
|
||||||
logger.error(f"Error saving report to MongoDB: {mongo_error}", exc_info=True)
|
logger.error(f"Error saving report to MongoDB: {mongo_error}", exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Increment user's report counter in MySQL
|
# Increment user's report counter in MySQL
|
||||||
self.user_repo.increment_reports(message.id_usuario)
|
self.user_repo.increment_reports(message.id_usuario)
|
||||||
logger.info(f"Incremented report counter for user: {message.id_usuario}")
|
logger.info(f"Incremented report counter for user: {message.id_usuario}")
|
||||||
except Exception as sql_error:
|
except Exception as sql_error:
|
||||||
logger.error(f"Error incrementing report counter: {sql_error}", exc_info=True)
|
logger.error(f"Error incrementing report counter: {sql_error}", exc_info=True)
|
||||||
# Rollback SQL transaction
|
# Rollback SQL transaction
|
||||||
self.user_repo.db.rollback()
|
self.user_repo.db.rollback()
|
||||||
# Note: MongoDB save cannot be rolled back, log for manual cleanup
|
# Note: MongoDB save cannot be rolled back, log for manual cleanup
|
||||||
logger.critical(f"INCONSISTENCY: Report {message.id_reporte} saved to MongoDB but user counter not incremented for user {message.id_usuario}")
|
logger.critical(f"INCONSISTENCY: Report {message.id_reporte} saved to MongoDB but user counter not incremented for user {message.id_usuario}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error creating report: {e}", exc_info=True)
|
logger.error(f"Error creating report: {e}", exc_info=True)
|
||||||
self.user_repo.db.rollback()
|
self.user_repo.db.rollback()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _handle_update_visibility(self, message: ReportMessage):
|
def _handle_update_visibility(self, message: ReportMessage):
|
||||||
"""Handle report visibility update event con manejo de transacciones"""
|
"""Handle report visibility update event con manejo de transacciones"""
|
||||||
try:
|
try:
|
||||||
logger.info(f"Updating visibility for report: {message.id_reporte}")
|
logger.info(f"Updating visibility for report: {message.id_reporte}")
|
||||||
|
|
||||||
# Find the report
|
# Find the report
|
||||||
report = self.repo.find_by_id(message.id_reporte)
|
report = self.repo.find_by_id(message.id_reporte)
|
||||||
if not report:
|
if not report:
|
||||||
logger.warning(f"Report not found: {message.id_reporte}")
|
logger.warning(f"Report not found: {message.id_reporte}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Update visibility in MongoDB
|
# Update visibility in MongoDB
|
||||||
try:
|
try:
|
||||||
self.repo.update_visibility(message.id_reporte, message.visibilidad)
|
self.repo.update_visibility(message.id_reporte, message.visibilidad)
|
||||||
logger.info(f"Report visibility updated: {message.id_reporte} -> {message.visibilidad}")
|
logger.info(f"Report visibility updated: {message.id_reporte} -> {message.visibilidad}")
|
||||||
except Exception as mongo_error:
|
except Exception as mongo_error:
|
||||||
logger.error(f"Error updating report visibility in MongoDB: {mongo_error}", exc_info=True)
|
logger.error(f"Error updating report visibility in MongoDB: {mongo_error}", exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# Penalize author if visibility is very low (shadowban)
|
# Penalize author if visibility is very low (shadowban)
|
||||||
if message.penalize_author and message.visibilidad < 20:
|
if message.penalize_author and message.visibilidad < 20:
|
||||||
try:
|
try:
|
||||||
user = self.user_repo.find_by_id(report.id_usuario)
|
user = self.user_repo.find_by_id(report.id_usuario)
|
||||||
if user:
|
if user:
|
||||||
# Reduce user's rating
|
# Reduce user's rating
|
||||||
new_rating = max(0, user.calificacion - 5)
|
new_rating = max(0, user.calificacion - 5)
|
||||||
self.user_repo.update_rating(report.id_usuario, new_rating)
|
self.user_repo.update_rating(report.id_usuario, new_rating)
|
||||||
logger.info(f"Author penalized: user {report.id_usuario} rating reduced to {new_rating}")
|
logger.info(f"Author penalized: user {report.id_usuario} rating reduced to {new_rating}")
|
||||||
else:
|
else:
|
||||||
logger.warning(f"User not found for penalty: {report.id_usuario}")
|
logger.warning(f"User not found for penalty: {report.id_usuario}")
|
||||||
except Exception as penalty_error:
|
except Exception as penalty_error:
|
||||||
logger.error(f"Error penalizing author: {penalty_error}", exc_info=True)
|
logger.error(f"Error penalizing author: {penalty_error}", exc_info=True)
|
||||||
self.user_repo.db.rollback()
|
self.user_repo.db.rollback()
|
||||||
logger.critical(f"INCONSISTENCY: Report {message.id_reporte} visibility updated but author penalty failed")
|
logger.critical(f"INCONSISTENCY: Report {message.id_reporte} visibility updated but author penalty failed")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error updating report visibility: {e}", exc_info=True)
|
logger.error(f"Error updating report visibility: {e}", exc_info=True)
|
||||||
self.user_repo.db.rollback()
|
self.user_repo.db.rollback()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _handle_delete_report(self, message: ReportMessage):
|
def _handle_delete_report(self, message: ReportMessage):
|
||||||
"""Handle report delete event"""
|
"""Handle report delete event"""
|
||||||
try:
|
try:
|
||||||
logger.info(f"Deleting report: {message.id_reporte}")
|
logger.info(f"Deleting report: {message.id_reporte}")
|
||||||
|
|
||||||
# Obtener reportepara acceder a image_filename antes de eliminarlo
|
# Obtener reportepara acceder a image_filename antes de eliminarlo
|
||||||
report = self.repo.find_by_id(message.id_reporte)
|
report = self.repo.find_by_id(message.id_reporte)
|
||||||
|
|
||||||
# Eliminar del MongoDB
|
# Eliminar del MongoDB
|
||||||
success = self.repo.delete(message.id_reporte)
|
success = self.repo.delete(message.id_reporte)
|
||||||
if success:
|
if success:
|
||||||
logger.info(f"Report deleted successfully from MongoDB: {message.id_reporte}")
|
logger.info(f"Report deleted successfully from MongoDB: {message.id_reporte}")
|
||||||
|
|
||||||
# Eliminar imagen del almacenamiento
|
# Eliminar imagen del almacenamiento
|
||||||
if report and report.image_filename:
|
if report and report.image_filename:
|
||||||
deleted = image_storage.delete_image(report.image_filename)
|
deleted = image_storage.delete_image(report.image_filename)
|
||||||
if deleted:
|
if deleted:
|
||||||
logger.info(f"Image deleted: {report.image_filename}")
|
logger.info(f"Image deleted: {report.image_filename}")
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Could not delete image: {report.image_filename}")
|
logger.warning(f"Could not delete image: {report.image_filename}")
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Failed to delete report: {message.id_reporte}")
|
logger.warning(f"Failed to delete report: {message.id_reporte}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error deleting report: {e}", exc_info=True)
|
logger.error(f"Error deleting report: {e}", exc_info=True)
|
||||||
self.user_repo.db.rollback()
|
self.user_repo.db.rollback()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
"""Start consuming messages"""
|
"""Start consuming messages"""
|
||||||
logger.info("Starting Report Consumer...")
|
logger.info("Starting Report Consumer...")
|
||||||
logger.info("[*] Waiting for report events. Ctrl+C to exit.")
|
logger.info("[*] Waiting for report events. Ctrl+C to exit.")
|
||||||
try:
|
try:
|
||||||
self.consumer.start_consuming()
|
self.consumer.start_consuming()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logger.info("Report Consumer stopped by user")
|
logger.info("Report Consumer stopped by user")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Consumer error: {e}", exc_info=True)
|
logger.error(f"Consumer error: {e}", exc_info=True)
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
# Asegurar cierre de sesión SQL
|
# Asegurar cierre de sesión SQL
|
||||||
if self.user_repo.db:
|
if self.user_repo.db:
|
||||||
try:
|
try:
|
||||||
self.user_repo.db.close()
|
self.user_repo.db.close()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error closing database session: {e}")
|
logger.error(f"Error closing database session: {e}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
consumer = ReportConsumer()
|
consumer = ReportConsumer()
|
||||||
consumer.start()
|
consumer.start()
|
||||||
|
|||||||
@@ -1,157 +1,157 @@
|
|||||||
"""User RabbitMQ Consumer - Processes user events and saves to database"""
|
"""User RabbitMQ Consumer - Processes user events and saves to database"""
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
# Add src to path to import modules
|
# Add src to path to import modules
|
||||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||||
|
|
||||||
from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer
|
from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer
|
||||||
from infrastructure.adapters.rabbitmq.messages import UserMessage, UserEventType
|
from infrastructure.adapters.rabbitmq.messages import UserMessage, UserEventType
|
||||||
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
|
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
|
||||||
from domain.users import User
|
from domain.users import User
|
||||||
|
|
||||||
# Set up logging
|
# Set up logging
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=logging.INFO,
|
level=logging.INFO,
|
||||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||||
)
|
)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class UserConsumer:
|
class UserConsumer:
|
||||||
"""Consumer for user events from RabbitMQ"""
|
"""Consumer for user events from RabbitMQ"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.repo = UserRepositorySQL()
|
self.repo = UserRepositorySQL()
|
||||||
self.consumer = RabbitMQConsumer(queue_name='users_queue')
|
self.consumer = RabbitMQConsumer(queue_name='users_queue')
|
||||||
self.consumer.set_callback(self.process_message)
|
self.consumer.set_callback(self.process_message)
|
||||||
|
|
||||||
def process_message(self, message_dict: dict):
|
def process_message(self, message_dict: dict):
|
||||||
"""
|
"""
|
||||||
Processes a user event message from RabbitMQ
|
Processes a user event message from RabbitMQ
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
message_dict: Dictionary containing the message data
|
message_dict: Dictionary containing the message data
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Reconstruct the UserMessage object
|
# Reconstruct the UserMessage object
|
||||||
message = UserMessage.from_dict(message_dict)
|
message = UserMessage.from_dict(message_dict)
|
||||||
|
|
||||||
if message.event_type == UserEventType.CREATE:
|
if message.event_type == UserEventType.CREATE:
|
||||||
self._handle_create_user(message)
|
self._handle_create_user(message)
|
||||||
elif message.event_type == UserEventType.UPDATE:
|
elif message.event_type == UserEventType.UPDATE:
|
||||||
self._handle_update_user(message)
|
self._handle_update_user(message)
|
||||||
elif message.event_type == UserEventType.DELETE:
|
elif message.event_type == UserEventType.DELETE:
|
||||||
self._handle_delete_user(message)
|
self._handle_delete_user(message)
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Unknown event type: {message.event_type}")
|
logger.warning(f"Unknown event type: {message.event_type}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing user message: {e}", exc_info=True)
|
logger.error(f"Error processing user message: {e}", exc_info=True)
|
||||||
# Rollback en caso de error en el procesamiento del mensaje
|
# Rollback en caso de error en el procesamiento del mensaje
|
||||||
self.repo.db.rollback()
|
self.repo.db.rollback()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _handle_create_user(self, message: UserMessage):
|
def _handle_create_user(self, message: UserMessage):
|
||||||
"""Handle user create event"""
|
"""Handle user create event"""
|
||||||
try:
|
try:
|
||||||
logger.info(f"Creating user: {message.email}")
|
logger.info(f"Creating user: {message.email}")
|
||||||
|
|
||||||
# Parse datetime strings
|
# Parse datetime strings
|
||||||
fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento)
|
fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento)
|
||||||
fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
|
fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
|
||||||
|
|
||||||
# Create User domain object
|
# Create User domain object
|
||||||
user = User(
|
user = User(
|
||||||
user_id=0, # Will be auto-generated by DB
|
user_id=0, # Will be auto-generated by DB
|
||||||
nombre=message.nombre,
|
nombre=message.nombre,
|
||||||
apellido=message.apellido,
|
apellido=message.apellido,
|
||||||
email=message.email,
|
email=message.email,
|
||||||
fecha_nacimiento=fecha_nacimiento,
|
fecha_nacimiento=fecha_nacimiento,
|
||||||
fecha_creacion=fecha_creacion,
|
fecha_creacion=fecha_creacion,
|
||||||
calificacion=message.calificacion,
|
calificacion=message.calificacion,
|
||||||
numero_reportes=message.numero_reportes,
|
numero_reportes=message.numero_reportes,
|
||||||
url_foto_perfil=message.url_foto_perfil,
|
url_foto_perfil=message.url_foto_perfil,
|
||||||
biografia=message.biografia
|
biografia=message.biografia
|
||||||
)
|
)
|
||||||
|
|
||||||
# Save to repository with transaction handling
|
# Save to repository with transaction handling
|
||||||
saved_user = self.repo.save(user)
|
saved_user = self.repo.save(user)
|
||||||
logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}")
|
logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error creating user: {e}", exc_info=True)
|
logger.error(f"Error creating user: {e}", exc_info=True)
|
||||||
self.repo.db.rollback()
|
self.repo.db.rollback()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _handle_update_user(self, message: UserMessage):
|
def _handle_update_user(self, message: UserMessage):
|
||||||
"""Handle user update event"""
|
"""Handle user update event"""
|
||||||
try:
|
try:
|
||||||
logger.info(f"Updating user: {message.user_id}")
|
logger.info(f"Updating user: {message.user_id}")
|
||||||
|
|
||||||
# Find the user
|
# Find the user
|
||||||
user = self.repo.find_by_id(message.user_id)
|
user = self.repo.find_by_id(message.user_id)
|
||||||
if not user:
|
if not user:
|
||||||
logger.warning(f"User not found: {message.user_id}")
|
logger.warning(f"User not found: {message.user_id}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Update fields if provided
|
# Update fields if provided
|
||||||
if message.nombre:
|
if message.nombre:
|
||||||
user.nombre = message.nombre
|
user.nombre = message.nombre
|
||||||
if message.apellido:
|
if message.apellido:
|
||||||
user.apellido = message.apellido
|
user.apellido = message.apellido
|
||||||
if message.url_foto_perfil is not None:
|
if message.url_foto_perfil is not None:
|
||||||
user.url_foto_perfil = message.url_foto_perfil
|
user.url_foto_perfil = message.url_foto_perfil
|
||||||
if message.biografia is not None:
|
if message.biografia is not None:
|
||||||
user.biografia = message.biografia
|
user.biografia = message.biografia
|
||||||
|
|
||||||
# Save to repository with transaction handling
|
# Save to repository with transaction handling
|
||||||
updated_user = self.repo.update(user)
|
updated_user = self.repo.update(user)
|
||||||
logger.info(f"User updated successfully: {message.user_id}")
|
logger.info(f"User updated successfully: {message.user_id}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error updating user: {e}", exc_info=True)
|
logger.error(f"Error updating user: {e}", exc_info=True)
|
||||||
self.repo.db.rollback()
|
self.repo.db.rollback()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _handle_delete_user(self, message: UserMessage):
|
def _handle_delete_user(self, message: UserMessage):
|
||||||
"""Handle user delete event"""
|
"""Handle user delete event"""
|
||||||
try:
|
try:
|
||||||
logger.info(f"Deleting user: {message.user_id}")
|
logger.info(f"Deleting user: {message.user_id}")
|
||||||
|
|
||||||
success = self.repo.delete(message.user_id)
|
success = self.repo.delete(message.user_id)
|
||||||
if success:
|
if success:
|
||||||
logger.info(f"User deleted successfully: {message.user_id}")
|
logger.info(f"User deleted successfully: {message.user_id}")
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Failed to delete user: {message.user_id}")
|
logger.warning(f"Failed to delete user: {message.user_id}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error deleting user: {e}", exc_info=True)
|
logger.error(f"Error deleting user: {e}", exc_info=True)
|
||||||
self.repo.db.rollback()
|
self.repo.db.rollback()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
"""Start consuming messages"""
|
"""Start consuming messages"""
|
||||||
logger.info("Starting User Consumer...")
|
logger.info("Starting User Consumer...")
|
||||||
logger.info("[*] Waiting for user events. Ctrl+C to exit.")
|
logger.info("[*] Waiting for user events. Ctrl+C to exit.")
|
||||||
try:
|
try:
|
||||||
self.consumer.start_consuming()
|
self.consumer.start_consuming()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logger.info("User Consumer stopped by user")
|
logger.info("User Consumer stopped by user")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Consumer error: {e}", exc_info=True)
|
logger.error(f"Consumer error: {e}", exc_info=True)
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
# Asegurar cierre de sesión
|
# Asegurar cierre de sesión
|
||||||
if self.repo.db:
|
if self.repo.db:
|
||||||
try:
|
try:
|
||||||
self.repo.db.close()
|
self.repo.db.close()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error closing database session: {e}")
|
logger.error(f"Error closing database session: {e}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
consumer = UserConsumer()
|
consumer = UserConsumer()
|
||||||
consumer.start()
|
consumer.start()
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional
|
from typing import Optional, Literal
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Report:
|
class Report:
|
||||||
@@ -14,4 +14,5 @@ class Report:
|
|||||||
lng: Optional[float] = None
|
lng: Optional[float] = None
|
||||||
image_filename: Optional[str] = None # Nombre del archivo de imagen WebP
|
image_filename: Optional[str] = None # Nombre del archivo de imagen WebP
|
||||||
visibilidad: float = 0.0 # 0-100 (puntuación comunitaria)
|
visibilidad: float = 0.0 # 0-100 (puntuación comunitaria)
|
||||||
|
estado: Literal["en proceso", "no resuelto", "resuelto"] = "en proceso" # Estado del reporte
|
||||||
fecha_creacion: Optional[datetime] = None
|
fecha_creacion: Optional[datetime] = None
|
||||||
|
|||||||
@@ -1,154 +1,154 @@
|
|||||||
"""File storage utilities for report images"""
|
"""File storage utilities for report images"""
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from fastapi import UploadFile, HTTPException, status
|
from fastapi import UploadFile, HTTPException, status
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from core.config import ConfSettings
|
from core.config import ConfSettings
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ImageStorageManager:
|
class ImageStorageManager:
|
||||||
"""Maneja almacenamiento, compresión y eliminación de imágenes de reportes"""
|
"""Maneja almacenamiento, compresión y eliminación de imágenes de reportes"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.storage_path = Path(ConfSettings.storage_base_path) / ConfSettings.images_dir
|
self.storage_path = Path(ConfSettings.storage_base_path) / ConfSettings.images_dir
|
||||||
self.max_size_bytes = ConfSettings.images_max_size_mb * 1024 * 1024
|
self.max_size_bytes = ConfSettings.images_max_size_mb * 1024 * 1024
|
||||||
self.allowed_types = ConfSettings.images_allowed_types
|
self.allowed_types = ConfSettings.images_allowed_types
|
||||||
self.compression_quality = ConfSettings.images_compression_quality
|
self.compression_quality = ConfSettings.images_compression_quality
|
||||||
|
|
||||||
# Crear directorio si no existe
|
# Crear directorio si no existe
|
||||||
self.storage_path.mkdir(parents=True, exist_ok=True)
|
self.storage_path.mkdir(parents=True, exist_ok=True)
|
||||||
logger.info(f"ImageStorageManager initialized with path: {self.storage_path}")
|
logger.info(f"ImageStorageManager initialized with path: {self.storage_path}")
|
||||||
|
|
||||||
def validate_and_save_image(self, file: UploadFile, report_id: str) -> str:
|
def validate_and_save_image(self, file: UploadFile, report_id: str) -> str:
|
||||||
"""
|
"""
|
||||||
Valida y guarda una imagen, comprimiendo a WebP.
|
Valida y guarda una imagen, comprimiendo a WebP.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
file: Archivo subido (UploadFile)
|
file: Archivo subido (UploadFile)
|
||||||
report_id: ID del reporte para nombrado del archivo
|
report_id: ID del reporte para nombrado del archivo
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Nombre del archivo guardado (sin ruta)
|
Nombre del archivo guardado (sin ruta)
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
HTTPException: Si hay error en validación o guardado
|
HTTPException: Si hay error en validación o guardado
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Validar tipo MIME
|
# Validar tipo MIME
|
||||||
if file.content_type not in self.allowed_types:
|
if file.content_type not in self.allowed_types:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_400_BAD_REQUEST,
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
detail=f"Tipo de archivo no permitido. Permitidos: {', '.join(self.allowed_types)}"
|
detail=f"Tipo de archivo no permitido. Permitidos: {', '.join(self.allowed_types)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Leer contenido
|
# Leer contenido
|
||||||
content = file.file.read()
|
content = file.file.read()
|
||||||
|
|
||||||
# Validar tamaño
|
# Validar tamaño
|
||||||
if len(content) > self.max_size_bytes:
|
if len(content) > self.max_size_bytes:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
|
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
|
||||||
detail=f"Archivo demasiado grande. Máximo: {ConfSettings.images_max_size_mb}MB"
|
detail=f"Archivo demasiado grande. Máximo: {ConfSettings.images_max_size_mb}MB"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Abrir imagen con Pillow
|
# Abrir imagen con Pillow
|
||||||
try:
|
try:
|
||||||
image = Image.open(BytesIO(content))
|
image = Image.open(BytesIO(content))
|
||||||
image.verify() # Verificar que sea una imagen válida
|
image.verify() # Verificar que sea una imagen válida
|
||||||
|
|
||||||
# Reabrir después de verify() que la cierra
|
# Reabrir después de verify() que la cierra
|
||||||
image = Image.open(BytesIO(content))
|
image = Image.open(BytesIO(content))
|
||||||
|
|
||||||
# Convertir a RGB si tiene canal alpha (RGBA)
|
# Convertir a RGB si tiene canal alpha (RGBA)
|
||||||
if image.mode in ('RGBA', 'LA', 'P'):
|
if image.mode in ('RGBA', 'LA', 'P'):
|
||||||
rgb_image = Image.new('RGB', image.size, (255, 255, 255))
|
rgb_image = Image.new('RGB', image.size, (255, 255, 255))
|
||||||
rgb_image.paste(image, mask=image.split()[-1] if image.mode == 'RGBA' else None)
|
rgb_image.paste(image, mask=image.split()[-1] if image.mode == 'RGBA' else None)
|
||||||
image = rgb_image
|
image = rgb_image
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error validating image: {e}")
|
logger.error(f"Error validating image: {e}")
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_400_BAD_REQUEST,
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
detail="Archivo no es una imagen válida"
|
detail="Archivo no es una imagen válida"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Guardar como WebP comprimido
|
# Guardar como WebP comprimido
|
||||||
filename = f"{report_id}.webp"
|
filename = f"{report_id}.webp"
|
||||||
filepath = self.storage_path / filename
|
filepath = self.storage_path / filename
|
||||||
|
|
||||||
try:
|
try:
|
||||||
image.save(
|
image.save(
|
||||||
filepath,
|
filepath,
|
||||||
"WEBP",
|
"WEBP",
|
||||||
quality=self.compression_quality,
|
quality=self.compression_quality,
|
||||||
method=6
|
method=6
|
||||||
)
|
)
|
||||||
logger.info(f"Image saved: {filename} ({len(open(filepath, 'rb').read())} bytes)")
|
logger.info(f"Image saved: {filename} ({len(open(filepath, 'rb').read())} bytes)")
|
||||||
return filename
|
return filename
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error saving image to disk: {e}")
|
logger.error(f"Error saving image to disk: {e}")
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||||
detail="Error al guardar la imagen"
|
detail="Error al guardar la imagen"
|
||||||
)
|
)
|
||||||
|
|
||||||
except HTTPException:
|
except HTTPException:
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Unexpected error processing image: {e}")
|
logger.error(f"Unexpected error processing image: {e}")
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||||
detail="Error procesando la imagen"
|
detail="Error procesando la imagen"
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_image_path(self, filename: str) -> Path:
|
def get_image_path(self, filename: str) -> Path:
|
||||||
"""Obtiene la ruta completa de una imagen"""
|
"""Obtiene la ruta completa de una imagen"""
|
||||||
return self.storage_path / filename
|
return self.storage_path / filename
|
||||||
|
|
||||||
def image_exists(self, filename: str) -> bool:
|
def image_exists(self, filename: str) -> bool:
|
||||||
"""Verifica si una imagen existe"""
|
"""Verifica si una imagen existe"""
|
||||||
if not filename:
|
if not filename:
|
||||||
return False
|
return False
|
||||||
filepath = self.get_image_path(filename)
|
filepath = self.get_image_path(filename)
|
||||||
return filepath.exists()
|
return filepath.exists()
|
||||||
|
|
||||||
def delete_image(self, filename: str) -> bool:
|
def delete_image(self, filename: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Elimina una imagen del almacenamiento.
|
Elimina una imagen del almacenamiento.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
filename: Nombre del archivo a eliminar
|
filename: Nombre del archivo a eliminar
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True si se eliminó, False si no existía
|
True si se eliminó, False si no existía
|
||||||
"""
|
"""
|
||||||
if not filename:
|
if not filename:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
filepath = self.get_image_path(filename)
|
filepath = self.get_image_path(filename)
|
||||||
try:
|
try:
|
||||||
if filepath.exists():
|
if filepath.exists():
|
||||||
filepath.unlink() # Eliminar archivo
|
filepath.unlink() # Eliminar archivo
|
||||||
logger.info(f"Image deleted: {filename}")
|
logger.info(f"Image deleted: {filename}")
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Image not found for deletion: {filename}")
|
logger.warning(f"Image not found for deletion: {filename}")
|
||||||
return False
|
return False
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error deleting image {filename}: {e}")
|
logger.error(f"Error deleting image {filename}: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_image_url(self, filename: str) -> str:
|
def get_image_url(self, filename: str) -> str:
|
||||||
"""Genera la URL pública para una imagen"""
|
"""Genera la URL pública para una imagen"""
|
||||||
if not filename:
|
if not filename:
|
||||||
return None
|
return None
|
||||||
return f"/images/{filename}"
|
return f"/images/{filename}"
|
||||||
|
|
||||||
|
|
||||||
# Instancia global
|
# Instancia global
|
||||||
image_storage = ImageStorageManager()
|
image_storage = ImageStorageManager()
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ class ReportRepositoryMongo(ReportRepository):
|
|||||||
"lng": report.lng,
|
"lng": report.lng,
|
||||||
"image_filename": report.image_filename,
|
"image_filename": report.image_filename,
|
||||||
"visibilidad": report.visibilidad,
|
"visibilidad": report.visibilidad,
|
||||||
|
"estado": report.estado,
|
||||||
"fecha_creacion": report.fecha_creacion or datetime.utcnow()
|
"fecha_creacion": report.fecha_creacion or datetime.utcnow()
|
||||||
}
|
}
|
||||||
result = self.collection.insert_one(report_dict)
|
result = self.collection.insert_one(report_dict)
|
||||||
@@ -59,6 +60,13 @@ class ReportRepositoryMongo(ReportRepository):
|
|||||||
{"$set": {"visibilidad": new_visibility}}
|
{"$set": {"visibilidad": new_visibility}}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def update_estado(self, report_id: str, new_estado: str) -> None:
|
||||||
|
"""Actualiza el estado de un reporte"""
|
||||||
|
self.collection.update_one(
|
||||||
|
{"id_reporte": report_id},
|
||||||
|
{"$set": {"estado": new_estado}}
|
||||||
|
)
|
||||||
|
|
||||||
def delete(self, report_id: str) -> bool:
|
def delete(self, report_id: str) -> bool:
|
||||||
"""Elimina un reporte"""
|
"""Elimina un reporte"""
|
||||||
result = self.collection.delete_one({"id_reporte": report_id})
|
result = self.collection.delete_one({"id_reporte": report_id})
|
||||||
@@ -83,5 +91,6 @@ class ReportRepositoryMongo(ReportRepository):
|
|||||||
lng=doc.get("lng"),
|
lng=doc.get("lng"),
|
||||||
image_filename=doc.get("image_filename"),
|
image_filename=doc.get("image_filename"),
|
||||||
visibilidad=doc.get("visibilidad"),
|
visibilidad=doc.get("visibilidad"),
|
||||||
|
estado=doc.get("estado", "en proceso"),
|
||||||
fecha_creacion=doc.get("fecha_creacion")
|
fecha_creacion=doc.get("fecha_creacion")
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
"""RabbitMQ adapters for message publishing and consuming"""
|
"""RabbitMQ adapters for message publishing and consuming"""
|
||||||
|
|||||||
@@ -1,58 +1,58 @@
|
|||||||
import pika
|
import pika
|
||||||
import json
|
import json
|
||||||
from typing import Callable, Dict, Any
|
from typing import Callable, Dict, Any
|
||||||
import logging
|
import logging
|
||||||
from sqlalchemy.exc import IntegrityError
|
from sqlalchemy.exc import IntegrityError
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class RabbitMQConsumer:
|
class RabbitMQConsumer:
|
||||||
def __init__(self, queue_name: str, host: str = 'localhost', port: int = 5672):
|
def __init__(self, queue_name: str, host: str = 'localhost', port: int = 5672):
|
||||||
self.queue_name = queue_name
|
self.queue_name = queue_name
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
self.callback = None
|
self.callback = None
|
||||||
|
|
||||||
def set_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None:
|
def set_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None:
|
||||||
self.callback = callback
|
self.callback = callback
|
||||||
|
|
||||||
def start_consuming(self) -> None:
|
def start_consuming(self) -> None:
|
||||||
try:
|
try:
|
||||||
connection = pika.BlockingConnection(
|
connection = pika.BlockingConnection(
|
||||||
pika.ConnectionParameters(host=self.host, port=self.port)
|
pika.ConnectionParameters(host=self.host, port=self.port)
|
||||||
)
|
)
|
||||||
channel = connection.channel()
|
channel = connection.channel()
|
||||||
channel.queue_declare(queue=self.queue_name, durable=True)
|
channel.queue_declare(queue=self.queue_name, durable=True)
|
||||||
|
|
||||||
def callback_wrapper(ch, method, properties, body):
|
def callback_wrapper(ch, method, properties, body):
|
||||||
try:
|
try:
|
||||||
message = json.loads(body.decode('utf-8'))
|
message = json.loads(body.decode('utf-8'))
|
||||||
logger.info(f"Received message from queue '{self.queue_name}': {message}")
|
logger.info(f"Received message from queue '{self.queue_name}': {message}")
|
||||||
|
|
||||||
if self.callback:
|
if self.callback:
|
||||||
self.callback(message)
|
self.callback(message)
|
||||||
|
|
||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||||
|
|
||||||
except IntegrityError as e:
|
except IntegrityError as e:
|
||||||
# Error de negocio: no tiene sentido reintentar
|
# Error de negocio: no tiene sentido reintentar
|
||||||
logger.warning(f"Business error, discarding message: {e}")
|
logger.warning(f"Business error, discarding message: {e}")
|
||||||
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Error transitorio (red, DB caída): sí puede resolverse solo
|
# Error transitorio (red, DB caída): sí puede resolverse solo
|
||||||
logger.error(f"Transient error processing message: {e}")
|
logger.error(f"Transient error processing message: {e}")
|
||||||
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
||||||
|
|
||||||
channel.basic_consume(
|
channel.basic_consume(
|
||||||
queue=self.queue_name,
|
queue=self.queue_name,
|
||||||
on_message_callback=callback_wrapper,
|
on_message_callback=callback_wrapper,
|
||||||
auto_ack=False
|
auto_ack=False
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f"[*] Waiting for messages in queue '{self.queue_name}'. Ctrl+C to exit.")
|
logger.info(f"[*] Waiting for messages in queue '{self.queue_name}'. Ctrl+C to exit.")
|
||||||
channel.start_consuming()
|
channel.start_consuming()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error in consumer: {e}")
|
logger.error(f"Error in consumer: {e}")
|
||||||
raise
|
raise
|
||||||
@@ -1,85 +1,86 @@
|
|||||||
"""Message schemas for RabbitMQ communication"""
|
"""Message schemas for RabbitMQ communication"""
|
||||||
from dataclasses import dataclass, asdict
|
from dataclasses import dataclass, asdict
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
|
||||||
class UserEventType(str, Enum):
|
class UserEventType(str, Enum):
|
||||||
"""Types of user events"""
|
"""Types of user events"""
|
||||||
CREATE = "user.create"
|
CREATE = "user.create"
|
||||||
UPDATE = "user.update"
|
UPDATE = "user.update"
|
||||||
DELETE = "user.delete"
|
DELETE = "user.delete"
|
||||||
|
|
||||||
|
|
||||||
class ReportEventType(str, Enum):
|
class ReportEventType(str, Enum):
|
||||||
"""Types of report events"""
|
"""Types of report events"""
|
||||||
CREATE = "report.create"
|
CREATE = "report.create"
|
||||||
UPDATE_VISIBILITY = "report.update_visibility"
|
UPDATE_VISIBILITY = "report.update_visibility"
|
||||||
DELETE = "report.delete"
|
DELETE = "report.delete"
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class UserMessage:
|
class UserMessage:
|
||||||
"""Message for user events"""
|
"""Message for user events"""
|
||||||
event_type: UserEventType
|
event_type: UserEventType
|
||||||
user_id: Optional[int] = None
|
user_id: Optional[int] = None
|
||||||
nombre: Optional[str] = None
|
nombre: Optional[str] = None
|
||||||
apellido: Optional[str] = None
|
apellido: Optional[str] = None
|
||||||
email: Optional[str] = None
|
email: Optional[str] = None
|
||||||
fecha_nacimiento: Optional[str] = None # ISO format datetime string
|
fecha_nacimiento: Optional[str] = None # ISO format datetime string
|
||||||
fecha_creacion: Optional[str] = None # ISO format datetime string
|
fecha_creacion: Optional[str] = None # ISO format datetime string
|
||||||
calificacion: Optional[float] = None
|
calificacion: Optional[float] = None
|
||||||
numero_reportes: Optional[int] = None
|
numero_reportes: Optional[int] = None
|
||||||
url_foto_perfil: Optional[str] = None
|
url_foto_perfil: Optional[str] = None
|
||||||
biografia: Optional[str] = None
|
biografia: Optional[str] = None
|
||||||
|
|
||||||
def to_dict(self):
|
def to_dict(self):
|
||||||
"""Convert to dictionary"""
|
"""Convert to dictionary"""
|
||||||
data = asdict(self)
|
data = asdict(self)
|
||||||
data['event_type'] = self.event_type.value
|
data['event_type'] = self.event_type.value
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def to_json(self) -> str:
|
def to_json(self) -> str:
|
||||||
"""Convert to JSON string"""
|
"""Convert to JSON string"""
|
||||||
return json.dumps(self.to_dict())
|
return json.dumps(self.to_dict())
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_dict(data: dict) -> 'UserMessage':
|
def from_dict(data: dict) -> 'UserMessage':
|
||||||
"""Create from dictionary"""
|
"""Create from dictionary"""
|
||||||
data['event_type'] = UserEventType(data['event_type'])
|
data['event_type'] = UserEventType(data['event_type'])
|
||||||
return UserMessage(**data)
|
return UserMessage(**data)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ReportMessage:
|
class ReportMessage:
|
||||||
"""Message for report events"""
|
"""Message for report events"""
|
||||||
event_type: ReportEventType
|
event_type: ReportEventType
|
||||||
id_reporte: Optional[str] = None
|
id_reporte: Optional[str] = None
|
||||||
id_usuario: Optional[int] = None
|
id_usuario: Optional[int] = None
|
||||||
tipo_reporte: Optional[int] = None
|
tipo_reporte: Optional[int] = None
|
||||||
descripcion: Optional[str] = None
|
descripcion: Optional[str] = None
|
||||||
ubicacion: Optional[str] = None
|
ubicacion: Optional[str] = None
|
||||||
lat: Optional[float] = None
|
lat: Optional[float] = None
|
||||||
lng: Optional[float] = None
|
lng: Optional[float] = None
|
||||||
image_filename: Optional[str] = None # Nombre del archivo de imagen guardado
|
image_filename: Optional[str] = None # Nombre del archivo de imagen guardado
|
||||||
visibilidad: Optional[float] = None
|
visibilidad: Optional[float] = None
|
||||||
fecha_creacion: Optional[str] = None # ISO format datetime string
|
estado: Optional[str] = None # Estado del reporte: "en proceso", "no resuelto", "resuelto"
|
||||||
penalize_author: Optional[bool] = None # For update_visibility event
|
fecha_creacion: Optional[str] = None # ISO format datetime string
|
||||||
|
penalize_author: Optional[bool] = None # For update_visibility event
|
||||||
def to_dict(self):
|
|
||||||
"""Convert to dictionary"""
|
def to_dict(self):
|
||||||
data = asdict(self)
|
"""Convert to dictionary"""
|
||||||
data['event_type'] = self.event_type.value
|
data = asdict(self)
|
||||||
return data
|
data['event_type'] = self.event_type.value
|
||||||
|
return data
|
||||||
def to_json(self) -> str:
|
|
||||||
"""Convert to JSON string"""
|
def to_json(self) -> str:
|
||||||
return json.dumps(self.to_dict())
|
"""Convert to JSON string"""
|
||||||
|
return json.dumps(self.to_dict())
|
||||||
@staticmethod
|
|
||||||
def from_dict(data: dict) -> 'ReportMessage':
|
@staticmethod
|
||||||
"""Create from dictionary"""
|
def from_dict(data: dict) -> 'ReportMessage':
|
||||||
data['event_type'] = ReportEventType(data['event_type'])
|
"""Create from dictionary"""
|
||||||
return ReportMessage(**data)
|
data['event_type'] = ReportEventType(data['event_type'])
|
||||||
|
return ReportMessage(**data)
|
||||||
|
|||||||
@@ -1,74 +1,74 @@
|
|||||||
"""RabbitMQ message sender"""
|
"""RabbitMQ message sender"""
|
||||||
import pika
|
import pika
|
||||||
import json
|
import json
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class RabbitMQSender:
|
class RabbitMQSender:
|
||||||
"""Generic RabbitMQ sender for publishing messages to queues"""
|
"""Generic RabbitMQ sender for publishing messages to queues"""
|
||||||
|
|
||||||
def __init__(self, host: str = 'localhost', port: int = 5672):
|
def __init__(self, host: str = 'localhost', port: int = 5672):
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
|
|
||||||
def send_message(self, queue_name: str, message: Dict[str, Any]) -> bool:
|
def send_message(self, queue_name: str, message: Dict[str, Any]) -> bool:
|
||||||
"""
|
"""
|
||||||
Sends a message to a RabbitMQ queue
|
Sends a message to a RabbitMQ queue
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
queue_name: Name of the queue to send to
|
queue_name: Name of the queue to send to
|
||||||
message: Dictionary containing the message data
|
message: Dictionary containing the message data
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True if successful, False otherwise
|
True if successful, False otherwise
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
connection = pika.BlockingConnection(
|
connection = pika.BlockingConnection(
|
||||||
pika.ConnectionParameters(host=self.host, port=self.port)
|
pika.ConnectionParameters(host=self.host, port=self.port)
|
||||||
)
|
)
|
||||||
channel = connection.channel()
|
channel = connection.channel()
|
||||||
|
|
||||||
# Declare queue to ensure it exists
|
# Declare queue to ensure it exists
|
||||||
channel.queue_declare(queue=queue_name, durable=True)
|
channel.queue_declare(queue=queue_name, durable=True)
|
||||||
|
|
||||||
# Convert message to JSON
|
# Convert message to JSON
|
||||||
message_json = json.dumps(message)
|
message_json = json.dumps(message)
|
||||||
|
|
||||||
# Publish the message
|
# Publish the message
|
||||||
channel.basic_publish(
|
channel.basic_publish(
|
||||||
exchange='',
|
exchange='',
|
||||||
routing_key=queue_name,
|
routing_key=queue_name,
|
||||||
body=message_json,
|
body=message_json,
|
||||||
properties=pika.BasicProperties(
|
properties=pika.BasicProperties(
|
||||||
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
|
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
connection.close()
|
connection.close()
|
||||||
logger.info(f"Message sent to queue '{queue_name}': {message_json}")
|
logger.info(f"Message sent to queue '{queue_name}': {message_json}")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error sending message to RabbitMQ: {e}")
|
logger.error(f"Error sending message to RabbitMQ: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def send_to_queue(queue_name: str, message: Dict[str, Any],
|
def send_to_queue(queue_name: str, message: Dict[str, Any],
|
||||||
host: str = 'localhost', port: int = 5672) -> bool:
|
host: str = 'localhost', port: int = 5672) -> bool:
|
||||||
"""
|
"""
|
||||||
Convenience function to send a message to RabbitMQ
|
Convenience function to send a message to RabbitMQ
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
queue_name: Name of the queue
|
queue_name: Name of the queue
|
||||||
message: Message dictionary
|
message: Message dictionary
|
||||||
host: RabbitMQ host
|
host: RabbitMQ host
|
||||||
port: RabbitMQ port
|
port: RabbitMQ port
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True if successful, False otherwise
|
True if successful, False otherwise
|
||||||
"""
|
"""
|
||||||
sender = RabbitMQSender(host=host, port=port)
|
sender = RabbitMQSender(host=host, port=port)
|
||||||
return sender.send_message(queue_name, message)
|
return sender.send_message(queue_name, message)
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
from fastapi import APIRouter, HTTPException, status, UploadFile, File, Form
|
from fastapi import APIRouter, HTTPException, status, UploadFile, File, Form
|
||||||
from infrastructure.api.reports.schemas import ReportCreateRequest, ReportUpdateVisibilityRequest, ReportResponse
|
from infrastructure.api.reports.schemas import ReportCreateRequest, ReportUpdateVisibilityRequest, ReportUpdateStatusRequest, ReportResponse
|
||||||
from application.services.report_services import (
|
from application.services.report_services import (
|
||||||
CreateReport, GetReportById, GetReportsByUser, ListAllReports,
|
CreateReport, GetReportById, GetReportsByUser, ListAllReports,
|
||||||
UpdateReportVisibility, GetShadowbannedReports, DeleteReport
|
UpdateReportVisibility, GetShadowbannedReports, DeleteReport, UpdateReportStatus
|
||||||
)
|
)
|
||||||
from infrastructure.adapters.persistence.report_repository_mongo import ReportRepositoryMongo
|
from infrastructure.adapters.persistence.report_repository_mongo import ReportRepositoryMongo
|
||||||
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
|
from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL
|
||||||
@@ -28,6 +28,7 @@ def _report_to_response(report) -> dict:
|
|||||||
"lng": report.lng,
|
"lng": report.lng,
|
||||||
"image_url": image_storage.get_image_url(report.image_filename) if report.image_filename else None,
|
"image_url": image_storage.get_image_url(report.image_filename) if report.image_filename else None,
|
||||||
"visibilidad": report.visibilidad,
|
"visibilidad": report.visibilidad,
|
||||||
|
"estado": report.estado,
|
||||||
"fecha_creacion": report.fecha_creacion
|
"fecha_creacion": report.fecha_creacion
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -39,6 +40,7 @@ async def create_report(
|
|||||||
ubicacion: Optional[str] = Form(None),
|
ubicacion: Optional[str] = Form(None),
|
||||||
lat: Optional[float] = Form(None),
|
lat: Optional[float] = Form(None),
|
||||||
lng: Optional[float] = Form(None),
|
lng: Optional[float] = Form(None),
|
||||||
|
estado: str = Form("en proceso"),
|
||||||
file: Optional[UploadFile] = File(None)
|
file: Optional[UploadFile] = File(None)
|
||||||
):
|
):
|
||||||
"""Crea un nuevo reporte - envía a cola de procesamiento con validaciones previas"""
|
"""Crea un nuevo reporte - envía a cola de procesamiento con validaciones previas"""
|
||||||
@@ -57,7 +59,8 @@ async def create_report(
|
|||||||
ubicacion=ubicacion,
|
ubicacion=ubicacion,
|
||||||
lat=lat,
|
lat=lat,
|
||||||
lng=lng,
|
lng=lng,
|
||||||
image_filename=image_filename
|
image_filename=image_filename,
|
||||||
|
estado=estado
|
||||||
)
|
)
|
||||||
|
|
||||||
if result["status"] == "error":
|
if result["status"] == "error":
|
||||||
@@ -225,3 +228,40 @@ async def delete_report(report_id: str):
|
|||||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||||
detail="Error interno del servidor"
|
detail="Error interno del servidor"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@router.put("/{report_id}/status", status_code=status.HTTP_200_OK)
|
||||||
|
async def update_report_status(report_id: str, status_data: ReportUpdateStatusRequest):
|
||||||
|
"""Actualiza el estado de un reporte"""
|
||||||
|
try:
|
||||||
|
update_use_case = UpdateReportStatus(report_repo)
|
||||||
|
result = update_use_case.execute(
|
||||||
|
report_id=report_id,
|
||||||
|
new_estado=status_data.estado
|
||||||
|
)
|
||||||
|
|
||||||
|
if result["status"] == "error":
|
||||||
|
message = result["message"]
|
||||||
|
if "no existe" in message:
|
||||||
|
# 404 Not Found: reporte no existe
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
|
detail=message
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# 400 Bad Request: error de validación
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
|
detail=message
|
||||||
|
)
|
||||||
|
|
||||||
|
# 200 OK: estado actualizado correctamente
|
||||||
|
return result
|
||||||
|
|
||||||
|
except HTTPException:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error al actualizar estado del reporte {report_id}: {e}", exc_info=True)
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||||
|
detail="Error interno del servidor"
|
||||||
|
)
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional
|
from typing import Optional, Literal
|
||||||
from fastapi import Form, UploadFile, File
|
from fastapi import Form, UploadFile, File
|
||||||
|
|
||||||
class ReportCreateRequest(BaseModel):
|
class ReportCreateRequest(BaseModel):
|
||||||
@@ -11,6 +11,7 @@ class ReportCreateRequest(BaseModel):
|
|||||||
ubicacion: Optional[str] = None
|
ubicacion: Optional[str] = None
|
||||||
lat: Optional[float] = None
|
lat: Optional[float] = None
|
||||||
lng: Optional[float] = None
|
lng: Optional[float] = None
|
||||||
|
estado: Literal["en proceso", "no resuelto", "resuelto"] = "en proceso" # Estado del reporte
|
||||||
# file se recibe como UploadFile en el endpoint, no en el modelo
|
# file se recibe como UploadFile en el endpoint, no en el modelo
|
||||||
|
|
||||||
class ReportUpdateVisibilityRequest(BaseModel):
|
class ReportUpdateVisibilityRequest(BaseModel):
|
||||||
@@ -18,6 +19,10 @@ class ReportUpdateVisibilityRequest(BaseModel):
|
|||||||
new_visibility: float
|
new_visibility: float
|
||||||
penalize_author: bool = False
|
penalize_author: bool = False
|
||||||
|
|
||||||
|
class ReportUpdateStatusRequest(BaseModel):
|
||||||
|
"""Solicitud para actualizar el estado de un reporte"""
|
||||||
|
estado: Literal["en proceso", "no resuelto", "resuelto"]
|
||||||
|
|
||||||
class ReportResponse(BaseModel):
|
class ReportResponse(BaseModel):
|
||||||
"""Respuesta con datos de reporte"""
|
"""Respuesta con datos de reporte"""
|
||||||
id_reporte: str
|
id_reporte: str
|
||||||
@@ -29,6 +34,7 @@ class ReportResponse(BaseModel):
|
|||||||
lng: Optional[float]
|
lng: Optional[float]
|
||||||
image_url: Optional[str] = None # URL pública para acceder a la imagen
|
image_url: Optional[str] = None # URL pública para acceder a la imagen
|
||||||
visibilidad: float
|
visibilidad: float
|
||||||
|
estado: Literal["en proceso", "no resuelto", "resuelto"] = "en proceso" # Estado del reporte
|
||||||
fecha_creacion: Optional[datetime]
|
fecha_creacion: Optional[datetime]
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
|
|||||||
Reference in New Issue
Block a user