another fix
This commit is contained in:
@@ -1,15 +1,12 @@
|
|||||||
"""RabbitMQ message consumer base"""
|
|
||||||
import pika
|
import pika
|
||||||
import json
|
import json
|
||||||
from typing import Callable, Dict, Any
|
from typing import Callable, Dict, Any
|
||||||
import logging
|
import logging
|
||||||
|
from sqlalchemy.exc import IntegrityError
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class RabbitMQConsumer:
|
class RabbitMQConsumer:
|
||||||
"""Generic RabbitMQ consumer for consuming messages from queues"""
|
|
||||||
|
|
||||||
def __init__(self, queue_name: str, host: str = 'localhost', port: int = 5672):
|
def __init__(self, queue_name: str, host: str = 'localhost', port: int = 5672):
|
||||||
self.queue_name = queue_name
|
self.queue_name = queue_name
|
||||||
self.host = host
|
self.host = host
|
||||||
@@ -17,46 +14,36 @@ class RabbitMQConsumer:
|
|||||||
self.callback = None
|
self.callback = None
|
||||||
|
|
||||||
def set_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None:
|
def set_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None:
|
||||||
"""
|
|
||||||
Sets the callback function to be called when a message is received
|
|
||||||
|
|
||||||
Args:
|
|
||||||
callback: Function that takes a message dictionary as argument
|
|
||||||
"""
|
|
||||||
self.callback = callback
|
self.callback = callback
|
||||||
|
|
||||||
def start_consuming(self) -> None:
|
def start_consuming(self) -> None:
|
||||||
"""
|
|
||||||
Starts consuming messages from the queue
|
|
||||||
"""
|
|
||||||
try:
|
try:
|
||||||
connection = pika.BlockingConnection(
|
connection = pika.BlockingConnection(
|
||||||
pika.ConnectionParameters(host=self.host, port=self.port)
|
pika.ConnectionParameters(host=self.host, port=self.port)
|
||||||
)
|
)
|
||||||
channel = connection.channel()
|
channel = connection.channel()
|
||||||
|
|
||||||
# Declare queue to ensure it exists
|
|
||||||
channel.queue_declare(queue=self.queue_name, durable=True)
|
channel.queue_declare(queue=self.queue_name, durable=True)
|
||||||
|
|
||||||
def callback_wrapper(ch, method, properties, body):
|
def callback_wrapper(ch, method, properties, body):
|
||||||
try:
|
try:
|
||||||
# Decode the message
|
|
||||||
message = json.loads(body.decode('utf-8'))
|
message = json.loads(body.decode('utf-8'))
|
||||||
logger.info(f"Received message from queue '{self.queue_name}': {message}")
|
logger.info(f"Received message from queue '{self.queue_name}': {message}")
|
||||||
|
|
||||||
# Call the user's callback function
|
|
||||||
if self.callback:
|
if self.callback:
|
||||||
self.callback(message)
|
self.callback(message)
|
||||||
|
|
||||||
# Acknowledge the message
|
|
||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||||
|
|
||||||
|
except IntegrityError as e:
|
||||||
|
# Error de negocio: no tiene sentido reintentar
|
||||||
|
logger.warning(f"Business error, discarding message: {e}")
|
||||||
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing message: {e}")
|
# Error transitorio (red, DB caída): sí puede resolverse solo
|
||||||
# Negative acknowledge to requeue the message
|
logger.error(f"Transient error processing message: {e}")
|
||||||
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
||||||
|
|
||||||
# Set up the consumer with manual acknowledgment
|
|
||||||
channel.basic_consume(
|
channel.basic_consume(
|
||||||
queue=self.queue_name,
|
queue=self.queue_name,
|
||||||
on_message_callback=callback_wrapper,
|
on_message_callback=callback_wrapper,
|
||||||
@@ -68,4 +55,4 @@ class RabbitMQConsumer:
|
|||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error in consumer: {e}")
|
logger.error(f"Error in consumer: {e}")
|
||||||
raise
|
raise
|
||||||
Reference in New Issue
Block a user