Added Notifications Microservice API - Integrated notification system with MongoDB, RabbitMQ consumer, and automatic status change notifications for reports
This commit is contained in:
135
src/consumers/notification_consumer.py
Normal file
135
src/consumers/notification_consumer.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""Notification RabbitMQ Consumer - Processes notification events"""
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
# Add src to path to import modules
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
from infrastructure.adapters.rabbitmq.consumer import RabbitMQConsumer
|
||||
from infrastructure.adapters.rabbitmq.messages import NotificationMessage, NotificationEventType
|
||||
from infrastructure.adapters.persistence.notification_repository_mongo import NotificationRepositoryMongo
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotificationConsumer:
|
||||
"""Consumer for notification events from RabbitMQ"""
|
||||
|
||||
def __init__(self):
|
||||
self.repo = NotificationRepositoryMongo()
|
||||
self.consumer = RabbitMQConsumer(queue_name='notifications_queue')
|
||||
self.consumer.set_callback(self.process_message)
|
||||
|
||||
def process_message(self, message_dict: dict):
|
||||
"""
|
||||
Processes a notification event message from RabbitMQ
|
||||
|
||||
Args:
|
||||
message_dict: Dictionary containing the message data
|
||||
"""
|
||||
try:
|
||||
# Reconstruct the NotificationMessage object
|
||||
message = NotificationMessage.from_dict(message_dict)
|
||||
|
||||
if message.event_type == NotificationEventType.CREATE:
|
||||
self._handle_create_notification(message)
|
||||
elif message.event_type == NotificationEventType.REPORT_STATUS_CHANGE:
|
||||
self._handle_report_status_change(message)
|
||||
else:
|
||||
logger.warning(f"Unknown event type: {message.event_type}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing notification message: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def _handle_create_notification(self, message: NotificationMessage):
|
||||
"""Handle notification create event"""
|
||||
try:
|
||||
logger.info(f"Creating notification for user: {message.id_usuario}")
|
||||
|
||||
# Parse datetime string if provided
|
||||
fecha_creacion = None
|
||||
if message.fecha_creacion:
|
||||
try:
|
||||
fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
|
||||
except (ValueError, TypeError):
|
||||
fecha_creacion = datetime.utcnow()
|
||||
else:
|
||||
fecha_creacion = datetime.utcnow()
|
||||
|
||||
from domain.notifications import Notification
|
||||
|
||||
# Create Notification domain object
|
||||
notification = Notification(
|
||||
id_notificacion=message.id_notificacion,
|
||||
id_usuario=message.id_usuario,
|
||||
tipo_notificacion=message.tipo_notificacion,
|
||||
titulo=message.titulo,
|
||||
mensaje=message.mensaje,
|
||||
id_reporte=message.id_reporte,
|
||||
estado_reporte=message.estado_reporte,
|
||||
leida=False,
|
||||
fecha_creacion=fecha_creacion
|
||||
)
|
||||
|
||||
# Save to MongoDB repository
|
||||
saved_notification = self.repo.save(notification)
|
||||
logger.info(f"Notification created successfully: {message.id_notificacion}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating notification: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def _handle_report_status_change(self, message: NotificationMessage):
|
||||
"""Handle report status change notification"""
|
||||
try:
|
||||
logger.info(f"Processing report status change notification for user: {message.id_usuario}")
|
||||
|
||||
# Parse datetime string if provided
|
||||
fecha_creacion = None
|
||||
if message.fecha_creacion:
|
||||
try:
|
||||
fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
|
||||
except (ValueError, TypeError):
|
||||
fecha_creacion = datetime.utcnow()
|
||||
else:
|
||||
fecha_creacion = datetime.utcnow()
|
||||
|
||||
from domain.notifications import Notification
|
||||
|
||||
# Create Notification domain object for status change
|
||||
notification = Notification(
|
||||
id_notificacion=message.id_notificacion,
|
||||
id_usuario=message.id_usuario,
|
||||
tipo_notificacion=message.tipo_notificacion,
|
||||
titulo=message.titulo,
|
||||
mensaje=message.mensaje,
|
||||
id_reporte=message.id_reporte,
|
||||
estado_reporte=message.estado_reporte,
|
||||
leida=False,
|
||||
fecha_creacion=fecha_creacion
|
||||
)
|
||||
|
||||
# Save to MongoDB repository
|
||||
saved_notification = self.repo.save(notification)
|
||||
logger.info(f"Report status change notification created: {message.id_notificacion}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing report status change: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def start(self):
|
||||
"""Start consuming messages from RabbitMQ"""
|
||||
try:
|
||||
logger.info("Starting Notification Consumer...")
|
||||
self.consumer.start()
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting notification consumer: {e}", exc_info=True)
|
||||
raise
|
||||
Reference in New Issue
Block a user