Commit 0dd9b626 authored by pm22d12's avatar pm22d12
Browse files

Fix a metric files write latency issue (#63)

parent b62e1688
......@@ -55,18 +55,6 @@ class EMMWriter(IMessageWriter):
queue_name=globalConfig["emm.rabbitmq.queue_name"],
routing_key=globalConfig["emm.rabbitmq.routing_key"],
)
"""
self.__emm_rabbitmq = EMMRabbitMQ(
host=globalConfig["emm.rabbitmq.host"],
port=globalConfig["emm.rabbitmq.port"],
ssl=globalConfig["emm.rabbitmq.ssl"],
user=globalConfig["emm.rabbitmq.user"],
password=globalConfig["emm.rabbitmq.password"],
virtual_host=globalConfig["emm.rabbitmq.virtual_host"],
queue_name=globalConfig["emm.rabbitmq.queue_name"],
routing_key=globalConfig["emm.rabbitmq.routing_key"],
)
"""
fileWriter = property(lambda self: self.__file_writer, None, None, None)
......
......@@ -321,8 +321,8 @@ class EnhancedFileWriter(IMessageWriter):
if self.add_uniq_prefix:
dt = datetime.datetime.utcnow()
timestamp_string = dt.strftime(MSG_DATE_FORMAT)
uniq_prefix = "%s-%s-%s-%s_" % (timestamp_string, gethostname(), os.getpid(
), threading.current_thread().name.split('-')[1][-7:].zfill(7))
uniq_prefix = "%s-%s-%s-%s_" % (timestamp_string, gethostname(),
os.getpid(), threading.current_thread().name.split('-')[1][-7:].zfill(7))
suffix = "_%.6d" % (dt.microsecond)
f_path, f_name = os.path.split(f)
f = os.path.join(f_path, uniq_prefix + f_name.rstrip(self.__extension) + suffix)
......
......@@ -2728,30 +2728,30 @@ class Download(threading.Thread):
self.__writeJsonFile(source_file)
def __writeJsonFile(self, source_file):
metrics = {}
nowDate = datetime.datetime.utcnow()
deltaTimePLD = source_file.mtime - source_file.sensingtime
deltaTimeLUD = nowDate - source_file.mtime
metrics["filename"] = os.path.split(
source_file.filepath.split(FILE_URL_SEPARATOR)[0])[1]
metrics["download"] = self.id
metrics["download_time"] = nowDate.strftime(JSON_DATE_FORMAT)
metrics["sensing_time"] = source_file.sensingtime.strftime(JSON_DATE_FORMAT)
metrics["modification_time"] = source_file.mtime.strftime(JSON_DATE_FORMAT)
metrics["production_latency_delay"] = deltaTimePLD.days * 86400 + deltaTimePLD.seconds
metrics["local_update_delay"] = deltaTimeLUD.days * 86400 + deltaTimeLUD.seconds
metrics["file_volume"] = source_file.size
metrics = {
"filename": os.path.split(
source_file.filepath.split(FILE_URL_SEPARATOR)[0])[1],
"download": self.id,
"download_time": nowDate.strftime(JSON_DATE_FORMAT) ,
"sensing_time": source_file.sensingtime.strftime(JSON_DATE_FORMAT),
"modification_time": source_file.mtime.strftime(JSON_DATE_FORMAT),
"production_latency_delay": deltaTimePLD.days * 86400 + deltaTimePLD.seconds,
"local_update_delay": deltaTimeLUD.days * 86400 + deltaTimeLUD.seconds,
"file_volume": source_file.size,
}
tookedSemaphore = False
jsonFile = None
if self.__metricsFilesystem:
try:
metrics_text = json.dumps(metrics) + '\n'
self._log.debug(f"[json]: {metrics_text}")
self.__semaState.acquire()
tookedSemaphore = True
jsonFile = open(self.__jsonFileName, 'a')
#jsonFile.write(jsonData)
self._log.debug(f"[json]: {json.dumps(metrics)}")
jsonFile.write(json.dumps(metrics))
jsonFile.write('\n')
jsonFile.write(metrics_text)
jsonFile.close()
jsonFile = None
self.__semaState.release()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment