Commit e72d65c2 authored by BODERE's avatar BODERE
Browse files

Merge branch 'hotfix_6.0.6' into 'master'

Hotfix 6.0.6

Closes #56

See merge request downloader/downloader_daemon!31
parents dafeb5cb b0c694f9
......@@ -14,18 +14,18 @@ import shutil
import random
import threading
import time
import traceback
import pwd
import grp
import datetime
from socket import gethostname
from xml.parsers.expat import ExpatError
from xml.etree.ElementTree import ParseError
from string import capwords
try:
from queue import Queue
except:
from Queue import Queue # Python2 compatibility
from xml.parsers.expat import ExpatError
from xml.etree.ElementTree import ParseError
from ifr_lib.ifr_os import is_io_locked
from dchecktools.common.basefileinfos import FILE_URL_SEPARATOR
from downloader.emm.api import messages
......@@ -384,6 +384,9 @@ class Download(threading.Thread):
self._stopevent = threading.Event()
self.__endingMessage = ""
self.__endingMStatus = "INFO"
emm_dir = [self.__globalConfig.getPath('messages_to_log')]
emm_dir.append(Controller.EMM_STORAGES)
......@@ -3107,18 +3110,68 @@ class Download(threading.Thread):
If it is modified, the current thread is stopped.
"""
self._log.debug("Detect if a new configuration is available")
try:
mtime = File(self.__download_conf_file).getTime()
is_new_configuration = (
mtime != self.__current_mtime_download_conf_file)
if is_new_configuration:
self._log.debug(
"New configuration detected %s, new mtime : %s, old mtime : %s " %
(self.__download_conf_file, mtime, self.__current_mtime_download_conf_file))
self.__current_mtime_download_conf_file = mtime
self._stopevent.set()
mtime = File(self.__download_conf_file).getTime()
is_new_configuration = (
mtime != self.__current_mtime_download_conf_file)
if is_new_configuration:
self._log.debug(
"New configuration detected %s, new mtime : %s, old mtime : %s " %
(self.__download_conf_file, mtime, self.__current_mtime_download_conf_file))
self.__current_mtime_download_conf_file = mtime
self.__endingMessage = "A new configuration detected"
self.__endingMStatus = "WARNING"
self._stopevent.set()
except OSError:
self._stopevent.set()
self.__endingMessage = "The configuration file is not found"
self._log.exception("The configuration file is not found")
self.__endingMStatus = "WARNING"
self._stopevent.set()
"""
try:
if not is_io_locked( os.path.split(self.__download_conf_file)[0],
timeout=2.0):
try:
mtime = File(self.__download_conf_file).getTime()
is_new_configuration = (
mtime != self.__current_mtime_download_conf_file)
if is_new_configuration:
self._log.debug(
"New configuration detected %s, new mtime : %s, old mtime : %s " %
(self.__download_conf_file, mtime, self.__current_mtime_download_conf_file))
self.__current_mtime_download_conf_file = mtime
self.__endingMessage = "A new configuration detected"
self._stopevent.set()
except OSError:
self.__endingMessage = "The configuration file is not found"
self._stopevent.set()
except:
self._log.error(
f"[download_id={self.id}] : folder containing the configuration files not found.")
m = messages.MessageFactory().create({
'type': 'downloadMessage',
'level': 'ERROR',
'summary': '[%s] configuration file not found' % (self.id),
'details': "The folder '%s' containing the configuration files is not found." %
(os.path.split(self.__download_conf_file)[0]),
'reference_type': 'data',
'reference_name': self.__download_conf_file,
'processing_start_date': self.__emm_start_date,
'processing_hostname': gethostname(),
'processing_pid': os.getpid(),
'download_name': self.id,
'download_provider': self.__remote_storage_provider_ref,
}, template=Controller.TEMPLATE_DOWNLOAD_MESSAGE)
try:
self.__emm_writer.writeMessage(m, self.__id)
except Exception as e:
self._log.exception('Message write error : %s' % (str(e)))
"""
# Thread entry point, called by YourDownload.start()
def run(self):
......@@ -3282,14 +3335,41 @@ class Download(threading.Thread):
self.isNewConfiguration()
except BaseException:
traceback.print_exc()
self._log.info("Download thread %s is dead !" % (self.__id))
self._log.exception("Download thread %s is dead !" % (self.__id))
self.__endingMessage = "A not catchable error has occurred"
self.__endingMStatus = "ERROR"
if not self.__isWaitSema:
self._log.debug(" [download_id=%s] Release sema ." % (self.id))
self.__sema.release()
self.__isWaitSema = True
self._log.debug('-- isWaitSema : %s' % (self.__isWaitSema))
if self.__endingMStatus == "ERROR":
self._log.error(f'[{self.id}] the download is stopped ({self.__endingMessage})')
elif self.__endingMStatus == "WARNING":
self._log.warning(f'[{self.id}] the download is stopped ({self.__endingMessage})')
else:
self._log.info(f'[{self.id}] the download is stopped ({self.__endingMessage})')
m = messages.MessageFactory().create({
'type': 'downloadMessage',
'level': self.__endingMStatus,
'summary': '[%s] the download is stopped' % (self.id),
'details': '%s' % (self.__endingMessage),
'reference_type': 'data',
'reference_name': self.__download_conf_file,
'processing_start_date': self.__emm_start_date,
'processing_hostname': gethostname(),
'processing_pid': os.getpid(),
'download_name': self.id,
'download_provider': self.__remote_storage_provider_ref,
}, template=Controller.TEMPLATE_DOWNLOAD_MESSAGE)
try:
self.__emm_writer.writeMessage(m, self.__id)
except Exception as e:
self._log.exception('Message write error : %s' % (str(e)))
# Thread termination
self._log.info("Download thread '%s' terminated !" % (self.__id))
self.__closeSession(self.__session)
......@@ -3308,6 +3388,7 @@ class Download(threading.Thread):
Stop the thread
"""
self._log.info(" Join %s..." % (self.__id))
self.__endingMessage = "The stop of download is requested"
self._stopevent.set()
while self.isAlive():
self._log.info(" waiting end of download '%s' (state = %s)" % (
......
......@@ -68,6 +68,7 @@ class SynchronisationLocalRemote(object):
self.__dp_pattern_filter = None
self.__state = None
self.__localState = None
self.__jobFile = None
self.__compression_action = None
......@@ -135,28 +136,35 @@ class SynchronisationLocalRemote(object):
self.__state = None
self.__jobFile = None
else:
self.__jobFile.seek(0)
self.__state['Download'] = self.__id
self.__state['Host'] = gethostname()
self.__state['Pid'] = os.getpid()
self.__state['State'] = state[0]
self.__state['Details'] = state[1]
self.__state['Date'] = datetime.datetime.utcnow().strftime(
self.__localState = {}
self.__localState['Download'] = self.__id
self.__localState['Host'] = gethostname()
self.__localState['Pid'] = os.getpid()
self.__localState['State'] = state[0]
self.__localState['Details'] = state[1]
self.__localState['Date'] = datetime.datetime.utcnow().strftime(
messages.MSG_DATE_FORMAT)
self.__state['redownload_mode'] = self.redownload_mode
self.__localState['redownload_mode'] = self.redownload_mode
#self.__state['Output'] = self.__output_filepath
self.syncJobState()
self.__state.sync()
except Exception:
self.__log.exception("Cannot update jobstate file (state=%s). Skipping update", state)
def updateJobState(self, key, value):
try:
if self.__state is not None:
self.__state[key] = value
if self.__localState is not None:
self.__localState[key] = value
self.syncJobState()
self.__state.sync()
except Exception:
self.__log.exception("Cannot update jobstate file (key/value=%s/%s). Skipping update", key, value)
def syncJobState(self):
self.__log.info(f"====================> sync localState = {self.__localState}")
for k in self.__localState:
self.__state[k] = self.__localState[k]
def setLocalUrl(self, local_url):
self.__local_storage_repository = local_url
......@@ -172,6 +180,7 @@ class SynchronisationLocalRemote(object):
def unLock(self):
if self.__jobFile is not None:
self.__jobFile.unLock()
self.syncJobState()
self.__state.close(False)
if self.__history_path is not None:
history_file = self.__jobFile.copy(self.__history_path)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment