Commit d7494434 authored by PIOLLE's avatar PIOLLE

code refactoring; new multisearch

parent 8839ec1e
......@@ -87,7 +87,7 @@ elif args.commands == 'properties':
doctype = "granule"
print("\n{} properties for index ".format(doctype) +
Style.BRIGHT + Fore.BLUE + "{}".format(args.index) + " :\n" +
Style.BRIGHT + Fore.BLUE + "{}".format(args.index) + " :\n" +
Style.RESET_ALL
)
print_properties(properties)
......
......@@ -29,51 +29,26 @@ from naiad.queries.tile import Tile, TileEncoder
from naiad.queries.search import SpatioTemporalSearch
from naiad.utils.filelocator import FileLocator
from naiad.utils.executor import Executor
import naiad.utils
def get_options():
parser = argparse.ArgumentParser(
description='Search the granules matching a region and period of '
'interest'
)
naiad.utils.add_verbosity_args(parser)
naiad.utils.add_config_args(parser)
naiad.utils.add_search_args(parser)
naiad.utils.add_connection_args(parser)
parser.add_argument(
'name', type=str,
'index', type=str,
help='name of the product index (must be lowercase)'
)
parser.add_argument(
'-e', '--elasticsearch', dest='url', action='store',
help='url of the elasticsearch instance where to register'
'the granules. By default, use the elasticsearch of the local host'
)
parser.add_argument(
'-l', '--login', action='store',
default=None,
help='login for secured connection to Elasticsearch'
)
parser.add_argument(
'-p', '--password', action='store',
default=None,
help='password for secured connection to Elasticsearch'
)
parser.add_argument(
'--area', type=str,
help='area of interest, defined as "lonmin, latmin, lonmax, latmax"'
)
parser.add_argument(
'--start', type=str,
help='start time of the period, expressed as YYYYMMDDTHHMMSS'
)
parser.add_argument(
'--end', type=str,
help='end time of the period, expressed as YYYYMMDDTHHMMSS'
)
parser.add_argument(
'-v', '--verbose', action='store_true', help='verbose mode'
)
parser.add_argument(
'-d', '--debug', action='store_true', help='debug mode'
)
parser.add_argument(
'--show', action='store_true',
help="display each found granule's foot print on a separate map"
......@@ -83,126 +58,41 @@ def get_options():
help='display all found granule in one map'
)
parser.add_argument(
'--precise', action='store_true',
help='precise search : ensure strict temporal matching within the time '
'window at the search. If not set, the temporal matching is made '
'at granule level. Returns the slices of the granule subset '
'matching the search criteria.'
)
parser.add_argument(
'--subset-constraints', type=str,
help='specify some constraints on properties to subset the '
'granules only on relevant content. Only the subsets '
'of the granule matching the constraints will be '
'returned.'
)
parser.add_argument(
'--granule-constraints', type=str,
help='specify some constraints on properties at granule '
'level. Only the granules matching the specified '
'constraints will return a result.'
)
parser.add_argument(
'--full-path', action='store_true',
help="return the matching granules with their full local "
"name"
)
parser.add_argument(
'--datastore', type=str,
help='path to the datastore configuration file defining '
'the access rules to the granule files. Optional arg '
'and only applicable if --full-path is set.'
)
parser.add_argument(
'--toolstore', type=str,
help='path to the toolstore configuration file defining '
'how to pipe the naiad search result in an external tool. '
'Onl applicable if --exttool is set.'
)
parser.add_argument(
'--exttool', type=str,
help="name of the tool, configured in the toolstore "
"configuration file, into which are piped the returned granules"
)
parser.add_argument(
'--output-format', type=str, default="list",
help="format of the returned result, among list, detailed, json"
)
parser.add_argument(
'--fail-silently', action='store_true',
help=('does not throw an exception when a problem occurs '
'for one cross-over. Return the other results '
'normally.')
)
args = parser.parse_args()
# check arguments
if args.login is not None and args.password is None:
raise ValueError("You must provide a password")
return args
return parser.parse_args()
if '__main__' == __name__:
logger = logging.getLogger()
logger.setLevel(logging.INFO)
tracer = logging.getLogger('elasticsearch.trace')
tracer.setLevel(logging.WARNING)
tracer.addHandler(logging.StreamHandler())
args = get_options()
if args.debug:
logger.setLevel(logging.DEBUG)
# Set up verbosity option.
verbosity = naiad.utils.set_verbosity(args)
name = args.name.lower()
if args.area:
lonmin, latmin, lonmax, latmax = literal_eval(args.area)
else:
# global selection by default
lonmin, latmin, lonmax, latmax = -180, -90., 180., 90
area = shapely.geometry.box(lonmin, latmin, lonmax, latmax)
# Search arguments
search = naiad.utils.get_search_config(
naiad.utils.get_config_file(args.configfile), args
)
# connection parameters
connection = naiad.utils.get_connection_config(
naiad.utils.get_config_file(args.configfile), args
)
if args.start:
start = dateutil.parser.parse(args.start)
else:
start = datetime(1950, 1, 1)
if args.end:
end = dateutil.parser.parse(args.end)
else:
end = datetime.utcnow()
if args.precise:
precise = True
else:
precise = False
if args.granule_constraints:
granule_constraints = []
fields = args.granule_constraints.split(';')
for item in fields:
prop, oper, val = item.split(' ')
if oper not in ['eq', 'lt', 'le', 'gt', 'ge']:
raise Exception("Invalid constraint operator : %s", oper)
granule_constraints.append((prop, oper, val))
else:
granule_constraints = None
# create search query
index = Index(name)
search = SpatioTemporalSearch([name], area, start, end,
granule_constraints=granule_constraints,
precise=precise)
qsearch = SpatioTemporalSearch(
[args.index.lower()], search['area'], search['start'], search['end'],
granule_constraints=search['granule_constraints'])
# connect to Naiad server and run the query
try:
# create Naiad server object to query
es = Server(args.url, login=args.login, password=args.password)
res = search.run(es, precise=precise)
es = Server(
connection['es_server'],
login=connection['username'],
password=connection['password']
)
res = qsearch.run(es, precise=search['precise'])
except ConnectionError as _:
print("I can not perform the search (Elasticsearch can not be reached)",
file=sys.stderr)
......@@ -212,10 +102,14 @@ if '__main__' == __name__:
exit(-1)
# complete with full path name if required (implicit if exttool option is set)
if args.full_path or args.exttool:
datastore = FileLocator(args.datastore)
if search['full_path'] or search['exttool']:
storage = naiad.utils.get_storage_config_from_file(
args.index, naiad.utils.get_config_file(args.configfile)
)
for granule in res['data']:
granule.granule = datastore.get_full_path(granule.granule, name)
granule.granule = naiad.utils.filelocator.get_full_path(
granule.granule, storage, args.index)
if args.verbose:
print("Got %d results:\n" % search.total)
......@@ -223,7 +117,7 @@ if '__main__' == __name__:
print(res["error"])
# pipe result in external tool
if args.exttool is not None:
if search['exttool'] is not None:
toolstore = Executor(args.toolstore)
max_files = toolstore.get_maximum_inputs(args.exttool)
......@@ -246,15 +140,15 @@ if '__main__' == __name__:
# print result in stdout
else:
if args.output_format == "list":
if search['output_format'] == "list":
for granule in res['data']:
print(granule)
elif args.output_format == "detailed":
elif search['output_format'] == "detailed":
for granule in res['data']:
print(granule)
elif args.output_format == "json":
elif search['output_format'] == "json":
print(json.dumps(res['data'], cls=TileEncoder, sort_keys=True,
indent=4, separators=(',', ': ')))
......@@ -264,4 +158,4 @@ if '__main__' == __name__:
elif args.show:
for granule in res['data']:
granule.show(clip=area)
granule.show(clip=search['area'])
......@@ -84,7 +84,6 @@ def search(
config = get_config_from_file(indices, configfile)
# has to be the same server for each sought index
print(set([_['es_server'] for _ in config]))
if len(set([_['es_server'] for _ in config])) != 1:
raise NotImplementedError
......
......@@ -129,8 +129,8 @@ class SpatialQuery(Query):
if missing != 0:
if not self.fail_silently:
raise NaiadMissingResults(
'some results will be missing as they were ignored by ES '
'aggregation query'
'some results will be missing as they were ignored by '
'ES aggregation query'
)
result["error"]["missing"] = missing
else:
......@@ -519,7 +519,6 @@ class SpatioTemporalSearch(SpatialQuery):
min_intersection = 0,
granule_constraints=None,
excluded_granule=None,
precise=False,
fail_silently=False
):
super(SpatioTemporalSearch, self).__init__(products, fail_silently)
......@@ -566,6 +565,7 @@ class SpatioTemporalSearch(SpatialQuery):
server,
precise=False,
return_tiles=False,
geometry=False
):
"""run the search query
......@@ -578,6 +578,9 @@ class SpatioTemporalSearch(SpatialQuery):
return_tiles: return the list of tiles intersecting the search
area and time frame instead of a list of granules. Used mostly
for internal purpose.
geometry: add the geometry of the crossover intersection. Only
applies when ``precise`` is set (otherwise it is already
returned). May require additional queries to the server.
"""
# build query
query = self.build_search_query(
......@@ -621,6 +624,13 @@ class SpatioTemporalSearch(SpatialQuery):
self.coordinates,
min_intersection=self.min_intersection
)
# add cross-over geometry
if precise and geometry:
for tile in result:
search = GranuleInfoSearch(index + '_granule', tile.granule)
res = search.run(server)
tile.shape = self.coordinates.intersection(res[0].shape)
return result
......@@ -628,7 +638,8 @@ class SpatioTemporalSearch(SpatialQuery):
def run(self,
server=None,
precise=False,
return_tiles=False
return_tiles=False,
geometry=False
):
"""Execute the query
......@@ -655,7 +666,8 @@ class SpatioTemporalSearch(SpatialQuery):
res = self._search(
server=self.server,
precise=precise,
return_tiles=return_tiles
return_tiles=return_tiles,
geometry=geometry
)
return res
......@@ -669,8 +681,7 @@ class BulkTemporalSearch(SpatialQuery):
perform the searches
searches (list<SpatioTemporalSearch>): list of spatio-temporal searches
"""
def __init__(self, products, searches,
fail_silently=False):
def __init__(self, products, searches, fail_silently=False):
super(BulkTemporalSearch, self).__init__(products, fail_silently)
self.searches = searches
......@@ -678,6 +689,7 @@ class BulkTemporalSearch(SpatialQuery):
server=None,
precise=False,
return_tiles=False,
geometry=False,
max_number_of_requests=1000
):
"""Execute the query
......@@ -699,6 +711,9 @@ class BulkTemporalSearch(SpatialQuery):
area and time frame instead of a list of granules. Used mostly
for internal purpose.
"""
if isinstance(server, dict):
raise NotImplementedError
# store server in class attribute to manage pagination
self.server = self._get_server(server)
......@@ -725,7 +740,7 @@ class BulkTemporalSearch(SpatialQuery):
for search in self.searches[s0:s1]:
query_list += json.dumps({})
query_list += "\n"
query_list += search.json()
query_list += search.json(precise=precise)
query_list += "\n"
# query to ES
......@@ -743,22 +758,41 @@ class BulkTemporalSearch(SpatialQuery):
# build results
results = []
for i, resp in enumerate(responses):
if not 'hits' in resp and 'error' in resp:
#pprint.pprint(resp)
if 'hits' not in resp and 'error' in resp:
tmp = copy.copy(EMPTY_SEARCH_RESULT)
tmp['error']['request_processing'] = resp['error']
results.append(tmp)
break
results.append(
self.decode_result(
resp['hits']['hits'],
doctype,
self.searches[i].coordinates,
min_intersection=self.searches[i].min_intersection
if doctype == GRANULE_INDEX:
resp = resp['hits']['hits']
single_res = self.decode_result(
resp,
doctype,
self.searches[i].coordinates,
min_intersection=self.searches[i].min_intersection
)
# add cross-over geometry
if precise and geometry:
for tile in single_res['data']:
search = GranuleInfoSearch(
self.products[0], tile.granule)
res = search.run(server)
if not isinstance(res['data'], Tile):
logging.error(
'Granule {} not found in granule index'
.format(tile.granule)
)
continue
tile.shape = self.searches[i].coordinates.intersection(
res['data'].shape
)
)
results.append(single_res)
return results
......@@ -803,13 +837,44 @@ class MultiSpatioTemporalQuery(BulkTemporalSearch):
def run(self,
server,
precise: bool = False,
return_tiles: bool = False):
return_tiles: bool = False,
geometry: bool = True,
filter_duplicates: bool = True):
res = super(MultiSpatioTemporalQuery, self).run(
server, precise=precise, return_tiles=return_tiles)
server, precise=precise, return_tiles=return_tiles,
geometry=geometry)
crossovers = OrderedDict([])
if len(res) > 0:
for i, target in enumerate(self.targets):
crossovers[target[0].strftime('%Y%m%dT%H%M%S')] = res[i]['data']
if filter_duplicates:
duplicates = {}
# collect duplicated granules
for t, crossover in crossovers.items():
for g in crossover:
if g.granule not in duplicates:
duplicates[g.granule] = []
duplicates[g.granule].append((g, t))
# remove duplicates
granules = []
for g in duplicates:
if len(duplicates[g]) == 1:
# granule only found once
granules.append(duplicates[g][0])
else:
logging.debug("Removed duplicate : ", g)
# select closest in time
selection = 0
item, t = duplicates[g][selection]
min_delta = abs((item.start - t).total_seconds())
for i, (item, t) in enumerate(duplicates[g][1:]):
delta = abs((item.start - t).total_seconds())
if delta < min_delta:
selection = 1
min_delta = delta
granules.append(duplicates[g][selection])
return crossovers
......@@ -245,8 +245,8 @@ class Tile(object):
# trick to cope with polygon issues in ES
# split the polygon at 0 meridian (which seem to cause failure
# and re-unite
# if (tile.shape.geom_type in ['Polygon', 'MultiPolygon'] and
# tile.shape.intersects(LineString([(0., -90.), (0, 90.)]))):
#if (tile.shape.geom_type in ['Polygon', 'MultiPolygon'] and
# tile.shape.intersects(shapely.geometry.LineString([(0., -90.), (0, 90.)]))):
# tile.shape = GeoShape.split_over_greenwich(tile.shape)
# check the footprint shape is valid
......@@ -270,7 +270,7 @@ class Tile(object):
logging.debug(str(tile))
# correct clockwise multipolygons
#tile.shape = GeoShape.orient(tile.shape)
tile.shape = GeoShape.orient(tile.shape)
return granules, tiles
......
import logging
import os
from pathlib import Path
import yaml
from .search import *
from .storage import *
from ..queries.server import Server
CONNECTION_PARAMS = {
'es_server': 'http://localhost:9200',
'username': None,
'password': None,
'fail_silently': False
}
logger = logging.getLogger()
logger.setLevel(logging.WARNING)
tracer = logging.getLogger('elasticsearch.trace')
tracer.setLevel(logging.WARNING)
tracer.addHandler(logging.StreamHandler())
def get_config_file(config: Path = None):
"""Get the user Naiad configuration file.
Args:
config (str, optional): configuration file containing the definition of
indexes in Naiad. If not provided, the will search for
NAIAD_HOME env variable or by default in the default
``.naiad/indexes.yaml`` file in the user home directory.
"""
if config is not None and not config.exists():
raise IOError(
'Naiad configuration file {} not found'.format(config)
)
if config is None:
if 'NAIAD_HOME' in os.environ:
config = Path(os.environ['NAIAD_HOME']) / 'indexes.yaml'
else:
# look in user home
config = Path.home() / '.naiad/indexes.yaml'
if not config.exists():
return
with open(config) as f:
config = yaml.load(f, Loader=yaml.FullLoader)
return config
def add_verbosity_args(parser):
# Add verbosity control.
verbosity_group = parser.add_mutually_exclusive_group()
verbosity_group.add_argument(
"-v", "--verbose", action="store_true",
help="Activate debug level logging - for extra feedback."
)
verbosity_group.add_argument(
"-q", "--quiet", action="store_true",
help="Disable information logging - for reduced feedback."
)
verbosity_group.add_argument(
"-s", "--silent", action="store_true",
help="Log ONLY the most critical or fatal errors."
)
parser.add_argument(
'--benchmark', action='store_true',
help='provides the time elasped between start and end of execution'
)
return verbosity_group
def add_connection_args(parser):
parser.add_argument(
'-e', '--es_server', dest='es_server', action='store',
default="http://localhost:9200",
help=(
'url of the elasticsearch instance where to register '
'the index. By default, the default elasticsearch port '
'on the local host will be used (http://localhost:9200)')
)
parser.add_argument(
'-l', '--login', action='store', dest='username',
help='login for secured connection to Elasticsearch'
)
parser.add_argument(
'-p', '--password', action='store',
help='password for secured connection to Elasticsearch'
)
parser.add_argument(
"-f", "--fail_silently", action="store_true",
help="ignore errors returned by Elasticsearch"
)
def get_connection_config(config, args=None, index=None):
if config is None:
return CONNECTION_PARAMS
connection = config['globals']
logging.info("Connection parameters:")
for param in ['es_server', 'username', 'password', 'fail_silently']:
if args is not None and getattr(args, param) is not None:
connection[param] = getattr(args, param)
elif index in config and param in config['index']:
connection[param] = config['index'][param]
else:
connection[param] = CONNECTION_PARAMS[param]
logging.info(' {}: {}'.format(param, connection[param]))
return connection
def get_multiconnection_config(indices):
"""get connection details for multiple indices"""
connections = {}
for index in indices:
connections[index] = get_connection_config(
get_config_file(), index=index
)
if len(set(frozenset(d.items()) for d in connections.values())) == 1:
connection = list(connections.values())[0]
return Server(
connection['es_server'],
login=connection['username'],
password=connection['password']
)
return {
index: Server(connection['es_server'],
login=connection['username'],
password=connection['password'])
for index, connection in indices.items()
}
def set_verbosity(args):
"""read and set up verbosity option"""
if args.verbose:
logger.setLevel(logging.DEBUG)
tracer.setLevel(logging.DEBUG)
elif args.quiet:
logger.setLevel(logging.WARNING)
tracer.setLevel(logging.WARNING)
elif args.silent:
logger.setLevel(logging.FATAL)
tracer.setLevel(logging.FATAL)
else:
logger.setLevel(logging.INFO)
def add_config_args(parser):
parser.add_argument(
'-c, --configfile', type=Path, default=None, dest='configfile',
help='path to the configuration file if not using the default one'
)
......@@ -10,8 +10,30 @@ Class to locate granules indexed in Naiad
.. sectionauthor:: Jeff Piolle <jfpiolle@ifremer.fr>
.. codeauthor:: Jeff Piolle <jfpiolle@ifremer.fr>
"""
import os
from pathlib import Path
from dateutil import parser
def get_full_path(granule, storage, index):
try:
path = storage["storage_path"]
except KeyError:
raise ValueError(
"No storage location defined for index {}"
.format(index)
)
try:
extractor = storage["time_parser"] \
.replace('$FILE', granule)
except KeyError:
raise ValueError(
"No specified mean of extracting reference time for granule ")
date = eval(extractor)
return Path(date.strftime(path)) / granule