Unverified Commit 2af5622c authored by CHARLES's avatar CHARLES 🐧
Browse files

Py3 & library upgrade fixes

parent 21b6cce3
# Downloader daemon
Checks online services for changed in data files, and downloads them when needed.
IMPORTANT
=========
This branch is a merge between master and feature/no_json_emm branches: RabbitMQ messaging can be turned on/off by setting boolean USE_RABBIT in the Download.py.
Additionally, it provides new features:
- default global session timeout (Download.py: SESSION_TIMEOUT_SEC) to avoid infinite wait when establishing connection to remote provider.
- new data reader for remote storage not having subfolders organized by period
- new option IGNORE_FILES_OLDER_THAN (used in xxx_download.xml) to scan only newest directories
- auto vacuum databases
......@@ -2,12 +2,13 @@
from dchecktools.common.errors import DC_ConfigError, DC_DbError
from sqlalchemy.exc import ProgrammingError, IntegrityError, DBAPIError
from sqlalchemy.interfaces import PoolListener
import sqlite3
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.exc import ProgrammingError, IntegrityError, DBAPIError
from sqlalchemy.interfaces import PoolListener
from datetime import datetime
import logging
import time
import os
......@@ -138,11 +139,9 @@ class SqliteBaseFilesInfos(object):
sys.exit(1)
self.db = create_engine(
'sqlite:///' +
self.database_path,
'sqlite:///' + self.database_path,
echo=False,
listeners=[
MyPoolListener()])
listeners=[MyPoolListener()])
try:
self.db.connect()
except DBAPIError as msg:
......@@ -152,7 +151,9 @@ class SqliteBaseFilesInfos(object):
self.db.echo = False
self.metadata = MetaData(self.db)
self.session = Session()
SessionClass = sessionmaker(bind=self.db)
self.session = SessionClass()
def createTables(self, column_key_list=[], autoload=True):
class Execution(object):
......@@ -166,80 +167,91 @@ class SqliteBaseFilesInfos(object):
class File(object):
pass
self.constants = Table(
'constants_infos',
self.metadata,
autoload=autoload)
constants_columns = []
if not autoload:
self.constants.append_column(
constants_columns.append(
Column(
'id',
Integer,
primary_key=True,
autoincrement=True))
self.constants.append_column(Column("id_execution", Integer))
self.constants.append_column(Column("title", String(40)))
self.constants.append_column(Column("data", TEXT))
constants_columns.append(Column("id_execution", Integer))
constants_columns.append(Column("title", String(40)))
constants_columns.append(Column("data", TEXT))
self.constants = Table(
'constants_infos',
self.metadata,
*constants_columns,
autoload=autoload)
if not autoload:
try:
self.constants.create(checkfirst=True)
except ProgrammingError:
self.constants.create(checkfirst=False)
mapper(Constant, self.constants)
self.exec_infos = Table('exec_infos', self.metadata, autoload=autoload)
exec_infos_columns = []
if not autoload:
self.exec_infos.append_column(
exec_infos_columns.append(
Column(
'id',
Integer,
primary_key=True,
autoincrement=True))
self.exec_infos.append_column(Column("id_execution", Integer))
self.exec_infos.append_column(Column("date", DateTime))
self.exec_infos.append_column(Column("title", String(40)))
self.exec_infos.append_column(Column("data", TEXT))
exec_infos_columns.append(Column("id_execution", Integer))
exec_infos_columns.append(Column("date", DateTime()))
exec_infos_columns.append(Column("title", String(40)))
exec_infos_columns.append(Column("data", TEXT))
self.exec_infos = Table('exec_infos', self.metadata, *exec_infos_columns, autoload=autoload)
if not autoload:
try:
self.exec_infos.create(checkfirst=True)
except ProgrammingError:
self.exec_infos.create(checkfirst=False)
mapper(ExecInfo, self.exec_infos)
self.executions = Table('executions', self.metadata, autoload=autoload)
executions_columns = []
if not autoload:
self.executions.append_column(
executions_columns.append(
Column(
'id',
Integer,
primary_key=True,
autoincrement=True))
self.executions.append_column(Column("date_start", DateTime))
self.executions.append_column(Column("date_stop", DateTime))
self.executions.append_column(Column("valid_execution", Boolean))
executions_columns.append(Column("date_start", DateTime()))
executions_columns.append(Column("date_stop", DateTime(), nullable=True))
executions_columns.append(Column("valid_execution", Boolean, nullable=True))
self.executions = Table('executions', self.metadata, *executions_columns, autoload=autoload)
if not autoload:
try:
self.executions.create(checkfirst=True)
except ProgrammingError:
self.executions.create(checkfirst=False)
mapper(Execution, self.executions)
self.files = Table('files', self.metadata, autoload=autoload)
files_columns = []
if not autoload:
self.files.append_column(
files_columns.append(
Column(
"id_execution",
Integer,
primary_key=True))
self.files.append_column(
files_columns.append(
Column(
"filename",
String(1024),
primary_key=True))
self.files.append_column(Column("isDirectory", Boolean))
self.files.append_column(Column("isSymLink", Boolean))
files_columns.append(Column("isDirectory", Boolean))
files_columns.append(Column("isSymLink", Boolean))
for column_key in column_key_list:
column = self.__getColumn(column_key)
self.files.append_column(column)
files_columns.append(column)
self.infosKeyList += [column.name]
self.files = Table('files', self.metadata, *files_columns, autoload=autoload)
if not autoload:
try:
self.files.create(checkfirst=True)
except ProgrammingError:
......@@ -261,13 +273,11 @@ class SqliteBaseFilesInfos(object):
pass
mapper(Execution, self.executions)
start_time_str = time.strftime(
"%Y-%m-%d %H:%M:%S",
time.localtime(
self.config._start_time))
self.current_execution = Execution()
self.current_execution.date_start = start_time_str
self.session.bulk_save_objects(self.current_execution)
self.current_execution.date_start = datetime.fromtimestamp(self.config._start_time)
self.current_execution.date_stop = None
self.current_execution.valid_execution = None
self.session.bulk_save_objects([self.current_execution])
# need a flush to get the current_execution.id != None
self.flushEngine()
......@@ -311,9 +321,9 @@ class SqliteBaseFilesInfos(object):
if column_key == 'size':
return Column('size', Integer)
elif column_key == 'mtime':
return Column('mtime', DateTime)
return Column('mtime', DateTime())
elif column_key == 'sensingtime':
return Column('sensingtime', DateTime)
return Column('sensingtime', DateTime())
elif column_key == 'uid':
return Column('uid', Integer)
elif column_key == 'gid':
......@@ -355,7 +365,7 @@ class SqliteBaseFilesInfos(object):
#if self.config.debug: log.debug("addFile : %s", str(f))
f.id_execution = self.__id_execution
try:
self.session.bulk_save_objects(f)
self.session.bulk_save_objects([f])
self.added_files_nbr += 1
if FLUSH_FILE_BUFFER_ACTIVATED:
if self.flush_file_buffer >= FLUSH_FILE_BUFFER:
......@@ -379,7 +389,7 @@ class SqliteBaseFilesInfos(object):
constant.data = data
constant.id_execution = self.__id_execution
log.debug("addConstant : %s => %s" % (constant.title, constant.data))
self.session.bulk_save_objects(constant)
self.session.bulk_save_objects([constant])
def addExecInfo(self, title, data):
assert self.__id_execution is not None
......@@ -394,7 +404,7 @@ class SqliteBaseFilesInfos(object):
exec_info.data = data
exec_info.id_execution = self.__id_execution
log.debug("addExecInfo : %s => %s" % (exec_info.title, exec_info.data))
self.session.bulk_save_objects(exec_info)
self.session.bulk_save_objects([exec_info])
########## Requetes ##########
......@@ -428,8 +438,9 @@ class SqliteBaseFilesInfos(object):
table.delete(table.c.id_execution < executionId).execute()
def req_PurgeOlderThanStartDate(self, startDate, keep_last=True):
executionId = self.req_NextExecutionsIds(startDate)[0].id
self.req_PurgeOlderThan(executionId, keep_last)
execs = self.req_NextExecutionsIds(startDate)
if len(execs) > 0:
self.req_PurgeOlderThan(execs[0].id, keep_last)
def req_LastTerminatedExecutions(self, limit=10):
# select id from executions order by date_start desc
......@@ -471,9 +482,9 @@ class SqliteBaseFilesInfos(object):
def req_PreviousTerminatedExecutionsIds(
self, date, only_valid_exec=True, limit=10):
# using and_ to have a statement
valid_exec_stmt = and_(1, self.executions.c.valid_execution == 1)
valid_exec_stmt = and_(True, self.executions.c.valid_execution == 1)
if not only_valid_exec:
valid_exec_stmt = and_(1, 1)
valid_exec_stmt = and_(True, True)
s = select([self.executions],
and_(and_(self.executions.c.date_start <= date,
self.executions.c.date_stop >= self.executions.c.date_start),
......
......@@ -241,7 +241,8 @@ class Protocol_http_directorylist(AbstractProtocol):
# la base ne se nettoie pas...
self._parser = FileExtractor()
try:
self._parser.feed(urllib.request.urlopen(url).read())
data = urllib.request.urlopen(url).read().decode("utf-8")
self._parser.feed(data)
except urllib.error.HTTPError as e:
if e.code == 404:
raise
......
......@@ -403,7 +403,7 @@ class DCheckReportListingBuilder(IListingBuilder):
#dc = dcheck.DCheck()
#options, args = dc.getOptions(argsList)
#dc.run(options, args)
self.__process = Popen(cmd, stdout=PIPE, stderr=PIPE, close_fds=True)
self.__process = Popen(cmd, stdout=PIPE, stderr=PIPE, close_fds=True, text=True)
stdout_lines, stderr_lines = self.__process.communicate()
......
......@@ -417,7 +417,7 @@ class SynchronisationLocalRemote(object):
cmd = ['dcheck'] + argsList
self.__log.info("Cmd %s" % cmd)
process = Popen(cmd, stdout=PIPE, stderr=PIPE, close_fds=True)
process = Popen(cmd, stdout=PIPE, stderr=PIPE, close_fds=True, text=True)
stdout_lines, stderr_lines = process.communicate()
......
django
netCDF4
amqp==2.2.2
django<2
netCDF4~=1.4.0
kombu~=4.3.0
amqp~=2.4.0
billiard==3.5.0.3
celery==4.1.0
certifi==2017.11.5
downloader==3.2.0
kombu==4.1.0
pytz==2017.3
simplejson==3.13.2
SQLAlchemy==0.4.8
vine==1.1.4
SQLAlchemy~=1.2.0
vine~=1.2.0
ifr_lib~=1.2.0
\ No newline at end of file
......@@ -66,7 +66,7 @@ setup(
url='http://cerpypi.ifremer.fr/packages/downloader',
license='LICENSE.txt',
description='Downloader for the Felyx distributed system.',
long_description=open('README.txt').read(),
long_description=open('README.md').read(),
install_requires=[
'simplejson>=3.3.0',
'argparse',
......@@ -85,5 +85,6 @@ setup(
'cffi>=1.12',
],
cffi_modules=[
i + ':ffi' for i in glob('downloader/scheduler/com/api/*/_*_build.py')],
i + ':ffi' for i in glob('downloader/scheduler/com/api/*/_*_build.py')
],
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment