Commit 70713e4a authored by BODERE's avatar BODERE
Browse files

Merge branch 'hotfix_6.0.4' into 'master'

Hotfix 6.0.4

Closes #50, #55, #52 et #54

See merge request downloader/downloader_daemon!27
parents c41ced94 44f81400
......@@ -56,7 +56,6 @@ function init()
echo "# Deployment date:" `date` >> ${DOWNLOADER_APPD}
echo "#" >> ${DOWNLOADER_APPD}
echo "export DOWNLOADER_APPD=${DOWNLOADER_APPD}" >> ${DOWNLOADER_APPD}
echo "export FELYX_CELERY_CFG=${FELYX_CELERY_CFG}" >> ${DOWNLOADER_APPD}
echo "export PYTHONPATH=${PYTHONPATH}" >> ${DOWNLOADER_APPD}
echo "#" >> ${DOWNLOADER_APPD}
echo "export DOWNLOADER_CONF_VERSION=${DOWNLOADER_VERSION}" >> ${DOWNLOADER_APPD}
......@@ -130,8 +129,6 @@ echo "Downloader appdata = ${DOWNLOADER_APPDATA}"
#Internal constants
export FELYX_CELERY_CFG=${DOWNLOADER_APPDATA}/celery_cfg.json
SCRIPT_DIR=$(dirname $(readlink -f "$0"))
export PYTHONPATH=${SCRIPT_DIR}
......
{
"task_queues": {"UI.ananda.1": { "queue_arguments": {"alias" : "test_downloader_1"}, "vhost": "downloader"}},
"broker_url": "amqp://zenika_training:zenika_training@vrabbitmq1-val.ifremer.fr:5672/downloader",
"broker_heartbeat": 10,
"imports": ["downloader.worker.celery_tasks"],
"result_backend": "amqp",
"result_expires": 900,
"task_serializer": "json",
"result_serializer": "json",
"timezone": "Europe/Paris",
"enable_utc": "True",
"result_compression": "bzip2",
"worker_log_color": "False",
"accept_content": ["json"]
}
\ No newline at end of file
......@@ -7,10 +7,6 @@
# --
# max_activated_files_by_loop: 100
# --
# message_broker: null
# --
# send_notification_messages: false
# --
# ui_worker: false
# --- Folder paths
......@@ -31,12 +27,10 @@
# --- elasticsearch: Push emm messages to an elasticsearch database
# --- rabbitmq: Push emm messages to a RabbitMQ queue
# targets: []
# --- Shared EMM logs
# filesystem:
# # --- Path, either absolute or relative to the workspace
# path: spools/message/
# --- Elasticsearch configuration
# elasticsearch:
# hosts: [ localhost ]
......@@ -55,30 +49,55 @@
# password: null
# queue_name: dl-emm
# --- job files config
# # --- job files config
# jobs:
# # --- List of output target for writing job files. Available targets:
# # --- elasticsearch: Push emm jobs to an elasticsearch database
# # --- rabbitmq: Push emm jobs to a RabbitMQ queue
# targets: []
# # --- Elasticsearch configuration
# elasticsearch:
# hosts: []
# scheme: http
# user: null
# password: null
# # --- 'index' can contain date information, that is formatted according to
# # --- https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior
# index: dl-jobs
# # --- RabbitMQ configuration
# rabbitmq:
# host: localhost
# port: 5672
# ssl: false
# user: null
# password: null
# queue_name: dl-jobs
# --- List of output target for writing job files. Available targets:
# --- elasticsearch: Push emm jobs to an elasticsearch database
# --- rabbitmq: Push emm jobs to a RabbitMQ queue
# targets: []
# --- Elasticsearch configuration
# elasticsearch:
# hosts: []
# scheme: http
# user: null
# password: null
# --- 'index' can contain date information, that is formatted according to
# --- https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior
# index: dl-jobs
# --- RabbitMQ configuration
# rabbitmq:
# host: localhost
# port: 5672
# ssl: false
# user: null
# password: null
# queue_name: dl-jobs
# #--- User interface configuration
# ui_worker:
# queue: #{queue_name: { queue_arguments: {alias : alias_downloader_name}}}
# name: UI.host.0
# alias: alias_downloader_name
# broker_url: #"amqp://user:password@host:5672/vhost"
# host: localhost
# port: 5672
# user: null
# password: null
# virtual_host: "/"
# broker_heartbeat: 30
# imports: [downloader.worker.celery_tasks]
# result_backend: amqp
# result_expires: 300
# task_serializer: json
# result_serializer: json
# timezone: Europe/Paris
# enable_utc: True
# result_compression: bzip2
# worker_log_color: True
# accept_content: [json]
# auto_delete: True
# expires: 15.0
# exclusive: True
# # --- Python logging configuration
# logs:
......
......@@ -298,15 +298,32 @@ class SqliteBaseFilesInfos(object):
self.__sql_session.commit()
def req_PurgeOlderThan(self, executionId, keep_last=True):
if keep_last and self.req_IsLastExecutionId(executionId):
executionId -= 1
if keep_last:
execs = self.req_PreviousExecutionOKId(executionId)
if len(execs) > 0:
executionId = execs[0].id
else:
executionId -= 1
self.__sql_session \
.query(Execution) \
.filter(Execution.id < executionId) \
.delete()
self.__sql_session \
.query(File.id_execution) \
.filter(File.id_execution < executionId) \
.delete()
self.__sql_session.commit()
def req_PreviousExecutionOKId(self, refid, limit=1):
return self.__sql_session \
.query(Execution) \
.filter(and_(Execution.valid_execution == 1, Execution.id < refid)) \
.order_by(desc(Execution.id)) \
.limit(limit) \
.all()
def req_PurgeOlderThanStartDate(self, startDate, keep_last=True):
assert isinstance(startDate, datetime)
......
......@@ -38,6 +38,8 @@ def spliturl(url, defaultpath='/'):
# 30/03/2018 PMT#34 ajout
>>> spliturl('https_opensearch://foo:pass@www.protected.com:8080/rep1')
('https', 'www.protected.com', '/rep1', 8080, 'foo', 'pass')
>>> spliturl('ftps_xx://foo:pass@jsimpsonftps.pps.eosdis.nasa.gov/rep1')
('ftps_xx', jsimpsonftps.pps.eosdis.nasa.gov', '/rep1', null, 'foo', 'pass')
"""
scheme = None
......@@ -101,11 +103,12 @@ def getProtocolName(scheme, path, domain, exitonfail=True):
return 'localpath'
elif scheme == 'lslr':
return 'lslr_file'
# 30/03/2018 PMT#34 ajout du protocole opensearch en https
elif scheme == 'https_opensearch':
return 'https_opensearch'
elif scheme == 'webdav':
return 'webdav'
elif scheme[:4] == 'ftps':
return scheme
else:
if exitonfail:
error = DC_ConfigError(
......
......@@ -19,6 +19,7 @@ from dchecktools.protocols import http_stdmet
from dchecktools.protocols import https_opensearch
from dchecktools.protocols import sftp
from dchecktools.protocols import webdav
from dchecktools.protocols import ftps
from dchecktools.filters.StringFilters import FileFilter
from dchecktools.common.errors import DC_Error, DC_ConfigError, DC_DbError
from dchecktools.plugins import DataReaderFactory
......@@ -39,7 +40,7 @@ VERSION = "0.8b"
DEFAULT_FTP_LISTING_TYPE = "unix"
DEFAULT_PURGE_OLDER_THAN = 60
DEFAULT_PURGE_OLDER_THAN = 0
class Config:
......@@ -375,6 +376,8 @@ class DCheck(object):
obj = sftp.Protocol_sftp()
elif protocolname == "webdav":
obj = webdav.Protocol_webdav()
elif protocolname[:4] == "ftps":
obj = ftps.Protocol_ftps(protocolname[5:])
return obj
def getOptions(self, argsList=None):
......
......@@ -2,6 +2,7 @@
import logging
import re
import os
from datetime import datetime
log = logging.getLogger('StringFilters')
......@@ -67,3 +68,31 @@ class FileFilter(object):
return True
return self.__interestingByDefault
def isInterestingDirectory(self, dirname, mtime=None, dirpath=None):
"""
Input : String file
Output : Boolean
Test whether a file string is interesting or not, according to
current object configuration. Priority order :
forceFiles(Regexp) > ignoreFiles(Regexp) > date related checks > interestingByDefault
"""
if self.__ignoreNewerThan is not None and mtime is not None and \
datetime.fromtimestamp(mtime) > self.__ignoreNewerThan:
return False
if self.__ignoreOlderThan is not None and mtime is not None and \
datetime.fromtimestamp(mtime) < self.__ignoreOlderThan:
return False
# check ignore file regexp
for regexp in self.__ignoreRegexp:
if regexp.search(dirname):
return False
# check force file regexp
for regexp in self.__forceRegexp:
if regexp.search(os.path.join(dirpath,dirname)):
return True
return self.__interestingByDefault
......@@ -91,8 +91,8 @@ def ftpwalk(
# Filter directory
if directoryFilter is not None:
interestingDirectory = directoryFilter.isInteresting(
path, mtime)
interestingDirectory = directoryFilter.isInterestingDirectory(
dirname=dname, mtime=mtime, dirpath=top)
log.debug(
"directory %s is interesting ? %s" %
(path, interestingDirectory))
......@@ -407,12 +407,14 @@ class Protocol_ftp(AbstractProtocol):
# FP 23/02/10 : test pour resoudre les pbs de timeout de francoise, sur le download podaac_modis_a
# self.__session.set_pasv(True)
except ftplib.all_errors as msg:
try:
if isinstance(msg, tuple):
#try:
errno, string = msg
error = DC_ConnectionError(
"ftp connection error : %s [errno=%s]" %
(string, errno), db=self.fileInfoDatabase)
except ValueError: # msg n'est pas toujours un tuple, auquel cas le unpack ne passe pas.
#except ValueError: # msg n'est pas toujours un tuple, auquel cas le unpack ne passe pas.
else:
error = DC_ConnectionError(
"ftp connection error : %s" %
(str(msg)), db=self.fileInfoDatabase)
......@@ -473,8 +475,8 @@ class Protocol_ftp(AbstractProtocol):
for dirname in dirs:
dirpath = dirname[0]
mtime = dirname[2]
interestingDir = self.directoryFilter.isInteresting(
dirpath, mtime) # fcad: add mtime condition
interestingDir = self.directoryFilter.isInterestingDirectory(
dirname=dirpath, mtime=mtime, dirpath=basedir)
if not interestingDir:
continue
......
from dchecktools.protocols.AbstractProtocol import AbstractProtocol
from dchecktools.common.basefileinfos import File
from dchecktools.common.errors import DC_ConfigError, DC_ConnectionError, DC_FtpError
from datetime import datetime
import ftplib
import logging
import os
import re
import socket
import sys
import time
DEFAULT_SERVER_PORT = 21
DEFAULT_USERNAME = "anonymous"
DEFAULT_PASSWORD = "anon@anon.org"
log = logging.getLogger('ftps')
log.setLevel(logging.INFO)
FORCE_CHROOT = True
"""
ftpwalk -- Walk a hierarchy of files using FTPS (Adapted from os.walk()).
"""
# TODO : utiliser le flag onerror, ou le virer
# un return dans cette methode genere en fait un 'raise StopIteration'
# exception
def ftpswalk(
isSmartCrawler,
directoryFilter,
ftp,
top,
topdown=True,
onerror=None,
db=None,
listing_type='unix'):
"""
Generator that yields tuples of (root, dirs, nondirs).
"""
# Make the FTP object's current directory to the top dir.
log.debug('ftpswalk : dir=%s' % (top))
try:
if top is not None and top != '':
if FORCE_CHROOT:
if top == '/':
top = ftp.pwd()
if not top:
top = "/"
try:
ftp.cwd(top)
except ftplib.error_perm as msg:
if not isSmartCrawler:
error = DC_FtpError(
"error_perm : " +
str(msg) +
" [dir=%s]" %
(top),
db)
log.error(error)
if onerror is not None:
onerror(msg)
raise error
return
try:
dirs, nondirs = _ftp_listdir(ftp, db, listing_type)
except os.error as err:
error = DC_FtpError("ftp listdir error : %s" % (err))
log.debug(error)
if onerror is not None:
onerror(err)
# 19/06/2018 PMT : raise error instead of return
# return
raise error
# raise StopIteration, error
if topdown:
yield top, dirs, nondirs
for entry in dirs:
dname = entry[0]
mtime = entry[2] # fcad: add modification time
path = os.path.join(top, dname)
# Filter directory
if directoryFilter is not None:
interestingDirectory = directoryFilter.isInterestingDirectory(
dirname=dname, mtime=mtime, dirpath=top)
log.debug(
"directory %s is interesting ? %s" %
(path, interestingDirectory))
if not interestingDirectory:
continue
# 19/06/2018 PMT : skip directories without permissions
if entry[3][9] != 'x':
log.debug(
"directory %s does not have the right permissions [%s]" %
(path, entry[3]))
continue
if entry[-1] is False: # not a link
for x in ftpswalk(
isSmartCrawler,
directoryFilter,
ftp,
path,
topdown,
onerror,
db,
listing_type=listing_type):
#log.debug("yield x : %s"%(str(x)))
yield x
else: # TODO : gerer le follow-link
pass
if not topdown:
yield top, dirs, nondirs
except ftplib.all_errors as msg:
error = DC_FtpError("ftp error : %s" % (msg))
log.debug(error)
if onerror is not None:
onerror(msg)
raise error
_calmonths = dict((x, i + 1) for i, x in
enumerate(('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec')))
def _ftp_parse_unix(listing):
"""
drwxrwxrwx 4 ftp ftp 4096 Jan 02 01:47 FOLDER
"""
dirs, nondirs = [], []
for line in listing:
if line.startswith('total'):
continue
words = line.split(None, 8)
if len(words) < 6:
print('Warning: Error reading short line (unix) : ',
line, file=sys.stderr)
continue
# Get the filename.
filename = words[-1].lstrip()
if filename in ('.', '..'):
continue
# Get the link target, if the file is a symlink.
extra = None
i = filename.find(" -> ")
if i >= 0:
# words[0] had better start with 'l'...
extra = filename[i + 4:]
filename = filename[:i]
try:
# Get the file size.
size = int(words[4])
except ValueError as m:
log.error(
"ftp parse [unix] failure. Try to use --ftp-listing-type option. (line=%s)" %
(line))
raise
# Get the date.
year = datetime.today().year # par defaut, l'annee courante
month = _calmonths[words[5]]
day = int(words[6])
mo = re.match(r'(\d+):(\d+)', words[7])
if mo:
hour, min = list(map(int, mo.groups()))
# correction of the sliding year
if (
month > datetime.today().month) or (
month == datetime.today().month and day > datetime.today().day) or (
month == datetime.today().month and day == datetime.today().day and hour > datetime.today().hour) or (
month == datetime.today().month and day == datetime.today().day and hour == datetime.today().hour and min > datetime.today().minute):
year = year - 1
else:
mo = re.match(r'(\d\d\d\d)', words[7])
if mo:
year = int(mo.group(1))
hour, min = 0, 0
else:
raise ValueError(
"Could not parse time/year in line: '%s'" %
line)
dt = datetime(year, month, day, hour, min)
mtime = time.mktime(dt.timetuple())
# Get the type and mode.
mode = words[0]
islink = False
if mode[0] == 'l':
islink = True
entry = (filename, size, mtime, mode, islink)
if mode[0] == 'd':
dirs.append(entry)
else:
nondirs.append(entry)
return dirs, nondirs
def _ftp_parse_unix2(listing):
"""
drwxrwsrwx 5 10158 4096 Feb 21 2003 quicklook
drwxrwxr-x 6 10158 512 Jun 8 2005 COLOC
"""
dirs, nondirs = [], []
for line in listing:
if line == '226 Transfer complete.':
continue
if line.startswith('total'):
continue
words = line.split(None, 7)
if len(words) < 6:
print('Warning: Error reading short line (unix2) : ',
line, file=sys.stderr)
continue
# Get the filename.
filename = words[-1].lstrip()
if filename in ('.', '..'):
continue
# Get the link target, if the file is a symlink.
extra = None
i = filename.find(" -> ")
if i >= 0:
# words[0] had better start with 'l'...
extra = filename[i + 4:]
filename = filename[:i]
try:
# Get the file size.
size = int(words[3])
except ValueError as m:
log.error(
"ftp parse [unix2] failure. Try to use --ftp-listing-type option. (line=%s)" %
(line))
raise
# Get the date.
month = _calmonths[words[4]]
day = int(words[5])
year = datetime.today().year # par defaut, l'annee courante
mo = re.match(r'(\d+):(\d+)', words[6])
if mo:
hour, min = list(map(int, mo.groups()))
# correction of the sliding year
if (
month > datetime.today().month) or (
month == datetime.today().month and day > datetime.today().day) or (
month == datetime.today().month and day == datetime.today().day and hour > datetime.today().hour) or (
month == datetime.today().month and day == datetime.today().day and hour == datetime.today().hour and min > datetime.today().minute):
year = year - 1
else:
mo = re.match(r'(\d\d\d\d)', words[6])
if mo:
year = int(mo.group(1))
hour, min = 0, 0
else:
raise ValueError(
"Could not parse time/year in line: '%s'" %
line)
dt = datetime(year, month, day, hour, min)
mtime = time.mktime(dt.timetuple())
# Get the type and mode.
mode = words[0]
islink = False
if mode[0] == 'l':
islink = True
entry = (filename, size, mtime, mode, islink)
if mode[0] == 'd':
dirs.append(entry)
else:
nondirs.append(entry)
return dirs, nondirs
def _ftp_listdir(ftp, db=None, listing_type='unix'):
"""
List the contents of the FTP opbject's cwd and return two tuples of
(filename, size, mtime, mode, link)
one for subdirectories, and one for non-directories (normal files and other
stuff). If the path is a symbolic link, 'link' is set to the target of the
link (note that both files and directories can be symbolic links).
Note: we only parse Linux/UNIX style listings; this could easily be
extended.
"""