From 7331dcb086bdcaafa85c32d2151ed6ff4a4bfe5b Mon Sep 17 00:00:00 2001 From: "Juan M. Ley" Date: Wed, 6 May 2026 01:30:20 -0600 Subject: [PATCH] Fixed metrics api to recieve all metrics --- src/consumers/metrics_consumer.py | 115 +++++------ .../adapters/rabbitmq/sender.py | 53 ++--- .../api/moderations/moderations.py | 137 +++++-------- src/infrastructure/api/reports/reports.py | 193 ++++++------------ src/infrastructure/api/users/users.py | 174 ++++------------ 5 files changed, 213 insertions(+), 459 deletions(-) diff --git a/src/consumers/metrics_consumer.py b/src/consumers/metrics_consumer.py index 13e42cb..cd687e1 100644 --- a/src/consumers/metrics_consumer.py +++ b/src/consumers/metrics_consumer.py @@ -4,7 +4,6 @@ import os import logging from datetime import datetime -# Add src to path to import modules sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer @@ -14,7 +13,6 @@ from infrastructure.adapters.rabbitmq.messages import ( from application.services.metrics_services import MetricsService from infrastructure.adapters.persistence.metrics_repository_postgres import MetricsRepositoryPostgres -# Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', @@ -25,27 +23,32 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) +# Colas dedicadas para métricas — no compiten con los consumers de dominio +METRICS_QUEUES = { + 'user': 'metrics_users_queue', + 'report': 'metrics_reports_queue', + 'notification': 'metrics_notifications_queue', + 'moderation': 'metrics_moderations_queue', +} + class MetricsConsumer: """Consumer for all system events from RabbitMQ""" - + def __init__(self): self.metrics_service = MetricsService(MetricsRepositoryPostgres()) - - # Create separate consumers for each event type - self.user_consumer = RabbitMQConsumer(queue_name='users_queue') - self.report_consumer = RabbitMQConsumer(queue_name='reports_queue') - self.notification_consumer = RabbitMQConsumer(queue_name='notifications_queue') - self.moderation_consumer = RabbitMQConsumer(queue_name='moderations_queue') - - # Set callbacks + + self.user_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['user']) + self.report_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['report']) + self.notification_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['notification']) + self.moderation_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['moderation']) + self.user_consumer.set_callback(self.process_user_event) self.report_consumer.set_callback(self.process_report_event) self.notification_consumer.set_callback(self.process_notification_event) self.moderation_consumer.set_callback(self.process_moderation_event) - + def process_user_event(self, message_dict: dict): - """Procesa eventos de usuario""" try: message = UserMessage.from_dict(message_dict) self.metrics_service.record_event( @@ -58,107 +61,83 @@ class MetricsConsumer: logger.info(f"Métrica registrada: user_{message.event_type.value}") except Exception as e: logger.error(f"Error procesando evento de usuario: {e}") - + def process_report_event(self, message_dict: dict): - """Procesa eventos de reportes""" try: message = ReportMessage.from_dict(message_dict) self.metrics_service.record_event( event_type=f"report_{message.event_type.value}", - entity_id=str(message.report_id), + entity_id=str(message.id_reporte), entity_type="report", - user_id=message.user_id, - metadata={"status": message.status} + user_id=message.id_usuario, + metadata={"estado": message.estado} ) logger.info(f"Métrica registrada: report_{message.event_type.value}") except Exception as e: logger.error(f"Error procesando evento de reporte: {e}") - + def process_notification_event(self, message_dict: dict): - """Procesa eventos de notificaciones""" try: message = NotificationMessage.from_dict(message_dict) self.metrics_service.record_event( event_type=f"notification_{message.event_type.value}", - entity_id=str(message.notification_id), + entity_id=str(message.id_notificacion), entity_type="notification", - user_id=message.user_id, - metadata={"type": message.notification_type} + user_id=message.id_usuario, + metadata={"type": message.tipo_notificacion} ) logger.info(f"Métrica registrada: notification_{message.event_type.value}") except Exception as e: logger.error(f"Error procesando evento de notificación: {e}") - + def process_moderation_event(self, message_dict: dict): - """Procesa eventos de moderación""" try: message = ModerationMessage.from_dict(message_dict) self.metrics_service.record_event( event_type=f"moderation_{message.event_type.value}", - entity_id=str(message.moderation_id), + entity_id=str(message.action_id), entity_type="moderation", user_id=message.moderator_id, - metadata={"action": message.action} + metadata={"action": message.review_action, "reason": message.reason} ) logger.info(f"Métrica registrada: moderation_{message.event_type.value}") except Exception as e: logger.error(f"Error procesando evento de moderación: {e}") - + def start(self): - """Inicia todos los consumers""" logger.info("Iniciando Metrics Consumer...") try: - # Start consumers in separate threads import threading - - def start_user_consumer(): - try: - self.user_consumer.start_consuming() - except Exception as e: - logger.error(f"Error en user consumer: {e}") - - def start_report_consumer(): - try: - self.report_consumer.start_consuming() - except Exception as e: - logger.error(f"Error en report consumer: {e}") - - def start_notification_consumer(): - try: - self.notification_consumer.start_consuming() - except Exception as e: - logger.error(f"Error en notification consumer: {e}") - - def start_moderation_consumer(): - try: - self.moderation_consumer.start_consuming() - except Exception as e: - logger.error(f"Error en moderation consumer: {e}") - + threads = [ - threading.Thread(target=start_user_consumer, daemon=True), - threading.Thread(target=start_report_consumer, daemon=True), - threading.Thread(target=start_notification_consumer, daemon=True), - threading.Thread(target=start_moderation_consumer, daemon=True), + threading.Thread(target=self._safe_consume, args=(self.user_consumer,), daemon=True), + threading.Thread(target=self._safe_consume, args=(self.report_consumer,), daemon=True), + threading.Thread(target=self._safe_consume, args=(self.notification_consumer,), daemon=True), + threading.Thread(target=self._safe_consume, args=(self.moderation_consumer,), daemon=True), ] for t in threads: t.start() - - # Keep main thread alive for t in threads: t.join() except KeyboardInterrupt: logger.info("Metrics Consumer detenido") self.stop() - + + def _safe_consume(self, consumer: RabbitMQConsumer): + try: + consumer.start_consuming() + except Exception as e: + logger.error(f"Error en consumer de {consumer.queue_name}: {e}") + def stop(self): - """Detiene los consumers""" - self.user_consumer.stop() - self.report_consumer.stop() - self.notification_consumer.stop() - self.moderation_consumer.stop() + for c in [self.user_consumer, self.report_consumer, + self.notification_consumer, self.moderation_consumer]: + try: + c.stop() + except Exception: + pass if __name__ == "__main__": consumer = MetricsConsumer() - consumer.start() + consumer.start() \ No newline at end of file diff --git a/src/infrastructure/adapters/rabbitmq/sender.py b/src/infrastructure/adapters/rabbitmq/sender.py index f412277..618da33 100644 --- a/src/infrastructure/adapters/rabbitmq/sender.py +++ b/src/infrastructure/adapters/rabbitmq/sender.py @@ -3,41 +3,34 @@ import pika import json from typing import Any, Dict import logging +import os logger = logging.getLogger(__name__) +def _get_rabbitmq_host() -> str: + try: + from core.config import ConfSettings + return ConfSettings.rabbitmq + except Exception: + return os.getenv("RABBITMQ_URI", "localhost") + + class RabbitMQSender: """Generic RabbitMQ sender for publishing messages to queues""" - - def __init__(self, host: str = 'localhost', port: int = 5672): - self.host = host + + def __init__(self, host: str = None, port: int = 5672): + self.host = host or _get_rabbitmq_host() self.port = port - + def send_message(self, queue_name: str, message: Dict[str, Any]) -> bool: - """ - Sends a message to a RabbitMQ queue - - Args: - queue_name: Name of the queue to send to - message: Dictionary containing the message data - - Returns: - True if successful, False otherwise - """ try: connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.host, port=self.port) ) channel = connection.channel() - - # Declare queue to ensure it exists channel.queue_declare(queue=queue_name, durable=True) - - # Convert message to JSON message_json = json.dumps(message) - - # Publish the message channel.basic_publish( exchange='', routing_key=queue_name, @@ -46,29 +39,15 @@ class RabbitMQSender: delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE ) ) - connection.close() logger.info(f"Message sent to queue '{queue_name}': {message_json}") return True - except Exception as e: logger.error(f"Error sending message to RabbitMQ: {e}") return False -def send_to_queue(queue_name: str, message: Dict[str, Any], - host: str = 'localhost', port: int = 5672) -> bool: - """ - Convenience function to send a message to RabbitMQ - - Args: - queue_name: Name of the queue - message: Message dictionary - host: RabbitMQ host - port: RabbitMQ port - - Returns: - True if successful, False otherwise - """ +def send_to_queue(queue_name: str, message: Dict[str, Any], + host: str = None, port: int = 5672) -> bool: sender = RabbitMQSender(host=host, port=port) - return sender.send_message(queue_name, message) + return sender.send_message(queue_name, message) \ No newline at end of file diff --git a/src/infrastructure/api/moderations/moderations.py b/src/infrastructure/api/moderations/moderations.py index 53ee083..2eec3a7 100644 --- a/src/infrastructure/api/moderations/moderations.py +++ b/src/infrastructure/api/moderations/moderations.py @@ -2,76 +2,67 @@ from fastapi import APIRouter, HTTPException, Depends from fastapi.responses import JSONResponse from infrastructure.api.moderations.schemas import ( - DeleteReportRequest, - CloseAccountRequest, - BanUserRequest, - WarnUserRequest, - ReviewContentRequest, - ModerationActionResponse + DeleteReportRequest, CloseAccountRequest, BanUserRequest, + WarnUserRequest, ReviewContentRequest, ModerationActionResponse ) from infrastructure.api.auth import get_current_admin_user from application.services.moderation_services import ( - DeleteReportUseCase, - CloseAccountUseCase, - BanUserUseCase, - WarnUserUseCase, - ReviewContentUseCase + DeleteReportUseCase, CloseAccountUseCase, BanUserUseCase, + WarnUserUseCase, ReviewContentUseCase ) from infrastructure.adapters.persistence.mongodb import mongodb from infrastructure.adapters.persistence.db import get_db from infrastructure.adapters.moderation_repository_mongo import ModerationRepositoryMongo +from infrastructure.adapters.rabbitmq.sender import send_to_queue +from infrastructure.adapters.rabbitmq.messages import ModerationMessage, ModerationEventType from domain.users import User from sqlalchemy.orm import Session import logging +from datetime import datetime logger = logging.getLogger(__name__) - router = APIRouter() - - -# Instanciar repositorio de moderación moderation_repo = ModerationRepositoryMongo() +def _publish_moderation_event(event_type: ModerationEventType, **kwargs): + try: + msg = ModerationMessage(event_type=event_type, fecha_creacion=datetime.utcnow().isoformat(), **kwargs) + payload = msg.to_dict() + send_to_queue('moderations_queue', payload) + send_to_queue('metrics_moderations_queue', payload) + except Exception as e: + logger.warning(f"Error publicando evento de moderación a RabbitMQ: {e}") + + @router.post("/reports/delete", response_model=ModerationActionResponse) -async def delete_report( - request: DeleteReportRequest, - current_admin: User = Depends(get_current_admin_user) -): +async def delete_report(request: DeleteReportRequest, current_admin: User = Depends(get_current_admin_user)): """ Eliminar un reporte como moderador (requiere permisos de admin) - + - **report_id**: ID del reporte a eliminar - **reason**: Razón de la eliminación (mínimo 5 caracteres) - **description**: Descripción adicional (opcional) """ try: use_case = DeleteReportUseCase(moderation_repo) - result = use_case.execute( - moderator_id=current_admin.user_id, - report_id=request.report_id, - reason=request.reason, - description=request.description - ) - + result = use_case.execute(moderator_id=current_admin.user_id, report_id=request.report_id, reason=request.reason, description=request.description) if result["status"] == "error": raise HTTPException(status_code=400, detail=result["message"]) - + _publish_moderation_event(ModerationEventType.DELETE_REPORT, moderator_id=current_admin.user_id, report_id=request.report_id, reason=request.reason, description=request.description, action_id=result.get("action_id")) return ModerationActionResponse(**result) - + except HTTPException: + raise except Exception as e: logger.error(f"Error in delete_report: {e}") raise HTTPException(status_code=500, detail="Error interno del servidor") @router.post("/accounts/close", response_model=ModerationActionResponse) -async def close_account( - request: CloseAccountRequest, - current_admin: User = Depends(get_current_admin_user) -): +async def close_account(request: CloseAccountRequest, current_admin: User = Depends(get_current_admin_user)): """ Cerrar una cuenta de usuario (requiere permisos de admin) - + - **user_id**: ID del usuario cuya cuenta cerrar - **reason**: Razón del cierre (mínimo 5 caracteres) - **description**: Descripción adicional (opcional) @@ -79,32 +70,23 @@ async def close_account( """ try: use_case = CloseAccountUseCase(moderation_repo) - result = use_case.execute( - moderator_id=current_admin.user_id, - user_id=request.user_id, - reason=request.reason, - description=request.description, - is_permanent=request.is_permanent - ) - + result = use_case.execute(moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, description=request.description, is_permanent=request.is_permanent) if result["status"] == "error": raise HTTPException(status_code=400, detail=result["message"]) - + _publish_moderation_event(ModerationEventType.CLOSE_ACCOUNT, moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, description=request.description, is_permanent=request.is_permanent, action_id=result.get("action_id")) return ModerationActionResponse(**result) - + except HTTPException: + raise except Exception as e: logger.error(f"Error in close_account: {e}") raise HTTPException(status_code=500, detail="Error interno del servidor") @router.post("/users/ban", response_model=ModerationActionResponse) -async def ban_user( - request: BanUserRequest, - current_admin: User = Depends(get_current_admin_user) -): +async def ban_user(request: BanUserRequest, current_admin: User = Depends(get_current_admin_user)): """ Banear a un usuario (requiere permisos de admin) - + - **user_id**: ID del usuario a banear - **reason**: Razón del ban (mínimo 5 caracteres) - **duration_days**: Duración del ban en días (None para permanente) @@ -112,63 +94,46 @@ async def ban_user( """ try: use_case = BanUserUseCase(moderation_repo) - result = use_case.execute( - moderator_id=current_admin.user_id, - user_id=request.user_id, - reason=request.reason, - duration_days=request.duration_days, - description=request.description - ) - + result = use_case.execute(moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, duration_days=request.duration_days, description=request.description) if result["status"] == "error": raise HTTPException(status_code=400, detail=result["message"]) - + _publish_moderation_event(ModerationEventType.BAN_USER, moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, duration_days=request.duration_days, description=request.description, is_permanent=request.duration_days is None, action_id=result.get("action_id")) return ModerationActionResponse(**result) - + except HTTPException: + raise except Exception as e: logger.error(f"Error in ban_user: {e}") raise HTTPException(status_code=500, detail="Error interno del servidor") @router.post("/users/warn", response_model=ModerationActionResponse) -async def warn_user( - request: WarnUserRequest, - current_admin: User = Depends(get_current_admin_user) -): +async def warn_user(request: WarnUserRequest, current_admin: User = Depends(get_current_admin_user)): """ Advertir a un usuario (requiere permisos de admin) - + - **user_id**: ID del usuario a advertir - **reason**: Razón de la advertencia (mínimo 5 caracteres) - **description**: Descripción adicional (opcional) """ try: use_case = WarnUserUseCase(moderation_repo) - result = use_case.execute( - moderator_id=current_admin.user_id, - user_id=request.user_id, - reason=request.reason, - description=request.description - ) - + result = use_case.execute(moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, description=request.description) if result["status"] == "error": raise HTTPException(status_code=400, detail=result["message"]) - + _publish_moderation_event(ModerationEventType.WARN_USER, moderator_id=current_admin.user_id, user_id=request.user_id, reason=request.reason, description=request.description, action_id=result.get("action_id")) return ModerationActionResponse(**result) - + except HTTPException: + raise except Exception as e: logger.error(f"Error in warn_user: {e}") raise HTTPException(status_code=500, detail="Error interno del servidor") @router.post("/content/review", response_model=ModerationActionResponse) -async def review_content( - request: ReviewContentRequest, - current_admin: User = Depends(get_current_admin_user) -): +async def review_content(request: ReviewContentRequest, current_admin: User = Depends(get_current_admin_user)): """ Revisar y actuar sobre contenido reportado (requiere permisos de admin) - + - **report_id**: ID del reporte a revisar - **action**: Acción a tomar (approve, reject, needs_more_info) - **reason**: Razón de la decisión (opcional) @@ -176,19 +141,13 @@ async def review_content( """ try: use_case = ReviewContentUseCase(moderation_repo) - result = use_case.execute( - moderator_id=current_admin.user_id, - report_id=request.report_id, - action=request.action, - reason=request.reason, - notes=request.notes - ) - + result = use_case.execute(moderator_id=current_admin.user_id, report_id=request.report_id, action=request.action, reason=request.reason, notes=request.notes) if result["status"] == "error": raise HTTPException(status_code=400, detail=result["message"]) - + _publish_moderation_event(ModerationEventType.REVIEW_CONTENT, moderator_id=current_admin.user_id, report_id=request.report_id, review_action=request.action, reason=request.reason, notes=request.notes, action_id=result.get("action_id")) return ModerationActionResponse(**result) - + except HTTPException: + raise except Exception as e: logger.error(f"Error in review_content: {e}") - raise HTTPException(status_code=500, detail="Error interno del servidor") + raise HTTPException(status_code=500, detail="Error interno del servidor") \ No newline at end of file diff --git a/src/infrastructure/api/reports/reports.py b/src/infrastructure/api/reports/reports.py index 9f16d8a..2665cb6 100644 --- a/src/infrastructure/api/reports/reports.py +++ b/src/infrastructure/api/reports/reports.py @@ -8,7 +8,10 @@ from infrastructure.adapters.persistence.report_repository_mongo import ReportRe from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL from infrastructure.adapters.file_storage import image_storage from infrastructure.adapters.rabbitmq.sender import send_to_queue -from infrastructure.adapters.rabbitmq.messages import NotificationMessage, NotificationEventType +from infrastructure.adapters.rabbitmq.messages import ( + NotificationMessage, NotificationEventType, + ReportMessage, ReportEventType +) import logging from typing import Optional from datetime import datetime @@ -21,7 +24,6 @@ logger = logging.getLogger(__name__) def _report_to_response(report) -> dict: - """Convierte un objeto Report a dict con image_url""" return { "id_reporte": report.id_reporte, "id_usuario": report.id_usuario, @@ -36,6 +38,17 @@ def _report_to_response(report) -> dict: "fecha_creacion": report.fecha_creacion } + +def _publish_report_event(event_type: ReportEventType, **kwargs): + try: + msg = ReportMessage(event_type=event_type, **kwargs) + payload = msg.to_dict() + send_to_queue('reports_queue', payload) + send_to_queue('metrics_reports_queue', payload) + except Exception as e: + logger.warning(f"Error publicando evento de reporte a RabbitMQ: {e}") + + @router.post("/", status_code=status.HTTP_202_ACCEPTED) async def create_report( id_usuario: int = Form(...), @@ -49,54 +62,38 @@ async def create_report( ): """Crea un nuevo reporte - envía a cola de procesamiento con validaciones previas""" try: - # Procesar imagen si fue proporcionada image_filename = None if file: logger.info(f"Processing image file: {file.filename} ({file.content_type})") image_filename = image_storage.validate_and_save_image(file, f"temp_{id_usuario}_{tipo_reporte}") - + create_use_case = CreateReport(report_repo, user_repo) result = create_use_case.execute( - id_usuario=id_usuario, - tipo_reporte=tipo_reporte, - descripcion=descripcion, - ubicacion=ubicacion, - lat=lat, - lng=lng, - image_filename=image_filename, - estado=estado + id_usuario=id_usuario, tipo_reporte=tipo_reporte, descripcion=descripcion, + ubicacion=ubicacion, lat=lat, lng=lng, image_filename=image_filename, estado=estado ) - + if result["status"] == "error": - # Si hay error, eliminar imagen si fue guardada if image_filename: image_storage.delete_image(image_filename) - message = result["message"] if "no existe" in message: - # 404 Not Found: usuario no existe - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=message - ) - else: - # 400 Bad Request: error de validación - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=message - ) - - # 202 Accepted: enviado a la cola correctamente + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) + + _publish_report_event( + ReportEventType.CREATE, + id_reporte=result.get("id_reporte"), id_usuario=id_usuario, tipo_reporte=tipo_reporte, + descripcion=descripcion, ubicacion=ubicacion, lat=lat, lng=lng, estado=estado, + fecha_creacion=datetime.utcnow().isoformat() + ) return result - + except HTTPException: raise except Exception as e: logger.error(f"Error inesperado en create_report: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") @router.get("/{report_id}", response_model=ReportResponse) async def get_report(report_id: str): @@ -105,19 +102,13 @@ async def get_report(report_id: str): get_use_case = GetReportById(report_repo) report = get_use_case.execute(report_id) if not report: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Reporte con ID {report_id} no encontrado" - ) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Reporte con ID {report_id} no encontrado") return _report_to_response(report) except HTTPException: raise except Exception as e: logger.error(f"Error al obtener reporte {report_id}: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") @router.get("/user/{user_id}") async def get_user_reports(user_id: int): @@ -128,10 +119,7 @@ async def get_user_reports(user_id: int): return [_report_to_response(report) for report in reports] except Exception as e: logger.error(f"Error al obtener reportes del usuario {user_id}: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") @router.get("/") async def list_reports(): @@ -142,10 +130,7 @@ async def list_reports(): return [_report_to_response(report) for report in reports] except Exception as e: logger.error(f"Error al listar reportes: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") @router.get("/shadowbanned/list") async def get_shadowbanned_reports(threshold: float = 20): @@ -156,48 +141,26 @@ async def get_shadowbanned_reports(threshold: float = 20): return [_report_to_response(report) for report in reports] except Exception as e: logger.error(f"Error al obtener reportes shadowbaneados: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") @router.put("/{report_id}/visibility", status_code=status.HTTP_202_ACCEPTED) async def update_report_visibility(report_id: str, visibility_data: ReportUpdateVisibilityRequest): """Actualiza la visibilidad de un reporte - envía a cola de procesamiento con validaciones previas""" try: update_use_case = UpdateReportVisibility(report_repo, user_repo) - result = update_use_case.execute( - report_id=report_id, - new_visibility=visibility_data.new_visibility, - penalize_author=visibility_data.penalize_author - ) - + result = update_use_case.execute(report_id=report_id, new_visibility=visibility_data.new_visibility, penalize_author=visibility_data.penalize_author) if result["status"] == "error": message = result["message"] if "no existe" in message: - # 404 Not Found: reporte no existe - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=message - ) - else: - # 400 Bad Request: error de validación - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=message - ) - - # 202 Accepted: enviado a la cola correctamente + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) + _publish_report_event(ReportEventType.UPDATE_VISIBILITY, id_reporte=report_id, visibilidad=visibility_data.new_visibility, penalize_author=visibility_data.penalize_author) return result - except HTTPException: raise except Exception as e: logger.error(f"Error al actualizar visibilidad del reporte {report_id}: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") @router.delete("/{report_id}", status_code=status.HTTP_202_ACCEPTED) async def delete_report(report_id: str): @@ -205,69 +168,38 @@ async def delete_report(report_id: str): try: delete_use_case = DeleteReport(report_repo) result = delete_use_case.execute(report_id) - if result["status"] == "error": message = result["message"] if "no existe" in message: - # 404 Not Found: reporte no existe - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=message - ) - else: - # 400 Bad Request: error de validación - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=message - ) - - # 202 Accepted: enviado a la cola correctamente + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) + _publish_report_event(ReportEventType.DELETE, id_reporte=report_id) return result - except HTTPException: raise except Exception as e: logger.error(f"Error al eliminar reporte {report_id}: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") @router.put("/{report_id}/status", status_code=status.HTTP_200_OK) async def update_report_status(report_id: str, status_data: ReportUpdateStatusRequest): """Actualiza el estado de un reporte y envía notificación al usuario""" try: - # Obtener el reporte actual para saber el usuario creador report = report_repo.find_by_id(report_id) if not report: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Reporte con ID {report_id} no encontrado" - ) - - # Actualizar el estado + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Reporte con ID {report_id} no encontrado") + update_use_case = UpdateReportStatus(report_repo) - result = update_use_case.execute( - report_id=report_id, - new_estado=status_data.estado - ) - + result = update_use_case.execute(report_id=report_id, new_estado=status_data.estado) + if result["status"] == "error": message = result["message"] if "no existe" in message: - # 404 Not Found: reporte no existe - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=message - ) - else: - # 400 Bad Request: error de validación - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=message - ) - - # Enviar notificación al usuario creador del reporte + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) + + _publish_report_event(ReportEventType.UPDATE_STATUS, id_reporte=report_id, id_usuario=report.id_usuario, estado=status_data.estado) + try: notification_message = NotificationMessage( event_type=NotificationEventType.REPORT_STATUS_CHANGE, @@ -280,25 +212,16 @@ async def update_report_status(report_id: str, status_data: ReportUpdateStatusRe estado_reporte=status_data.estado, fecha_creacion=datetime.utcnow().isoformat() ) - - # Enviar a la cola de notificaciones - send_to_queue( - queue_name='notifications_queue', - message=notification_message.to_dict() - ) + send_to_queue('notifications_queue', notification_message.to_dict()) + send_to_queue('metrics_notifications_queue', notification_message.to_dict()) logger.info(f"Notification sent to user {report.id_usuario} for report {report_id}") except Exception as notification_error: logger.warning(f"Error sending notification for report {report_id}: {notification_error}") - # No fallar la actualización si hay error en notificación - - # 200 OK: estado actualizado correctamente + return result - + except HTTPException: raise except Exception as e: logger.error(f"Error al actualizar estado del reporte {report_id}: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") \ No newline at end of file diff --git a/src/infrastructure/api/users/users.py b/src/infrastructure/api/users/users.py index b1a7ed0..8521cf9 100644 --- a/src/infrastructure/api/users/users.py +++ b/src/infrastructure/api/users/users.py @@ -1,6 +1,6 @@ from fastapi import APIRouter, HTTPException, status, Depends from infrastructure.api.users.schemas import ( - UserCreateRequest, UserUpdateRequest, UserResponse, + UserCreateRequest, UserUpdateRequest, UserResponse, UserLoginRequest, UserLoginResponse ) from application.services.user_services import ( @@ -8,21 +8,35 @@ from application.services.user_services import ( ) from infrastructure.adapters.persistence.user_repository_sql import UserRepositorySQL from infrastructure.api.users.auth_service import auth_service +from infrastructure.adapters.rabbitmq.sender import send_to_queue +from infrastructure.adapters.rabbitmq.messages import UserMessage, UserEventType import logging +from datetime import datetime router = APIRouter() user_repo = UserRepositorySQL() logger = logging.getLogger(__name__) + +def _publish_user_event(event_type: UserEventType, **kwargs): + try: + msg = UserMessage(event_type=event_type, fecha_creacion=datetime.utcnow().isoformat(), **kwargs) + payload = msg.to_dict() + send_to_queue('users_queue', payload) + send_to_queue('metrics_users_queue', payload) + except Exception as e: + logger.warning(f"Error publicando evento de usuario a RabbitMQ: {e}") + + @router.post("/login", response_model=UserLoginResponse, status_code=status.HTTP_200_OK) async def login_user(credentials: UserLoginRequest): """ Autentica un usuario y retorna un token JWT - + **Parámetros:** - email: Email del usuario - contraseña: Contraseña del usuario - + **Retorna:** - access_token: Token JWT para usar en requests autenticados - token_type: Tipo de token (bearer) @@ -30,55 +44,26 @@ async def login_user(credentials: UserLoginRequest): - email: Email confirmado """ try: - # Obtener usuario por email get_use_case = GetUserByEmail(user_repo) user = get_use_case.execute(credentials.email) - if not user: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Email o contraseña incorrectos", - headers={"WWW-Authenticate": "Bearer"} - ) - - # Verificar contraseña - # Necesitamos obtener el hash de contraseña del modelo + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Email o contraseña incorrectos", headers={"WWW-Authenticate": "Bearer"}) user_model = user_repo.find_by_email_with_password(credentials.email) if not user_model or not auth_service.verify_password(credentials.contraseña, user_model.contraseña_hash): - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Email o contraseña incorrectos", - headers={"WWW-Authenticate": "Bearer"} - ) - - # Crear token JWT - access_token = auth_service.create_access_token( - user_id=user.user_id, - email=user.email - ) - - return { - "access_token": access_token, - "token_type": "bearer", - "user_id": user.user_id, - "email": user.email, - "is_admin": user.is_admin - } - + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Email o contraseña incorrectos", headers={"WWW-Authenticate": "Bearer"}) + access_token = auth_service.create_access_token(user_id=user.user_id, email=user.email) + return {"access_token": access_token, "token_type": "bearer", "user_id": user.user_id, "email": user.email, "is_admin": user.is_admin} except HTTPException: raise except Exception as e: logger.error(f"Error en login_user: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") @router.post("/", status_code=status.HTTP_202_ACCEPTED) async def create_user(user_data: UserCreateRequest): """ Crea un nuevo usuario - envía a cola de procesamiento con validaciones previas - + **Parámetros:** - nombre: Nombre del usuario (requerido) - apellido: Apellido del usuario (requerido) @@ -91,42 +76,22 @@ async def create_user(user_data: UserCreateRequest): try: create_use_case = CreateUser(user_repo) result = create_use_case.execute( - nombre=user_data.nombre, - apellido=user_data.apellido, - email=user_data.email, - contraseña=user_data.contraseña, - fecha_nacimiento=user_data.fecha_nacimiento, - url_foto_perfil=user_data.url_foto_perfil, - biografia=user_data.biografia + nombre=user_data.nombre, apellido=user_data.apellido, email=user_data.email, + contraseña=user_data.contraseña, fecha_nacimiento=user_data.fecha_nacimiento, + url_foto_perfil=user_data.url_foto_perfil, biografia=user_data.biografia ) - if result["status"] == "error": - # Detectar tipo de error para código HTTP apropiado message = result["message"] if "ya está registrado" in message: - # 409 Conflict: email duplicado - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=message - ) - else: - # 400 Bad Request: error de validación - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=message - ) - - # 202 Accepted: enviado a la cola correctamente + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=message) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) + _publish_user_event(UserEventType.CREATE, user_id=result.get("user_id"), email=user_data.email, nombre=user_data.nombre, apellido=user_data.apellido) return result - except HTTPException: raise except Exception as e: logger.error(f"Error inesperado en create_user: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") @router.get("/{user_id}", response_model=UserResponse) async def get_user(user_id: int): @@ -135,19 +100,13 @@ async def get_user(user_id: int): get_use_case = GetUserById(user_repo) user = get_use_case.execute(user_id) if not user: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Usuario con ID {user_id} no encontrado" - ) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Usuario con ID {user_id} no encontrado") return user except HTTPException: raise except Exception as e: logger.error(f"Error al obtener usuario {user_id}: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") @router.get("/email/{email}", response_model=UserResponse) async def get_user_by_email(email: str): @@ -156,19 +115,13 @@ async def get_user_by_email(email: str): get_use_case = GetUserByEmail(user_repo) user = get_use_case.execute(email) if not user: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Usuario con email {email} no encontrado" - ) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Usuario con email {email} no encontrado") return user except HTTPException: raise except Exception as e: logger.error(f"Error al obtener usuario por email {email}: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") @router.get("/") async def list_users(): @@ -178,50 +131,26 @@ async def list_users(): return list_use_case.execute() except Exception as e: logger.error(f"Error al listar usuarios: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") @router.put("/{user_id}", status_code=status.HTTP_202_ACCEPTED) async def update_user(user_id: int, user_data: UserUpdateRequest): """Actualiza un usuario - envía a cola de procesamiento con validaciones previas""" try: update_use_case = UpdateUser(user_repo) - result = update_use_case.execute( - user_id=user_id, - nombre=user_data.nombre, - apellido=user_data.apellido, - url_foto_perfil=user_data.url_foto_perfil, - biografia=user_data.biografia - ) - + result = update_use_case.execute(user_id=user_id, nombre=user_data.nombre, apellido=user_data.apellido, url_foto_perfil=user_data.url_foto_perfil, biografia=user_data.biografia) if result["status"] == "error": message = result["message"] if "no existe" in message: - # 404 Not Found: usuario no existe - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=message - ) - else: - # 400 Bad Request: error de validación - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=message - ) - - # 202 Accepted: enviado a la cola correctamente + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) + _publish_user_event(UserEventType.UPDATE, user_id=user_id, nombre=user_data.nombre, apellido=user_data.apellido) return result - except HTTPException: raise except Exception as e: logger.error(f"Error al actualizar usuario {user_id}: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") @router.delete("/{user_id}", status_code=status.HTTP_202_ACCEPTED) async def delete_user(user_id: int): @@ -229,30 +158,15 @@ async def delete_user(user_id: int): try: delete_use_case = DeleteUser(user_repo) result = delete_use_case.execute(user_id) - if result["status"] == "error": message = result["message"] if "no existe" in message: - # 404 Not Found: usuario no existe - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=message - ) - else: - # 400 Bad Request: error de validación - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=message - ) - - # 202 Accepted: enviado a la cola correctamente + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) + _publish_user_event(UserEventType.DELETE, user_id=user_id) return result - except HTTPException: raise except Exception as e: logger.error(f"Error al eliminar usuario {user_id}: {e}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Error interno del servidor" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error interno del servidor") \ No newline at end of file