Commit 23d019cf authored by MAISSIAT's avatar MAISSIAT
Browse files

Fix # 93: catch rabbitmq ChannelWrongStateError exceptions

parent c5df2079
......@@ -46,64 +46,73 @@ class PublishRabbitMQ():
self.__queue_name = queue_name
self.__routing_key = routing_key
def connect(self, threadID, connection=None, channel=None):
if not connection or connection.is_closed or channel.is_closed:
new_connection = pika.BlockingConnection(pika.ConnectionParameters(**self.__connectionParams))
new_channel = new_connection.channel()
connectionPool[threadID] = (new_connection, new_channel, None)
def publish(self, identifier, message, content_type):
"""
Write a full job into the database (the job must not exist)
"""
assert isinstance(message, dict), "message must be a dict"
def customSerialize(value):
if isinstance(value, datetime):
return value.isoformat()
assert 0, "Unknown type %s for %s" % (type(value), value)
return ""
# Handle rabbitMQ connections per thread
threadID = threading.get_ident()
if threadID not in connectionPool:
# Create connection + channel
connection = pika.BlockingConnection(pika.ConnectionParameters(**self.__connectionParams))
channel = connection.channel()
connectionPool[threadID] = (connection, channel, None)
self.connect(threadID)
else:
(connection, channel, lastQueueName) = connectionPool[threadID]
self.connect(threadID, connection, channel)
(connection, channel, lastQueueName) = connectionPool[threadID]
def customSerialize(value):
if isinstance(value, datetime):
return value.isoformat()
assert 0, "Unknown type %s for %s" % (type(value), value)
return ""
published = False
retry = False
fail = False
while not published and not fail:
queueName = self.__queue_name
try:
if queueName != lastQueueName:
# Declare queue
channel.queue_declare(
queue=queueName,
durable=True,
)
queueName = datetime.now().strftime(self.__queue_name)
if queueName != lastQueueName:
# Declare queue
channel.queue_declare(
queue=queueName,
durable=True,
)
lastQueueName = queueName
lastQueueName = queueName
# Update stored values
connectionPool[threadID] = (connection, channel, lastQueueName)
try:
channel.basic_publish(exchange="",
routing_key=self.__routing_key if self.__routing_key is not None else queueName,
properties=pika.spec.BasicProperties(
message_id=identifier,
content_type=content_type,
),
body=json.dumps(message, default=customSerialize))
channel.basic_publish(
exchange="",
routing_key=self.__routing_key if self.__routing_key is not None else queueName,
properties=pika.spec.BasicProperties(
message_id=identifier,
content_type=content_type,
),
body=json.dumps(message, default=customSerialize)
)
published = True
except (pika.exceptions.StreamLostError, pika.exceptions.ChannelWrongStateError) as e:
except (pika.exceptions.StreamLostError,
pika.exceptions.AMQPConnectionError,
pika.exceptions.ChannelWrongStateError) as e:
if retry:
fail = True;
fail = True
self.__log.error(f"The message cannot be written (error {e}), message :\n{message}")
else:
connection = pika.BlockingConnection(pika.ConnectionParameters(**self.__connectionParams))
channel = connection.channel()
lastQueueName = None
self.connect(threadID)
connectionPool[threadID] = (connection, channel, lastQueueName)
self.__log.info(f"Try to write a message again (error {e})")
retry = True
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment