Commit 9bdfc0be authored by BODERE's avatar BODERE
Browse files

Merge branch 'hotfix_7.0.4' into 'master'

Fix #93: additional correction

Closes #93

See merge request !54
parents 3f732528 395aa339
......@@ -46,12 +46,6 @@ class PublishRabbitMQ():
self.__queue_name = queue_name
self.__routing_key = routing_key
def connect(self, threadID, connection=None, channel=None):
if not connection or connection.is_closed or channel.is_closed:
new_connection = pika.BlockingConnection(pika.ConnectionParameters(**self.__connectionParams))
new_channel = new_connection.channel()
connectionPool[threadID] = (new_connection, new_channel, None)
def publish(self, identifier, message, content_type):
"""
......@@ -65,34 +59,39 @@ class PublishRabbitMQ():
assert 0, "Unknown type %s for %s" % (type(value), value)
return ""
queueName = self.__queue_name
connection = None
channel = None
lastQueueName = None
# Handle rabbitMQ connections per thread
threadID = threading.get_ident()
if threadID not in connectionPool:
# Create connection + channel
self.connect(threadID)
else:
if threadID in connectionPool:
(connection, channel, lastQueueName) = connectionPool[threadID]
self.connect(threadID, connection, channel)
(connection, channel, lastQueueName) = connectionPool[threadID]
published = False
retry = False
fail = False
while not published and not fail:
queueName = self.__queue_name
try:
if not connection or connection.is_closed or channel.is_closed:
# connection is not valid
connection = pika.BlockingConnection(
pika.ConnectionParameters(**self.__connectionParams)
)
channel = connection.channel()
lastQueueName = None
connectionPool[threadID] = (connection, channel, lastQueueName)
if queueName != lastQueueName:
# Declare queue
channel.queue_declare(
queue=queueName,
durable=True,
)
lastQueueName = queueName
# Update stored values
connectionPool[threadID] = (connection, channel, lastQueueName)
# Update stored values
connectionPool[threadID] = (connection, channel, lastQueueName)
channel.basic_publish(
exchange="",
......@@ -110,9 +109,11 @@ class PublishRabbitMQ():
pika.exceptions.ChannelWrongStateError) as e:
if retry:
fail = True
self.__log.error(f"The message cannot be written (error {e}), message :\n{message}")
self.__log.error(f"The message type '{content_type}' "
f"cannot be written (error {e}), "
f"message :\n{message}")
else:
self.connect(threadID)
self.__log.info(f"Try to write a message again (error {e})")
# reset connection
connection = None
retry = True
self.__log.warning(f"Try to write the message type '{content_type} 'again (error {e})")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment