Commit 5af447e0 authored by ARCHER's avatar ARCHER 💬
Browse files

Initial commit

parents
#!/usr/bin/env python
import logging
import argparse
import datetime
import sys
import socket
import os
import sarnomencl
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
#logger.info("Script executed at %s" % datetime.datetime.now())
#logger.info("Called with the following command : %s" % " ".join(sys.argv))
#logger.info("The host machine is : %s" % socket.gethostname())
if __name__ == "__main__":
description = "Generate L1 path from L1 name (RS2 or sentinel)"
parser = argparse.ArgumentParser(description = description)
parser.add_argument("--debug", action="store_true", default=False, help="start the script in debug mode")
parser.add_argument('--prefix', action="append", type=str, help="base path prefix")
parser.add_argument('--skip', action="store_true", default=False, help="skip if output path doesn't exist. (print basename on stderr)")
parser.add_argument('names', action="store", nargs='*', type=str, help="sar image basename")
args = parser.parse_args()
if args.debug:
try:
import debug
except:
pass
lines = []
if len(args.names) == 0:
lines = sys.stdin.readlines()
else:
lines = args.names
for name in lines:
path=sarnomencl.sar2path(name,skip=args.skip,prefix=args.prefix)
if not path and args.skip:
sys.stderr.write("%s\n" % name.rstrip())
else:
print path
#Old script, peut-etre que ca peut t'etre utile
# parse args
#wargs={}
#or arg in sys.argv[1:]:
# (key,value) = arg.split('=')
# kwargs[key.lstrip('-')]=value
#
#
#ount = 0
#kCount = 0
#ndColor = '\033[0m'
#or fname in sys.stdin:
# fname=fname.rstrip()
# count = count+1
# newFname = sentinelnomencl.FullPathNomencl(fname).setTags(**kwargs)
# print "FILES %s" % newFname.getFiles()
# if newFname.getFiles():
# okCount = okCount+1
# color = '\033[92m' # green
# else:
# color = '\033[91m' # red
# print "%s %s%s%s" % ( fname , color, newFname , endColor )
#
#ys.stderr.write("%d/%d converted (%2.0f%%)\n" % (okCount , count, float(okCount)/count*100))
#!/usr/bin/env python
import os.path
import glob
import datetime
import re
from string import Template
import itertools
import sys
import logging
import pdb
logging.basicConfig(level=logging.INFO)
logger=logging.getLogger(__name__)
#base_path="/home/cercache/project/mpc-sentinel1/data/esa"
class Nomencl(object):
def __init__(self , regex, template, defaultsValues=None):
"""
regex : regex of the nomenclature, with grouping
template: template of the nomenclature. must use ${var} notation. ${var} and regex groups must match.
"""
self._logger = logging.getLogger(self.__class__.__name__)
self._regex = regex
self._template = template
self._defaultsValues= defaultsValues
self._tags = re.findall(r"\$\{([\w]+)\}", self._template.template)
self._parsed = False
self._dict = {}
for tag in self._tags:
self._dict[tag] = "*"
#self._string = str(self)
def setTags(self, **kwargs):
if self._defaultsValues:
# some tags implies other tags
for superTag in set.intersection( set(kwargs.keys()) , set(self._defaultsValues.keys())):
userSuperTagValue = kwargs[superTag]
if userSuperTagValue in self._defaultsValues[superTag]:
for defaultKey in self._defaultsValues[superTag][userSuperTagValue]:
if defaultKey not in kwargs:
# set default only if not specified by user
self._logger.debug("SuperTag %s %s: implie setting %s to %s" % (superTag , userSuperTagValue , defaultKey , self._defaultsValues[superTag][userSuperTagValue][defaultKey]))
kwargs[defaultKey] = self._defaultsValues[superTag][userSuperTagValue][defaultKey]
for key in kwargs:
if key in self._dict:
self._dict[key] = kwargs[key]
else:
self._logger.warning("Ignoring unknown key %s" % key)
return self
def getTag(self, tag):
if tag not in self._dict:
raise Exception("Unable to get tag %s. Known tags : %s" % ( tag , self._dict.keys() ))
return self._dict[tag]
def getTags(self):
return self._dict
def isParsed(self):
return self._parsed
def setString(self, string):
regex = self._regex.search(string)
if regex:
for tag in self._tags:
self._dict[tag] = regex.group(self._tags.index(tag)+1)
self._parsed = True
else:
raise Exception("Unable to find nomencl %s in %s" % (str(self._regex.pattern),string))
return self
def __str__(self):
if self._parsed:
return self._template.substitute(self._dict)
else:
return ""
class SafeNomencl(Nomencl):
regex = re.compile("(...)_(..)_(...)(.)_(.)(.)(..)_(........T......)_(........T......)_(......)_(......)_(....).SAFE")
template= Template("${MISSIONID}_${BEAM}_${PRODUCT}${RESOLUTION}_${LEVEL}${CLASS}${POL}_${STARTDATE}_${STOPDATE}_${ORBIT}_${TAKEID}_${PRODID}.SAFE")
defaultsValues= {
"PRODUCT" : {
"OCN" : { "RESOLUTION" : "_", "LEVEL" : "2", "PRODID" : "*" },
"GRD" : { "RESOLUTION" : "H", "LEVEL" : "1", "PRODID" : "*" }
}
}
def __init__(self,safe):
super(SafeNomencl,self).__init__(SafeNomencl.regex , SafeNomencl.template , defaultsValues=SafeNomencl.defaultsValues)
super(SafeNomencl,self).setString(safe)
#self.nomencl = Nomencl(regex=regex , tags=tags , template=template)
# RS2_OK97458_PK855010_DK786985_SCWA_20140717_225349_VV_VH_SGF
# RS2_OK99999_PK999999_DK999999_XXXX_20130314_055132_VV_SGF
# RS2_OK99999_PK999999_DK999999_XXXX_20101014_090242_HH_HV_SGF
# RS2_OK101963_PK882792_DK815838_SCWA_20180913_105943_VV_VH_SGF
class RS2Nomencl(Nomencl):
regex = re.compile("(RS2)_OK([0-9]+)_PK([0-9]+)_DK([0-9]+)_(....)_(........)_(......)_(..(_..)?)_SGF")
template = Template("${MISSIONID}_OK${DATA1}_PK${DATA2}_DK${DATA3}_${DATA4}_${DATE}_${TIME}_${POLARIZATION}_SGF")
def __init__(self, safe):
super(RS2Nomencl,self).__init__(RS2Nomencl.regex, RS2Nomencl.template)
super(RS2Nomencl,self).setString(safe)
class MeasurementNomencl(Nomencl):
regex = re.compile("(...)-(..)-(...)-(..)-(........t......)-(........t......)-(......)-(......)-(...).(.*)")
template= Template("${missionid}-${beam}-${product}-${pol}-${startdate}-${stopdate}-${orbit}-${takeid}-${num}.${ext}")
defaultsValues= {
"product" : {
"ocn" : { "ext" : "nc" },
"grd" : { "ext" : "tiff" }
}
}
def __init__(self,measurement):
super(MeasurementNomencl,self).__init__(MeasurementNomencl.regex , MeasurementNomencl.template , defaultsValues=MeasurementNomencl.defaultsValues)
super(MeasurementNomencl,self).setString(measurement)
class SafePathNomencl(SafeNomencl):
template = Template("sentinel-${MissionId}/L${LEVEL}/${BEAM}/${MISSIONID}_${BEAM}_${PRODUCT}${RESOLUTION}_${LEVEL}${CLASS}/${Year}/${Doy}")
def __init__(self,safe):
super(SafePathNomencl,self).__init__(safe)
#self._tags = super(SafePathNomencl,self).getTags()
self._dict["StartDate"] = None
self._dict["MissionId"] = "*"
self._dict["Year"] = "*"
self._dict["Doy"] = "*"
try:
self._dict["StartDate"] = datetime.datetime.strptime(self._dict["STARTDATE"],'%Y%m%dT%H%M%S')
self._dict["MissionId"] = self._dict["MISSIONID"][1:3].lower()
self._dict["Year"] = self._dict["StartDate"].strftime("%Y")
self._dict["Doy"] = self._dict["StartDate"].strftime("%j")
except:
pass
def __str__(self):
safePathNomenclStr = super(SafePathNomencl,self).__str__()
if safePathNomenclStr:
return "%s/%s" % (SafePathNomencl.template.substitute(self._dict) , super(SafePathNomencl,self).__str__())
else:
return ""
class RS2PathNomencl(RS2Nomencl):
template = Template("${MISSIONID}/L${LEVEL}/${POLARIZATION}/${Year}/${Doy}")
def __init__(self,safe):
super(RS2PathNomencl,self).__init__(safe)
self._dict["StartDate"] = None
self._dict["Year"] = "*"
self._dict["Doy"] = "*"
try:
self._dict["StartDate"] = datetime.datetime.strptime("%s %s" % (self._dict["DATE"], self._dict["TIME"]) ,'%Y%m%d %H%M%S')
self._dict["Year"] = self._dict["StartDate"].strftime("%Y")
self._dict["Doy"] = self._dict["StartDate"].strftime("%j")
self._dict["LEVEL"] = "1"## Maybe TOFIX
except:
pass
def __str__(self):
safePathNomenclStr = super(RS2PathNomencl,self).__str__()
if safePathNomenclStr:
return "%s/%s" % (RS2PathNomencl.template.substitute(self._dict), super(RS2PathNomencl,self).__str__())
else:
return ""
class FullPathNomencl(object):
# changing one of those tags will change both safe and measurement
linked = ["product" , "missionid" , "beam" , "orbit" , "takeid" ] # TODO: pol
# tags always uppercase
# upperCase = ["takeid"]
def __init__(self, fullPath, prefix=None):
self._safePath = SafePathNomencl(fullPath)
self._measurement = MeasurementNomencl(fullPath)
self._prefix=prefix
if not self._prefix:
try:
self._prefix = re.sub("(.*)%s.*" % str(self._safePath) , r"\1" , fullPath)
except:
self._prefix = ""
def setTags(self,**kwargs):
measurementTags={}
safePathTags ={}
for tag in kwargs:
if tag.lower() in FullPathNomencl.linked:
measurementTags[tag.lower()]=kwargs[tag].lower()
safePathTags[tag.upper()]=kwargs[tag].upper()
else:
if tag.lower() == tag:
measurementTags[tag]=kwargs[tag]
if tag.upper() == tag:
safePathTags[tag]=kwargs[tag]
#for tag in measurementTags:
# if tag in FullPathNomencl.upperCase:
# self._logger("to uppercase : %s" % tag)
# measurementTags[tag]=measurementTags[tag].upper()
#
#for tag in safePathTags:
# if tag in FullPathNomencl.upperCase:
# safePathTags[tag]=safePathTags[tag].upper()
if measurementTags:
self._measurement.setTags(**measurementTags)
if safePathTags:
self._safePath.setTags(**safePathTags)
return self
def getTags(self):
tags=self._safePath.getTags().copy()
tags.update(self._measurement.getTags())
return tags
def getTag(self,tag):
tags=self.getTags()
return tags[tag]
def __str__(self):
strSafePath = str(self._safePath)
strMeasurement = str(self._measurement)
if strSafePath and strMeasurement:
return "%s%s/measurement/%s" % (self._prefix , strSafePath , strMeasurement )
if strSafePath:
return "%s%s" % ( self._prefix , strSafePath )
if strMeasurement:
return strMeasurement
def getFiles(self):
return glob.glob(str(self))
default_prefix=[
'/home/cercache/project/mpc-sentinel1/data/esa',
'/home/cercache/project/sarwing/data'
]
def sar2path(name,skip=False,prefix=default_prefix):
if isinstance(prefix, basestring):
prefix=[prefix]
if 'RS2' in name:
post_path=str(RS2PathNomencl(name))
elif 'SAFE' in name:
post_path=str(SafePathNomencl(name))
else:
logger.warning("sar2path : no sar image reconized for %s " % name)
return None
paths=[os.path.join(pref,post_path) for pref in prefix]
if skip:
found=False
for path in paths:
if os.path.exists(path):
found=True
break
if found:
return path
else:
return None
else:
return paths[0]
from setuptools import setup
setup(name='safenomencl',
description='Sar Nomenclature',
url='https://git.cersat.fr/oarcher/sarnomencl.git',
version = "0.0.1",
author = "Theo Cevaer",
author_email = "Theo.Cevaer@ifremer.fr",
license='GPL',
packages=['sarnomencl'],
zip_safe=False,
scripts=['bin/sar2path.py'],
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment