Source code for s1ard.config

import os
import re
import copy
import importlib.resources
from datetime import datetime, timedelta
import configparser
import dateutil.parser
from osgeo import gdal


[docs] def get_keys(section): """ get all allowed configuration keys Parameters ---------- section: {'processing', 'metadata'} the configuration section to get the allowed keys for. Returns ------- list[str] a list of keys """ if section == 'processing': return ['mode', 'aoi_tiles', 'aoi_geometry', 'mindate', 'maxdate', 'acq_mode', 'datatake', 'work_dir', 'scene_dir', 'sar_dir', 'tmp_dir', 'wbm_dir', 'measurement', 'db_file', 'dem_type', 'gdal_threads', 'logfile', 'ard_dir', 'etad', 'etad_dir', 'product', 'annotation', 'stac_catalog', 'stac_collections', 'sensor', 'date_strict', 'snap_gpt_args', 'scene', 'parquet'] elif section == 'metadata': return ['format', 'copy_original', 'access_url', 'licence', 'doi', 'processing_center'] else: raise RuntimeError(f"unknown section: {section}. Options: 'processing', 'metadata'.")
[docs] def get_config(config_file=None, **kwargs): """ Returns the content of a `config.ini` file as a dictionary. Parameters ---------- config_file: str or None Full path to the config file that should be parsed to a dictionary. kwargs further keyword arguments overriding configuration found in the config file. Returns ------- dict Dictionary of the parsed config parameters. The keys correspond to the config sections in lowercase letters. """ parser = configparser.ConfigParser(allow_no_value=True, converters={'_annotation': _parse_annotation, '_datetime': _parse_datetime, '_modes': _parse_modes, '_stac_collections': _parse_list, '_tile_list': _parse_tile_list, '_list': _parse_list}) if isinstance(config_file, str): if not os.path.isfile(config_file): raise FileNotFoundError("Config file {} does not exist.".format(config_file)) parser.read(config_file) elif config_file is None: with importlib.resources.path('s1ard.resources', 'config.ini') as path: config_file = str(path) parser.read(config_file) else: raise TypeError(f"'config_file' must be of type str or None, was {type(config_file)}") out_dict = {'processing': {}, 'metadata': {}} # PROCESSING section allowed_keys = get_keys(section='processing') try: proc_sec = parser['PROCESSING'] except KeyError: msg = "Section '{}' does not exist in config file {}" raise KeyError(msg.format('PROCESSING', config_file)) # override config file parameters with additional keyword arguments for k, v in kwargs.items(): if k in allowed_keys: proc_sec[k] = v.strip() # make all relevant paths absolute for k in ['work_dir', 'scene_dir', 'scene', 'logfile', 'etad_dir']: v = proc_sec[k] proc_sec[k] = 'None' if v in ['', 'None'] else os.path.abspath(v) # set some defaults if 'etad' not in proc_sec.keys(): proc_sec['etad'] = 'False' proc_sec['etad_dir'] = 'None' for item in ['sar_dir', 'tmp_dir', 'ard_dir', 'wbm_dir']: if item not in proc_sec.keys(): proc_sec[item] = item[:3].upper() if 'gdal_threads' not in proc_sec.keys(): proc_sec['gdal_threads'] = '4' if 'dem_type' not in proc_sec.keys(): proc_sec['dem_type'] = 'Copernicus 30m Global DEM' if 'date_strict' not in proc_sec.keys(): proc_sec['date_strict'] = 'True' if 'snap_gpt_args' not in proc_sec.keys(): proc_sec['snap_gpt_args'] = 'None' if 'datatake' not in proc_sec.keys(): proc_sec['datatake'] = 'None' # use previous defaults for measurement and annotation if they have not been defined if 'measurement' not in proc_sec.keys(): proc_sec['measurement'] = 'gamma' if 'annotation' not in proc_sec.keys(): proc_sec['annotation'] = 'dm,ei,id,lc,li,np,ratio' if 'logfile' not in proc_sec.keys(): proc_sec['logfile'] = 'None' if 'parquet' not in proc_sec.keys(): proc_sec['parquet'] = 'None' # check completeness of configuration parameters missing = [] exclude = ['aoi_tiles', 'aoi_geometry'] for key in get_keys(section='processing'): if key not in proc_sec.keys() and key not in exclude: missing.append(key) if len(missing) > 0: missing_str = '\n - ' + '\n - '.join(missing) raise RuntimeError(f"missing the following parameters:{missing_str}") # convert values to Python objects and validate them for k, v in proc_sec.items(): # check if key is allowed and convert 'None|none|' strings to None v = _keyval_check(key=k, val=v, allowed_keys=allowed_keys) if k == 'mode': v = proc_sec.get_modes(k) if k == 'aoi_tiles': if v is not None: v = proc_sec.get_tile_list(k) if k == 'aoi_geometry': if v is not None: msg = f"Parameter '{k}': File {v} could not be found" assert os.path.isfile(v), msg if k == 'mindate': v = proc_sec.get_datetime(k) if k == 'maxdate': date_short = re.search('^[0-9-]{10}$', v) is not None v = proc_sec.get_datetime(k) if date_short: v += timedelta(days=1, microseconds=-1) if k == 'sensor': assert v in ['S1A', 'S1B', 'S1C', 'S1D'] if k == 'acq_mode': assert v in ['IW', 'EW', 'SM'] if k == 'work_dir': msg = f"Parameter '{k}': '{v}' must be an existing writable directory" assert v is not None and os.path.isdir(v) and os.access(v, os.W_OK), msg dir_ignore = ['work_dir'] if proc_sec['etad'] == 'False': dir_ignore.append('etad_dir') if k == 'scene_dir' and v is None: dir_ignore.append(k) if k.endswith('_dir') and k not in dir_ignore: if any(x in v for x in ['/', '\\']): msg = f"Parameter '{k}': '{v}' must be an existing directory" assert v is not None and os.path.isdir(v), msg else: v = os.path.join(proc_sec['work_dir'], v) if k.endswith('_file') and not k.startswith('db'): if any(x in v for x in ['/', '\\']): msg = f"Parameter '{k}': file {v} could not be found" assert os.path.isfile(v), msg else: v = os.path.join(proc_sec['work_dir'], v) msg = f"Parameter '{k}': file {v} could not be found" assert os.path.isfile(v), msg if k in ['db_file', 'logfile'] and v is not None: if not any(x in v for x in ['/', '\\']): v = os.path.join(proc_sec['work_dir'], v) if k == 'stac_collections': v = proc_sec.get_stac_collections(k) if k == 'gdal_threads': v = int(v) if k == 'dem_type': allowed = ['Copernicus 10m EEA DEM', 'Copernicus 30m Global DEM II', 'Copernicus 30m Global DEM', 'GETASSE30'] msg = "Parameter '{}': expected to be one of {}; got '{}' instead" assert v in allowed, msg.format(k, allowed, v) if k in ['etad', 'date_strict']: v = proc_sec.getboolean(k) if k == 'product': allowed = ['GRD', 'SLC'] msg = "Parameter '{}': expected to be one of {}; got '{}' instead" assert v in allowed, msg.format(k, allowed, v) if k == 'measurement': allowed = ['gamma', 'sigma'] msg = "Parameter '{}': expected to be one of {}; got '{}' instead" assert v in allowed, msg.format(k, allowed, v) if k == 'annotation': v = proc_sec.get_annotation(k) if k == 'snap_gpt_args': v = proc_sec.get_list(k) if k == 'datatake': v = proc_sec.get_list(k) out_dict['processing'][k] = v # check that a valid scene search option is set db_file_set = out_dict['processing']['db_file'] is not None stac_catalog_set = out_dict['processing']['stac_catalog'] is not None stac_collections_set = out_dict['processing']['stac_collections'] is not None parquet_set = out_dict['processing']['parquet'] is not None options_set = sum([db_file_set, stac_catalog_set, parquet_set]) if options_set == 0: raise RuntimeError("Please define a scene search option.") elif options_set > 1: raise RuntimeError("Multiple scene search options have been defined. Please choose only one.") if stac_catalog_set and not stac_collections_set: raise RuntimeError("'stac_collections' must be defined if data is to be searched in a STAC.") # METADATA section allowed_keys = get_keys(section='metadata') if 'METADATA' not in parser.keys(): parser.add_section('METADATA') meta_sec = parser['METADATA'] # override config file parameters for k, v in kwargs.items(): if k in allowed_keys: meta_sec[k] = v.strip() # set defaults if 'format' not in meta_sec.keys(): meta_sec['format'] = 'OGC, STAC' if 'copy_original' not in meta_sec.keys(): meta_sec['copy_original'] = 'True' for k, v in meta_sec.items(): v = _keyval_check(key=k, val=v, allowed_keys=allowed_keys) if k == 'format': v = meta_sec.get_list(k) if k == 'copy_original': v = meta_sec.getboolean(k) out_dict['metadata'][k] = v for key in allowed_keys: if key not in out_dict['metadata'].keys(): out_dict['metadata'][key] = None return out_dict
[docs] def init(target, source=None, overwrite=False, **kwargs): """ Initialize a configuration file. Parameters ---------- target : str Path to the target configuration file. source : str, optional Path to the source file to read the configuration from. If not provided, a default configuration file within the package will be used. overwrite : bool, default=False Overwrite an existing file? kwargs : Any Additional keyword arguments for overwriting the configuration in `source`. Returns ------- Examples -------- Create a file in the current working directory. `work_dir` and a scene search option (in this case SQLite via `db_file`) must be defined, other configuration is read from the default configuration file. >>> from s1ard.config import init >>> init(target='config.ini', work_dir='.', db_file='scenes.db') """ if source is None: with importlib.resources.path(package='s1ard.resources', resource='config.ini') as path: source = str(path) config = get_config(config_file=source, **kwargs) write(config=config, target=target, overwrite=overwrite)
def _parse_annotation(s): """Custom converter for configparser: https://docs.python.org/3/library/configparser.html#customizing-parser-behaviour""" annotation_list = _parse_list(s) if annotation_list is not None: allowed = ['dm', 'ei', 'em', 'id', 'lc', 'ld', 'li', 'np', 'ratio', 'wm'] for layer in annotation_list: if layer not in allowed: msg = "Parameter 'annotation': Error while parsing to list; " \ "layer '{}' is not supported. Allowed keys:\n{}" raise ValueError(msg.format(layer, allowed)) return annotation_list def _parse_datetime(s): """Custom converter for configparser: https://docs.python.org/3/library/configparser.html#customizing-parser-behaviour""" return dateutil.parser.parse(s) def _parse_modes(s): """Custom converter for configparser: https://docs.python.org/3/library/configparser.html#customizing-parser-behaviour""" mode_list = _parse_list(s) allowed = ['sar', 'nrb', 'orb'] for mode in mode_list: if mode not in allowed: msg = "Parameter 'annotation': Error while parsing to list; " \ "mode '{}' is not supported. Allowed keys:\n{}" raise ValueError(msg.format(mode, allowed)) return mode_list def _parse_tile_list(s): """Custom converter for configparser: https://docs.python.org/3/library/configparser.html#customizing-parser-behaviour""" tile_list = _parse_list(s) if tile_list is not None: for tile in tile_list: if len(tile) != 5: raise ValueError("Parameter 'aoi_tiles': Error while parsing " "MGRS tile IDs to list; tile '{}' is not 5 " "digits long.".format(tile)) else: continue return tile_list def _parse_list(s): """Custom converter for configparser: https://docs.python.org/3/library/configparser.html#customizing-parser-behaviour""" if s in ['', 'None']: return None else: return [x.strip() for x in s.split(',')] def _keyval_check(key, val, allowed_keys): """Helper function to check and clean up key,value pairs while parsing a config file.""" if key not in allowed_keys: raise ValueError("Parameter '{}' is not allowed; should be one of {}".format(key, allowed_keys)) val = val.replace('"', '').replace("'", "") if val in ['None', 'none', '']: val = None return val
[docs] def snap_conf(config): """ Returns a dictionary of additional parameters for :func:`s1ard.snap.process` based on processing configurations provided by the config file. Parameters ---------- config: dict Dictionary of the parsed config parameters for the current process. Returns ------- dict Dictionary of parameters that can be passed to :func:`s1ard.snap.process` """ return {'spacing': {'IW': 10, 'SM': 10, 'EW': 40}[config['processing']['acq_mode']], 'allow_res_osv': True, 'dem_resampling_method': 'BILINEAR_INTERPOLATION', 'img_resampling_method': 'BILINEAR_INTERPOLATION', 'clean_edges': True, 'clean_edges_pixels': 4, 'cleanup': True }
[docs] def gdal_conf(config): """ Stores GDAL configuration options for the current process. Parameters ---------- config: dict Dictionary of the parsed config parameters for the current process. Returns ------- dict Dictionary containing GDAL configuration options for the current process. """ threads = config['processing']['gdal_threads'] threads_before = gdal.GetConfigOption('GDAL_NUM_THREADS') if not isinstance(threads, int): raise TypeError("'threads' must be of type int") if threads == 1: multithread = False elif threads > 1: multithread = True gdal.SetConfigOption('GDAL_NUM_THREADS', str(threads)) else: raise ValueError("'threads' must be >= 1") return {'threads': threads, 'threads_before': threads_before, 'multithread': multithread}
[docs] def write(config, target, overwrite=False, **kwargs): """ Write configuration options to a config file. Parameters ---------- config: dict the configuration as returned by :func:`get_config` target: str the name of the output file overwrite: bool overwrite existing file if it exists? kwargs further keyword arguments overriding configuration found in the config file. Returns ------- """ if os.path.isfile(target) and not overwrite: raise RuntimeError("target already exists") def to_string(item): """ Parameters ---------- item: dict or List or str Returns ------- str or dict """ if isinstance(item, dict): return {k: to_string(v) for k, v in item.items()} elif isinstance(item, list): return ', '.join([to_string(x) for x in item]) elif isinstance(item, datetime): return item.strftime('%Y-%m-%d %H:%M:%S') else: return str(item) config = copy.deepcopy(config) keys_processing = get_keys('processing') keys_meta = get_keys('metadata') for k, v in kwargs.items(): if k in keys_processing: config['processing'][k] = v elif k in keys_meta: config['metadata'][k] = v else: raise KeyError("Parameter '{}' is not supported".format(k)) keys_path_relative = ['sar_dir', 'tmp_dir', 'ard_dir', 'wbm_dir', 'db_file'] work_dir = config['processing']['work_dir'] for k in keys_path_relative: v = config['processing'][k] if v is not None and work_dir in v: config['processing'][k] = v.replace(work_dir, '').strip('/\\') k = 'snap_gpt_args' v = config['processing'][k] if v is not None: v = ' '.join([str(x) for x in config['processing'][k]]) config['processing'][k] = v config = to_string(config) parser = configparser.ConfigParser() parser['METADATA'] = config['metadata'] parser['PROCESSING'] = config['processing'] with open(target, 'w') as configfile: parser.write(configfile)