139
src/consumers/metrics_consumer.py
Normal file
139
src/consumers/metrics_consumer.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""Metrics RabbitMQ Consumer - Processes system events and saves metrics"""
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
# Add src to path to import modules
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer
|
||||
from infrastructure.adapters.rabbitmq.messages import (
|
||||
UserMessage, ReportMessage, NotificationMessage, ModerationMessage
|
||||
)
|
||||
from application.services.metrics_services import MetricsService
|
||||
from infrastructure.adapters.persistence.metrics_repository_postgres import MetricsRepositoryPostgres
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler('logs/metrics_consumer.log'),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MetricsConsumer:
|
||||
"""Consumer for all system events from RabbitMQ"""
|
||||
|
||||
def __init__(self):
|
||||
self.metrics_service = MetricsService(MetricsRepositoryPostgres())
|
||||
|
||||
# Create separate consumers for each event type
|
||||
self.user_consumer = RabbitMQConsumer(queue_name='users_queue')
|
||||
self.report_consumer = RabbitMQConsumer(queue_name='reports_queue')
|
||||
self.notification_consumer = RabbitMQConsumer(queue_name='notifications_queue')
|
||||
self.moderation_consumer = RabbitMQConsumer(queue_name='moderations_queue')
|
||||
|
||||
# Set callbacks
|
||||
self.user_consumer.set_callback(self.process_user_event)
|
||||
self.report_consumer.set_callback(self.process_report_event)
|
||||
self.notification_consumer.set_callback(self.process_notification_event)
|
||||
self.moderation_consumer.set_callback(self.process_moderation_event)
|
||||
|
||||
def process_user_event(self, message_dict: dict):
|
||||
"""Procesa eventos de usuario"""
|
||||
try:
|
||||
message = UserMessage.from_dict(message_dict)
|
||||
self.metrics_service.record_event(
|
||||
event_type=f"user_{message.event_type.value}",
|
||||
entity_id=str(message.user_id),
|
||||
entity_type="user",
|
||||
user_id=message.user_id,
|
||||
metadata={"email": message.email}
|
||||
)
|
||||
logger.info(f"Métrica registrada: user_{message.event_type.value}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error procesando evento de usuario: {e}")
|
||||
|
||||
def process_report_event(self, message_dict: dict):
|
||||
"""Procesa eventos de reportes"""
|
||||
try:
|
||||
message = ReportMessage.from_dict(message_dict)
|
||||
self.metrics_service.record_event(
|
||||
event_type=f"report_{message.event_type.value}",
|
||||
entity_id=str(message.report_id),
|
||||
entity_type="report",
|
||||
user_id=message.user_id,
|
||||
metadata={"status": message.status}
|
||||
)
|
||||
logger.info(f"Métrica registrada: report_{message.event_type.value}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error procesando evento de reporte: {e}")
|
||||
|
||||
def process_notification_event(self, message_dict: dict):
|
||||
"""Procesa eventos de notificaciones"""
|
||||
try:
|
||||
message = NotificationMessage.from_dict(message_dict)
|
||||
self.metrics_service.record_event(
|
||||
event_type=f"notification_{message.event_type.value}",
|
||||
entity_id=str(message.notification_id),
|
||||
entity_type="notification",
|
||||
user_id=message.user_id,
|
||||
metadata={"type": message.notification_type}
|
||||
)
|
||||
logger.info(f"Métrica registrada: notification_{message.event_type.value}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error procesando evento de notificación: {e}")
|
||||
|
||||
def process_moderation_event(self, message_dict: dict):
|
||||
"""Procesa eventos de moderación"""
|
||||
try:
|
||||
message = ModerationMessage.from_dict(message_dict)
|
||||
self.metrics_service.record_event(
|
||||
event_type=f"moderation_{message.event_type.value}",
|
||||
entity_id=str(message.moderation_id),
|
||||
entity_type="moderation",
|
||||
user_id=message.moderator_id,
|
||||
metadata={"action": message.action}
|
||||
)
|
||||
logger.info(f"Métrica registrada: moderation_{message.event_type.value}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error procesando evento de moderación: {e}")
|
||||
|
||||
def start(self):
|
||||
"""Inicia todos los consumers"""
|
||||
logger.info("Iniciando Metrics Consumer...")
|
||||
try:
|
||||
# Start consumers in separate threads
|
||||
import threading
|
||||
threads = [
|
||||
threading.Thread(target=self.user_consumer.start, daemon=True),
|
||||
threading.Thread(target=self.report_consumer.start, daemon=True),
|
||||
threading.Thread(target=self.notification_consumer.start, daemon=True),
|
||||
threading.Thread(target=self.moderation_consumer.start, daemon=True),
|
||||
]
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
# Keep main thread alive
|
||||
for t in threads:
|
||||
t.join()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Metrics Consumer detenido")
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
"""Detiene los consumers"""
|
||||
self.user_consumer.stop()
|
||||
self.report_consumer.stop()
|
||||
self.notification_consumer.stop()
|
||||
self.moderation_consumer.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
consumer = MetricsConsumer()
|
||||
consumer.start()
|
||||
174
src/consumers/moderation_consumer.py
Normal file
174
src/consumers/moderation_consumer.py
Normal file
@@ -0,0 +1,174 @@
|
||||
"""Consumidor de eventos de Moderación desde RabbitMQ"""
|
||||
import pika
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from threading import Thread
|
||||
from typing import Dict, Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ModerationConsumer:
|
||||
"""Consumidor de eventos de moderación desde RabbitMQ"""
|
||||
|
||||
def __init__(self, host: str = 'localhost', port: int = 5672):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.queue_name = 'moderation_queue'
|
||||
self.running = False
|
||||
|
||||
def start(self):
|
||||
"""Inicia el consumidor en un thread separado"""
|
||||
thread = Thread(target=self._consume, daemon=True)
|
||||
thread.start()
|
||||
logger.info("Moderation Consumer started")
|
||||
|
||||
def _consume(self):
|
||||
"""Consume mensajes de RabbitMQ"""
|
||||
try:
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host=self.host, port=self.port)
|
||||
)
|
||||
channel = connection.channel()
|
||||
|
||||
# Declarar cola
|
||||
channel.queue_declare(queue=self.queue_name, durable=True)
|
||||
|
||||
# Configurar QoS
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
|
||||
# Configurar callback
|
||||
channel.basic_consume(
|
||||
queue=self.queue_name,
|
||||
on_message_callback=self._process_message
|
||||
)
|
||||
|
||||
logger.info(f"Moderation Consumer listening on {self.queue_name}")
|
||||
self.running = True
|
||||
channel.start_consuming()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in moderation consumer: {e}")
|
||||
self.running = False
|
||||
|
||||
def _process_message(self, ch, method, properties, body):
|
||||
"""Procesa un mensaje de moderación"""
|
||||
try:
|
||||
message = json.loads(body)
|
||||
logger.info(f"Processing moderation event: {message.get('event_type')}")
|
||||
|
||||
event_type = message.get('event_type')
|
||||
|
||||
if event_type == 'moderation.delete_report':
|
||||
self._handle_delete_report(message)
|
||||
elif event_type == 'moderation.close_account':
|
||||
self._handle_close_account(message)
|
||||
elif event_type == 'moderation.ban_user':
|
||||
self._handle_ban_user(message)
|
||||
elif event_type == 'moderation.warn_user':
|
||||
self._handle_warn_user(message)
|
||||
elif event_type == 'moderation.review_content':
|
||||
self._handle_review_content(message)
|
||||
else:
|
||||
logger.warning(f"Unknown moderation event type: {event_type}")
|
||||
|
||||
# Confirmar mensaje
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
except json.JSONDecodeError:
|
||||
logger.error("Error decoding JSON message")
|
||||
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing moderation message: {e}")
|
||||
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
||||
|
||||
def _handle_delete_report(self, message: Dict[str, Any]):
|
||||
"""Maneja evento de eliminación de reporte"""
|
||||
try:
|
||||
action_id = message.get('action_id')
|
||||
report_id = message.get('report_id')
|
||||
reason = message.get('reason')
|
||||
|
||||
logger.info(f"Deleting report {report_id} (Action: {action_id})")
|
||||
|
||||
# Aquí implementarías la lógica para eliminar el reporte en la BD
|
||||
# Por ahora solo registramos el evento
|
||||
logger.info(f"Report {report_id} marked as deleted by moderator {message.get('moderator_id')}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling delete report: {e}")
|
||||
raise
|
||||
|
||||
def _handle_close_account(self, message: Dict[str, Any]):
|
||||
"""Maneja evento de cierre de cuenta"""
|
||||
try:
|
||||
action_id = message.get('action_id')
|
||||
user_id = message.get('user_id')
|
||||
is_permanent = message.get('is_permanent', True)
|
||||
|
||||
logger.info(f"Closing account for user {user_id} (Action: {action_id}, Permanent: {is_permanent})")
|
||||
|
||||
# Aquí implementarías la lógica para cerrar la cuenta en la BD
|
||||
logger.info(f"Account for user {user_id} marked as closed")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling close account: {e}")
|
||||
raise
|
||||
|
||||
def _handle_ban_user(self, message: Dict[str, Any]):
|
||||
"""Maneja evento de ban de usuario"""
|
||||
try:
|
||||
action_id = message.get('action_id')
|
||||
user_id = message.get('user_id')
|
||||
duration_days = message.get('duration_days')
|
||||
is_permanent = message.get('is_permanent', False)
|
||||
reason = message.get('reason')
|
||||
|
||||
logger.info(f"Banning user {user_id} (Action: {action_id}, Duration: {duration_days} days, Permanent: {is_permanent})")
|
||||
|
||||
# Aquí implementarías la lógica para banear al usuario
|
||||
if is_permanent:
|
||||
logger.info(f"User {user_id} permanently banned")
|
||||
else:
|
||||
logger.info(f"User {user_id} banned for {duration_days} days")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling ban user: {e}")
|
||||
raise
|
||||
|
||||
def _handle_warn_user(self, message: Dict[str, Any]):
|
||||
"""Maneja evento de advertencia a usuario"""
|
||||
try:
|
||||
action_id = message.get('action_id')
|
||||
user_id = message.get('user_id')
|
||||
reason = message.get('reason')
|
||||
|
||||
logger.info(f"Warning user {user_id} (Action: {action_id}, Reason: {reason})")
|
||||
|
||||
# Aquí implementarías la lógica para registrar la advertencia
|
||||
logger.info(f"Warning recorded for user {user_id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling warn user: {e}")
|
||||
raise
|
||||
|
||||
def _handle_review_content(self, message: Dict[str, Any]):
|
||||
"""Maneja evento de revisión de contenido"""
|
||||
try:
|
||||
action_id = message.get('action_id')
|
||||
report_id = message.get('report_id')
|
||||
review_action = message.get('review_action') # approve, reject, needs_more_info
|
||||
|
||||
logger.info(f"Reviewing report {report_id} (Action: {action_id}, Decision: {review_action})")
|
||||
|
||||
# Aquí implementarías la lógica para procesar la revisión
|
||||
logger.info(f"Report {report_id} review: {review_action}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling review content: {e}")
|
||||
raise
|
||||
|
||||
|
||||
# Crear instancia global
|
||||
moderation_consumer = ModerationConsumer()
|
||||
Reference in New Issue
Block a user