import os
import re
import copy
import importlib.resources
from datetime import datetime, timedelta
import configparser
from dateutil.parser import parse as dateparse
from osgeo import gdal
from s1ard.processors.registry import load_processor
from cesard.config import (keyval_check, validate_options, validate_value,
version_dict as cesard_version_dict)
from typing import Any
def _get_config_metadata(parser, **kwargs):
# METADATA section
allowed_keys = get_keys(section='metadata')
if 'METADATA' not in parser.sections():
parser.add_section('METADATA')
meta_sec = parser['METADATA']
# override config file parameters
for k, v in kwargs.items():
if k in allowed_keys:
meta_sec[k] = v.strip()
# set defaults
if 'format' not in meta_sec.keys():
meta_sec['format'] = 'OGC, STAC'
if 'copy_original' not in meta_sec.keys():
meta_sec['copy_original'] = 'True'
out = {}
for k, v in meta_sec.items():
v = keyval_check(key=k, val=v, allowed_keys=allowed_keys)
if k == 'format':
v = meta_sec.get_list(k)
if k == 'copy_original':
v = meta_sec.getboolean(k)
out[k] = v
for key in allowed_keys:
if key not in out.keys():
out[key] = None
return out
def _get_config_processing(parser, **kwargs):
allowed_keys = get_keys(section='processing')
try:
proc_sec = parser['PROCESSING']
except KeyError:
msg = "Section 'PROCESSING' does not exist in the config file"
raise KeyError(msg)
# override config file parameters with additional keyword arguments
for k, v in kwargs.items():
if k in allowed_keys:
proc_sec[k] = v.strip()
# make all relevant paths absolute
for k in ['work_dir', 'scene_dir', 'scene', 'etad_dir']:
v = proc_sec[k]
proc_sec[k] = 'None' if v in ['', 'None'] else os.path.abspath(v)
# set some defaults
processing_defaults = {
'sar_dir': 'SAR',
'tmp_dir': 'TMP',
'ard_dir': 'ARD',
'wbm_dir': 'WBM',
'gdal_threads': '4',
'dem_type': 'Copernicus 30m Global DEM',
'date_strict': 'True',
'datatake': 'None',
'measurement': 'gamma',
'annotation': 'dm,ei,id,lc,li,np,ratio',
'logfile': 'None',
'parquet': 'None',
'spacing_iw': '10',
'spacing_sm': '10',
'spacing_ew': '30',
}
processing_options = {
'acq_mode': ['IW', 'EW', 'SM'],
'annotation': ['dm', 'ei', 'em', 'id', 'lc',
'ld', 'li', 'np', 'ratio', 'wm'],
'dem_type': ['Copernicus 10m EEA DEM',
'Copernicus 30m Global DEM',
'Copernicus 30m Global DEM II',
'GETASSE30'],
'measurement': ['gamma', 'sigma'],
'mode': ['sar', 'nrb', 'orb'],
'product': ['GRD', 'SLC'],
'sensor': ['S1A', 'S1B', 'S1C', 'S1D']}
if 'etad' not in proc_sec.keys():
proc_sec['etad'] = 'False'
proc_sec['etad_dir'] = 'None'
for k, v in processing_defaults.items():
if k not in proc_sec.keys():
proc_sec[k] = v
# check completeness of configuration parameters
missing = []
exclude = ['aoi_tiles', 'aoi_geometry']
for key in get_keys(section='processing'):
if key not in proc_sec.keys() and key not in exclude:
missing.append(key)
if len(missing) > 0:
missing_str = '\n - ' + '\n - '.join(missing)
raise RuntimeError(f"missing the following parameters:{missing_str}")
out = {}
for k, v in proc_sec.items():
# check if key is allowed and convert 'None|none|' strings to None
v = keyval_check(key=k, val=v, allowed_keys=allowed_keys)
if k in ['annotation', 'aoi_tiles', 'data_take', 'mode', 'stac_collections']:
v = proc_sec.get_list(k)
validate_value(k, v)
if k == 'mindate' and v is not None:
v = proc_sec.get_datetime(k)
if k == 'maxdate' and v is not None:
date_short = re.search('^[0-9-]{10}$', v) is not None
v = proc_sec.get_datetime(k)
if date_short:
v += timedelta(days=1, microseconds=-1)
dir_ignore = ['work_dir']
if proc_sec['etad'] == 'False':
dir_ignore.append('etad_dir')
if k == 'scene_dir' and v is None:
dir_ignore.append(k)
if k.endswith('_dir') and k not in dir_ignore:
if os.path.isabs(v):
msg = f"Parameter '{k}': '{v}' must be an existing directory"
assert v is not None and os.path.isdir(v), msg
else:
v = os.path.join(proc_sec['work_dir'], v)
if k.endswith('_file') and not k.startswith('db'):
msg = f"Parameter '{k}': file {v} could not be found"
if os.path.isabs(v):
assert os.path.isfile(v), msg
else:
v = os.path.join(proc_sec['work_dir'], v)
assert os.path.isfile(v), msg
if k in ['db_file', 'logfile'] and v is not None:
if not os.path.isabs(v):
v = os.path.join(proc_sec['work_dir'], v)
if k in ['gdal_threads', 'spacing_iw', 'spacing_sm', 'spacing_ew']:
v = int(v)
if k in ['etad', 'date_strict']:
v = proc_sec.getboolean(k)
validate_options(k, v, options=processing_options)
out[k] = v
# check that a valid scene search option is set
db_file_set = out['db_file'] is not None
stac_catalog_set = out['stac_catalog'] is not None
stac_collections_set = out['stac_collections'] is not None
parquet_set = out['parquet'] is not None
options_set = sum([db_file_set, stac_catalog_set, parquet_set])
if options_set == 0:
raise RuntimeError("Please define a scene search option.")
elif options_set > 1:
raise RuntimeError("Multiple scene search options have been defined. Please choose only one.")
if stac_catalog_set and not stac_collections_set:
raise RuntimeError("'stac_collections' must be defined if data is to be searched in a STAC.")
return out
def _parse_datetime(s):
"""Custom converter for configparser:
https://docs.python.org/3/library/configparser.html#customizing-parser-behaviour"""
return dateparse(s)
def _parse_list(s):
"""Custom converter for configparser:
https://docs.python.org/3/library/configparser.html#customizing-parser-behaviour"""
if s in ['', 'None']:
return None
else:
return [x.strip() for x in s.split(',')]
[docs]
def gdal_conf(config):
"""
Stores GDAL configuration options for the current process.
Parameters
----------
config: dict
Dictionary of the parsed config parameters for the current process.
Returns
-------
dict
Dictionary containing GDAL configuration options for the current process.
"""
threads = config['processing']['gdal_threads']
threads_before = gdal.GetConfigOption('GDAL_NUM_THREADS')
if not isinstance(threads, int):
raise TypeError("'threads' must be of type int")
if threads == 1:
multithread = False
elif threads > 1:
multithread = True
gdal.SetConfigOption('GDAL_NUM_THREADS', str(threads))
else:
raise ValueError("'threads' must be >= 1")
return {'threads': threads, 'threads_before': threads_before,
'multithread': multithread}
[docs]
def get_config(
config_file: str | None = None,
**kwargs: dict[str, str]
) -> dict[str, Any]:
"""
Returns the content of a `config.ini` file as a dictionary.
Parameters
----------
config_file:
Full path to the config file that should be parsed to a dictionary.
kwargs:
further keyword arguments overriding configuration found in the config file.
Returns
-------
Dictionary of the parsed config parameters.
The keys correspond to the config sections in lowercase letters.
"""
parser = read_config_file(config_file)
kwargs_proc = {k: v for k, v in kwargs.items() if k in get_keys('processing')}
kwargs_meta = {k: v for k, v in kwargs.items() if k in get_keys('metadata')}
out = {'processing': _get_config_processing(parser, **kwargs_proc),
'metadata': _get_config_metadata(parser, **kwargs_meta)}
processor_name = out['processing']['processor']
processor = load_processor(processor_name)
kwargs_sar = {k: v for k, v in kwargs.items() if k in get_keys(processor_name)}
out[processor_name] = processor.get_config_section(parser, **kwargs_sar)
return out
[docs]
def get_keys(section: str) -> list[str]:
"""
get all allowed configuration keys for a section
Parameters
----------
section:
the configuration section to get the allowed keys for.
Either 'processing', 'metadata' or the name of a SAR processor plugin e.g. 'snap'.
Returns
-------
a list of keys
"""
if section == 'processing':
return ['acq_mode', 'annotation', 'aoi_geometry', 'aoi_tiles',
'ard_dir', 'datatake', 'date_strict', 'db_file', 'dem_type',
'etad', 'etad_dir', 'gdal_threads', 'logfile', 'maxdate',
'measurement', 'mindate', 'mode', 'parquet', 'processor',
'product', 'sar_dir', 'scene', 'scene_dir', 'sensor',
'spacing_ew', 'spacing_iw', 'spacing_sm',
'stac_catalog', 'stac_collections', 'tmp_dir', 'wbm_dir', 'work_dir']
elif section == 'metadata':
return ['access_url', 'copy_original', 'doi', 'format', 'licence', 'processing_center']
else:
try:
processor = load_processor(section)
except ModuleNotFoundError:
raise RuntimeError(f"unknown section: {section}.")
try:
return processor.get_config_keys()
except AttributeError:
raise RuntimeError(f"missing function s1ard.{section}.get_config_keys().")
[docs]
def init(
target: str,
source: str | None = None,
overwrite: bool = False,
**kwargs
) -> None:
"""
Initialize a configuration file.
Parameters
----------
target:
Path to the target configuration file.
source:
Path to the source file to read the configuration from. If not provided,
a default configuration file within the package will be used.
overwrite:
Overwrite an existing file?
kwargs:
Additional keyword arguments for overwriting the configuration in `source`.
Examples
--------
Create a file in the current working directory.
`work_dir` and a scene search option (in this case SQLite via `db_file`)
must be defined, other configuration is read from the default configuration file.
>>> from s1ard.config import init
>>> init(target='config.ini', work_dir='.', db_file='scenes.db')
"""
if source is None:
with importlib.resources.path(package='s1ard.resources',
resource='config.ini') as path:
source = str(path)
config = get_config(config_file=source, **kwargs)
write(config=config, target=target, overwrite=overwrite)
[docs]
def read_config_file(config_file: str | None = None) -> configparser.ConfigParser:
"""
Reads a configuration file and returns a ConfigParser object
Parameters
----------
config_file: str or None
the configuration file name. If None, the default configuration file
within the package will be used.
Returns
-------
the configuration object
"""
parser = configparser.ConfigParser(allow_no_value=True,
converters={'_datetime': _parse_datetime,
'_list': _parse_list})
if config_file:
if not os.path.isfile(config_file):
raise FileNotFoundError(f"Config file {config_file} does not exist.")
else:
with importlib.resources.path(package='s1ard.resources',
resource='config.ini') as path:
config_file = str(path)
parser.read(config_file)
return parser
[docs]
def version_dict(processor_name: str) -> dict[str, str]:
"""
Get the versions of used packages
Parameters
----------
processor_name
the name of the used SAR processor (e.g. SNAP)
Returns
-------
a dictionary containing the versions of relevant python packages and
the return of the `version_dict` function of the used SAR processor.
"""
import s1ard
out = cesard_version_dict()
out['s1ard'] = s1ard.__version__
processor = load_processor(processor_name)
out.update(processor.version_dict())
return out
[docs]
def write(config, target, overwrite=False, **kwargs):
"""
Write configuration options to a config file.
Parameters
----------
config: dict
the configuration as returned by :func:`get_config`
target: str
the name of the output file
overwrite: bool
overwrite an existing file if it exists?
kwargs
further keyword arguments overriding configuration found in `config`.
Returns
-------
"""
if os.path.isfile(target) and not overwrite:
raise RuntimeError("target already exists")
def to_string(item):
"""
Parameters
----------
item: dict or List or str
Returns
-------
str or dict
"""
if isinstance(item, dict):
return {k: to_string(v) for k, v in item.items()}
elif isinstance(item, list):
return ', '.join([to_string(x) for x in item])
elif isinstance(item, datetime):
return item.strftime('%Y-%m-%d %H:%M:%S')
else:
return str(item)
processor_name = config['processing']['processor']
processor = load_processor(processor_name)
config = copy.deepcopy(config)
keys_processing = get_keys('processing')
keys_meta = get_keys('metadata')
keys_proc = processor.get_config_keys()
for k, v in kwargs.items():
if k in keys_processing:
config['processing'][k] = v
elif k in keys_meta:
config['metadata'][k] = v
elif k in keys_proc:
config[processor_name][k] = v
else:
raise KeyError("Parameter '{}' is not supported".format(k))
keys_path_relative = ['sar_dir', 'tmp_dir', 'ard_dir', 'wbm_dir', 'db_file']
work_dir = config['processing']['work_dir']
for k in keys_path_relative:
v = config['processing'][k]
if v is not None and work_dir in v:
config['processing'][k] = v.replace(work_dir, '').strip('/\\')
config['metadata'] = to_string(config['metadata'])
config['processing'] = to_string(config['processing'])
config_proc_str = processor.config_to_string(config[processor_name])
config[processor_name] = config_proc_str
parser = configparser.ConfigParser()
parser['METADATA'] = config['metadata']
parser['PROCESSING'] = config['processing']
parser[processor_name.upper()] = config[processor_name]
with open(target, 'w') as configfile:
parser.write(configfile)