diff --git a/src/consumers/user_consumer.py b/src/consumers/user_consumer.py index 18e8aa2..18f2973 100644 --- a/src/consumers/user_consumer.py +++ b/src/consumers/user_consumer.py @@ -22,117 +22,92 @@ logger = logging.getLogger(__name__) class UserConsumer: """Consumer for user events from RabbitMQ""" - + def __init__(self): self.repo = UserRepositorySQL() self.consumer = RabbitMQConsumer(queue_name='users_queue') self.consumer.set_callback(self.process_message) - + def process_message(self, message_dict: dict): """ Processes a user event message from RabbitMQ - + Args: message_dict: Dictionary containing the message data """ - try: - # Reconstruct the UserMessage object - message = UserMessage.from_dict(message_dict) - - if message.event_type == UserEventType.CREATE: - self._handle_create_user(message) - elif message.event_type == UserEventType.UPDATE: - self._handle_update_user(message) - elif message.event_type == UserEventType.DELETE: - self._handle_delete_user(message) - else: - logger.warning(f"Unknown event type: {message.event_type}") - - except Exception as e: - logger.error(f"Error processing user message: {e}", exc_info=True) - # Rollback en caso de error en el procesamiento del mensaje - self.repo.db.rollback() - raise - + # Reconstruct the UserMessage object — let exceptions propagate so the + # consumer's callback_wrapper can decide whether to ack, nack+discard, or requeue. + message = UserMessage.from_dict(message_dict) + + if message.event_type == UserEventType.CREATE: + self._handle_create_user(message) + elif message.event_type == UserEventType.UPDATE: + self._handle_update_user(message) + elif message.event_type == UserEventType.DELETE: + self._handle_delete_user(message) + else: + logger.warning(f"Unknown event type: {message.event_type}") + def _handle_create_user(self, message: UserMessage): """Handle user create event""" - try: - logger.info(f"Creating user: {message.email}") - - # Parse datetime strings - fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento) - fecha_creacion = datetime.fromisoformat(message.fecha_creacion) - - # Create User domain object - user = User( - user_id=0, # Will be auto-generated by DB - nombre=message.nombre, - apellido=message.apellido, - email=message.email, - contraseña_hash=message.contraseña_hash, - fecha_nacimiento=fecha_nacimiento, - fecha_creacion=fecha_creacion, - calificacion=message.calificacion, - numero_reportes=message.numero_reportes, - url_foto_perfil=message.url_foto_perfil, - biografia=message.biografia - ) - - # Save to repository with transaction handling - saved_user = self.repo.save(user) - logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}") - - except Exception as e: - logger.error(f"Error creating user: {e}", exc_info=True) - self.repo.db.rollback() - raise - + logger.info(f"Creating user: {message.email}") + + if not message.fecha_nacimiento: + raise ValueError(f"Missing fecha_nacimiento in CREATE message for email {message.email}") + if not message.fecha_creacion: + raise ValueError(f"Missing fecha_creacion in CREATE message for email {message.email}") + + fecha_nacimiento = datetime.fromisoformat(message.fecha_nacimiento) + fecha_creacion = datetime.fromisoformat(message.fecha_creacion) + + user = User( + user_id=0, # Will be auto-generated by DB + nombre=message.nombre, + apellido=message.apellido, + email=message.email, + contraseña_hash=message.contraseña_hash, + fecha_nacimiento=fecha_nacimiento, + fecha_creacion=fecha_creacion, + calificacion=message.calificacion, + numero_reportes=message.numero_reportes, + url_foto_perfil=message.url_foto_perfil, + biografia=message.biografia + ) + + saved_user = self.repo.save(user) + logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}") + def _handle_update_user(self, message: UserMessage): """Handle user update event""" - try: - logger.info(f"Updating user: {message.user_id}") - - # Find the user - user = self.repo.find_by_id(message.user_id) - if not user: - logger.warning(f"User not found: {message.user_id}") - return - - # Update fields if provided - if message.nombre: - user.nombre = message.nombre - if message.apellido: - user.apellido = message.apellido - if message.url_foto_perfil is not None: - user.url_foto_perfil = message.url_foto_perfil - if message.biografia is not None: - user.biografia = message.biografia - - # Save to repository with transaction handling - updated_user = self.repo.update(user) - logger.info(f"User updated successfully: {message.user_id}") - - except Exception as e: - logger.error(f"Error updating user: {e}", exc_info=True) - self.repo.db.rollback() - raise - + logger.info(f"Updating user: {message.user_id}") + + user = self.repo.find_by_id(message.user_id) + if not user: + logger.warning(f"User not found: {message.user_id}") + return + + if message.nombre: + user.nombre = message.nombre + if message.apellido: + user.apellido = message.apellido + if message.url_foto_perfil is not None: + user.url_foto_perfil = message.url_foto_perfil + if message.biografia is not None: + user.biografia = message.biografia + + self.repo.update(user) + logger.info(f"User updated successfully: {message.user_id}") + def _handle_delete_user(self, message: UserMessage): """Handle user delete event""" - try: - logger.info(f"Deleting user: {message.user_id}") - - success = self.repo.delete(message.user_id) - if success: - logger.info(f"User deleted successfully: {message.user_id}") - else: - logger.warning(f"Failed to delete user: {message.user_id}") - - except Exception as e: - logger.error(f"Error deleting user: {e}", exc_info=True) - self.repo.db.rollback() - raise - + logger.info(f"Deleting user: {message.user_id}") + + success = self.repo.delete(message.user_id) + if success: + logger.info(f"User deleted successfully: {message.user_id}") + else: + logger.warning(f"Failed to delete user: {message.user_id}") + def start(self): """Start consuming messages""" logger.info("Starting User Consumer...") @@ -145,7 +120,6 @@ class UserConsumer: logger.error(f"Consumer error: {e}", exc_info=True) raise finally: - # Asegurar cierre de sesión if self.repo.db: try: self.repo.db.close() @@ -155,4 +129,4 @@ class UserConsumer: if __name__ == '__main__': consumer = UserConsumer() - consumer.start() + consumer.start() \ No newline at end of file diff --git a/src/infrastructure/adapters/rabbitmq/consumer.py b/src/infrastructure/adapters/rabbitmq/consumer.py index 8ad582e..97549cb 100644 --- a/src/infrastructure/adapters/rabbitmq/consumer.py +++ b/src/infrastructure/adapters/rabbitmq/consumer.py @@ -12,10 +12,10 @@ class RabbitMQConsumer: self.host = host self.port = port self.callback = None - + def set_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None: self.callback = callback - + def start_consuming(self) -> None: try: connection = pika.BlockingConnection( @@ -23,36 +23,37 @@ class RabbitMQConsumer: ) channel = connection.channel() channel.queue_declare(queue=self.queue_name, durable=True) - + channel.basic_qos(prefetch_count=1) + def callback_wrapper(ch, method, properties, body): try: message = json.loads(body.decode('utf-8')) logger.info(f"Received message from queue '{self.queue_name}': {message}") - + if self.callback: self.callback(message) - + ch.basic_ack(delivery_tag=method.delivery_tag) - except IntegrityError as e: - # Error de negocio: no tiene sentido reintentar - logger.warning(f"Business error, discarding message: {e}") + except (IntegrityError, ValueError, TypeError, KeyError) as e: + # Errores de negocio/datos: no tiene sentido reintentar + logger.warning(f"Business/data error, discarding message: {e}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) except Exception as e: - # Error transitorio (red, DB caída): sí puede resolverse solo + # Error transitorio (red, DB caída): puede resolverse solo logger.error(f"Transient error processing message: {e}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) - + channel.basic_consume( queue=self.queue_name, on_message_callback=callback_wrapper, auto_ack=False ) - + logger.info(f"[*] Waiting for messages in queue '{self.queue_name}'. Ctrl+C to exit.") channel.start_consuming() - + except Exception as e: logger.error(f"Error in consumer: {e}") raise \ No newline at end of file