Fixed metrics api to recieve all metrics
This commit is contained in:
@@ -4,7 +4,6 @@ import os
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
# Add src to path to import modules
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer
|
||||
@@ -14,7 +13,6 @@ from infrastructure.adapters.rabbitmq.messages import (
|
||||
from application.services.metrics_services import MetricsService
|
||||
from infrastructure.adapters.persistence.metrics_repository_postgres import MetricsRepositoryPostgres
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
@@ -25,27 +23,32 @@ logging.basicConfig(
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Colas dedicadas para métricas — no compiten con los consumers de dominio
|
||||
METRICS_QUEUES = {
|
||||
'user': 'metrics_users_queue',
|
||||
'report': 'metrics_reports_queue',
|
||||
'notification': 'metrics_notifications_queue',
|
||||
'moderation': 'metrics_moderations_queue',
|
||||
}
|
||||
|
||||
|
||||
class MetricsConsumer:
|
||||
"""Consumer for all system events from RabbitMQ"""
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.metrics_service = MetricsService(MetricsRepositoryPostgres())
|
||||
|
||||
# Create separate consumers for each event type
|
||||
self.user_consumer = RabbitMQConsumer(queue_name='users_queue')
|
||||
self.report_consumer = RabbitMQConsumer(queue_name='reports_queue')
|
||||
self.notification_consumer = RabbitMQConsumer(queue_name='notifications_queue')
|
||||
self.moderation_consumer = RabbitMQConsumer(queue_name='moderations_queue')
|
||||
|
||||
# Set callbacks
|
||||
|
||||
self.user_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['user'])
|
||||
self.report_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['report'])
|
||||
self.notification_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['notification'])
|
||||
self.moderation_consumer = RabbitMQConsumer(queue_name=METRICS_QUEUES['moderation'])
|
||||
|
||||
self.user_consumer.set_callback(self.process_user_event)
|
||||
self.report_consumer.set_callback(self.process_report_event)
|
||||
self.notification_consumer.set_callback(self.process_notification_event)
|
||||
self.moderation_consumer.set_callback(self.process_moderation_event)
|
||||
|
||||
|
||||
def process_user_event(self, message_dict: dict):
|
||||
"""Procesa eventos de usuario"""
|
||||
try:
|
||||
message = UserMessage.from_dict(message_dict)
|
||||
self.metrics_service.record_event(
|
||||
@@ -58,107 +61,83 @@ class MetricsConsumer:
|
||||
logger.info(f"Métrica registrada: user_{message.event_type.value}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error procesando evento de usuario: {e}")
|
||||
|
||||
|
||||
def process_report_event(self, message_dict: dict):
|
||||
"""Procesa eventos de reportes"""
|
||||
try:
|
||||
message = ReportMessage.from_dict(message_dict)
|
||||
self.metrics_service.record_event(
|
||||
event_type=f"report_{message.event_type.value}",
|
||||
entity_id=str(message.report_id),
|
||||
entity_id=str(message.id_reporte),
|
||||
entity_type="report",
|
||||
user_id=message.user_id,
|
||||
metadata={"status": message.status}
|
||||
user_id=message.id_usuario,
|
||||
metadata={"estado": message.estado}
|
||||
)
|
||||
logger.info(f"Métrica registrada: report_{message.event_type.value}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error procesando evento de reporte: {e}")
|
||||
|
||||
|
||||
def process_notification_event(self, message_dict: dict):
|
||||
"""Procesa eventos de notificaciones"""
|
||||
try:
|
||||
message = NotificationMessage.from_dict(message_dict)
|
||||
self.metrics_service.record_event(
|
||||
event_type=f"notification_{message.event_type.value}",
|
||||
entity_id=str(message.notification_id),
|
||||
entity_id=str(message.id_notificacion),
|
||||
entity_type="notification",
|
||||
user_id=message.user_id,
|
||||
metadata={"type": message.notification_type}
|
||||
user_id=message.id_usuario,
|
||||
metadata={"type": message.tipo_notificacion}
|
||||
)
|
||||
logger.info(f"Métrica registrada: notification_{message.event_type.value}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error procesando evento de notificación: {e}")
|
||||
|
||||
|
||||
def process_moderation_event(self, message_dict: dict):
|
||||
"""Procesa eventos de moderación"""
|
||||
try:
|
||||
message = ModerationMessage.from_dict(message_dict)
|
||||
self.metrics_service.record_event(
|
||||
event_type=f"moderation_{message.event_type.value}",
|
||||
entity_id=str(message.moderation_id),
|
||||
entity_id=str(message.action_id),
|
||||
entity_type="moderation",
|
||||
user_id=message.moderator_id,
|
||||
metadata={"action": message.action}
|
||||
metadata={"action": message.review_action, "reason": message.reason}
|
||||
)
|
||||
logger.info(f"Métrica registrada: moderation_{message.event_type.value}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error procesando evento de moderación: {e}")
|
||||
|
||||
|
||||
def start(self):
|
||||
"""Inicia todos los consumers"""
|
||||
logger.info("Iniciando Metrics Consumer...")
|
||||
try:
|
||||
# Start consumers in separate threads
|
||||
import threading
|
||||
|
||||
def start_user_consumer():
|
||||
try:
|
||||
self.user_consumer.start_consuming()
|
||||
except Exception as e:
|
||||
logger.error(f"Error en user consumer: {e}")
|
||||
|
||||
def start_report_consumer():
|
||||
try:
|
||||
self.report_consumer.start_consuming()
|
||||
except Exception as e:
|
||||
logger.error(f"Error en report consumer: {e}")
|
||||
|
||||
def start_notification_consumer():
|
||||
try:
|
||||
self.notification_consumer.start_consuming()
|
||||
except Exception as e:
|
||||
logger.error(f"Error en notification consumer: {e}")
|
||||
|
||||
def start_moderation_consumer():
|
||||
try:
|
||||
self.moderation_consumer.start_consuming()
|
||||
except Exception as e:
|
||||
logger.error(f"Error en moderation consumer: {e}")
|
||||
|
||||
|
||||
threads = [
|
||||
threading.Thread(target=start_user_consumer, daemon=True),
|
||||
threading.Thread(target=start_report_consumer, daemon=True),
|
||||
threading.Thread(target=start_notification_consumer, daemon=True),
|
||||
threading.Thread(target=start_moderation_consumer, daemon=True),
|
||||
threading.Thread(target=self._safe_consume, args=(self.user_consumer,), daemon=True),
|
||||
threading.Thread(target=self._safe_consume, args=(self.report_consumer,), daemon=True),
|
||||
threading.Thread(target=self._safe_consume, args=(self.notification_consumer,), daemon=True),
|
||||
threading.Thread(target=self._safe_consume, args=(self.moderation_consumer,), daemon=True),
|
||||
]
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
# Keep main thread alive
|
||||
for t in threads:
|
||||
t.join()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Metrics Consumer detenido")
|
||||
self.stop()
|
||||
|
||||
|
||||
def _safe_consume(self, consumer: RabbitMQConsumer):
|
||||
try:
|
||||
consumer.start_consuming()
|
||||
except Exception as e:
|
||||
logger.error(f"Error en consumer de {consumer.queue_name}: {e}")
|
||||
|
||||
def stop(self):
|
||||
"""Detiene los consumers"""
|
||||
self.user_consumer.stop()
|
||||
self.report_consumer.stop()
|
||||
self.notification_consumer.stop()
|
||||
self.moderation_consumer.stop()
|
||||
for c in [self.user_consumer, self.report_consumer,
|
||||
self.notification_consumer, self.moderation_consumer]:
|
||||
try:
|
||||
c.stop()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
consumer = MetricsConsumer()
|
||||
consumer.start()
|
||||
consumer.start()
|
||||
Reference in New Issue
Block a user