Commit c3b7efb6 authored by BODERE's avatar BODERE
Browse files

Merge branch 'hotfix_6.0.9' into 'master'

Hotfix 6.0.9

Closes #62 et #60

See merge request downloader/downloader_daemon!37
parents 4e65506b da483dce
......@@ -11,3 +11,4 @@ bcauseur <bcauseur@sii.fr>
jfpiolle <jfpiolle@ifremer.fr>
medspi <medspi@br156-133.ifremer.fr>
pifgold <pifgold@free.fr>
pm22d12 <occulterai purees fonctionnais>
#
# -*- coding: UTF-8 -*-
#
import os
import logging
from datetime import datetime, timedelta
from eccodes import *
from downloader.scheduler.sc.IDataReader import IDataReader
class GribDataReader(IDataReader):
Table4 = {"m": 60,
"h": 3600,
"D": 3600 * 24,
"3h": 3600 * 3,
"6h": 3600 * 6,
"12h": 3600 * 12,
"15m": 60 * 15,
"30m": 60 * 30,
"s": 1
}
def __init__(self, loggerName, dummy1, dummy2, storagePath):
self._log = logging.getLogger(loggerName)
self._log.debug("GribDataReader: storagePath %s'" %
(storagePath))
self.storage_path = storagePath
def getRelativePath(self, filepath):
filename = self.getStorageName(filepath)
dt = self.getDate(filepath)
if self.storage_path is not None and dt is not None:
filedir = dt.strftime(self.storage_path)
return os.path.join(filedir, filename)
else:
filepath = filepath.lstrip('/')
return filepath
def getStorageName(self, filepath):
filename = os.path.split(filepath)[1]
return filename
def getDate(self, filepath):
grip_type = None
first_grib = None
data_date = None
data_time = None
step_units = None
data_datetime = 0
forecastTime = None
dt = None
if os.path.isfile(filepath): # if file is a local file
try:
with open(filepath, 'rb') as f:
first_grib = codes_grib_new_from_file(f)
grip_type = codes_get(first_grib, "editionNumber")
data_date = codes_get_string(first_grib, "dataDate")
data_time = codes_get_string(first_grib, "dataTime")
step_units = codes_get_string(first_grib, "stepUnits")
data_datetime = datetime.strptime(data_date + data_time, "%Y%m%d%H%M")
if grip_type == 1:
forecastTime = codes_get(first_grib, "startStep")
else:
forecastTime = codes_get(first_grib, "forecastTime")
forecastTimeSeconds = forecastTime * self.Table4[step_units]
dt = data_datetime + timedelta(seconds=forecastTimeSeconds)
except Exception as e:
self._log.exception(
"GribDataReader::getDate : Unable to get date for file '%s' "
"(type=%s, date=%s time=%s startStep/forecastTime=%s stepUnits=%s) " %
(filepath, str(grip_type), data_date,
dataTime, str(forecastTime), str(step_units)))
raise e
return dt
if __name__ == '__main__':
filepath = '/export/home1/pmaissiat/downloader_daemon/unittests/dataset/data/Test_grib/data1' #"C:\\Cersat\\Daemon\\unittests\\dataset\\data\\Test_grib\\data1"
logging.basicConfig()
date_name = "first_meas_time"
date_format = "%Y-%m-%d %H:%M:%S.%f"
storage_path = "%Y/%j"
print("GRIB1")
reader = GribDataReader('root', date_name, date_format, storage_path)
print("getStorageName() => ", reader.getStorageName(filepath))
print("getDate() => ", reader.getDate(filepath))
print("getRelativePath() => ", reader.getRelativePath(filepath))
print("\nGRIB2")
filepath = '/export/home1/pmaissiat/downloader_daemon/unittests/dataset/data/Test_grib/data2'
reader = GribDataReader('root', date_name, date_format, storage_path)
print("getStorageName() => ", reader.getStorageName(filepath))
print("getDate() => ", reader.getDate(filepath))
print("getRelativePath() => ", reader.getRelativePath(filepath))
......@@ -79,4 +79,5 @@ def DataReaderFactory(
loggerName, regexpDate, dateFormat, storagePath)
else:
return know_classes[classname](loggerName)
return know_classes[classname](
loggerName, regexpDate, dateFormat, storagePath)
......@@ -308,6 +308,8 @@ class Protocol_https_directorylist(AbstractProtocol):
mtime_str = fnames[fname][1]
if mtime_str != '' and mtime_str is not None:
mtime_value = time.mktime(time.strptime(mtime_str, self.date_format))
else:
mtime_value = 0
size_str = fnames[fname][2]
size_value = 0
if not isdirectory:
......
......@@ -6,6 +6,7 @@ from .api.messages import EnhancedFileWriter, IMessageWriter
from .targets.EMMElastic import EMMElastic
from .targets.EMMRabbitMQ import EMMRabbitMQ
from downloader.scheduler.com.ext.PublishRabbitMQ import PublishRabbitMQ
class EMMWriter(IMessageWriter):
......@@ -44,7 +45,7 @@ class EMMWriter(IMessageWriter):
# RabbitMQ
self.__emm_rabbitmq = None
if "rabbitmq" in globalConfig["emm.targets"]:
self.__emm_rabbitmq = EMMRabbitMQ(
self.__emm_rabbitmq = PublishRabbitMQ(
host=globalConfig["emm.rabbitmq.host"],
port=globalConfig["emm.rabbitmq.port"],
ssl=globalConfig["emm.rabbitmq.ssl"],
......@@ -95,7 +96,10 @@ class EMMWriter(IMessageWriter):
# RabbitMQ
if self.__emm_rabbitmq is not None:
self.__emm_rabbitmq.publish(dataset_id, message.toDict(), self.message_type)
"""
self.__emm_rabbitmq.writeMessage(dataset_id, message)
"""
def setProjectName(self, project_name):
self.projet_name = project_name
......@@ -321,8 +321,8 @@ class EnhancedFileWriter(IMessageWriter):
if self.add_uniq_prefix:
dt = datetime.datetime.utcnow()
timestamp_string = dt.strftime(MSG_DATE_FORMAT)
uniq_prefix = "%s-%s-%s-%s_" % (timestamp_string, gethostname(), os.getpid(
), threading.current_thread().name.split('-')[1][-7:].zfill(7))
uniq_prefix = "%s-%s-%s-%s_" % (timestamp_string, gethostname(),
os.getpid(), threading.current_thread().name.split('-')[1][-7:].zfill(7))
suffix = "_%.6d" % (dt.microsecond)
f_path, f_name = os.path.split(f)
f = os.path.join(f_path, uniq_prefix + f_name.rstrip(self.__extension) + suffix)
......
import pika
from datetime import datetime
from ..api.messages import IMessageWriter
from ssl import create_default_context
import threading
import json
connectionPool = {}
def CloseRabbitMQConnections():
for (con, _, _) in connectionPool:
con.close()
class EMMRabbitMQ(IMessageWriter):
def __init__(self,
host="localhost",
port=pika.connection.Parameters.DEFAULT_PORT,
ssl=pika.connection.Parameters.DEFAULT_SSL,
user=None,
password=None,
virtual_host=pika.connection.Parameters.DEFAULT_VIRTUAL_HOST,
queue_name="emm",
routing_key=None,
):
"""
hosts: list of elasticsearch hosts (`hostname:port` format)
"""
IMessageWriter.__init__(self)
import logging
self.__log = logging.getLogger()
sslContext = create_default_context() if ssl is True else None
self.__connectionParams = {
"host": host,
"credentials": pika.credentials.PlainCredentials(user, password) if password is not None else pika.connection.Parameters.DEFAULT_CREDENTIALS,
"ssl_options": pika.SSLOptions(sslContext) if sslContext is not None else None,
"port": port,
"virtual_host": virtual_host,
"blocked_connection_timeout": 0,
}
self.__queue_name = queue_name
self.__routing_key = routing_key
def writeMessage(self, identifier, message):
"""
Write a full EMM message into the database (the message must not exist)
"""
# Handle rabbitMQ connections per thread
threadID = threading.get_ident()
if threadID not in connectionPool:
# Create connection + channel
connection = pika.BlockingConnection(pika.ConnectionParameters(**self.__connectionParams))
channel = connection.channel()
connectionPool[threadID] = (connection, channel, None)
(connection, channel, lastQueueName) = connectionPool[threadID]
def customSerialize(value):
if isinstance(value, datetime):
return value.isoformat()
assert 0, "Unknown type %s for %s" % (type(value), value)
return ""
published = False
retry = False
fail = False
while not published and not fail:
queueName = datetime.now().strftime(self.__queue_name)
if queueName != lastQueueName:
# Declare queue
channel.queue_declare(
queue=queueName,
durable=True,
)
lastQueueName = queueName
# Update stored values
connectionPool[threadID] = (connection, channel, lastQueueName)
try:
channel.basic_publish(exchange="",
routing_key=self.__routing_key if self.__routing_key is not None else queueName,
properties=pika.spec.BasicProperties(
message_id=identifier,
content_type=message.type,
),
body=json.dumps(message.toDict(), default=customSerialize))
published = True;
except pika.exceptions.StreamLostError:
if retry:
fail = True;
else:
connection = pika.BlockingConnection(pika.ConnectionParameters(**self.__connectionParams))
channel = connection.channel()
lastQueueName = None
connectionPool[threadID] = (connection, channel, lastQueueName)
self.__log.info("Try to write a message again")
retry = True
\ No newline at end of file
......@@ -13,16 +13,16 @@ def CloseRabbitMQConnections():
con.close()
class JobsRabbitMQ():
class PublishRabbitMQ():
def __init__(self,
host="localhost",
port=pika.connection.Parameters.DEFAULT_PORT,
ssl=False,
ssl=pika.connection.Parameters.DEFAULT_SSL,
user=None,
password=None,
virtual_host=pika.connection.Parameters.DEFAULT_VIRTUAL_HOST,
queue_name="jobs",
queue_name="dl-****",
routing_key=None,
):
"""
......@@ -40,16 +40,17 @@ class JobsRabbitMQ():
"ssl_options": pika.SSLOptions(sslContext) if sslContext is not None else None,
"port": port,
"virtual_host": virtual_host,
"blocked_connection_timeout": 0,
}
self.__queue_name = queue_name
self.__routing_key = routing_key
def update(self, identifier, state):
def publish(self, identifier, message, content_type):
"""
Write a full job into the database (the job must not exist)
"""
assert isinstance(state, dict), "state must be a dict"
assert isinstance(message, dict), "message must be a dict"
# Handle rabbitMQ connections per thread
threadID = threading.get_ident()
......@@ -90,9 +91,9 @@ class JobsRabbitMQ():
routing_key=self.__routing_key if self.__routing_key is not None else queueName,
properties=pika.spec.BasicProperties(
message_id=identifier,
content_type="Job",
content_type=content_type,
),
body=json.dumps(state, default=customSerialize))
body=json.dumps(message, default=customSerialize))
published = True
except pika.exceptions.StreamLostError:
......@@ -105,4 +106,4 @@ class JobsRabbitMQ():
connectionPool[threadID] = (connection, channel, lastQueueName)
self.__log.info("Try to write a message again")
retry = True
\ No newline at end of file
retry = True
#
# -*- coding: UTF-8 -*-
#
import os
import logging
from datetime import datetime, timedelta
from eccodes import *
from downloader.scheduler.sc.IDataReader import IDataReader
class GribDataReader(IDataReader):
ONLY_LOCAL = True
DIRECTORY = False
TABLE4 = {"m": 60,
"h": 3600,
"D": 3600 * 24,
"3h": 3600 * 3,
"6h": 3600 * 6,
"12h": 3600 * 12,
"15m": 60 * 15,
"30m": 60 * 30,
"s": 1
}
def __init__(self, loggerName, dummy1, dummy2, storagePath):
self._log = logging.getLogger(loggerName)
self._log.debug("GribDataReader: storagePath %s'" %
(storagePath))
self.storage_path = storagePath
def getRelativePath(self, filepath):
filename = self.getStorageName(filepath)
dt = self.getDate(filepath)
if self.storage_path is not None and dt is not None:
filedir = dt.strftime(self.storage_path)
return os.path.join(filedir, filename)
else:
filepath = filepath.lstrip('/')
return filepath
def getStorageName(self, filepath):
filename = os.path.split(filepath)[1]
return filename
def getDate(self, filepath):
grip_type = None
first_grib = None
data_date = None
data_time = None
step_units = None
data_datetime = 0
forecastTime = None
dt = None
if os.path.isfile(filepath): # if file is a local file
try:
with open(filepath, 'rb') as f:
first_grib = codes_grib_new_from_file(f)
grip_type = codes_get(first_grib, "editionNumber")
data_date = codes_get_string(first_grib, "dataDate")
data_time = codes_get_string(first_grib, "dataTime")
step_units = codes_get_string(first_grib, "stepUnits")
data_datetime = datetime.strptime(data_date + data_time, "%Y%m%d%H%M")
if grip_type == 1:
forecastTime = codes_get(first_grib, "startStep")
else:
forecastTime = codes_get(first_grib, "forecastTime")
forecastTimeSeconds = forecastTime * self.TABLE4[step_units]
dt = data_datetime + timedelta(seconds=forecastTimeSeconds)
except Exception as e:
self._log.exception(
"GribDataReader::getDate : Unable to get date for file '%s' "
"(type=%s, date=%s time=%s startStep/forecastTime=%s stepUnits=%s) " %
(filepath, str(grip_type), data_date,
data_time, str(forecastTime), str(step_units)))
raise e
return dt
if __name__ == '__main__':
filepath = '/export/home1/pmaissiat/downloader_daemon/unittests/dataset/data/Test_grib/data1'
logging.basicConfig()
date_name = "first_meas_time"
date_format = "%Y-%m-%d %H:%M:%S.%f"
storage_path = "%Y/%j"
print("GRIB1")
reader = GribDataReader('root', date_name, date_format, storage_path)
print("getStorageName() => ", reader.getStorageName(filepath))
print("getDate() => ", reader.getDate(filepath))
print("getRelativePath() => ", reader.getRelativePath(filepath))
print("\nGRIB2")
filepath = '/export/home1/pmaissiat/downloader_daemon/unittests/dataset/data/Test_grib/data2'
reader = GribDataReader('root', date_name, date_format, storage_path)
print("getStorageName() => ", reader.getStorageName(filepath))
print("getDate() => ", reader.getDate(filepath))
print("getRelativePath() => ", reader.getRelativePath(filepath))
......@@ -105,7 +105,8 @@ def DataReaderFactory(
return know_classes[classname](
loggerName, regexpDate, dateFormat, storagePath)
else:
return know_classes[classname](loggerName)
return know_classes[classname](
loggerName, regexpDate, dateFormat, storagePath)
except Exception as e:
raise Exception(
"DataReader [%s] cannot be instanciated [%s,%s,%s,%s,%s]: %s" %
......
......@@ -17,6 +17,7 @@ import time
import pwd
import grp
import datetime
import json
from socket import gethostname
from string import capwords
try:
......@@ -42,8 +43,10 @@ from downloader.scheduler.sc.JobState import JobState
from downloader.scheduler.sc.ProviderManager import ProviderManager
from downloader.scheduler.sc.ConfigurationFileUtil import ConfigurationFileUtil
from downloader.emm.EMMWriter import EMMWriter
from .job_targets.JobsElastic import JobsElastic
from .job_targets.JobsRabbitMQ import JobsRabbitMQ
from downloader.scheduler.sc.job_targets.JobsElastic import JobsElastic
from downloader.scheduler.sc.job_targets.JobsRabbitMQ import JobsRabbitMQ
from downloader.scheduler.com.ext.PublishRabbitMQ import PublishRabbitMQ
LOGGER_DEFAULT_FILE_NAME = "download_"
DOWNLOAD_LOGGER_LEVEL = logging.DEBUG
......@@ -423,7 +426,7 @@ class Download(threading.Thread):
self.__jobsElastic = None
# Setup RabbitMQ job upload
if "rabbitmq" in self.__globalConfig["jobs.targets"]:
self.__jobsRabbitMQ = JobsRabbitMQ(
self.__jobsRabbitMQ = PublishRabbitMQ(
host=self.__globalConfig["jobs.rabbitmq.host"],
port=self.__globalConfig["jobs.rabbitmq.port"],
ssl=self.__globalConfig["jobs.rabbitmq.ssl"],
......@@ -437,6 +440,9 @@ class Download(threading.Thread):
else:
self.__jobsRabbitMQ = None
self.__metricsRabbitMQ = None
self.__metricsFilesystem = None
def initFileLog(self, fileLogConfig, path):
from .LogUtil import setupFileLogger
self.setFileLogName(path)
......@@ -717,6 +723,9 @@ class Download(threading.Thread):
if not self.__configuration.settings.monitoring:
self.__monitoringFilePath = None
self.__monitoring = False
else:
self.__setMonitoringConfiguration()
# Ensure that getlisting_delay > loop_delay
if self.__misc_param_getlisting_delay < self.__misc_param_loop_delay:
......@@ -947,8 +956,8 @@ class Download(threading.Thread):
# Add the path of the json files if the generation of the files is
# requested
if self.__monitoringFilePath is not None: # self.__monitoringFilePath != None
workspace_path_list += [self.__monitoringFilePath] # workspace_path_list += [ self.__monitoringFilePath ]
if self.__monitoringFilePath is not None:
workspace_path_list += [self.__monitoringFilePath]
# Try to build needed directories for download workspace
for workspace_path in workspace_path_list:
......@@ -965,9 +974,36 @@ class Download(threading.Thread):
# End of read()
self._log.debug(" download configuration done.")
# 09/04/2018 PMT#37 : monitoring - global configuration
def setMonitoringFilePath(self, monitoringFilePath):
self.__monitoringFilePath = monitoringFilePath
# 24/03/2020 #37 : monitoring - global configuration
def __setMonitoringConfiguration(self):
# Setup RabbitMQ metrics upload
if "rabbitmq" in self.__globalConfig["metrics.targets"]:
self.__metricsRabbitMQ = PublishRabbitMQ(
host=self.__globalConfig["metrics.rabbitmq.host"],
port=self.__globalConfig["metrics.rabbitmq.port"],
ssl=self.__globalConfig["metrics.rabbitmq.ssl"],
user=self.__globalConfig["metrics.rabbitmq.user"],
password=self.__globalConfig["metrics.rabbitmq.password"],
virtual_host=self.__globalConfig["metrics.rabbitmq.virtual_host"],
queue_name=self.__globalConfig["metrics.rabbitmq.queue_name"],
routing_key=self.__globalConfig["metrics.rabbitmq.routing_key"],
)
self._log.debug("Metrics will be pushed to RabbitMQ")
else:
self.__metricsRabbitMQ = None
# Setup Filesystem metrics upload
if "filesystem" in self.__globalConfig["metrics.targets"]:
self.__monitoringFilePath = os.path.join(self.__globalConfig["paths.workspace"],
self.__globalConfig["metrics.filesystem.path"])
self.__metricsFilesystem = True
else:
self.__monitoringFilePath = None
if self.__monitoringFilePath is None and self.__metricsRabbitMQ is None:
self.__monitoring = False
else:
self.__monitoring = True
# 15/05/2018 PMT : limite globale par défaut
def setDefaultMaxActivatedFilesByLoop(self, maxActivatedFilesByLoop):
......@@ -1812,8 +1848,8 @@ class Download(threading.Thread):
else:
self._log.debug("Begin Store downloaded file %s" %
downloaded_file.localFile.getName())
self.state = (STATE_RUNNING, STATE_RUNNING_STORING %
(downloaded_file.remoteFilename))
if use_internal_folder:
self.state = (STATE_RUNNING, STATE_RUNNING_STORING % (downloaded_file.remoteFilename))
assert use_internal_folder != use_final_folder, \
"Download::storeDownloadedFile : You must choose internal_folder OR final_folder"
......@@ -2450,8 +2486,8 @@ class Download(threading.Thread):
self._log.warning(
' >> %s : error when storing - it will be retried later' %
(logical_foldername))
else:
else:
# self._log.info(' Transfert of folder %s done'%(logical_foldername))
self._log.info(' >> %s : transfert done' %
(logical_foldername))
......@@ -2686,46 +2722,47 @@ class Download(threading.Thread):
self._log.exception('Message write error : %s' % (str(e)))
# Adding a new record to the json file
if self.__monitoringFilePath is not None \
if self.__monitoring \
and msg_level in ['INFO', 'WARNING']\
and not self.test:
self.__writeJsonFile(source_file)
def __writeJsonFile(self, source_file):
jsonData = '{'
jsonData += '"filename":"%s",' % (os.path.split(
source_file.filepath.split(FILE_URL_SEPARATOR)[0])[1])
jsonData += '"download":"%s",' % (self.id)
nowDate = datetime.datetime.utcnow()
jsonData += '"download_time": "%s",' % (
nowDate.strftime(JSON_DATE_FORMAT))
jsonData += '"sensing_time": "%s",' % (
source_file.sensingtime.strftime(JSON_DATE_FORMAT))