Commit c41ced94 authored by BODERE's avatar BODERE
Browse files

Merge branch 'hotfix_6.0.3' into 'master'

Hotfix 6.0.3

Closes #49

See merge request downloader/downloader_daemon!25
parents d9de0cbc bd1712a4
......@@ -12,12 +12,15 @@ class EMMWriter(IMessageWriter):
def __init__(self, globalConfig):
IMessageWriter.__init__(self)
self.projet_name = globalConfig["emm.project"]
self.message_type = globalConfig["emm.message_type"]
self.__log = logging.getLogger()
self.__file_writer = None
if "filesystem" in globalConfig["emm.targets"]:
path = globalConfig["emm.filesystem.path"]
if os.path.isabs(path):
if not os.path.isabs(path):
path = os.path.join(globalConfig["paths.workspace"], path)
self.__file_writer = EnhancedFileWriter(
......@@ -69,6 +72,10 @@ class EMMWriter(IMessageWriter):
xml_declaration=True,
is_send_toRabbit_mq=False):
# overwrite project and type
message.project = self.projet_name
message.type = self.message_type
# Filesystem
if self.__file_writer is not None:
try:
......@@ -92,3 +99,6 @@ class EMMWriter(IMessageWriter):
# RabbitMQ
if self.__emm_rabbitmq is not None:
self.__emm_rabbitmq.writeMessage(dataset_id, message)
def setProjectName(self, project_name):
self.projet_name = project_name
\ No newline at end of file
......@@ -357,15 +357,15 @@ class File(object):
else:
# create a explicite new file name for the uncompressed file
unzipFile = File(self.getName() + ".un" + ct + "ed")
# Open the current compressed file
self.open("r")
# Open the new uncompressed file
if comptype == File.ZIP:
# Open the current compressed file
self.open("r")
filenames = self.__fd.namelist()
if len(filenames) > 1:
raise Exception("ZipFile must only contains one file")
# Open the new uncompressed file
_filepath, name = os.path.split(filenames[0])
unzipFile = File(os.path.join(self.__path, name))
unzipFile.open("wb")
......@@ -377,8 +377,12 @@ class File(object):
self.close()
unzipFile.close()
raise
elif comptype == File.LZW:
else:
# Open the current compressed file
self.open("rb")
if comptype == File.LZW:
data_uncompress = True
# Open the new uncompressed file
unzipFile.open("wb")
try:
data = self.__fd.read()
......@@ -389,6 +393,7 @@ class File(object):
unzipFile.close()
raise
else:
# Open the new uncompressed file
unzipFile.open("wb")
# do the uncompression copy (with bufferization)
buff = True
......
......@@ -623,6 +623,7 @@ class ConfigurationSettings(object):
self.__maxNbOfConcurrentStreams = None
self.__maxNbOfLinesInAutoListing = None
self.__monitoring = None
self.__projectName = None
def Read(self):
......@@ -704,10 +705,15 @@ class ConfigurationSettings(object):
self.__monitoring = True
if self.__xr.haveSubNode(self.__node, 'monitoring'):
monitoring = self.__xr.getSubNodeValue(logging.WARNING, self.__node, 'monitoring', "")
monitoring = self.__xr.getSubNodeValue(logging.WARNING,
self.__node, 'monitoring', "")
if monitoring is not None and monitoring.lower() == 'false':
self.__monitoring = False
if self.__xr.haveSubNode(self.__node, 'projet_name'):
self.__projectName = self.__xr.getSubNodeValue(logging.WARNING,
self.__node, 'projet_name')
return result
database_path = property(
......@@ -780,3 +786,9 @@ class ConfigurationSettings(object):
None,
None,
"read only monitoring variable")
projectName = property(
lambda self: self.__projectName,
None,
None,
"read only projectName variable")
......@@ -640,6 +640,9 @@ class Download(threading.Thread):
# ==============================================================================================================
self._log.debug(" --> apply downloader configuration : settings")
if self.__configuration.settings.projectName is not None:
self.__emm_writer.setProjectName(self.__configuration.settings.projectName)
if self.__configuration.settings.nbRetries is not None:
self.__misc_param_nb_retry = self.__configuration.settings.nbRetries
......@@ -2575,7 +2578,7 @@ class Download(threading.Thread):
else:
dl_state = STATE_DOWNLOAD_ERROR
self._log.debug("updateStateDownload of %s state : %d" % (filepath, dl_state))
self._log.debug("updateStateDownload of %s state IN : %d" % (filepath, dl_state))
# NetCDF Datareader Case: Searching for Sensingtime in Global NetCDF Attributes
if source_file.sensingtime is None:
source_file.sensingtime = self.__data_reader.getDate(downloaded_file.localFile.getName())
......@@ -2586,7 +2589,7 @@ class Download(threading.Thread):
self._log.debug("updateStateDownload of %s state None ?????????" % (filepath))
msg_level = 'WARNING'
else:
self._log.debug("updateStateDownload of %s state : %d" % (filepath, db_state))
self._log.debug("updateStateDownload of %s state OUT: %d" % (filepath, db_state))
if msg_level == 'INFO':
m.level = messages.INFO
......
......@@ -30,6 +30,8 @@ class GlobalConfig(object):
admins: !!list []
emm:
project: CERSAT
message_type: download
targets: []
filesystem:
path: spools/message/
......@@ -66,9 +68,34 @@ class GlobalConfig(object):
user: null
password: null
virtual_host: "/"
queue_name: dl-jobs
queue_name: dl-jobsif
routing_key: null
ui_worker:
queue: #{queue_name: { queue_arguments: {alias : alias_downloader_name}}}
name: UI.host.0
alias: alias_downloader_name
broker_url: #"amqp://user:password@host:5672/vhost"
host: localhost
port: 5672
user: null
password: null
virtual_host: "/"
broker_heartbeat: 30
imports: [downloader.worker.celery_tasks]
result_backend: amqp
result_expires: 300
task_serializer: json
result_serializer: json
timezone: Europe/Paris
enable_utc: True
result_compression: bzip2
worker_log_color: True
accept_content: [json]
auto_delete: True
expires: 15.0
exclusive: True
logs:
default:
root:
......
......@@ -232,11 +232,16 @@ class Scheduler(object):
os.makedirs(path)
# Setup file logging target
logFile = File(os.path.join(self.__globalConfig.getPath("process_log_archive"), "scheduler.log"))
if not logFile.acquireLock("%s%d" % (gethostname(), os.getpid())):
if options.force:
lockfile = logFile.acquireLock("%s-%d %s" % (gethostname(), os.getpid(), self.__version))
if not lockfile and options.force:
self._log.info("Forcing the deletion of the lock file (%s)", logFile.getName() + ".lock")
else:
os.remove(logFile.getName() + ".lock")
lockfile = logFile.acquireLock("%s-%d %s" % (gethostname(), os.getpid(), self.__version))
if not lockfile:
self._log.error("!" * 100)
self._log.error("A session of Scheduler for this DOWNLOADER_WORKSPACE, may be already run !")
self._log.error("Check the lock file %s before restart the scheduler",
......
......@@ -41,8 +41,6 @@ class WorkerUI(threading.Thread):
self._stopevent = threading.Event()
configFile = self.__globalConfig.getPath("celery_cfg")
pluginFile = os.path.join(self.__globalConfig.getPath("dynamic_plugins"), CONFIG_WORKER + ".py")
plugin = open(pluginFile, "w")
......@@ -50,6 +48,37 @@ class WorkerUI(threading.Thread):
line = "downloader_configfile='" + os.path.abspath(self.__globalConfig.filePath) + "'\n"
plugin.write(line + "\n")
#Add queue
for key in self.__globalConfig["ui_worker"].keys():
if key == 'queue':
line = "task_queues={" + \
"'" + self.__globalConfig["ui_worker.queue.name"] + "': " +\
"{'queue_arguments': " +\
"{'alias': '" + self.__globalConfig["ui_worker.queue.alias"] + "'}," + \
"'vhost': '" + self.__globalConfig["ui_worker.broker_url.virtual_host"] + "'}" +\
"}"
self.__queueName = self.__globalConfig["ui_worker.queue.name"]
elif key == 'broker_url':
line = "broker_url="
line += "'amqp://"
line += str(self.__globalConfig["ui_worker.broker_url.user"]) + ":"
line += str(self.__globalConfig["ui_worker.broker_url.password"]) + "@"
line += str(self.__globalConfig["ui_worker.broker_url.host"]) + ":"
line += str(self.__globalConfig["ui_worker.broker_url.port"]) + "/"
line += str(self.__globalConfig["ui_worker.broker_url.virtual_host"]) + "'"
else:
entry = "ui_worker." + key
if isinstance(self.__globalConfig[entry], (str, bool)):
value = "'" + str(self.__globalConfig[entry]) + "'"
else:
value = str(self.__globalConfig[entry])
line = key + "=" + value
plugin.write(line + "\n")
"""
configFile = self.__globalConfig.getPath("celery_cfg")
# search (and replace default queue name) UI queue name in configuration file
with open(configFile, 'r') as f:
config = simplejson.loads(f.read())
......@@ -68,7 +97,7 @@ class WorkerUI(threading.Thread):
self.__queueName = queue
self._log.debug("config queue OK")
break
"""
plugin.close()
def run(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment