Source code for s1ard.ancillary

import os
import io
import sys
import logging
import requests
import hashlib
import tempfile
import zipfile
import dateutil.parser
from pathlib import Path
from multiformats import multihash
import binascii
from lxml import etree
from datetime import datetime, timedelta, timezone
from osgeo import gdal, ogr, osr
import numpy as np
import spatialist
from spatialist.raster import Raster, rasterize
from spatialist.vector import bbox, intersect, boundary, vectorize, Vector, crsConvert
import pyroSAR
from pyroSAR.ancillary import Lock, LockCollection
from pyroSAR import identify_many
import s1ard
from s1ard.processors.registry import load_processor
from collections import defaultdict
from typing import Callable, List, TypeVar

log = logging.getLogger('s1ard')

T = TypeVar('T')  # any type
K = TypeVar('K')  # key


[docs] def check_scene_consistency(scenes): """ Check the consistency of a scene selection. The following pyroSAR object attributes must be the same: - sensor - acquisition_mode - product - frameNumber (data take ID) Parameters ---------- scenes: list[str or pyroSAR.drivers.ID] Returns ------- Raises ------ RuntimeError """ scenes = identify_many(scenes) for attr in ['sensor', 'acquisition_mode', 'product', 'frameNumber']: values = set([getattr(x, attr) for x in scenes]) if not len(values) == 1: msg = f"scene selection differs in attribute '{attr}': {values}" raise RuntimeError(msg)
[docs] def check_spacing(spacing): """ Check whether the spacing fits into the MGRS tile boundaries. Parameters ---------- spacing: int or float the target pixel spacing in meters Returns ------- """ # 109800 m is the edge length of one tile. # the overlap between tiles is either 9780 or 9840 m. overlap_edges = [9780, 9840, 109800] options = [] for i in range(1, (spacing + 1) * 10): if all([x % (i / 10) == 0 for x in overlap_edges]): options.append(i / 10) if spacing not in options: raise RuntimeError(f'target spacing of {spacing} m does not align ' f'with MGRS tile size and overlaps.\nOptions: {options}')
[docs] def generate_unique_id(encoded_str, length=4): """ Returns a unique product identifier as a hexadecimal string. The CRC-16 algorithm used to compute the unique identifier is CRC-CCITT (0xFFFF). The resulting CRC value is truncated to the number of hexadecimal characters specified by the `length` argument. Parameters ---------- encoded_str: bytes A string that should be used to generate a unique id from. The string needs to be encoded; e.g.: `'abc'.encode()`. length: int, optional The desired length of the output string in hexadecimal characters (max: 4). Values higher than 4 will be capped at 4, since CRC-16 only produces 16 bits. Returns ------- p_id: str The unique product identifier (upper-case hexadecimal string). """ crc = binascii.crc_hqx(encoded_str, 0xffff) max_length = 4 # Max characters for 16-bit CRC length = max(1, min(length, max_length)) # Clamp between 1 and 4 mask = (1 << (length * 4)) - 1 # Each hex digit = 4 bits p_id = f'{crc & mask:0{length}X}' return p_id
[docs] def get_max_ext(geometries, buffer=None, crs=None): """ Gets the maximum extent from a list of geometries. Parameters ---------- geometries: list[spatialist.vector.Vector] List of :class:`~spatialist.vector.Vector` geometries. buffer: float or None The buffer in units of the geometries' CRS to add to the extent. crs: str or int or None The target CRS of the extent. If None (default) the extent is expressed in the CRS of the input geometries. Returns ------- max_ext: dict The maximum extent of the selected :class:`~spatialist.vector.Vector` geometries including the chosen buffer. """ max_ext = {} crs_list = [] for geo in geometries: crs_list.append(f"EPSG:{geo.getProjection('epsg')}") if len(max_ext.keys()) == 0: max_ext = geo.extent else: ext = geo.extent for key in ['xmin', 'ymin']: if ext[key] < max_ext[key]: max_ext[key] = ext[key] for key in ['xmax', 'ymax']: if ext[key] > max_ext[key]: max_ext[key] = ext[key] crs_list = list(set(crs_list)) if len(crs_list) > 1: raise RuntimeError(f'The input geometries are in different CRSs: {crs_list}') max_ext = dict(max_ext) if buffer is not None: max_ext['xmin'] -= buffer max_ext['xmax'] += buffer max_ext['ymin'] -= buffer max_ext['ymax'] += buffer if crs is not None: with bbox(coordinates=max_ext, crs=crs_list[0]) as geo: geo.reproject(projection=crs) max_ext = geo.extent return max_ext
[docs] def set_logging(config, debug=False): """ Set logging for the current process. Parameters ---------- config: dict Dictionary of the parsed config parameters for the current process. debug: bool Set logging level to DEBUG? Returns ------- logging.Logger The log handler for the current process. """ level = logging.DEBUG if debug else logging.INFO logger = logging.getLogger('s1ard') logger.setLevel(level) log_format = "[%(asctime)s] [%(levelname)7s] %(message)s" formatter = logging.Formatter(fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S') logfile = config['processing']['logfile'] if logfile is not None: os.makedirs(os.path.dirname(logfile), exist_ok=True) handler = logging.FileHandler(filename=logfile, mode='a') else: handler = logging.StreamHandler(sys.stdout) logger.addHandler(handler) # Add header first with simple formatting formatter_simple = logging.Formatter("%(message)s") handler.setFormatter(formatter_simple) _log_process_config(logger=logger, config=config) # Use normal formatting from here on handler.setFormatter(formatter) # add pyroSAR logger log_pyro = logging.getLogger('pyroSAR') log_pyro.setLevel(level) log_pyro.addHandler(handler) return logger
[docs] def group_by_attr(items: List[T], key_fn: Callable[[T], K]) -> List[List[T]]: """ Group items based on a key function. :param items: The list of arbitrary items to group. :param key_fn: A function that extracts a key from each item. :returns: A list of groups, where each group is a list of items with the same key. Example ------- >>> list_in = ['abc', 'axy', 'brt', 'btk'] >>> print(group_by_attr(list_in, lambda x: x[0])) [['abc', 'axy'], ['brt', 'btk']] >>> list_in = [{'a': 1}, {'a': 2}, {'a': 1}, {'a': 2}] >>> print(group_by_attr(list_in, lambda x: x['a'])) [[{'a': 1}, {'a': 1}], [{'a': 2}, {'a': 2}]] """ grouped = defaultdict(list) for item in items: key = key_fn(item) grouped[key].append(item) return list(grouped.values())
[docs] def group_by_time(scenes, time=3): """ Group scenes by their acquisition time difference. Parameters ---------- scenes:list[pyroSAR.drivers.ID or str] a list of image names time: int or float a time difference in seconds by which to group the scenes. The default of 3 seconds incorporates the overlap between SLCs. Returns ------- list[list[pyroSAR.drivers.ID]] a list of sub-lists containing the file names of the grouped scenes """ # sort images by time stamp scenes = identify_many(scenes, sortkey='start') if len(scenes) < 2: return [scenes] groups = [[scenes[0]]] group = groups[0] for i in range(1, len(scenes)): start = datetime.strptime(scenes[i].start, '%Y%m%dT%H%M%S') stop_pred = datetime.strptime(scenes[i - 1].stop, '%Y%m%dT%H%M%S') diff = abs((stop_pred - start).total_seconds()) if diff <= time: group.append(scenes[i]) else: groups.append([scenes[i]]) group = groups[-1] return groups
def _log_process_config(logger, config): """ Adds a header to the logfile, which includes information about the current processing configuration. Parameters ---------- logger: logging.Logger The logger to which the header is added to. config: dict Dictionary of the parsed config parameters for the current process. """ sw_versions = { 's1ard': s1ard.__version__, 'python': sys.version, 'python-pyroSAR': pyroSAR.__version__, 'python-spatialist': spatialist.__version__, 'python-GDAL': gdal.__version__} processor_name = config['processing']['processor'] processor = load_processor(processor_name) sw_versions.update(processor.version_dict()) max_len_sw = len(max(sw_versions.keys(), key=len)) max_len_main = len(max(config['processing'].keys(), key=len)) max_len_meta = len(max(config['metadata'].keys(), key=len)) max_len_proc = len(max(config[processor_name].keys(), key=len)) max_len = max(max_len_sw, max_len_main, max_len_meta, max_len_proc) + 4 lines = [] lines.append('=' * 100) for section in ['PROCESSING', processor_name.upper(), 'METADATA']: lines.append(f'{section}') for k, v in config[section.lower()].items(): if k == 'dem_prepare_mode': continue if isinstance(v, datetime): val = v.isoformat() else: val = v lines.append(f"{k: <{max_len}}{val}") lines.append('=' * 100) lines.append('SOFTWARE') for k, v in sw_versions.items(): lines.append(f"{k: <{max_len}}{v}") lines.append('=' * 100) header = '\n'.join(lines) logger.info(header)
[docs] def vrt_add_overviews(vrt, overviews, resampling='AVERAGE'): """ Add overviews to an existing VRT file. Existing overviews will be overwritten. Parameters ---------- vrt: str the VRT file overviews: list[int] the overview levels resampling: str the overview resampling method Returns ------- """ tree = etree.parse(vrt) root = tree.getroot() ovr = root.find('OverviewList') if ovr is None: ovr = etree.SubElement(root, 'OverviewList') ovr.text = ' '.join([str(x) for x in overviews]) ovr.attrib['resampling'] = resampling.lower() etree.indent(root) tree.write(vrt, pretty_print=True, xml_declaration=False, encoding='utf-8')
[docs] def buffer_min_overlap(geom1, geom2, percent=1, step=None): """ Buffer a rectangular geometry to a minimum overlap with a second geometry. The geometry is iteratively buffered until the minimum overlap is reached. If the overlap of the input geometries is already larger than the defined threshold, a copy of the original geometry is returned. Parameters ---------- geom1: spatialist.vector.Vector the geometry to be buffered geom2: spatialist.vector.Vector the reference geometry to intersect with percent: int or float the minimum overlap in percent of `geom1` step: int or float or None the buffering step size. If None, the step size is 0.1 % of the average rectangle corner length. Returns ------- """ geom1_crs = geom1.getProjection('epsg') geom2_crs = geom2.getProjection('epsg') if geom1_crs != geom2_crs: raise ValueError('both geometries must have the same CRS') geom2_area = geom2.getArea() ext = geom1.extent ext2 = ext.copy() if step is None: xdist = ext['xmax'] - ext['xmin'] ydist = ext['ymax'] - ext['ymin'] step = (xdist + ydist) / 2 / 1000 buffer = 0 overlap = 0 while overlap <= percent: xbuf = buffer * step ybuf = buffer * step ext2['xmin'] = ext['xmin'] - xbuf ext2['xmax'] = ext['xmax'] + xbuf ext2['ymin'] = ext['ymin'] - ybuf ext2['ymax'] = ext['ymax'] + ybuf with bbox(ext2, geom1_crs) as geom3: ext3 = geom3.extent with intersect(geom2, geom3) as inter: inter_area = inter.getArea() overlap = inter_area / geom2_area * 100 buffer += 1 return bbox(ext3, geom1_crs)
[docs] def date_to_utc(date, as_datetime=False, str_format='%Y%m%dT%H%M%S'): """ convert a date object to a UTC date string or datetime object. Parameters ---------- date: str or datetime or None the date object to convert; timezone-unaware dates are interpreted as UTC. as_datetime: bool return a datetime object instead of a string? str_format: str the output string format (ignored if `as_datetime` is True) Returns ------- str or datetime or None the date string or datetime object in UTC time zone """ if date is None: return date elif isinstance(date, str): out = dateutil.parser.parse(date) elif isinstance(date, datetime): out = date else: raise TypeError('date must be a string, datetime object or None') if out.tzinfo is None: out = out.replace(tzinfo=timezone.utc) else: out = out.astimezone(timezone.utc) if not as_datetime: out = out.strftime(str_format) return out
[docs] def buffer_time(start, stop, as_datetime=False, str_format='%Y%m%dT%H%M%S', **kwargs): """ Time range buffering Parameters ---------- start: str the start time date object to convert; timezone-unaware dates are interpreted as UTC. stop: str the stop time date object to convert; timezone-unaware dates are interpreted as UTC. as_datetime: bool return datetime objects instead of strings? str_format: str the output string format (ignored if `as_datetime` is True) kwargs time arguments passed to :func:`datetime.timedelta` Returns ------- tuple[str | datetime] the buffered start and stop time as string or datetime object """ td = timedelta(**kwargs) start = date_to_utc(start, as_datetime=True) - td stop = date_to_utc(stop, as_datetime=True) + td if not as_datetime: start = start.strftime(str_format) stop = stop.strftime(str_format) return start, stop
[docs] def get_kml(): """ Download the Sentinel-2 MGRS grid KML file. The target folder is ~/s1ard. Returns ------- str the path to the KML file """ remote = ('https://sentiwiki.copernicus.eu/__attachments/1692737/' 'S2A_OPER_GIP_TILPAR_MPC__20151209T095117_V20150622T000000_21000101T000000_B00.zip') local_path = os.path.join(os.path.expanduser('~'), '.s1ard') os.makedirs(local_path, exist_ok=True) local = os.path.join(local_path, os.path.basename(remote).replace('.zip', '.kml')) with Lock(local): if not os.path.isfile(local): log.info(f'downloading MGRS grid KML file to {local_path}') r = requests.get(remote) with zipfile.ZipFile(io.BytesIO(r.content)) as zf: zf.extractall(local_path) return local
[docs] def compute_hash(file_path, algorithm='sha256', chunk_size=8192, multihash_encode=True): """ Compute the (multi)hash of a file using the specified algorithm. Parameters ---------- file_path: str Path to the file. algorithm: str Hash algorithm to use (default is 'sha256'). chunk_size: int Size of chunks to read from the file in bytes (default is 8192). multihash_encode: bool Encode the hash according to the `multihash specification <https://github.com/multiformats/multihash>`_ (default is True)? The hash generated by `hashlib` will be wrapped using :func:`multiformats.multihash.wrap`. Returns ------- str the hexadecimal hash string of the file. See Also -------- :mod:`hashlib` :mod:`multiformats.multihash` """ # lookup between hashlib and multihash algorithm names; to be extended if necessary algo_lookup = {'sha1': 'sha1', 'sha256': 'sha2-256', 'sha512': 'sha2-512'} if algorithm not in algo_lookup.keys(): raise ValueError(f'Hash algorithm must be one of {algo_lookup.keys()}') hash_func = getattr(hashlib, algorithm)() with open(file_path, 'rb') as f: while chunk := f.read(chunk_size): hash_func.update(chunk) if multihash_encode: digest = hash_func.digest() mh = multihash.wrap(digest, algo_lookup[algorithm]) return mh.hex() else: return hash_func.hexdigest()
[docs] def datamask(measurement, dm_ras, dm_vec): """ Create data masks for a given image file. The created raster data mask does not contain a simple mask of nodata values. Rather, a boundary vector geometry containing all valid pixels is created and then rasterized. This boundary geometry (single polygon) is saved as `dm_vec`. In this case `dm_vec` is returned. If the input image only contains nodata values, no raster data mask is created, and an empty dummy vector mask is created. In this case the function will return `None`. Parameters ---------- measurement: str the binary image file dm_ras: str the name of the raster data mask dm_vec: str the name of the vector data mask Returns ------- str or None `dm_vec` if the vector data mask contains a geometry or None otherwise """ def mask_from_array(arr, dm_vec, dm_ras, ref): """ Parameters ---------- arr: np.ndarray dm_vec: str dm_ras: str ref: spatialist.raster.Raster Returns ------- str or None """ # create a dummy vector mask if the mask only contains 0 values if len(arr[arr == 1]) == 0: Path(dm_vec).touch(exist_ok=False) return None # vectorize the raster data mask with vectorize(target=arr, reference=ref) as vec: # compute a valid data boundary geometry (vector data mask) with boundary(vec, expression="value=1") as bounds: # rasterize the vector data mask if not os.path.isfile(dm_ras): rasterize(vectorobject=bounds, reference=ref, outname=dm_ras) # write the vector data mask bounds.write(outfile=dm_vec) return dm_vec if os.path.isfile(dm_vec) and os.path.isfile(dm_ras): return None if os.path.getsize(dm_vec) == 0 else dm_vec with LockCollection([dm_vec, dm_ras]): if not os.path.isfile(dm_vec): if not os.path.isfile(dm_ras): with Raster(measurement) as ras: arr = ras.array() # create a nodata mask mask = ~np.isnan(arr) del arr out = mask_from_array(arr=mask, dm_vec=dm_vec, dm_ras=dm_ras, ref=ras) else: # read the raster data mask with Raster(dm_ras) as ras: mask = ras.array() out = mask_from_array(arr=mask, dm_vec=dm_vec, dm_ras=dm_ras, ref=ras) del mask else: if os.path.getsize(dm_vec) == 0: out = None else: out = dm_vec return out
[docs] def get_tmp_name(suffix): """ Get the name of a temporary file with defined suffix. Files are placed in a subdirectory 's1ard' of the regular temporary directory so the latter is not flooded with too many files in case they are not properly deleted. Parameters ---------- suffix: str the file suffix/extension, e.g. '.tif' Returns ------- """ tmpdir = os.path.join(tempfile.gettempdir(), 's1ard') os.makedirs(tmpdir, exist_ok=True) return tempfile.NamedTemporaryFile(suffix=suffix, dir=tmpdir).name
[docs] def combine_polygons(vector, crs=4326, multipolygon=False, layer_name='combined'): """ Combine polygon vector objects into one. The output is a single vector object with the polygons either stored in separate features or combined into a single multipolygon geometry. Parameters ---------- vector: spatialist.vector.Vector or list[spatialist.vector.Vector] the input vector object(s). Providing only one object only makes sense when `multipolygon=True`. crs: int or str the target CRS. Default: EPSG:4326 multipolygon: bool combine all polygons into one multipolygon? Default False: write each polygon into a separate feature. layer_name: str the layer name of the output vector object. Returns ------- spatialist.vector.Vector """ if not isinstance(vector, list): vector = [vector] ############################################################################## # check geometry types geometry_names = [] field_defs = [] for item in vector: field_defs.extend(item.fieldDefs) for feature in item.layer: geom = feature.GetGeometryRef() geometry_names.append(geom.GetGeometryName()) item.layer.ResetReading() geom = None geometry_names = list(set(geometry_names)) if not all(x == 'POLYGON' for x in geometry_names): raise RuntimeError('All geometries must be of type POLYGON') ############################################################################## vec = Vector(driver='Memory') srs_out = crsConvert(crs, 'osr') if multipolygon: geom_type = ogr.wkbMultiPolygon geom_out = [ogr.Geometry(geom_type)] else: geom_type = ogr.wkbPolygon geom_out = [] fields = [] vec.addlayer(name=layer_name, srs=srs_out, geomType=geom_type) for item in vector: fieldnames = item.fieldnames if item.srs.IsSame(srs_out): coord_trans = None else: coord_trans = osr.CoordinateTransformation(item.srs, srs_out) for feature in item.layer: geom = feature.GetGeometryRef() if coord_trans is not None: geom.Transform(coord_trans) if multipolygon: geom_out[0].AddGeometry(geom.Clone()) else: fields.append({x: feature.GetField(x) for x in fieldnames}) geom_out.append(geom.Clone()) item.layer.ResetReading() geom = None if multipolygon: geom_out = geom_out[0].UnionCascaded() vec.addfeature(geom_out) else: for field_def in field_defs: if field_def.GetName() not in vec.fieldnames: vec.layer.CreateField(field_def) for i, geom in enumerate(geom_out): vec.addfeature(geometry=geom, fields=fields[i]) geom_out = None return vec