Commit 1768e056 authored by pm22d12's avatar pm22d12
Browse files

Fixed # 72 and added call to is_io_locked when copying files

parent 9596b98f
......@@ -4,7 +4,7 @@ F. PAUL <fpaul@ifremer.fr>
Fabien CADORET <fcadoret@br152-128.ifremer.fr>
Frederic PAUL <fpaul@hancock.ifremer.fr>
MAISSIAT <pierre.maissiat@partenaire-exterieur.ifremer.fr>
PMAISSIA <Pierre.Maissiat@partenaire-exterieur.ifremer.fr>
Pierre MAISSIAT <pmaissia@ananda.ifremer.fr>
Sylvain Herlédan <sylvain.herledan@shadowz.sytes.net>
Thibaut CHARLES <thibaut.charles@ifremer.fr>
bcauseur <bcauseur@sii.fr>
......
......@@ -10,7 +10,7 @@ SHELL ["/bin/bash", "--login", "-c"]
ARG ROOTPATH=/tmp
ARG WORKSPACE=$ROOTPATH/workspace
ARG APPDATA=$ROOTPATH/appdata
v
RUN cd $ROOTPATH && \
git clone https://gitlab.ifremer.fr/downloader/downloader_daemon.git && \
conda env create -f downloader_daemon/environment.yaml && \
......@@ -28,7 +28,7 @@ RUN conda clean -a && \
--extra-index-url https://nexus-test.ifremer.fr/repository/pypi-public-snapshot/simple \
--extra-index-url https://nexus-test.ifremer.fr/repository/pypi-private-release/simple \
--extra-index-url https://nexus-test.ifremer.fr/repository/pypi-private-snapshot/simple && \
pip install .
./setup.py install
# ------------------------------------------------------------------
......
......@@ -3,7 +3,7 @@ channels:
- default
- conda-forge
dependencies:
- python>=2.7
- python <3.8
- eccodes
# - gcc
......@@ -448,7 +448,8 @@ class XMLSchemaValidator(IMessageValidator):
else:
res = self.xml_schema.validate(message)
if res is False and error_details is True:
print((self.xml_schema.error_log))
#print((self.xml_schema.error_log))
pass
return res
xml_schema = property(
......@@ -478,8 +479,8 @@ class MessageReader(object):
if validator is not None:
res = validator.validate(element)
if not res:
print(("Error : MessageReader::fromElement : invalid message : %s" % (
validator.xml_schema.error_log)))
#print(("Error : MessageReader::fromElement : invalid message : %s" % (
# validator.xml_schema.error_log)))
return None
message = None
......
......@@ -15,6 +15,9 @@ __docformat__ = 'epytext'
import os.path
import shutil
import sys
from ifr_lib.ifr_os import is_io_locked
if sys.version_info[0] == 2:
# Python2 import fix
import File # pylint: disable=import-error
......@@ -25,6 +28,14 @@ else:
# Gestion getTar + option compress=None || gzip || bz2
def nfs_check(folderpath, timeout=2.0):
try:
if is_io_locked(folderpath, timeout):
raise IOError('File::nfs_check: server not responding')
except Exception as e:
raise IOError(str(e))
class Folder(object):
"""
Folder is a facility layer for folder management
......@@ -330,7 +341,7 @@ class Folder(object):
# check that the directory is available to avoid NFS Freezes for example. The probeDir program
# always exits shortly (and do not wait for the folder to become available again when nfs problems occurs)
if self.__probing:
self.__nfsCheck(2.0)
nfs_check(self.path)
for key, isFile in self._walkInFolder():
if isFile:
......
......@@ -509,6 +509,7 @@ class ConfigurationDestination(object):
self.__spoolLink = None
self.__keepParentFolder = None
self.__fileGroupName = None
self.__optionalSpool = None
def Read(self):
......@@ -542,6 +543,10 @@ class ConfigurationDestination(object):
if keepParentFolder and keepParentFolder.lower() == 'true':
self.__keepParentFolder = True
if self.__xr.haveSubNode(self.__node, 'optional_spool_location'):
self.__optionalSpool = self.__xr.getSubNodeValue(
logging.ERROR, self.__node, 'optional_spool_location')
if self.__xr.haveSubNode(self.__node, 'post-processing'):
postPrecessingNode = self.__xr.getSubNode(
logging.ERROR, self.__node, 'post-processing')
......@@ -630,6 +635,11 @@ class ConfigurationDestination(object):
None,
None,
"read only fileGroupName variable")
optionalSpool = property(
lambda self: self.__optionalSpool,
None,
None,
"read only optionalSpool variable")
class ConfigurationSettings(object):
......
......@@ -26,6 +26,7 @@ except:
from Queue import Queue # Python2 compatibility
from xml.parsers.expat import ExpatError
from xml.etree.ElementTree import ParseError
from ifr_lib.ifr_os import is_io_locked
from dchecktools.common.basefileinfos import FILE_URL_SEPARATOR
......@@ -34,7 +35,7 @@ from downloader.scheduler.com.ext.PatternFilter import PatternFilter
from downloader.scheduler.com.ext.XMLReader import XMLReader
from downloader.scheduler.com.sys import DiskSpool
from downloader.scheduler.com.sys.File import File
from downloader.scheduler.com.sys.Folder import Folder
from downloader.scheduler.com.sys.Folder import Folder, nfs_check
from downloader.scheduler.plugins import Factory
from downloader.scheduler.sc import Controller
from downloader.scheduler.sc.DownloaderDatabase import DownloaderDatabase, \
......@@ -83,6 +84,7 @@ FOLDER_COMMANDS = 'commands'
FOLDER_INTERNAL = 'internal'
FOLDER_WAITING_TO_COMPLETE = 'safe'
FOLDER_WAITING_LINK = 'link'
FOLDER_TO_SPOOL = 'spool'
# Options
# fcad: add session timeout parameter-> TODO : add in configuration file
......@@ -145,6 +147,8 @@ class DownloadedFile(File):
self.mtime = None
self.downloadEndTime = None
self.localFilename = None
self.__storageName = None
self.__relativePath = None
self.__validRelativePath = False
......@@ -288,7 +292,8 @@ class Download(threading.Thread):
FOLDER_COMMANDS: 'orders/commands',
FOLDER_INTERNAL: 'internal',
FOLDER_WAITING_TO_COMPLETE: 'data/safe',
FOLDER_WAITING_LINK: 'data/link'
FOLDER_WAITING_LINK: 'data/link',
FOLDER_TO_SPOOL: 'data/to_spool'
}
self.__downloadFolders = {
FOLDER_TEMPORARY: None,
......@@ -301,7 +306,8 @@ class Download(threading.Thread):
FOLDER_COMMANDS: None,
FOLDER_INTERNAL: None,
FOLDER_WAITING_TO_COMPLETE: None,
FOLDER_WAITING_LINK: None
FOLDER_WAITING_LINK: None,
FOLDER_TO_SPOOL: None
}
# Store localy the instance for log (in file and in db)
......@@ -340,6 +346,7 @@ class Download(threading.Thread):
self.__local_storage_repository = None
self.__local_storage_type = None
self.__local_added_repositories = None
self.__local_optional_spool = None
self.__misc_param_check_provider_before_download = CHECK_SOURCE_AVAILABILITY_DEFAULT
self.__misc_param_nb_retry = NB_RETRIES_DEFAULT
self.__misc_param_loop_delay = CYCLE_LENGTH_DEFAULT
......@@ -818,6 +825,9 @@ class Download(threading.Thread):
if self.__configuration.destination.keepParentFolder:
self.__misc_param_integrity_check = 'SAFE'
if self.__configuration.destination.optionalSpool:
self.__local_optional_spool = self.__configuration.destination.optionalSpool
# ==============================================================================================================
self._log.debug(" --> apply downloader configuration : source")
......@@ -1751,6 +1761,7 @@ class Download(threading.Thread):
try:
self._log.debug(" link '%s' in folder '%s'"
% (list_links[index], list_directories[index]))
nfs_check(list_links[index])
linkName = os.path.join(
list_directories[index], os.path.split(
list_links[index])[1])
......@@ -1778,8 +1789,11 @@ class Download(threading.Thread):
"storeFolder : store folder '%s' in final folder '%s'" %
(download_folder.getPath(), os.path.split(final_storage_path)[0]))
nfs_check( os.path.split(final_storage_path)[0])
try:
# Copie d'abord dans le repertoire temporaire cible dossier.tmp
# Copie d'abord dans le repertoire temporaire cible
tmp_final_storage_path = final_storage_path + '.NOT_SAFE'
shutil.rmtree(tmp_final_storage_path, ignore_errors=True)
shutil.copytree(download_folder.getPath(), tmp_final_storage_path)
......@@ -1812,6 +1826,33 @@ class Download(threading.Thread):
shutil.rmtree(final_storage_path, ignore_errors=True)
raise e
# #71
def storeFileInSpool(self, src_path, filename, final_store=False):
try:
if final_store:
spoolFile = File(filename, self.__getFolder(FOLDER_TO_SPOOL))
dst_filepath = os.path.join(self.__local_optional_spool, filename)
nfs_check(self.__local_optional_spool)
dst_tmp_filepath = dst_filepath + ".tmp"
spoolFile.move(dst_tmp_filepath)
if spoolFile.isLink() is False:
os.chmod(dst_tmp_filepath, FILE_ACCESS_RIGHT)
if self.__groupid != -1:
os.chown(dst_tmp_filepath, -1, self.__groupid)
spoolFile.move(dst_filepath)
else:
spoolFile = File(os.path.join(src_path, filename))
dst_filepath = os.path.join(self.__getFolder(FOLDER_TO_SPOOL).getPath(), filename)
dst_tmp_filepath = dst_filepath + ".tmp"
spoolFile.copy(dst_tmp_filepath)
shutil.move(dst_tmp_filepath, dst_filepath)
except Exception as e:
self._log.warning(
"[%s] storeFileInSpool : error while storing optional spool file '%s' into '%s'. It will be retried later" %
(self.id, spoolFile.getName(), dst_filepath))
raise e
def storeFile(self, downloaded_file_to_store, final_storage_base_path,
final_storage_relative_path, final_storage_filename):
# 2018/05/14 PMT: dans le cas des downloads en //, le repertoire peut être créé par un autre thread entre
......@@ -1833,21 +1874,43 @@ class Download(threading.Thread):
final_storage_relative_path),
create=False)
# #71
if self.__local_optional_spool and downloaded_file_to_store.localFilename is not None:
try:
self.storeFileInSpool(
downloaded_file_to_store.localFile.dirPath,
downloaded_file_to_store.localFilename,
final_store=downloaded_file_to_store.isStoredInternal)
except Exception as e:
if not downloaded_file_to_store.isStoredInternal:
raise e
dst_filepath = os.path.join(
dst_storage_folder.getPath(), final_storage_filename)
dst_tmp_filepath = dst_filepath + '.tmp'
downloaded_file_to_store.localFile.move(dst_tmp_filepath)
# 22/05/2018 PMT : Add chmod 644
downloaded_file_to_store.localFile.move(dst_filepath)
if downloaded_file_to_store.localFile.isLink() is False:
os.chmod(dst_filepath, FILE_ACCESS_RIGHT)
if self.__groupid != -1:
os.chown(dst_filepath, -1, self.__groupid)
try:
nfs_check(final_storage_base_path)
dst_tmp_filepath = dst_filepath + '.tmp'
downloaded_file_to_store.localFile.move(dst_tmp_filepath)
# 22/05/2018 PMT : Add chmod 644
downloaded_file_to_store.localFile.move(dst_filepath)
if downloaded_file_to_store.localFile.isLink() is False:
os.chmod(dst_filepath, FILE_ACCESS_RIGHT)
if self.__groupid != -1:
os.chown(dst_filepath, -1, self.__groupid)
except Exception as e:
if self.__local_optional_spool is not None:
spool_filename = os.path.join(self.__getFolder(FOLDER_TO_SPOOL).getPath(),
downloaded_file_to_store.localFilename)
if os.path.exists(spool_filename):
os.remove(spool_filename)
raise e
def storeDownloadedFile(self, downloaded_file, use_internal_folder=False,
use_final_folder=False, integrity_check=None):
if downloaded_file.localFile.getName(short=True) == '*':
if downloaded_file.localFile.getName(short=True).startswith('$'):
self._log.debug("Begin Store tree folder %s" %
downloaded_file.localFile.getDirPath())
else:
......@@ -1860,7 +1923,6 @@ class Download(threading.Thread):
"Download::storeDownloadedFile : You must choose internal_folder OR final_folder"
base_storage_folder = None
# TODO : creer ces dossiers au demarrage de l'application !
# 31/01/2018 PMT#33 : ajout de integrity_check SAFE : La cohérence s'effectue sur l'ensemble des fichiers d'un répertoire
# les fichiers sont placés dans un répertoire SAFE et ne seront
# transférés dans le local_storage_repository que lorsqu'ils seront
......@@ -1915,8 +1977,11 @@ class Download(threading.Thread):
relative_path = ''
try:
self.storeFile(downloaded_file, base_storage_folder.getPath(
), relative_path, relative_filename)
self.storeFile(
downloaded_file,
base_storage_folder.getPath(),
relative_path,
relative_filename)
except Exception as e:
dst_storage_folder = Folder(
os.path.join(
......@@ -2038,11 +2103,12 @@ class Download(threading.Thread):
raise e
# l'extraction s'est bien passee, on vire le fichier tar
original_filename = downloaded_file.localFile.getName(short=True)
downloaded_file.localFile.remove()
# On pointe sur le Folder du dé-tar ?!?! Existe-il une autre
# solution ?
downloaded_file.localFile = File('*', extract_folder)
downloaded_file.localFile = File('$' + original_filename, extract_folder)
else:
self._log.warning(
......@@ -2108,10 +2174,11 @@ class Download(threading.Thread):
raise e
# l'extraction s'est bien passee, on vire le fichier zip
original_filename = downloaded_file.localFile.getName(short=True)
downloaded_file.localFile.remove()
# On pointe sur le Folder du unzip ?!?! Existe-il une autre
# solution ?
downloaded_file.localFile = File('*', extract_folder)
downloaded_file.localFile = File('$' + original_filename, extract_folder)
else:
self._log.warning(
......@@ -2544,7 +2611,7 @@ class Download(threading.Thread):
try:
# we have to do a DownloadedFile to get the storage relative path (based on data_reader).
# we do not care about remote path here
#downloaded_file = DownloadedFile(file.getName(), None, self.__data_reader)
# downloaded_file = DownloadedFile(file.getName(), None, self.__data_reader)
if not file.getDirPath().startswith(store_folder.getPath()):
raise Exception(
......@@ -2552,7 +2619,7 @@ class Download(threading.Thread):
(file.getDirPath()))
final_storage_relative_path = file.getDirPath()[
len(store_folder.getPath()):]
len(store_folder.getPath()):]
final_storage_relative_path = final_storage_relative_path.lstrip(
'/')
final_storage_filename = file.getName(short=True)
......@@ -2601,6 +2668,53 @@ class Download(threading.Thread):
It will be retried later" %
(self.id, final_filepath, self.__local_added_repositories))
def storeRemainingSpoolFiles(self):
""" Pooling du dossier FOLDER_TO_SPOOL (internal storage folder), pour deplacer des que possible
les fichiers qui ont termine en erreur lors du deplacement dans le final storage folder """
final_storage_base_path = self.__local_optional_spool
store_folder = self.__getFolder(FOLDER_TO_SPOOL)
try:
store_folder.scan()
except OSError:
self._log.warning(f"Path {store_folder.getPath()} is temporarily inaccessible !")
else:
files = sorted(store_folder.values(restr=File), key=lambda x: x.getName())
self._log.debug(
'storeRemainingSpoolFiles : number of remaining files %d in %s' %
(len(files), store_folder.getPath()))
if len(files) > 0:
self._log.info(
'Remaining files in folder "to_spool", try to store them : ')
for file in files:
self._log.info(' -> %s' % (file.getName()))
try:
# we have to do a DownloadedFile to get the storage relative path (based on data_reader).
# we do not care about remote path here
# downloaded_file = DownloadedFile(file.getName(), None, self.__data_reader)
if not file.getDirPath().startswith(store_folder.getPath()):
raise Exception(
"file '%s' not in the internal folder ??" %
(file.getDirPath()))
self.storeFileInSpool(
file.getDirPath(),
file.getName(short=True),
final_store=True)
except Exception:
pass
else:
self._log.info(
'[%s] storeRemainingSpoolFiles OK : %s -> %s' %
(self.id, file.getName(short=False),
os.path.join(self.__local_optional_spool, file.getName(short=True))))
def validateFileDownload(
self,
source_file,
......@@ -2634,11 +2748,9 @@ class Download(threading.Thread):
if downloaded_file.localFile.exist():
mtime = time.strftime(DATE_FORMAT, time.gmtime(downloaded_file.localFile.getTime()))
size = str(downloaded_file.localFile.getSize())
print(f"mtime fichier : {mtime}")
else:
mtime = source_file.mtime.strftime(DATE_FORMAT)
size = str(source_file.size)
print(f"mtime source : {mtime}")
msg_downloaded_file = messages.File(
name=downloaded_file.localFile.getName(short=True),
......@@ -3114,6 +3226,7 @@ class Download(threading.Thread):
# fichier peut donc avoir sa validation, sans forcement etre depose
# dans le storage folder
try:
downloaded_file.localFilename = downloaded_file.localFile.getName(short=True)
self.storeDownloadedFile(
downloaded_file,
use_internal_folder=True,
......@@ -3125,8 +3238,8 @@ class Download(threading.Thread):
downloaded_file.remoteFilepath, str(e))
downloaded_file = None
else:
if self.__misc_param_integrity_check != "SAFE" and downloaded_file.localFile.getName(
short=True) != '*':
if self.__misc_param_integrity_check != "SAFE" \
and not downloaded_file.localFile.getName(short=True).startswith('$'):
self._log.debug(
"[%s] Internal storage success for file '%s' (stored in %s)" %
(self.id, downloaded_file.remoteFilepath, downloaded_file.localFile.getName()))
......@@ -3152,7 +3265,7 @@ class Download(threading.Thread):
(wait until all files in the secure directory are downloaded)"
% (self.id, downloaded_file.remoteFilepath))
msg_level = 'INFO' # tout s'est bien passe, c'est une info qui est envoyee a l'operateur
elif downloaded_file.localFile.getName(short=True) == '*':
elif downloaded_file.localFile.getName(short=True).startswith('$'):
self._log.debug(
"[%s] no final storage for tar file '%s' (using the secure storage procedure)" %
(self.id, downloaded_file.remoteFilepath))
......@@ -3292,6 +3405,8 @@ class Download(threading.Thread):
# if some files are still in the internal storage folder
self.storeRemainingFiles()
self.storeRemainingSpoolFiles()
# 15/03/2018 PMT : ajout des liens complémentaires sur les téléchargements finaux
# si des liens temporaires existent, ils sont copies dans les
# répertoires finaux.
......@@ -3527,6 +3642,8 @@ class Download(threading.Thread):
self.__configuration.destination.subpath)
self._log.log(level, " keep parent folder : '%s'" %
self.__configuration.destination.keepParentFolder)
self._log.log(level, " optional spool location : '%s'" %
self.__configuration.destination.optionalSpool)
self._log.log(level, " post processing : ")
self._log.log(level, " checksum : '%s'" %
self.__configuration.destination.checksum)
......@@ -3542,7 +3659,7 @@ class Download(threading.Thread):
self.__configuration.destination.archive_subdir)
self._log.log(level, " file group name : '%s'" %
self.__configuration.destination.fileGroupName)
self._log.log(level, " spoolLink : '%s'" %
self._log.log(level, " optional link spool : '%s'" %
self.__configuration.destination.spoolLink)
self._log.log(level, " settings : ")
self._log.log(level, " database :")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment