Commit d9de0cbc authored by BODERE's avatar BODERE
Browse files

Merge branch 'develop' into 'master'

V6.0.2

Closes #53 et #48

See merge request downloader/downloader_daemon!24
parents 74a658b3 e88372db
......@@ -10,6 +10,8 @@
# message_broker: null
# --
# send_notification_messages: false
# --
# ui_worker: false
# --- Folder paths
# paths:
......@@ -21,18 +23,62 @@
# --- Administrator contact list
# admins: []
# # -- UNUSED?
# # --- EMM Messages
# emm:
# # --
# database:
# address: 127.0.0.1
# user: root
# password: pa55w0rd
# database: downloader_emm
# # --
# enable_urgency_logs: true
# # --
# enable_archive_logs: true
# --- List of output target for writing EMM logs. Available targets:
# --- filesystem: Store emm log files to a directory
# --- archive: Store emm log files inside workspace/log/messages_archive/<YEAR>/<DAY>/
# --- elasticsearch: Push emm messages to an elasticsearch database
# --- rabbitmq: Push emm messages to a RabbitMQ queue
# targets: []
# --- Shared EMM logs
# filesystem:
# # --- Path, either absolute or relative to the workspace
# path: spools/message/
# --- Elasticsearch configuration
# elasticsearch:
# hosts: [ localhost ]
# scheme: http
# user: null
# password: null
# # --- 'index' can contain date information, that is formatted according to
# # --- https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior
# index: dl-emm
# --- RabbitMQ configuration
# rabbitmq:
# host: localhost
# port: 5672
# ssl: false
# user: null
# password: null
# queue_name: dl-emm
# --- job files config
# jobs:
# # --- List of output target for writing job files. Available targets:
# # --- elasticsearch: Push emm jobs to an elasticsearch database
# # --- rabbitmq: Push emm jobs to a RabbitMQ queue
# targets: []
# # --- Elasticsearch configuration
# elasticsearch:
# hosts: []
# scheme: http
# user: null
# password: null
# # --- 'index' can contain date information, that is formatted according to
# # --- https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior
# index: dl-jobs
# # --- RabbitMQ configuration
# rabbitmq:
# host: localhost
# port: 5672
# ssl: false
# user: null
# password: null
# queue_name: dl-jobs
# # --- Python logging configuration
# logs:
......
import os
import logging
from .api.messages import EnhancedFileWriter, IMessageWriter
from .targets.EMMElastic import EMMElastic
from .targets.EMMRabbitMQ import EMMRabbitMQ
class EMMWriter(IMessageWriter):
def __init__(self, globalConfig):
IMessageWriter.__init__(self)
self.__log = logging.getLogger()
self.__file_writer = None
if "filesystem" in globalConfig["emm.targets"]:
path = globalConfig["emm.filesystem.path"]
if os.path.isabs(path):
path = os.path.join(globalConfig["paths.workspace"], path)
self.__file_writer = EnhancedFileWriter(
filename='downloader',
directory=path,
extension='.xml',
celery_config_path=globalConfig.getPath("celery_cfg")
)
# Elasticsearch
self.__emm_elastic = None
if "elasticsearch" in globalConfig["emm.targets"]:
self.__emm_elastic = EMMElastic(
hosts=globalConfig["emm.elasticsearch.hosts"],
scheme=globalConfig["emm.elasticsearch.scheme"],
user=globalConfig["emm.elasticsearch.user"],
password=globalConfig["emm.elasticsearch.password"],
indexNameTemplate=globalConfig["emm.elasticsearch.index"],
sniffCluster=globalConfig["emm.elasticsearch.sniff_cluster"],
)
# RabbitMQ
self.__emm_rabbitmq = None
if "rabbitmq" in globalConfig["emm.targets"]:
self.__emm_rabbitmq = EMMRabbitMQ(
host=globalConfig["emm.rabbitmq.host"],
port=globalConfig["emm.rabbitmq.port"],
ssl=globalConfig["emm.rabbitmq.ssl"],
user=globalConfig["emm.rabbitmq.user"],
password=globalConfig["emm.rabbitmq.password"],
virtual_host=globalConfig["emm.rabbitmq.virtual_host"],
queue_name=globalConfig["emm.rabbitmq.queue_name"],
routing_key=globalConfig["emm.rabbitmq.routing_key"],
)
fileWriter = property(lambda self: self.__file_writer, None, None, None)
def getFilesystemDir(self):
return self.__file_writer.directory
def writeMessage(
self,
message,
dataset_id,
filename=None,
create_missing_dirs=None,
pretty_print=True,
encoding='UTF-8',
xml_declaration=True,
is_send_toRabbit_mq=False):
# Filesystem
if self.__file_writer is not None:
try:
self.__file_writer.writeMessage(
message,
dataset_id,
filename,
create_missing_dirs,
pretty_print,
encoding,
xml_declaration,
is_send_toRabbit_mq, # TODO: find out if this is a duplicate of self.__emm_rabbitmq
)
except Exception as e:
self.__log.exception('EMMWriter::writeMessage: error while writing EMM Message: %s', e)
# Elastic search
if self.__emm_elastic is not None:
self.__emm_elastic.writeMessage(dataset_id, message)
# RabbitMQ
if self.__emm_rabbitmq is not None:
self.__emm_rabbitmq.writeMessage(dataset_id, message)
......@@ -271,14 +271,14 @@ class EnhancedFileWriter(IMessageWriter):
def __init__(
self,
filename=None,
directories=None,
directory=None,
create_missing_dirs=True,
add_uniq_prefix=True,
extension=".xml",
enable=False,
celery_config_path=None):
self.setFilename(filename)
self.setDirectories(directories)
self.setDirectory(directory)
self.setCreateMissingDirs(create_missing_dirs)
self.setAddUniqPrefix(add_uniq_prefix)
self.setExtension(extension)
......@@ -294,6 +294,8 @@ class EnhancedFileWriter(IMessageWriter):
self.celery.conf.update(**conf)
def send_to_rabbitmq(self, file_path, dataset_id):
# TODO: the file writer shouldn't be responsible of sending things to RabbitMQ
# TODO: it's not clear what is sent to RabbitMQ?
processor = 'felyx-miniprod'
self.celery.send_task(
......@@ -306,8 +308,8 @@ class EnhancedFileWriter(IMessageWriter):
def setFilename(self, value):
self.__filename = value
def setDirectories(self, value):
self.__directories = value
def setDirectory(self, value):
self.__directory = value
def setCreateMissingDirs(self, value):
self.__create_missing_dirs = value
......@@ -315,7 +317,7 @@ class EnhancedFileWriter(IMessageWriter):
def setAddUniqPrefix(self, value):
self.__add_uniq_prefix = value
def write(
def writeMessage(
self,
message,
dataset_id,
......@@ -346,71 +348,58 @@ class EnhancedFileWriter(IMessageWriter):
if not f.endswith(self.__extension):
f = f + self.__extension
if len(self.directories) == 0:
raise Exception(
"Cannot write file without at least 1 base directory...")
# building dst_dirs, to retry to message write in the same directory in
# case of failure
dst_dirs = []
for d in self.directories:
dst_dirs.extend([d, d])
write_ok = False
for d in dst_dirs:
try:
rel_filedir = os.path.dirname(f)
abs_filedir = os.path.join(d, rel_filedir)
if not os.path.exists(
abs_filedir) and self.create_missing_dirs:
os.makedirs(abs_filedir)
abs_filepath = os.path.join(d, f)
# check if file already exists, rename it if needed
i = 0
while os.path.exists(abs_filepath):
i += 1
new_fname = '%s_%.5d%s' % (
f.rstrip(self.__extension), i, self.__extension)
abs_filepath = os.path.join(d, new_fname)
tmp_abs_filepath = abs_filepath + '.tmp'
if self.__extension == ".xml":
elementtree = etree.ElementTree(message.getElement())
if LXML_VERSION == 'lxml':
elementtree.write(
tmp_abs_filepath,
pretty_print=pretty_print,
encoding=encoding,
xml_declaration=xml_declaration)
else: # only tested with elementtree.ElementTree
elementtree.write(tmp_abs_filepath, encoding=encoding)
#shutil.move(tmp_abs_filepath, abs_filepath)
else:
file = open(tmp_abs_filepath, "wb")
dict_data = message.parseToJson(message.getElement())
file.write(json.dumps(dict_data, sort_keys=True, indent=4))
file.close()
shutil.move(tmp_abs_filepath, abs_filepath)
except Exception as e:
sys.stderr.write(
"WARNING : EnhancedFileWriter : cannot write file '%s' in directory '%s'. Error = %s. (will try to store message in backup directories : %s)\n" %
(f, d, str(e), str(
self.directories)))
try:
rel_filedir = os.path.dirname(f)
abs_filedir = os.path.join(self.directory, rel_filedir)
if not os.path.exists(
abs_filedir) and self.create_missing_dirs:
os.makedirs(abs_filedir)
abs_filepath = os.path.join(self.directory, f)
# check if file already exists, rename it if needed
i = 0
while os.path.exists(abs_filepath):
i += 1
new_fname = '%s_%.5d%s' % (
f.rstrip(self.__extension), i, self.__extension)
abs_filepath = os.path.join(self.directory, new_fname)
tmp_abs_filepath = abs_filepath + '.tmp'
if self.__extension == ".xml":
elementtree = etree.ElementTree(message.getElement())
if LXML_VERSION == 'lxml':
elementtree.write(
tmp_abs_filepath,
pretty_print=pretty_print,
encoding=encoding,
xml_declaration=xml_declaration)
else: # only tested with elementtree.ElementTree
elementtree.write(tmp_abs_filepath, encoding=encoding)
#shutil.move(tmp_abs_filepath, abs_filepath)
else:
write_ok = True
if is_send_toRabbit_mq:
try:
self.send_to_rabbitmq(abs_filepath, dataset_id)
except Exception as e:
sys.stderr.write(
"WARNING : Send to rabbitmq filepath = '%s' Error = %s %s.)\n" %
(abs_filepath, str(e), traceback.print_exc()))
break # OK, the file is written
file = open(tmp_abs_filepath, "wb")
dict_data = message.parseToJson(message.getElement())
file.write(json.dumps(dict_data, sort_keys=True, indent=4))
file.close()
shutil.move(tmp_abs_filepath, abs_filepath)
except Exception as e:
sys.stderr.write(
"WARNING : EnhancedFileWriter : cannot write file '%s' in directory '%s'. Error = %s\n" %
(f, self.directory, e))
else:
write_ok = True
if is_send_toRabbit_mq:
try:
self.send_to_rabbitmq(abs_filepath, dataset_id)
except Exception as e:
sys.stderr.write(
"WARNING : Send to rabbitmq filepath = '%s' Error = %s %s.)\n" %
(abs_filepath, e, traceback.print_exc()))
if not write_ok:
sys.stderr.write(
......@@ -423,9 +412,9 @@ class EnhancedFileWriter(IMessageWriter):
setFilename,
None,
"Description...")
directories = property(
lambda self: self.__directories,
setDirectories,
directory = property(
lambda self: self.__directory,
setDirectory,
None,
"Description...")
create_missing_dirs = property(
......@@ -574,6 +563,42 @@ class Message(BaseElement):
Message.setTemplate(self, template)
def toDict(self, identifier=None):
def convToString(obj):
from downloader.scheduler.com.sys.File import File
if isinstance(obj, dict):
for key in obj:
obj[key] = convToString(obj[key])
if isinstance(obj, list):
for index in range(len(obj)):
obj[index] = convToString(obj[index])
elif isinstance(obj, File):
obj = obj.path
elif isinstance(obj, Reference):
obj = obj.toDict()
return obj
from copy import deepcopy
return convToString(deepcopy({
"schemaVersion": self.schemaVersion,
"date": self.date,
"level": self.level,
"date": self.date,
"project": self.project,
"type": self.type,
"sender": {"name": self.sender.name, "type": self.sender.type},
"recipient": {"name": self.recipient.name, "type": self.recipient.type},
"summary": self.summary,
"reference": self.reference,
"details": self.details,
"misc_infos": self.misc_infos.pairs if self.misc_infos is not None else None,
"uid": self.uid,
"parent_uid": self.parent_uid,
}))
def toString(
self,
encoding='UTF-8',
......@@ -636,8 +661,8 @@ class Message(BaseElement):
project.text = self.project
if self.type is not None:
type = etree.SubElement(message, "type")
type.text = self.type
msgtype = etree.SubElement(message, "type")
msgtype.text = self.type
if self.sender is not None:
sender = self.sender.getElement()
......@@ -672,13 +697,14 @@ class Message(BaseElement):
parent_uid.text = self.parent_uid
return message
def parseToJson(self, elt):
@staticmethod
def parseToJson(elt):
dict_json = {elt.tag: {} if elt.attrib else None}
children = list(elt)
if children:
ddict = defaultdict(list)
for dictchild in map(self.parseToJson, children):
for dictchild in map(Message.parseToJson, children):
for k, v in list(dictchild.items()):
ddict[k].append(v)
dict_json = {
......@@ -1251,6 +1277,14 @@ class ChainContext(BaseElement):
if self.node_process is None:
self.node_process = template.node_process
def toDict(self):
return {
"chain_controller": self.chain_controller,
"chain_name": self.chain_name,
"node_name": self.node_name,
"node_process": self.node_process,
}
chain_controller = property(
lambda self: self.__chain_controller,
setChainController,
......@@ -1503,6 +1537,12 @@ class Reference(BaseElement):
def setType(self, type):
self.__type = type
def toDict(self):
return {
"name": self.name,
"type": self.type,
}
name = property(lambda self: self.__name, setName, None, "Description...")
type = property(lambda self: self.__type, setType, None, "Description...")
......@@ -1568,6 +1608,23 @@ class ProcessingMessage(Message):
self.chain_context = ChainContext()
self.chain_context.setTemplate(template.chain_context)
def toDict(self):
res = Message.toDict(self)
if "processing" in self.blocList:
res["processing_process_id"] = self.blocList["processing"].process_id
res["processing_start_date"] = self.blocList["processing"].start_date
res["processing_stop_date"] = self.blocList["processing"].stop_date
res["processing_exit_code"] = self.blocList["processing"].exit_code
res["processing_hostname"] = self.blocList["processing"].hostname
res["processing_username"] = self.blocList["processing"].username
res["processing_cmd_line"] = self.blocList["processing"].cmd_line
res["processing_pid"] = self.blocList["processing"].pid
res["processing_logfile"] = self.blocList["processing"].logfile
res["processing_logfile_misc_infos"] = self.blocList["processing"].logfile.misc_infos
if "chain_context" in self.blocList:
res["chain_context"] = self.blocList["chain_context"].toDict()
return res
processing = property(
lambda self: self.blocList['processing'],
setProcessing,
......@@ -1636,6 +1693,12 @@ class DownloadMessage(Message):
else:
self.blocList['download'] = Download()
def toDict(self):
res = Message.toDict(self)
res["download_name"] = self.download.name
res["download_provider"] = self.download.provider
return res
processing = property(
lambda self: self.blocList['processing'],
setProcessing)
......
import elasticsearch
from datetime import datetime
from ..api.messages import IMessageWriter
class EMMElastic(IMessageWriter):
def __init__(self, hosts, scheme="http", user=None, password=None, indexNameTemplate="emm", sniffCluster=False):
"""
hosts: list of elasticsearch hosts (`hostname:port` format)
"""
import logging
self.__log = logging.getLogger("cs")
IMessageWriter.__init__(self)
self._es = elasticsearch.Elasticsearch(
scheme=scheme,
hosts=hosts,
http_auth=None if user is None else (user, password),
# Sniffing detects additional nodes from the same cluster
sniff_on_start=sniffCluster,
sniff_on_connection_fail=sniffCluster,
verify_certs=True,
)
info = self._es.info(ignore=403) # pylint: disable=unexpected-keyword-arg
health = self._es.cluster.health(ignore=403) # pylint: disable=unexpected-keyword-arg
if "status" not in info:
# status key is set if there is an error, i.e. 403
self.__log.info("Elasticsearch cluster: %s, version %s, health=%s nodes=%s",
info["cluster_name"],
info["version"]["number"],
health["status"] if health["status"] == 200 else "Unknown",
[n["host"] for n in self._es.nodes.stats()["nodes"].values()])
self.__log.debug(" full info: %s", info)
if health["status"] == 200:
self.__log.debug(" full health: %s", health)
else:
self.__log.debug("Elasticsearch cluster hosts: %s", hosts)
self.__log.debug(" Client version: %s", ".".join([str(i) for i in elasticsearch.VERSION]))
self.__indexNameTemplate = indexNameTemplate
def writeMessage(self, identifier, message):
"""
Write a full EMM message into the database (the message must not exist)
"""
self._es.index(
index=self.__getIndexName(),
doc_type=message.type,
id=identifier,
body=message.toDict(),
)
def updateMessagePart(self, identifier, messagePart):
"""
Updates a message stored in elasticsearch database (with the document ID and content)
"""
assert isinstance(messagePart, dict)
self._es.update(
index=self.__getIndexName(),
doc_type=messagePart.type,
id=identifier,
body={"doc": messagePart},
)
def __getIndexName(self):
return datetime.now().strftime(self.__indexNameTemplate)
import pika
from datetime import datetime
from ..api.messages import IMessageWriter
from ssl import create_default_context
import threading
import json
connectionPool = {}
def CloseRabbitMQConnections():
for (con, _, _) in connectionPool:
con.close()
class EMMRabbitMQ(IMessageWriter):
def __init__(self,
host="localhost",
port=pika.connection.Parameters.DEFAULT_PORT,
ssl=pika.connection.Parameters.DEFAULT_SSL,
user=None,
password=None,
virtual_host=pika.connection.Parameters.DEFAULT_VIRTUAL_HOST,
queue_name="emm",
routing_key=None,
):
"""
hosts: list of elasticsearch hosts (`hostname:port` format)
"""
IMessageWriter.__init__(self)
import logging
self.__log = logging.getLogger()
sslContext = create_default_context() if ssl is True else None
self.__connectionParams = {
"host": host,
"credentials": pika.credentials.PlainCredentials(user, password) if password is not None else pika.connection.Parameters.DEFAULT_CREDENTIALS,
"ssl_options": pika.SSLOptions(sslContext) if sslContext is not None else None,
"port": port,
"virtual_host": virtual_host,
"blocked_connection_timeout": 900,