Commit 4e65506b authored by BODERE's avatar BODERE
Browse files

Merge branch 'hotfix_6.0.8' into 'master'

Hotfix 6.0.8

See merge request downloader/downloader_daemon!35
parents ab5e237f 71822c7b
BODERE <erwan.bodere@ifremer.fr>
Erwan BODERE <erwan.bodere@ifremer.fr>
F. PAUL <fpaul@ifremer.fr>
Fabien CADORET <fcadoret@br152-128.ifremer.fr>
Frederic PAUL <fpaul@hancock.ifremer.fr>
MAISSIAT <pierre.maissiat@partenaire-exterieur.ifremer.fr>
Pierre MAISSIAT <pmaissia@ananda.ifremer.fr>
Sylvain Herlédan <sylvain.herledan@shadowz.sytes.net>
Thibaut CHARLES <thibaut.charles@ifremer.fr>
......
......@@ -359,7 +359,14 @@ class DCheck(object):
if protocolname == "localpath":
obj = localpath.Protocol_localpath()
elif protocolname == "ftp":
obj = ftp.Protocol_ftp()
kwargs = {}
if "FollowLinks" in option:
kwargs["followLinks"] = True \
if option["FollowLinks"].capitalize() == "True" else False
if "SkipPermissions" in option:
kwargs["skipPermissions"] = True \
if option["SkipPermissions"].capitalize() == "True" else False
obj = ftp.Protocol_ftp(**kwargs)
elif protocolname == "http_ostia":
obj = http_ostia.Protocol_http_ostia()
elif protocolname == "http_stdmet":
......@@ -368,20 +375,6 @@ class DCheck(object):
obj = http_navy_rainglobal.Protocol_http_navy_rainglobal()
elif protocolname == "http_jma_jra_gribfinal":
obj = http_jma_jra_gribfinal.Protocol_http_jma_jra_gribfinal()
elif protocolname == "https_directorylist":
classFileExtractor = https_directorylist.FileExtractor
if "FileExtractor" in option:
try:
module = file_extractor
except:
# module to take in the plugins directory,
# directory add to sys.path in loadFromModule().
module = __import__("https_FileExtractor_" + option["FileExtractor"])
log.warning("The HTTP protocol uses the FileExtractor module of the plugin directory")
classFileExtractor=getattr(module,
"FileExtractor_" + option["FileExtractor"],
https_directorylist.FileExtractor)
obj = https_directorylist.Protocol_https_directorylist(classFileExtractor)
elif protocolname == "lslr_file":
obj = lslr_file.Protocol_lslr_file()
elif protocolname == "https_opensearch":
......@@ -392,6 +385,20 @@ class DCheck(object):
obj = webdav.Protocol_webdav()
elif protocolname[:4] == "ftps":
obj = ftps.Protocol_ftps(option)
elif protocolname == "https_directorylist":
classFileExtractor = https_directorylist.FileExtractor
if "FileExtractor" in option:
fileExtractor = "FileExtractor_" + option["FileExtractor"]
if hasattr(file_extractor, fileExtractor):
module = file_extractor
log.debug(f"FileExtractor : {option['FileExtractor']} found in regular module")
else:
# module to take in the plugins directory,
# directory add to sys.path in loadFromModule().
module = __import__("https_FileExtractor")
log.warning("The HTTP protocol uses the FileExtractor module of the plugin directory")
classFileExtractor=getattr(module, fileExtractor)
obj = https_directorylist.Protocol_https_directorylist(classFileExtractor)
return obj
def getOptions(self, argsList=None):
......
......@@ -40,7 +40,9 @@ def ftpwalk(
topdown=True,
onerror=None,
db=None,
listing_type='unix'):
listing_type='unix',
followLinks=False,
skipPermissions=False):
"""
Generator that yields tuples of (root, dirs, nondirs).
"""
......@@ -100,26 +102,31 @@ def ftpwalk(
continue
# 19/06/2018 PMT : skip directories without permissions
if entry[3][9] != 'x':
if entry[3][9] != 'x' and not skipPermissions:
log.debug(
"directory %s does not have the right permissions [%s]" %
(path, entry[3]))
continue
if entry[-1] is False: # not a link
for x in ftpwalk(
isSmartCrawler,
directoryFilter,
ftp,
path,
topdown,
onerror,
db,
listing_type=listing_type):
if entry[4] is None or followLinks: # not a link
if entry[4] is not None:
path = entry[4]
for x in ftpwalk(isSmartCrawler,
directoryFilter,
ftp,
path,
topdown,
onerror,
db,
listing_type=listing_type,
followLinks=followLinks,
skipPermissions=skipPermissions):
#log.debug("yield x : %s"%(str(x)))
yield x
else: # TODO : gerer le follow-link
pass
log.debug(f"link {path} -> {entry[4]} is not followed")
if not topdown:
yield top, dirs, nondirs
......@@ -204,12 +211,12 @@ def _ftp_parse_unix(listing):
# Get the type and mode.
mode = words[0]
islink = False
link = None
if mode[0] == 'l':
islink = True
link = extra
entry = (filename, size, mtime, mode, islink)
if mode[0] == 'd':
entry = (filename, size, mtime, mode, link)
if mode[0] == 'd' or link is not None:
dirs.append(entry)
else:
nondirs.append(entry)
......@@ -287,12 +294,12 @@ def _ftp_parse_unix2(listing):
# Get the type and mode.
mode = words[0]
islink = False
link = None
if mode[0] == 'l':
islink = True
link = extra
entry = (filename, size, mtime, mode, islink)
if mode[0] == 'd':
if mode[0] == 'd' or link is not None:
dirs.append(entry)
else:
nondirs.append(entry)
......@@ -338,7 +345,7 @@ def _ftp_listdir(ftp, db=None, listing_type='unix'):
class Protocol_ftp(AbstractProtocol):
def __init__(self):
def __init__(self, followLinks=False, skipPermissions=False):
AbstractProtocol.__init__(self)
# default socket timeout
......@@ -363,6 +370,8 @@ class Protocol_ftp(AbstractProtocol):
self.__tmpdirBlocs = {}
self.__session = None
self.followLinks = followLinks
self.skipPermissions = skipPermissions
def setConfig(self, config):
AbstractProtocol.setConfig(self, config)
......@@ -453,14 +462,15 @@ class Protocol_ftp(AbstractProtocol):
log.debug(
"Start ftpwalk (directoryFilter = %s), basedir : '%s'" %
(self.directoryFilter, basedir))
a = ftpwalk(
self.smart_crawler,
self.directoryFilter,
self.__session,
basedir,
self.directoryFilter,
db=self.fileInfoDatabase,
listing_type=self.listing_type)
a = ftpwalk(self.smart_crawler,
self.directoryFilter,
self.__session,
basedir,
self.directoryFilter,
db=self.fileInfoDatabase,
listing_type=self.listing_type,
followLinks=self.followLinks,
skipPermissions=self.skipPermissions)
files_to_insert = []
try:
......@@ -484,7 +494,7 @@ class Protocol_ftp(AbstractProtocol):
id_execution=None,
filename=os.path.join(basedir, dirpath),
isDirectory=True,
isSymLink=dirname[4],
isSymLink=False if dirname[4] is None else True,
size=dirname[1],
mtime=datetime.fromtimestamp(mtime),
sensingtime=None,
......@@ -514,7 +524,7 @@ class Protocol_ftp(AbstractProtocol):
id_execution=None,
filename=filePath,
isDirectory=False,
isSymLink=filename[4],
isSymLink= False if filename[4] is None else True,
size=filename[1],
mtime=datetime.fromtimestamp(mtime),
sensingtime=sensingtime,
......
......@@ -141,242 +141,9 @@ class FileExtractor(html.parser.HTMLParser):
self.__fileinfo_pattern = re.compile(pattern)
"""
class FileExtractor_UniBremen(html.parser.HTMLParser):
DATE_FORMAT = "%Y-%m-%d %H:%M"
FACTOR = {'': 1,
'K': 1 << 10, # 1024
'M': 1 << 20,
'G': 1 << 30,
'T': 1 << 40,
'P': 1 << 50}
@staticmethod
def get_download_url(dcheck_url):
return dcheck_url
@staticmethod
def get_new_dcheck_url(old_dcheck_url, path):
return old_dcheck_url + '/' + path
def __init__(self):
html.parser.HTMLParser.__init__(self)
self.links = {}
self._to_ignore = False
self._is_directory = False
self._is_file = False
self._current_name = ''
self.__mtime_pattern = re.compile(' *([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}) *')
self.__size_pattern = re.compile(' *([0-9a-zA-Z.-]*) *')
self.__td_count = 0
self.__mtime_str = ""
def handle_data(self, data):
size_str = None
#print(f"handle_data : {self._current_name} :{data} (count = {self.__td_count})")
if not self._to_ignore:
if len(data) > 0 and self._current_name != '':
if self.__td_count == 3:
mg = self.__mtime_pattern.match(data)
if mg is not None:
self.__mtime_str = mg.groups()[0]
elif self.__td_count == 4:
if self._is_file:
isdirectory = 0
mg = self.__size_pattern.match(data)
if mg is not None:
size_str = mg.groups()[0]
self.links[self._current_name] = \
(isdirectory,self.__mtime_str, size_str)
#print("handle_data =====> FILE name : %s, mtime : %s, size : %s" % (self._current_name, self.__mtime_str, size_str))
else:
isdirectory = 1
size_str = "0"
self.links[self._current_name] = (isdirectory, self.__mtime_str, size_str)
#print("handle_data =====> DIR name : %s, mtime : %s"%(self._current_name, self.__mtime_str))
def handle_starttag(self, tag, attrs):
#print(f"handle_starttag =====> {tag} = {attrs} (count = {self.__td_count})")
if tag == "img" and self.__td_count == 1:
if len(attrs) > 0:
for attr in attrs:
if attr[0] == 'alt':
if attr[1] == '[DIR]':
self._to_ignore = False
self._is_directory = True
self._is_file = False
elif not attr[1].startswith('['):
self._to_ignore = True
self._is_directory = False
self._is_file = False
elif attr[1] == '[PARENTDIR]':
self._to_ignore = True
self._is_directory = True
self._is_file = False
else:
self._is_directory = False
self._is_file = True
self._to_ignore = False
elif tag == "a" and self.__td_count == 2:
if not self._to_ignore:
if len(attrs) > 0:
for attr in attrs:
if attr[0] == "href":
if attr[1][0:1] == '/':
continue
self._current_name = attr[1]
self._current_name = self._current_name.rstrip('/')
if self._is_directory:
self.links[self._current_name] = list()
if self._is_file:
self.links[self._current_name] = list()
elif tag == "tr":
self.__td_count = 0
self.__mtime_str = None
elif tag == "td":
self.__td_count += 1
#print(f"handle_starttag =====> ({self._is_directory}, {self._is_file}, {self._to_ignore})")
def handle_endtag(self, tag):
if tag == "tr":
if self._is_directory:
self._is_directory = False
elif tag == "td":
if self.__td_count == 4:
self.__td_count = 0
def get_files(self):
return self.links
def set_mtime_pattern(self, pattern):
self.__mtime_pattern = re.compile(pattern)
def set_size_pattern(self, pattern):
self.__size_pattern = re.compile(pattern)
class FileExtractor_Nomads(FileExtractor_UniBremen):
DATE_FORMAT = "%d-%b-%Y %H:%M"
def __init__(self):
super().__init__()
self.set_mtime_pattern(' *([0-9]{2}-[a-zA-Z]{3}-[0-9]{4} [0-9]{2}:[0-9]{2}) *')
class FileExtractor_UniHamburg(html.parser.HTMLParser): #thredds
DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
FACTOR = {'bytes': 1,
'Kbytes': 10E3, # 1024
'Mbytes': 10E6,
'Gbytes': 10E9,
'Tbytes': 10E12,
'Pbytes': 10E15}
@staticmethod
def get_download_url(dcheck_url):
url = os.path.split(dcheck_url)[0].replace('catalog', 'fileServer')
return url
@staticmethod
def get_new_dcheck_url(old_dcheck_url, path):
url = os.path.split(old_dcheck_url)
return url[0] + '/' + path.rstrip('/') + '/' + url[1]
def __init__(self):
html.parser.HTMLParser.__init__(self)
self.links = {}
self._to_ignore = False
self._is_file = False
self._current_name = ''
self.__mtime_pattern = re.compile(' *([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z) *')
self.__size_pattern = re.compile(' *([0-9.]* [a-zA-z]*) *')
self.__td_count = 0
self.__size_str = None
self.__is_value = False
def handle_data(self, data):
mtime_str = ""
#print(f"handle_data : {self._current_name} :{data} (count = {self.__td_count})")
if not self._to_ignore and self.__is_value and len(data) > 0:
if self._current_name == '' and self.__td_count == 1:
self._current_name = data
if self._current_name != '':
if self.__td_count == 2:
if self._is_file:
mg = self.__size_pattern.match(data)
if mg is not None:
self.__size_str = mg.groups()[0]
else:
self.__size_str = "0"
elif self.__td_count == 3:
mg = self.__mtime_pattern.match(data)
if mg is not None:
mtime_str = mg.groups()[0]
if self._is_file:
isdirectory = 0
self.links[self._current_name] = \
(isdirectory,mtime_str, self.__size_str)
#print("handle_data =====> FILE name : %s, mtime : %s, size : %s" % (
# self._current_name, mtime_str, self.__size_str))
else:
# skip current directory entry
if self._current_name[-1:] == '/':
isdirectory = 1
self.links[self._current_name] = (isdirectory, mtime_str, self.__size_str)
#print("handle_data =====> DIR name : %s, mtime : %s"%(self._current_name, mtime_str))
def handle_starttag(self, tag, attrs):
#print(f"handle_starttag =====> {tag} = {attrs} (count = {self.__td_count})")
if tag == "img" and self.__td_count == 1:
if len(attrs) > 0:
for attr in attrs:
if attr[0] == 'alt':
if attr[1] == '[DIR]' or attr[1] == 'Folder':
self._to_ignore = False
self._is_file = False
elif attr[1] == '[PARENTDIR]' or not attr[1].startswith('['):
self._to_ignore = True
self._is_file = False
else:
self._is_file = True
self._to_ignore = False
elif tag == "tr":
self.__td_count = 0
self._to_ignore = False
self.__mtime_str = None
self._is_file = True
self._current_name = ''
elif tag == "td":
self.__td_count += 1
elif tag == "tt":
self.__is_value = True
#print(f"handle_starttag =====> ({self._is_file}, {self._to_ignore})")
def handle_endtag(self, tag):
if tag == "tr":
self._to_ignore = True
elif tag == "tt":
self.__is_value = False
def get_files(self):
return self.links
def set_mtime_pattern(self, pattern):
self.__mtime_pattern = re.compile(pattern)
def set_size_pattern(self, pattern):
self.__size_pattern = re.compile(pattern)
"""
class Protocol_https_directorylist(AbstractProtocol):
def __init__(self, fileExtractor = FileExtractor):
def __init__(self, fileExtractor=FileExtractor):
AbstractProtocol.__init__(self)
self.path = None
......@@ -387,6 +154,7 @@ class Protocol_https_directorylist(AbstractProtocol):
self._parser = None
self._baseurl = None
self.url = None
self._find_interesting_directory_max_depth = 0
......@@ -451,13 +219,13 @@ class Protocol_https_directorylist(AbstractProtocol):
urllib.request.install_opener(opener)
self._baseurl = self._protocol_str + self.server
url = self._baseurl + "/" + self.path
url = url.replace("//", "/").replace(":/", "://")
if self.__url_exists(url):
self.url = self._baseurl + "/" + self.path
self.url = self.url.replace("//", "/").replace(":/", "://")
if self.__url_exists(self.url):
current_depth = 0 # des qu'on fait le url_walk, on incremente.
self.__url_walk(url, current_depth)
self.__url_walk(self.url, current_depth)
else:
error = DC_LocalpathError("path does not exists : %s" % (url))
error = DC_LocalpathError("path does not exists : %s" % (self.url))
log.error(error)
sys.exit(1)
self.updateValidExecutionStatus(True)
......@@ -534,7 +302,7 @@ class Protocol_https_directorylist(AbstractProtocol):
if (fname.startswith("http:") or fname.startswith("https:")):
current_path = fname
else:
current_path = download_url + "/" + fname
current_path = download_url.rstrip('/') + "/" + fname
islink = False
isdirectory = fnames[fname][0]
mtime_str = fnames[fname][1]
......
......@@ -132,13 +132,14 @@ def _webdav_listdir(webdav, baseurl, path, timeout, db=None):
"""
last_error = ""
log.debug('_webdav_listdir => baseurl : {0}, path : {1}, timeout : {2}'.format(baseurl, path, timeout))
log.debug(f'_webdav_listdir => baseurl : {baseurl}, path : {path}, timeout : {timeout}')
dirs, nondirs = [], []
url = baseurl + path
headers = {'Depth': '1'}
nbretry = 0
request_OK = False
error = ""
last_errorcode = None
try:
while not request_OK and nbretry < 3:
......@@ -148,6 +149,7 @@ def _webdav_listdir(webdav, baseurl, path, timeout, db=None):
if response.status_code != 207: # MULTI_STATUS
msg = "Error request return status code : " + str(response.status_code)
last_error = str(msg)
last_errorcode = response.status_code
nbretry = nbretry + 1
continue
......@@ -155,6 +157,7 @@ def _webdav_listdir(webdav, baseurl, path, timeout, db=None):
except ConnectionError as msg:
last_error = str(msg)
last_errorcode = -1
nbretry = nbretry + 1
continue
......@@ -172,7 +175,8 @@ def _webdav_listdir(webdav, baseurl, path, timeout, db=None):
error = DC_UrlError("listdir error : " + str(msg), db)
log.error(error)
if not request_OK:
if not request_OK and last_errorcode != 404:
print(f"Raise error code {last_errorcode}")
raise error
return dirs, nondirs
......
......@@ -319,11 +319,13 @@ class EnhancedFileWriter(IMessageWriter):
raise Exception("Cannot write file without a filename...")
if self.add_uniq_prefix:
timestamp_string = datetime.datetime.utcnow().strftime(MSG_DATE_FORMAT)
dt = datetime.datetime.utcnow()
timestamp_string = dt.strftime(MSG_DATE_FORMAT)
uniq_prefix = "%s-%s-%s-%s_" % (timestamp_string, gethostname(), os.getpid(
), threading.current_thread().name.split('-')[1][-7:].zfill(7))
suffix = "_%.6d" % (dt.microsecond)
f_path, f_name = os.path.split(f)
f = os.path.join(f_path, uniq_prefix + f_name)
f = os.path.join(f_path, uniq_prefix + f_name.rstrip(self.__extension) + suffix)
if not f.endswith(self.__extension):
f = f + self.__extension
......
......@@ -42,7 +42,7 @@ class EMMRabbitMQ(IMessageWriter):
"ssl_options": pika.SSLOptions(sslContext) if sslContext is not None else None,
"port": port,
"virtual_host": virtual_host,
"blocked_connection_timeout": 900,
"blocked_connection_timeout": 0,
}
self.__queue_name = queue_name
......
......@@ -2,6 +2,7 @@ import shutil
import sys
import os
from celery import Celery
from lxml import etree, objectify
from downloader.synchronisation.DownloadSync import DownloadSync
from downloader.reloader.ReLoader import ReLoader
......@@ -25,7 +26,7 @@ globalConfig = GlobalConfig(app.conf.downloader_configfile)
# ------------------------------------------------------------------------------------------
# Read the lest n (TASK_READ_BUFFER_LENGTH) bytes at the end of file
# ------------------------------------------------------------------------------------------
def read_last_bytes(filepath):
def read_all_bytes(filepath):
buffer = ""
if os.path.isfile(filepath):
filesize = os.lstat(filepath).st_size
......@@ -100,7 +101,7 @@ def get_file(full_filename, **kwargs):
buffer = ""
try:
buffer = read_last_bytes(full_filename)
buffer = read_all_bytes(full_filename)
except:
pass
......@@ -115,13 +116,45 @@ def read_configuration_file(filename, **kwargs):
buffer = ""
full_filename = os.path.join(globalConfig.getPath("downloads_config"), filename)
try:
buffer = read_last_bytes(full_filename)
buffer = read_all_bytes(full_filename)
except:
pass
return buffer
@app.task
def read_configuration_files_summary(**kwargs):
""""""
summaries = {}
buffer = ""
try:
for filename in os.listdir(globalConfig.getPath("downloads_config")):
if filename[0:1] == '.':
continue
full_filename = os.path.join(globalConfig.getPath("downloads_config"), filename)
if os.path.isfile(full_filename):
buffer = read_all_bytes(full_filename)
file_data = objectify.fromstring(buffer)
del file_data.download_settings
if file_data.data_source.find("date_extraction") is not None:
del file_data.data_source.date_extraction