Commit 5ab55ffa authored by BODERE's avatar BODERE
Browse files

feat: enhance endpoint and engines

parent 0af67255
Pipeline #6120 passed with stage
# ---------------------------------------------------------------
# Global
# ---------------------------------------------------------------
# to build docker in docker
services:
- docker:dind
variables:
DOCKER_DRIVER: overlay2
PACKAGE_NAME: opensearx_ws
# default image
image: python:latest
# stages (main steps of pipeline)
stages:
- Quality
- Sonarqube
- Documentation
- Deploy
# ---------------------------------------------------------------
# Jobs templates
# ---------------------------------------------------------------
.code-changes-template: &code-changes
only:
changes:
- ${PACKAGE_NAME}/**/*.py
- tests/**/*.py
- .gitlab-ci.yml
- pyproject.toml
.install-deps-template: &install-deps
before_script:
- pip install poetry poetry-dynamic-versioning poetry2conda
- poetry config virtualenvs.in-project false
- poetry install -vv
tags: [opensearx-runner]
.quality-template: &quality
<<: *install-deps
<<: *code-changes
image: python:3.8
stage: Quality
allow_failure: true
except:
- tags
# ---------------------------------------------------------------
# Quality jobs
# ---------------------------------------------------------------
flake8:
<<: *quality
script:
- poetry run flake8 --max-line-length=120 --docstring-convention google ${PACKAGE_NAME}
pylint:
<<: *quality
script:
- "poetry run pylint --exit-zero ${PACKAGE_NAME} tests -r n --max-line-length=120 --msg-template='{path}:{line}: [{msg_id}({symbol}), {obj}] {msg}' | tee pylint.txt"
artifacts:
expire_in: 1 week
paths:
- pylint.txt
# ---------------------------------------------------------------
# SonarQube
# ---------------------------------------------------------------
#sonarqube:
# stage: Sonarqube
# tags: [opensearx-runner]
# image:
# name: sonarsource/sonar-scanner-cli:latest
# entrypoint: [""]
# allow_failure: true
# script:
# - sonar-scanner
# -Dsonar.projectKey=${CI_PROJECT_NAME}
# -Dsonar.language=py
# -Dsonar.host.url=http://visi-common-sonar:9000
# -Dsonar.login=e6f816eee72d3d5c03319ec74b468157b9164d12
# -Dsonar.sourceEncoding=UTF-8
# -Dsonar.python.coverage.reportPaths=coverage.xml
# -Dsonar.coverage.exclusions=**__init__**,tests/**
# -Dsonar.python.pylint.reportPath=pylint.txt
# <<: *code-changes
# except:
# - tags
# ---------------------------------------------------------------
# Produce docker image
# ---------------------------------------------------------------
build_and_push_docker:
# pipeline stage
stage: Deploy
# gitlab runner
tags: [opensearx-runner]
# docker image used to run build_and_push_docker
image: docker:latest
# commands to execute
script:
- test -n "${CI_BUILD_TOKEN}" && docker login -u gitlab-ci-token -p ${CI_BUILD_TOKEN} ${CI_REGISTRY} || true
- docker build -t ${CI_REGISTRY}/${CI_PROJECT_PATH}/${CI_PROJECT_NAME}:${CI_COMMIT_TAG} .
- docker push ${REGISTRY_PATH} ${CI_REGISTRY}/${CI_PROJECT_PATH}/${CI_PROJECT_NAME}:${CI_COMMIT_TAG}
# trigger : only on tag creation
only:
- tags
# python 3.8 with
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.8
# Install Poetry
RUN curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | POETRY_HOME=/opt/poetry python && \
cd /usr/local/bin && \
ln -s /opt/poetry/bin/poetry && \
poetry config virtualenvs.create false
# Copy using poetry.lock* in case it doesn't exist yet
COPY ./pyproject.toml ./poetry.lock /app/
RUN poetry install --no-root --no-dev && \
rm /app/pyproject.toml && \
rm /app/poetry.lock
COPY opensearx_ws /app/opensearx_ws
# Set envvar
ENV PYTHONPATH=/app/opensearx_ws
ENV MODULE_NAME="opensearx_ws.main"
......@@ -9,4 +9,3 @@ Dependencies :
- fastapi : MIT License
- httpx : BSD License
- uvicorn : BSD License
- atoma: MIT License
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from opensearx_ws.opensearch import IfremerOpensearchEngine, NasaOpensearchEngine
# -*- coding: utf-8 -*
"""Opensearx configuration"""
from starlette.templating import Jinja2Templates
from opensearx_ws.opensearch.engine import (IfremerOpensearchEngine,
JPLOpensearchEngine)
# opensearch engines executed
opensearch_engines = {
"Ifremer": IfremerOpensearchEngine(root_path='https://opensearch.ifremer.fr/granules.atom', timeout=30.0),
"TimeoutFailed": IfremerOpensearchEngine(root_path='https://opensearch.ifremer.fr/granules.atom', timeout=1.0),
"NotExistingURL": IfremerOpensearchEngine(root_path='https://toto.ifremer.fr/granules.atom', timeout=2.0),
"Nasa": NasaOpensearchEngine(root_path='https://cmr.earthdata.nasa.gov/opensearch/granules.atom', timeout=30.0)
"Opensearch Ifremer":
IfremerOpensearchEngine(root_path='https://opensearch.ifremer.fr/granules.atom', timeout=20.0),
"Opensearch Ifremer - timeout 1s":
IfremerOpensearchEngine(root_path='https://opensearch.ifremer.fr/granules.atom', timeout=1.0),
"Non-existent URL":
IfremerOpensearchEngine(root_path='https://toto.ifremer.fr/granules.atom', timeout=2.0),
"JPL engine":
JPLOpensearchEngine(root_path='https://cmr.earthdata.nasa.gov/opensearch/granules.atom', timeout=20.0)
}
# jinja2 templates utility
j2_templates = Jinja2Templates(directory="templates")
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Opensearx webservice
Execute requests on opensearch engines and merge the results.
"""
import asyncio
from fastapi import FastAPI
from fastapi import Depends
import uvicorn
from fastapi import Depends, FastAPI
from fastapi.responses import ORJSONResponse
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from opensearx_ws.config import opensearch_engines
from opensearx_ws.opensearch import OpensearchQueryParameters, merge_response, OpensearchResponseFormat
from opensearx_ws.config import j2_templates, opensearch_engines
from opensearx_ws.opensearch import merge_opensearch_responses
from opensearx_ws.opensearch.model import (OpensearchQueryParameters,
OpensearchResponseFormat)
app = FastAPI()
@app.get("/conf")
def conf():
@app.get("/engines")
def conf() -> JSONResponse:
"""Display the configuration
Will be removed for production
"""
return opensearch_engines
@app.get("/granules_raw")
async def granule(params: OpensearchQueryParameters = Depends()):
@app.get("/granules")
@app.get("/granules{response_fmt}")
async def granules(
request: Request,
response_fmt: OpensearchResponseFormat = OpensearchResponseFormat.atom,
params: OpensearchQueryParameters = Depends()) -> Response:
"""granules endpoint
Execute a request on each opensearch engine if the dataset is available.
If the output format is `.raw`, then return a list for the raw responses.
Otherwise, the responses are merged into a single response and the response id formatted
with the specified format (`.atom`, `.json`)
:param request:
:param response_fmt: output format (atom, json, raw)
:param params:
:return:
"""
# prepare task (i.e. coroutines)
tasks = [asyncio.create_task(engine.request(params)) for engine in opensearch_engines.values()]
return await asyncio.gather(*tasks)
# execute taks in parallel and retrieve a list of results
responses = await asyncio.gather(*tasks)
@app.get("/granules{format}")
async def granule(
request: Request,
format: OpensearchResponseFormat = OpensearchResponseFormat.atom,
params: OpensearchQueryParameters = Depends()):
tasks = [asyncio.create_task(engine.request(params)) for engine in opensearch_engines.values()]
return merge_response(request.url, await asyncio.gather(*tasks), params)
# retrieve engine names as a list (to map with response)
engine_names = list(opensearch_engines.keys())
# if response is `raw` -> return a dict<engine_name, response> as a JSON document
if response_fmt == OpensearchResponseFormat.raw:
return ORJSONResponse({engine_names[idx]: response.dict() for idx, response in enumerate(responses)})
# else, we merge response into a single one
merged_response = merge_opensearch_responses(request.url, responses, params, engine_names)
# if format is `json` -> return the response as a JSON document
if response_fmt == OpensearchResponseFormat.json:
return ORJSONResponse(merged_response.dict())
# if format is `atom` -> return the response as an ATOM document using a jinja2 template
if response_fmt == OpensearchResponseFormat.atom:
return j2_templates.TemplateResponse(
name="granules.atom",
context={"request": request, "response": merged_response},
media_type="application/xml"
)
@app.get("/")
def home(request: Request):
"""Home endpoint
"""
return {
"Test granule Ifremer": "http://127.0.0.1:8000/granules?datasetId=sca_l2a___&startPage=0&count=1000"
"&timeStart=2014-01-01T00:00:00Z&timeEnd=2020-05-18T23:59:59Z&geoBox=134,-64,135,-63",
"Test granule Nasa": "http://127.0.0.1:8000/granules?datasetId=C1693233348-PODAAC&startPage=0&count=1000"
"&timeStart=2014-01-01T00:00:00Z&timeEnd=2020-05-18T23:59:59Z&geoBox=134,-64,135,-63",
"Raw response": "http://127.0.0.1:8000/granules_raw?datasetId=sca_l2a___&startPage=0&count=1000"
"&timeStart=2014-01-01T00:00:00Z&timeEnd=2020-05-18T23:59:59Z&geoBox=134,-64,135,-63",
"Configuration": "http://127.0.0.1:8000/conf"
"Test granule Ifremer (atom, default)":
f"{request.url}granules?datasetId=sca_l2a___&startPage=0&count=1000"
"&timeStart=2014-01-01T00:00:00Z&timeEnd=2020-05-18T23:59:59Z&geoBox=134,-64,135,-63",
"Test granule Nasa (atom)":
f"{request.url}granules.atom?datasetId=C1693233348-PODAAC&startPage=0&count=1000"
"&timeStart=2014-01-01T00:00:00Z&timeEnd=2020-05-18T23:59:59Z&geoBox=134,-64,135,-63",
"Test granule Ifremer (raw)":
f"{request.url}granules.raw?datasetId=sca_l2a___&startPage=0&count=1000"
"&timeStart=2014-01-01T00:00:00Z&timeEnd=2020-05-18T23:59:59Z&geoBox=134,-64,135,-63",
"Test granule Ifremer (json)":
f"{request.url}granules.json?datasetId=sca_l2a___&startPage=0&count=1000"
"&timeStart=2014-01-01T00:00:00Z&timeEnd=2020-05-18T23:59:59Z&geoBox=134,-64,135,-63",
"Configuration": f"{request.url}engines"
}
@app.get("/version")
def read_version():
return {"version ": 0.1}
if __name__ == "__main__":
uvicorn.run("opensearx_ws.main:app", host="127.0.0.1", port=8000, reload=True)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from typing import List
from starlette.datastructures import URL
from opensearx_ws.opensearch.model import (OpensearchQuery,
OpensearchQueryParameters,
OpensearchResponse,
OpensearchResponseEntry,
OpensearchResponseHeader)
def merge_opensearch_responses(
query_url: URL,
responses: List[OpensearchResponse],
params: OpensearchQueryParameters,
engine_names: List) -> OpensearchResponse:
computed_total_results = 0
merged_entries: List[OpensearchResponseEntry] = list()
errors = list()
for idx, response in enumerate(responses):
if response.errors:
errors.extend([f"{engine_names[idx]} : {error}" for error in response.errors])
if response.header is None:
continue
computed_total_results += response.header.total_results
merged_entries.extend(response.entries)
return OpensearchResponse(
query=OpensearchQuery(url=str(query_url), params=params),
errors=errors,
header=OpensearchResponseHeader(
id=str(query_url),
title="opensearx results",
total_results=computed_total_results,
start_index=params.startPage,
items_per_page=params.count
),
entries=merged_entries
)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Opensearch engines"""
import xml.etree.ElementTree as Element
from urllib.parse import urlencode, urlsplit, parse_qs, urlunsplit, quote
from datetime import datetime
from enum import Enum
from typing import List, Dict, Optional, Tuple
from typing import Dict, List, Optional, Tuple
from urllib.parse import parse_qs, quote, urlencode, urlsplit, urlunsplit
from xml.etree.ElementTree import Element
import httpx
from httpx import Response
from pydantic import HttpUrl
from pydantic.main import BaseModel
from starlette.datastructures import URL
class OpensearchResponseFormat(str, Enum):
atom = '.atom'
json = '.json'
html = '.html'
class OpensearchQueryParameters(BaseModel):
datasetId: str
startPage: int
count: int
timeStart: datetime
timeEnd: datetime = datetime.now()
geoBox: str
class OpensearchResponseEntryLink(BaseModel):
title: str = None
rel: str
type: str = None
href: str
class OpensearchResponseEntry(BaseModel):
id: str = None
title: str = None
summary: str = None
updated: datetime = None
dc_date: str = None
geobox: str = None
geobox_where: str = None
links: List[OpensearchResponseEntryLink] = list()
class OpensearchResponseHeader(BaseModel):
id: str
title: str
total_results: int = 0
start_index: int = 0
items_per_page: int = 0
from starlette.responses import Response
class OpensearchQuery(BaseModel):
url: str
params: OpensearchQueryParameters
class OpensearchResponse(BaseModel):
query: OpensearchQuery = None
error: str = None
header: OpensearchResponseHeader = None
entries: List[OpensearchResponseEntry] = list()
from opensearx_ws.opensearch.model import (OpensearchQuery,
OpensearchQueryParameters,
OpensearchResponse,
OpensearchResponseEntry,
OpensearchResponseEntryLink,
OpensearchResponseHeader)
class OpensearchEngine:
......@@ -92,11 +41,13 @@ class OpensearchEngine:
raw_response = await client.get(url=url, timeout=self.timeout)
opensearch_response.header, opensearch_response.entries = self._process_response(raw_response)
except httpx.ReadTimeout:
opensearch_response.error = "Timeout occurred"
except Exception as e:
print(str(e))
opensearch_response.error = str(e)
opensearch_response.errors = ["Timeout occurred"]
# except Exception as e:
# print(str(e.with_traceback()))
# opensearch_response.errors = str(e).split('\n')
finally:
if opensearch_response.header is None and not opensearch_response.errors:
opensearch_response.errors.append("Not a valid response")
return opensearch_response
def _build_search_url(self, params: OpensearchQueryParameters) -> str:
......@@ -136,42 +87,42 @@ class OpensearchAtomEngine(OpensearchEngine):
}
def _process_response(self, http_response: Response) -> Tuple[OpensearchResponseHeader, List[OpensearchResponseEntry]]:
feed = Element.fromstring(http_response.content.decode("utf8"))
feed_elt = Element.fromstring(http_response.content.decode("utf8"))
return (
self._parse_header(feed),
[self._parse_entry(entry) for entry in self.get_children(feed, "feed:entry")]
self._parse_header(feed_elt),
[self._parse_entry(entry) for entry in self.get_children(feed_elt, "feed:entry")]
)
def _parse_header(self, feed: Element) -> OpensearchResponseHeader:
def _parse_header(self, feed_elt: Element) -> OpensearchResponseHeader:
return OpensearchResponseHeader(
title=self.get_text(feed, "feed:title"),
id=self.get_text(feed, "feed:id"),
updated=self.get_text(feed, "feed:updated"),
total_results=self.get_text(feed, "opensearch:totalResults"),
start_index=self.get_text(feed, "opensearch:startIndex"),
items_per_page=self.get_text(feed, "opensearch:itemsPerPage")
title=self.get_text(feed_elt, "feed:title"),
id=self.get_text(feed_elt, "feed:id"),
updated=self.get_text(feed_elt, "feed:updated"),
total_results=self.get_text(feed_elt, "opensearch:totalResults"),
start_index=self.get_text(feed_elt, "opensearch:startIndex"),
items_per_page=self.get_text(feed_elt, "opensearch:itemsPerPage")
)
def _parse_entry(self, entry: Element) -> OpensearchResponseEntry:
opensearch_entry = self._parse_entry_metadata(entry)
for link in self.get_children(entry, "feed:link"):
def _parse_entry(self, entry_elt: Element) -> OpensearchResponseEntry:
opensearch_entry = self._parse_entry_metadata(entry_elt)
for link in self.get_children(entry_elt, "feed:link"):
opensearch_entry.links.append(self._parse_link(link))
return opensearch_entry
def _parse_entry_metadata(self, entry: Element) -> OpensearchResponseEntry:
def _parse_entry_metadata(self, entry_elt: Element) -> OpensearchResponseEntry:
return OpensearchResponseEntry(
id=self.get_text(entry, "feed:id"),
title=self.get_text(entry, "feed:title"),
updated=self.get_text(entry, "feed:updated"),
summary=self.get_text(entry, "feed:summary")
id=self.get_text(entry_elt, "feed:id"),
title=self.get_text(entry_elt, "feed:title"),
updated=self.get_text(entry_elt, "feed:updated"),
summary=self.get_text(entry_elt, "feed:summary")
)
def _parse_link(self, link: Element) -> OpensearchResponseEntryLink:
def _parse_link(self, link_elt: Element) -> OpensearchResponseEntryLink:
return OpensearchResponseEntryLink(
title=link.get("title"),
href=link.get("href"),
rel=link.get("rel"),
type=link.get("type")
title=link_elt.get("title"),
href=link_elt.get("href"),
rel=link_elt.get("rel"),
type=link_elt.get("type")
)
def get_text(self, elt: Element, name: str) -> Optional[str]:
......@@ -185,63 +136,29 @@ class OpensearchAtomEngine(OpensearchEngine):
class IfremerOpensearchEngine(OpensearchAtomEngine):
"""Ifremer opensearch engine"""
def _parse_entry_metadata(self, entry: Element) -> OpensearchResponseEntry:
return OpensearchResponseEntry(
id=self.get_text(entry, "feed:id"),
title=self.get_text(entry, "feed:title"),
updated=self.get_text(entry, "feed:updated"),
summary=self.get_text(entry, "feed:summary"),
dc_date=self.get_text(entry, "dc:date"),
geobox=self.get_text(entry, "georss:box"),
geobox_where=self.get_text(entry, "geobox_where")
)
def _parse_entry_metadata(self, entry_elt: Element) -> OpensearchResponseEntry:
entry = super()._parse_entry_metadata(entry_elt)
entry.dc_date = self.get_text(entry_elt, "dc:date")
entry.geobox = self.get_text(entry_elt, "georss:box")
#entry.geobox_where = self.get_text(entry, "georss:geobox_where")
return entry
class NasaOpensearchEngine(OpensearchAtomEngine):
class JPLOpensearchEngine(OpensearchAtomEngine):
"""JPL opensearch engine"""
def _prepare_query_parameters(self, params: OpensearchQueryParameters) -> Dict:
return {
"parentIdentifier": params.datasetId,
"startTime": params.timeStart.strftime("%Y-%m-%dT%H:%M:%SZ"),
"endTime": params.timeEnd.strftime("%Y-%m-%dT%H:%M:%SZ"),
"spatial_type": "bbox",
"boundingBox": params.geoBox,
"numberOfResults": params.count,
"cursor": params.startPage + 1
}
def _parse_entry_metadata(self, entry: Element) -> OpensearchResponseEntry:
return OpensearchResponseEntry(
id=self.get_text(entry, "feed:id"),
title=self.get_text(entry, "feed:title"),
updated=self.get_text(entry, "feed:updated"),
summary=self.get_text(entry, "feed:summary"),
dc_date=self.get_text(entry, "dc:date"),
geobox=self.get_text(entry, "georss:box"),
)
def merge_response(
query_url: URL,
responses: List[OpensearchResponse],
params: OpensearchQueryParameters) -> OpensearchResponse:
computed_total_results = 0
computed_entries: List[OpensearchResponseEntry] = list()
for response in responses:
if response.header is None:
continue
computed_total_results += response.header.total_results
computed_entries.extend(response.entries)
return OpensearchResponse(
header=OpensearchResponseHeader(
id=str(query_url),
title="opensearx results",
total_results=computed_total_results,
start_index=params.startPage,
items_per_page=params.count
),
entries=computed_entries
)
params_as_dict = super()._prepare_query_parameters(params)
# to use bbox format
params_as_dict["spatial_type"] = "bbox"
# cursor start to 1, instead of 0
params_as_dict["cursor"] = params.startPage + 1
return params_as_dict
def _parse_entry_metadata(self, entry_elt: Element) -> OpensearchResponseEntry:
entry = super()._parse_entry_metadata(entry_elt)
entry.dc_date = self.get_text(entry_elt, "dc:date")
entry.geobox = self.get_text(entry_elt, "georss:box")
return entry
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Opensearch model"""
import xml.etree.ElementTree as Element
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Tuple
from urllib.parse import parse_qs, quote, urlencode, urlsplit, urlunsplit
import httpx
from httpx import Response
from pydantic.main import BaseModel
from starlette.datastructures import URL
# --------------------------------------------------------------------------
# MODEL
# --------------------------------------------------------------------------