Source code for s1ard.config

import os
import re
import copy
import importlib.resources
from datetime import datetime, timedelta
import configparser
from dateutil.parser import parse as dateparse
from osgeo import gdal
from s1ard.processors.registry import load_processor
from typing import Any


[docs] def get_keys(section: str) -> list[str]: """ get all allowed configuration keys for a section Parameters ---------- section: the configuration section to get the allowed keys for. Either 'processing', 'metadata' or the name of a SAR processor plugin e.g. 'snap'. Returns ------- a list of keys """ if section == 'processing': return ['acq_mode', 'annotation', 'aoi_geometry', 'aoi_tiles', 'ard_dir', 'datatake', 'date_strict', 'db_file', 'dem_type', 'etad', 'etad_dir', 'gdal_threads', 'logfile', 'maxdate', 'measurement', 'mindate', 'mode', 'parquet', 'processor', 'product', 'sar_dir', 'scene', 'scene_dir', 'sensor', 'stac_catalog', 'stac_collections', 'tmp_dir', 'wbm_dir', 'work_dir'] elif section == 'metadata': return ['access_url', 'copy_original', 'doi', 'format', 'licence', 'processing_center'] else: try: processor = load_processor(section) except ModuleNotFoundError: raise RuntimeError(f"unknown section: {section}.") try: return processor.get_config_keys() except AttributeError: raise RuntimeError(f"missing function s1ard.{section}.get_config_keys().")
[docs] def read_config_file(config_file: str | None = None) -> configparser.ConfigParser: """ Reads a configuration file and returns a ConfigParser object Parameters ---------- config_file: str or None the configuration file name. If None, the default configuration file within the package will be used. Returns ------- the configuration object """ parser = configparser.ConfigParser(allow_no_value=True, converters={'_datetime': _parse_datetime, '_list': _parse_list}) if config_file: if not os.path.isfile(config_file): raise FileNotFoundError(f"Config file {config_file} does not exist.") else: with importlib.resources.path(package='s1ard.resources', resource='config.ini') as path: config_file = str(path) parser.read(config_file) return parser
[docs] def get_config(config_file: str | None = None, **kwargs: dict[str, str]) \ -> dict[str, Any]: """ Returns the content of a `config.ini` file as a dictionary. Parameters ---------- config_file: Full path to the config file that should be parsed to a dictionary. kwargs: further keyword arguments overriding configuration found in the config file. Returns ------- Dictionary of the parsed config parameters. The keys correspond to the config sections in lowercase letters. """ parser = read_config_file(config_file) kwargs_proc = {k: v for k, v in kwargs.items() if k in get_keys('processing')} kwargs_meta = {k: v for k, v in kwargs.items() if k in get_keys('metadata')} out = {'processing': _get_config_processing(parser, **kwargs_proc), 'metadata': _get_config_metadata(parser, **kwargs_meta)} processor_name = out['processing']['processor'] processor = load_processor(processor_name) kwargs_sar = {k: v for k, v in kwargs.items() if k in get_keys(processor_name)} out[processor_name] = processor.get_config_section(parser, **kwargs_sar) return out
def _get_config_processing(parser, **kwargs): allowed_keys = get_keys(section='processing') try: proc_sec = parser['PROCESSING'] except KeyError: msg = "Section 'PROCESSING' does not exist in the config file" raise KeyError(msg) # override config file parameters with additional keyword arguments for k, v in kwargs.items(): if k in allowed_keys: proc_sec[k] = v.strip() # make all relevant paths absolute for k in ['work_dir', 'scene_dir', 'scene', 'etad_dir']: v = proc_sec[k] proc_sec[k] = 'None' if v in ['', 'None'] else os.path.abspath(v) # set some defaults processing_defaults = { 'sar_dir': 'SAR', 'tmp_dir': 'TMP', 'ard_dir': 'ARD', 'wbm_dir': 'WBM', 'gdal_threads': '4', 'dem_type': 'Copernicus 30m Global DEM', 'date_strict': 'True', 'datatake': 'None', 'measurement': 'gamma', 'annotation': 'dm,ei,id,lc,li,np,ratio', 'logfile': 'None', 'parquet': 'None' } processing_options = { 'acq_mode': ['IW', 'EW', 'SM'], 'annotation': ['dm', 'ei', 'em', 'id', 'lc', 'ld', 'li', 'np', 'ratio', 'wm'], 'dem_type': ['Copernicus 10m EEA DEM', 'Copernicus 30m Global DEM', 'Copernicus 30m Global DEM II', 'GETASSE30'], 'measurement': ['gamma', 'sigma'], 'mode': ['sar', 'nrb', 'orb'], 'product': ['GRD', 'SLC'], 'sensor': ['S1A', 'S1B', 'S1C', 'S1D']} if 'etad' not in proc_sec.keys(): proc_sec['etad'] = 'False' proc_sec['etad_dir'] = 'None' for k, v in processing_defaults.items(): if k not in proc_sec.keys(): proc_sec[k] = v # check completeness of configuration parameters missing = [] exclude = ['aoi_tiles', 'aoi_geometry'] for key in get_keys(section='processing'): if key not in proc_sec.keys() and key not in exclude: missing.append(key) if len(missing) > 0: missing_str = '\n - ' + '\n - '.join(missing) raise RuntimeError(f"missing the following parameters:{missing_str}") out = {} for k, v in proc_sec.items(): # check if key is allowed and convert 'None|none|' strings to None v = keyval_check(key=k, val=v, allowed_keys=allowed_keys) if k in ['annotation', 'aoi_tiles', 'data_take', 'mode', 'stac_collections']: v = proc_sec.get_list(k) validate_value(k, v) if k == 'mindate' and v is not None: v = proc_sec.get_datetime(k) if k == 'maxdate' and v is not None: date_short = re.search('^[0-9-]{10}$', v) is not None v = proc_sec.get_datetime(k) if date_short: v += timedelta(days=1, microseconds=-1) dir_ignore = ['work_dir'] if proc_sec['etad'] == 'False': dir_ignore.append('etad_dir') if k == 'scene_dir' and v is None: dir_ignore.append(k) if k.endswith('_dir') and k not in dir_ignore: if os.path.isabs(v): msg = f"Parameter '{k}': '{v}' must be an existing directory" assert v is not None and os.path.isdir(v), msg else: v = os.path.join(proc_sec['work_dir'], v) if k.endswith('_file') and not k.startswith('db'): msg = f"Parameter '{k}': file {v} could not be found" if os.path.isabs(v): assert os.path.isfile(v), msg else: v = os.path.join(proc_sec['work_dir'], v) assert os.path.isfile(v), msg if k in ['db_file', 'logfile'] and v is not None: if not os.path.isabs(v): v = os.path.join(proc_sec['work_dir'], v) if k == 'gdal_threads': v = int(v) if k in ['etad', 'date_strict']: v = proc_sec.getboolean(k) validate_options(k, v, options=processing_options) out[k] = v # check that a valid scene search option is set db_file_set = out['db_file'] is not None stac_catalog_set = out['stac_catalog'] is not None stac_collections_set = out['stac_collections'] is not None parquet_set = out['parquet'] is not None options_set = sum([db_file_set, stac_catalog_set, parquet_set]) if options_set == 0: raise RuntimeError("Please define a scene search option.") elif options_set > 1: raise RuntimeError("Multiple scene search options have been defined. Please choose only one.") if stac_catalog_set and not stac_collections_set: raise RuntimeError("'stac_collections' must be defined if data is to be searched in a STAC.") return out def _get_config_metadata(parser, **kwargs): # METADATA section allowed_keys = get_keys(section='metadata') if 'METADATA' not in parser.sections(): parser.add_section('METADATA') meta_sec = parser['METADATA'] # override config file parameters for k, v in kwargs.items(): if k in allowed_keys: meta_sec[k] = v.strip() # set defaults if 'format' not in meta_sec.keys(): meta_sec['format'] = 'OGC, STAC' if 'copy_original' not in meta_sec.keys(): meta_sec['copy_original'] = 'True' out = {} for k, v in meta_sec.items(): v = keyval_check(key=k, val=v, allowed_keys=allowed_keys) if k == 'format': v = meta_sec.get_list(k) if k == 'copy_original': v = meta_sec.getboolean(k) out[k] = v for key in allowed_keys: if key not in out.keys(): out[key] = None return out
[docs] def init(target: str, source: str | None = None, overwrite: bool = False, **kwargs: dict[str, str]) -> None: """ Initialize a configuration file. Parameters ---------- target: Path to the target configuration file. source: Path to the source file to read the configuration from. If not provided, a default configuration file within the package will be used. overwrite: Overwrite an existing file? kwargs: Additional keyword arguments for overwriting the configuration in `source`. Returns ------- Examples -------- Create a file in the current working directory. `work_dir` and a scene search option (in this case SQLite via `db_file`) must be defined, other configuration is read from the default configuration file. >>> from s1ard.config import init >>> init(target='config.ini', work_dir='.', db_file='scenes.db') """ if source is None: with importlib.resources.path(package='s1ard.resources', resource='config.ini') as path: source = str(path) config = get_config(config_file=source, **kwargs) write(config=config, target=target, overwrite=overwrite)
def _parse_datetime(s): """Custom converter for configparser: https://docs.python.org/3/library/configparser.html#customizing-parser-behaviour""" return dateparse(s) def _parse_list(s): """Custom converter for configparser: https://docs.python.org/3/library/configparser.html#customizing-parser-behaviour""" if s in ['', 'None']: return None else: return [x.strip() for x in s.split(',')]
[docs] def keyval_check(key: str, val: str, allowed_keys: list[str]) -> str | None: """ Check and clean up key,value pairs while parsing a config file. Parameters ---------- key: the parameter key val: the parameter value allowed_keys: a list of allowed keys """ if key not in allowed_keys: msg = f"Parameter '{key}' is not allowed; should be one of {allowed_keys}" raise ValueError(msg) val = val.replace('"', '').replace("'", "") if val in ['None', 'none', '']: val = None return val
[docs] def validate_options(k, v, options): """ Validate a configuration option against a set of allowed options. Parameters ---------- k: str the configuration key v: str the configuration value options: dict[str, list[str]] the configuration options Returns ------- """ if k not in options: return if isinstance(v, list): for item in v: validate_options(k, item, options) else: msg = "Parameter '{}': expected value(s) to be one of {}; got '{}' instead" assert v in options[k], msg.format(k, options[k], v)
[docs] def validate_value(k, v): """ Validate the value of a configuration option. Parameters ---------- k: str the configuration key v: Any the configuration value Returns ------- """ def val_aoi_geometry(x): return x is None or os.path.isfile(x) def val_aoi_tiles(x): return x is None or (isinstance(x, str) and len(x) == 5) def val_work_dir(x): return x is not None and os.path.isdir(v) and os.access(v, os.W_OK) validators = {'aoi_geometry': (val_aoi_geometry, 'must be None or an existing file'), 'aoi_tiles': (val_aoi_tiles, 'must be None or a string of length 5'), 'work_dir': (val_work_dir, 'must be an existing, writable directory')} if k not in validators.keys(): return if isinstance(v, list): for item in v: validate_value(k, item) else: validator, condition = validators[k] if not validator(v): msg = "Parameter '{}': value '{}' did not pass validation ({})." raise ValueError(msg.format(k, v, condition))
[docs] def gdal_conf(config): """ Stores GDAL configuration options for the current process. Parameters ---------- config: dict Dictionary of the parsed config parameters for the current process. Returns ------- dict Dictionary containing GDAL configuration options for the current process. """ threads = config['processing']['gdal_threads'] threads_before = gdal.GetConfigOption('GDAL_NUM_THREADS') if not isinstance(threads, int): raise TypeError("'threads' must be of type int") if threads == 1: multithread = False elif threads > 1: multithread = True gdal.SetConfigOption('GDAL_NUM_THREADS', str(threads)) else: raise ValueError("'threads' must be >= 1") return {'threads': threads, 'threads_before': threads_before, 'multithread': multithread}
[docs] def write(config, target, overwrite=False, **kwargs): """ Write configuration options to a config file. Parameters ---------- config: dict the configuration as returned by :func:`get_config` target: str the name of the output file overwrite: bool overwrite an existing file if it exists? kwargs further keyword arguments overriding configuration found in `config`. Returns ------- """ if os.path.isfile(target) and not overwrite: raise RuntimeError("target already exists") def to_string(item): """ Parameters ---------- item: dict or List or str Returns ------- str or dict """ if isinstance(item, dict): return {k: to_string(v) for k, v in item.items()} elif isinstance(item, list): return ', '.join([to_string(x) for x in item]) elif isinstance(item, datetime): return item.strftime('%Y-%m-%d %H:%M:%S') else: return str(item) processor_name = config['processing']['processor'] processor = load_processor(processor_name) config = copy.deepcopy(config) keys_processing = get_keys('processing') keys_meta = get_keys('metadata') keys_proc = processor.get_config_keys() for k, v in kwargs.items(): if k in keys_processing: config['processing'][k] = v elif k in keys_meta: config['metadata'][k] = v elif k in keys_proc: config[processor_name][k] = v else: raise KeyError("Parameter '{}' is not supported".format(k)) keys_path_relative = ['sar_dir', 'tmp_dir', 'ard_dir', 'wbm_dir', 'db_file'] work_dir = config['processing']['work_dir'] for k in keys_path_relative: v = config['processing'][k] if v is not None and work_dir in v: config['processing'][k] = v.replace(work_dir, '').strip('/\\') config['metadata'] = to_string(config['metadata']) config['processing'] = to_string(config['processing']) config_proc_str = processor.config_to_string(config[processor_name]) config[processor_name] = config_proc_str parser = configparser.ConfigParser() parser['METADATA'] = config['metadata'] parser['PROCESSING'] = config['processing'] parser[processor_name.upper()] = config[processor_name] with open(target, 'w') as configfile: parser.write(configfile)