Source code for s1ard.ancillary

import os
import sys
import logging
from typing import Any
from datetime import datetime
from dateutil.parser import parse as dateparse
from pyroSAR import ID
from s1ard.config import version_dict

log = logging.getLogger('s1ard')


[docs] def grd_diff_lines(scene1: ID, scene2: ID) -> float: """ Determine the number of lines of overlap/gap between two GRD scenes. Parameters ---------- scene1: the first scene scene2: the second scene (must be acquired after `scene1`) Returns ------- the number of lines difference between the two scenes. Positive numbers indicate overlap, negative ones a gap. """ start1 = dateparse(scene1.meta['acquisition_time']['start']) stop1 = dateparse(scene1.meta['acquisition_time']['stop']) start2 = dateparse(scene2.meta['acquisition_time']['start']) if start1 >= start2: raise ValueError('scene1 start time must be before scene2 start time') t_line = ((stop1 - start1) / scene1.lines) return (((start2 - stop1) / t_line) - 1) * -1
[docs] def set_logging( config: dict[str, Any], debug: bool = False ) -> logging.Logger: """ Set logging for the current process. Parameters ---------- config: dict Dictionary of the parsed config parameters for the current process. debug: bool Set logging level to DEBUG? Returns ------- logging.Logger The log handler for the current process. """ level = logging.DEBUG if debug else logging.INFO logger = logging.getLogger('s1ard') logger.setLevel(level) log_format = "[%(asctime)s] [%(levelname)7s] %(message)s" formatter = logging.Formatter(fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S') logfile = config['processing']['logfile'] if logfile is not None: os.makedirs(os.path.dirname(logfile), exist_ok=True) handler = logging.FileHandler(filename=logfile, mode='a') else: handler = logging.StreamHandler(sys.stdout) logger.addHandler(handler) # Add header first with simple formatting formatter_simple = logging.Formatter("%(message)s") handler.setFormatter(formatter_simple) _log_process_config(logger=logger, config=config) # Use normal formatting from here on handler.setFormatter(formatter) # add pyroSAR logger log_pyro = logging.getLogger('pyroSAR') log_pyro.setLevel(level) log_pyro.addHandler(handler) # add cesard logger log_cesard = logging.getLogger('cesard') log_cesard.setLevel(level) log_cesard.addHandler(handler) return logger
def _log_process_config( logger: logging.Logger, config: dict[str, Any] ) -> None: """ Adds a header to the logfile, which includes information about the current processing configuration. Parameters ---------- logger: The logger to which the header is added to. config: Dictionary of the parsed config parameters for the current process. """ processor_name = config['processing']['processor'] sw_versions = version_dict(processor_name=processor_name) max_len_sw = len(max(sw_versions.keys(), key=len)) max_len_main = len(max(config['processing'].keys(), key=len)) max_len_meta = len(max(config['metadata'].keys(), key=len)) max_len_proc = len(max(config[processor_name].keys(), key=len)) max_len = max(max_len_sw, max_len_main, max_len_meta, max_len_proc) + 4 lines = [] lines.append('=' * 100) for section in ['PROCESSING', processor_name.upper(), 'METADATA']: lines.append(f'{section}') for k, v in config[section.lower()].items(): if k == 'dem_prepare_mode': continue if isinstance(v, datetime): val = v.isoformat() else: val = v lines.append(f"{k: <{max_len}}{val}") lines.append('=' * 100) lines.append('SOFTWARE') for k, v in sw_versions.items(): lines.append(f"{k: <{max_len}}{v}") lines.append('=' * 100) header = '\n'.join(lines) logger.info(header)