Fixes with queuing

This commit is contained in:
2026-05-06 11:13:23 -06:00
parent 7331dcb086
commit 57fa3b7944
2 changed files with 84 additions and 109 deletions

View File

@@ -22,117 +22,92 @@ logger = logging.getLogger(__name__)
class UserConsumer: class UserConsumer:
"""Consumer for user events from RabbitMQ""" """Consumer for user events from RabbitMQ"""
def __init__(self): def __init__(self):
self.repo = UserRepositorySQL() self.repo = UserRepositorySQL()
self.consumer = RabbitMQConsumer(queue_name='users_queue') self.consumer = RabbitMQConsumer(queue_name='users_queue')
self.consumer.set_callback(self.process_message) self.consumer.set_callback(self.process_message)
def process_message(self, message_dict: dict): def process_message(self, message_dict: dict):
""" """
Processes a user event message from RabbitMQ Processes a user event message from RabbitMQ
Args: Args:
message_dict: Dictionary containing the message data message_dict: Dictionary containing the message data
""" """
try: # Reconstruct the UserMessage object — let exceptions propagate so the
# Reconstruct the UserMessage object # consumer's callback_wrapper can decide whether to ack, nack+discard, or requeue.
message = UserMessage.from_dict(message_dict) message = UserMessage.from_dict(message_dict)
if message.event_type == UserEventType.CREATE: if message.event_type == UserEventType.CREATE:
self._handle_create_user(message) self._handle_create_user(message)
elif message.event_type == UserEventType.UPDATE: elif message.event_type == UserEventType.UPDATE:
self._handle_update_user(message) self._handle_update_user(message)
elif message.event_type == UserEventType.DELETE: elif message.event_type == UserEventType.DELETE:
self._handle_delete_user(message) self._handle_delete_user(message)
else: else:
logger.warning(f"Unknown event type: {message.event_type}") logger.warning(f"Unknown event type: {message.event_type}")
except Exception as e:
logger.error(f"Error processing user message: {e}", exc_info=True)
# Rollback en caso de error en el procesamiento del mensaje
self.repo.db.rollback()
raise
def _handle_create_user(self, message: UserMessage): def _handle_create_user(self, message: UserMessage):
"""Handle user create event""" """Handle user create event"""
try: logger.info(f"Creating user: {message.email}")
logger.info(f"Creating user: {message.email}")
if not message.fecha_nacimiento:
# Parse datetime strings raise ValueError(f"Missing fecha_nacimiento in CREATE message for email {message.email}")
fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento) if not message.fecha_creacion:
fecha_creacion = datetime.fromisoformat(message.fecha_creacion) raise ValueError(f"Missing fecha_creacion in CREATE message for email {message.email}")
# Create User domain object fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento)
user = User( fecha_creacion = datetime.fromisoformat(message.fecha_creacion)
user_id=0, # Will be auto-generated by DB
nombre=message.nombre, user = User(
apellido=message.apellido, user_id=0, # Will be auto-generated by DB
email=message.email, nombre=message.nombre,
contraseña_hash=message.contraseña_hash, apellido=message.apellido,
fecha_nacimiento=fecha_nacimiento, email=message.email,
fecha_creacion=fecha_creacion, contraseña_hash=message.contraseña_hash,
calificacion=message.calificacion, fecha_nacimiento=fecha_nacimiento,
numero_reportes=message.numero_reportes, fecha_creacion=fecha_creacion,
url_foto_perfil=message.url_foto_perfil, calificacion=message.calificacion,
biografia=message.biografia numero_reportes=message.numero_reportes,
) url_foto_perfil=message.url_foto_perfil,
biografia=message.biografia
# Save to repository with transaction handling )
saved_user = self.repo.save(user)
logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}") saved_user = self.repo.save(user)
logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}")
except Exception as e:
logger.error(f"Error creating user: {e}", exc_info=True)
self.repo.db.rollback()
raise
def _handle_update_user(self, message: UserMessage): def _handle_update_user(self, message: UserMessage):
"""Handle user update event""" """Handle user update event"""
try: logger.info(f"Updating user: {message.user_id}")
logger.info(f"Updating user: {message.user_id}")
user = self.repo.find_by_id(message.user_id)
# Find the user if not user:
user = self.repo.find_by_id(message.user_id) logger.warning(f"User not found: {message.user_id}")
if not user: return
logger.warning(f"User not found: {message.user_id}")
return if message.nombre:
user.nombre = message.nombre
# Update fields if provided if message.apellido:
if message.nombre: user.apellido = message.apellido
user.nombre = message.nombre if message.url_foto_perfil is not None:
if message.apellido: user.url_foto_perfil = message.url_foto_perfil
user.apellido = message.apellido if message.biografia is not None:
if message.url_foto_perfil is not None: user.biografia = message.biografia
user.url_foto_perfil = message.url_foto_perfil
if message.biografia is not None: self.repo.update(user)
user.biografia = message.biografia logger.info(f"User updated successfully: {message.user_id}")
# Save to repository with transaction handling
updated_user = self.repo.update(user)
logger.info(f"User updated successfully: {message.user_id}")
except Exception as e:
logger.error(f"Error updating user: {e}", exc_info=True)
self.repo.db.rollback()
raise
def _handle_delete_user(self, message: UserMessage): def _handle_delete_user(self, message: UserMessage):
"""Handle user delete event""" """Handle user delete event"""
try: logger.info(f"Deleting user: {message.user_id}")
logger.info(f"Deleting user: {message.user_id}")
success = self.repo.delete(message.user_id)
success = self.repo.delete(message.user_id) if success:
if success: logger.info(f"User deleted successfully: {message.user_id}")
logger.info(f"User deleted successfully: {message.user_id}") else:
else: logger.warning(f"Failed to delete user: {message.user_id}")
logger.warning(f"Failed to delete user: {message.user_id}")
except Exception as e:
logger.error(f"Error deleting user: {e}", exc_info=True)
self.repo.db.rollback()
raise
def start(self): def start(self):
"""Start consuming messages""" """Start consuming messages"""
logger.info("Starting User Consumer...") logger.info("Starting User Consumer...")
@@ -145,7 +120,6 @@ class UserConsumer:
logger.error(f"Consumer error: {e}", exc_info=True) logger.error(f"Consumer error: {e}", exc_info=True)
raise raise
finally: finally:
# Asegurar cierre de sesión
if self.repo.db: if self.repo.db:
try: try:
self.repo.db.close() self.repo.db.close()
@@ -155,4 +129,4 @@ class UserConsumer:
if __name__ == '__main__': if __name__ == '__main__':
consumer = UserConsumer() consumer = UserConsumer()
consumer.start() consumer.start()

View File

@@ -12,10 +12,10 @@ class RabbitMQConsumer:
self.host = host self.host = host
self.port = port self.port = port
self.callback = None self.callback = None
def set_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None: def set_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None:
self.callback = callback self.callback = callback
def start_consuming(self) -> None: def start_consuming(self) -> None:
try: try:
connection = pika.BlockingConnection( connection = pika.BlockingConnection(
@@ -23,36 +23,37 @@ class RabbitMQConsumer:
) )
channel = connection.channel() channel = connection.channel()
channel.queue_declare(queue=self.queue_name, durable=True) channel.queue_declare(queue=self.queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
def callback_wrapper(ch, method, properties, body): def callback_wrapper(ch, method, properties, body):
try: try:
message = json.loads(body.decode('utf-8')) message = json.loads(body.decode('utf-8'))
logger.info(f"Received message from queue '{self.queue_name}': {message}") logger.info(f"Received message from queue '{self.queue_name}': {message}")
if self.callback: if self.callback:
self.callback(message) self.callback(message)
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
except IntegrityError as e: except (IntegrityError, ValueError, TypeError, KeyError) as e:
# Error de negocio: no tiene sentido reintentar # Errores de negocio/datos: no tiene sentido reintentar
logger.warning(f"Business error, discarding message: {e}") logger.warning(f"Business/data error, discarding message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e: except Exception as e:
# Error transitorio (red, DB caída): puede resolverse solo # Error transitorio (red, DB caída): puede resolverse solo
logger.error(f"Transient error processing message: {e}") logger.error(f"Transient error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume( channel.basic_consume(
queue=self.queue_name, queue=self.queue_name,
on_message_callback=callback_wrapper, on_message_callback=callback_wrapper,
auto_ack=False auto_ack=False
) )
logger.info(f"[*] Waiting for messages in queue '{self.queue_name}'. Ctrl+C to exit.") logger.info(f"[*] Waiting for messages in queue '{self.queue_name}'. Ctrl+C to exit.")
channel.start_consuming() channel.start_consuming()
except Exception as e: except Exception as e:
logger.error(f"Error in consumer: {e}") logger.error(f"Error in consumer: {e}")
raise raise