From 64727e17bbfa11aaf48506455b388ae7ff3f8591 Mon Sep 17 00:00:00 2001 From: "Juan M. Ley" Date: Sun, 22 Mar 2026 13:01:33 -0600 Subject: [PATCH] another fix --- .../adapters/rabbitmq/consumer.py | 33 ++++++------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/src/infrastructure/adapters/rabbitmq/consumer.py b/src/infrastructure/adapters/rabbitmq/consumer.py index 36b1c33..7449eda 100644 --- a/src/infrastructure/adapters/rabbitmq/consumer.py +++ b/src/infrastructure/adapters/rabbitmq/consumer.py @@ -1,15 +1,12 @@ -"""RabbitMQ message consumer base""" import pika import json from typing import Callable, Dict, Any import logging +from sqlalchemy.exc import IntegrityError logger = logging.getLogger(__name__) - class RabbitMQConsumer: - """Generic RabbitMQ consumer for consuming messages from queues""" - def __init__(self, queue_name: str, host: str = 'localhost', port: int = 5672): self.queue_name = queue_name self.host = host @@ -17,46 +14,36 @@ class RabbitMQConsumer: self.callback = None def set_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None: - """ - Sets the callback function to be called when a message is received - - Args: - callback: Function that takes a message dictionary as argument - """ self.callback = callback def start_consuming(self) -> None: - """ - Starts consuming messages from the queue - """ try: connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.host, port=self.port) ) channel = connection.channel() - - # Declare queue to ensure it exists channel.queue_declare(queue=self.queue_name, durable=True) def callback_wrapper(ch, method, properties, body): try: - # Decode the message message = json.loads(body.decode('utf-8')) logger.info(f"Received message from queue '{self.queue_name}': {message}") - # Call the user's callback function if self.callback: self.callback(message) - # Acknowledge the message ch.basic_ack(delivery_tag=method.delivery_tag) - + + except IntegrityError as e: + # Error de negocio: no tiene sentido reintentar + logger.warning(f"Business error, discarding message: {e}") + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + except Exception as e: - logger.error(f"Error processing message: {e}") - # Negative acknowledge to requeue the message + # Error transitorio (red, DB caída): sí puede resolverse solo + logger.error(f"Transient error processing message: {e}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) - # Set up the consumer with manual acknowledgment channel.basic_consume( queue=self.queue_name, on_message_callback=callback_wrapper, @@ -68,4 +55,4 @@ class RabbitMQConsumer: except Exception as e: logger.error(f"Error in consumer: {e}") - raise + raise \ No newline at end of file