import os
import re
import time
import shutil
from datetime import datetime, timezone
import numpy as np
import pandas as pd
import geopandas as gpd
from lxml import etree
from time import gmtime, strftime
from copy import deepcopy
from scipy.interpolate import RBFInterpolator
from osgeo import gdal
from spatialist.vector import Vector, bbox, intersect
from spatialist.raster import Raster, Dtype
from spatialist.auxil import gdalwarp, gdalbuildvrt
from spatialist.ancillary import finder
from pyroSAR import identify, identify_many
from pyroSAR.ancillary import Lock
import s1ard
from s1ard import dem, ocn
from s1ard.metadata import extract, xml, stac
from s1ard.metadata.mapping import LERC_ERR_THRES
from s1ard.ancillary import generate_unique_id, vrt_add_overviews, datamask, get_tmp_name, combine_polygons
from s1ard.metadata.extract import copy_src_meta
from s1ard.processors.registry import load_processor
import logging
log = logging.getLogger('s1ard')
[docs]
def get_datasets(scenes, datadir, extent, epsg, processor_name):
"""
Collect processing output for a list of scenes. Reads metadata from all
source SLC/GRD scenes, finds matching output files in `datadir` and
filters both lists depending on the actual overlap of each SLC/GRD valid
data coverage with the current MGRS tile geometry. If no output is found
for any scene the function will raise an error.
To obtain the extent of valid data coverage, first a binary mask raster
file is created with the name `datamask.tif`, which is stored in the same
folder as the processing output as found by :func:`~s1ard.snap.find_datasets`.
Then, the boundary of this binary mask is computed and stored as `datamask.gpkg`
(see function :func:`spatialist.vector.boundary`).
If the provided `extent` does not overlap with this boundary, the output is
discarded. This scenario might occur when the scene's geometry read from its
metadata overlaps with the tile but the actual extent of data does not.
Parameters
----------
scenes: list[str]
List of scenes to process. Either an individual scene or multiple,
matching scenes (consecutive acquisitions).
datadir: str
The directory containing the SAR datasets processed from the source
scenes using pyroSAR. The function will raise an error if the processing
output cannot be found for all scenes in `datadir`.
extent: dict
Spatial extent of the MGRS tile, derived from a
:class:`~spatialist.vector.Vector` object.
epsg: int
The coordinate reference system as an EPSG code.
processor_name: str
The name of the used SAR processor. The function `find_datasets` of the
respective processor module is used.
Returns
-------
ids: list[:class:`pyroSAR.drivers.ID`]
List of :class:`~pyroSAR.drivers.ID` objects of all source SLC/GRD scenes
that overlap with the current MGRS tile.
datasets: list[dict]
List of SAR processing output files that match each :class:`~pyroSAR.drivers.ID`
object of `ids`. The format is a list of dictionaries per scene with keys as
described by e.g. :func:`s1ard.snap.find_datasets`.
See Also
--------
:func:`s1ard.snap.find_datasets`
"""
processor = load_processor(processor_name)
ids = identify_many(scenes)
datasets = []
for i, _id in enumerate(ids):
log.debug(f'collecting processing output for scene {os.path.basename(_id.scene)}')
files = processor.find_datasets(scene=_id.scene, outdir=datadir, epsg=epsg)
if files is not None:
base = os.path.splitext(os.path.basename(_id.scene))[0]
ocn = re.sub('(?:SLC_|GRD[FHM])_1', 'OCN__2', base)[:-5]
# allow 1 second tolerance
s_start = int(ocn[31])
s_stop = int(ocn[47])
ocn_list = list(ocn)
s = 1
ocn_list[31] = f'[{s_start - s}{s_start}{s_start + s}]'
ocn_list[47] = f'[{s_stop - s}{s_stop}{s_stop + s}]'
ocn = ''.join(ocn_list)
log.debug(f'searching for OCN products with pattern {ocn}')
ocn_match = finder(target=datadir, matchlist=[ocn], regex=True,
foldermode=2, recursive=False)
if len(ocn_match) > 0:
for v in ['owiNrcsCmod', 'owiEcmwfWindSpeed', 'owiEcmwfWindDirection']:
ocn_tif = os.path.join(ocn_match[0], f'{v}.tif')
if os.path.isfile(ocn_tif):
if v.endswith('Speed'):
files['wm_ref_speed'] = ocn_tif
elif v.endswith('Direction'):
files['wm_ref_direction'] = ocn_tif
else:
files['wm'] = ocn_tif
datasets.append(files)
else:
base = os.path.basename(_id.scene)
msg = f'cannot find processing output for scene {base} and CRS EPSG:{epsg}'
raise RuntimeError(msg)
i = 0
while i < len(datasets):
log.debug(f'checking tile overlap for scene {os.path.basename(ids[i].scene)}')
measurements = [datasets[i][x] for x in datasets[i].keys()
if re.search('[gs]-lin', x)]
dm_ras = os.path.join(os.path.dirname(measurements[0]), 'datamask.tif')
dm_vec = dm_ras.replace('.tif', '.gpkg')
dm_vec = datamask(measurement=measurements[0], dm_ras=dm_ras, dm_vec=dm_vec)
if dm_vec is None:
del ids[i], datasets[i]
continue
with Lock(dm_vec, soft=True):
with Vector(dm_vec) as bounds:
with bbox(extent, epsg) as tile_geom:
inter = intersect(bounds, tile_geom)
if inter is not None:
with Raster(dm_ras) as ras:
inter_min = ras.res[0] * ras.res[1]
if inter.getArea() < inter_min:
inter.close()
inter = None
if inter is None:
log.debug('no overlap, removing scene')
del ids[i]
del datasets[i]
else:
log.debug('overlap detected')
# Add dm_ras to the datasets if it overlaps with the current tile
datasets[i]['datamask'] = dm_ras
i += 1
inter.close()
return ids, datasets
[docs]
def create_vrt(src, dst, fun, relpaths=False, scale=None, offset=None, dtype=None,
args=None, options=None, overviews=None, overview_resampling=None):
"""
Creates a VRT file for the specified source dataset(s) and adds a pixel function that should be applied on the fly
when opening the VRT file.
Parameters
----------
src: str or list[str]
The input dataset(s).
dst: str
The output dataset.
fun: str
A `PixelFunctionType` that should be applied on the fly when opening the VRT file. The function is applied to a
band that derives its pixel information from the source bands. A list of possible options can be found here:
https://gdal.org/drivers/raster/vrt.html#default-pixel-functions.
Furthermore, the option 'decibel' can be specified, which will implement a custom pixel function that uses
Python code for decibel conversion (10*log10).
relpaths: bool
Should all `SourceFilename` XML elements with attribute `@relativeToVRT="0"` be updated to be paths relative to
the output VRT file? Default is False.
scale: int or None
The scale that should be applied when computing “real” pixel values from scaled pixel values on a raster band.
Will be ignored if `fun='decibel'`.
offset: float or None
The offset that should be applied when computing “real” pixel values from scaled pixel values on a raster band.
Will be ignored if `fun='decibel'`.
dtype: str or None
the data type of the written VRT file; default None: same data type as source data.
data type notations of GDAL (e.g. `Float32`) and numpy (e.g. `int8`) are supported.
args: dict or None
arguments for `fun` passed as `PixelFunctionArguments`. Requires GDAL>=3.5 to be read.
options: dict or None
Additional parameters passed to `gdal.BuildVRT`.
overviews: list[int] or None
Internal overview levels to be created for each raster file.
overview_resampling: str or None
Resampling method for overview levels.
Examples
--------
linear gamma0 backscatter as input:
>>> src = 's1a-iw-nrb-20220601t052704-043465-0530a1-32tpt-vh-g-lin.tif'
decibel scaling I:
use `log10` pixel function and additional `Scale` parameter.
Known to display well in QGIS, but `Scale` is ignored when reading array in Python.
>>> dst = src.replace('-lin.tif', '-log1.vrt')
>>> create_vrt(src=src, dst=dst, fun='log10', scale=10)
decibel scaling II:
use custom Python pixel function. Requires additional environment variable GDAL_VRT_ENABLE_PYTHON set to YES.
>>> dst = src.replace('-lin.tif', '-log2.vrt')
>>> create_vrt(src=src, dst=dst, fun='decibel')
decibel scaling III:
use `dB` pixel function with additional `PixelFunctionArguments`. Works best but requires GDAL>=3.5.
>>> dst = src.replace('-lin.tif', '-log3.vrt')
>>> create_vrt(src=src, dst=dst, fun='dB', args={'fact': 10})
"""
options = {} if options is None else options
gdalbuildvrt(src=src, dst=dst, **options)
tree = etree.parse(dst)
root = tree.getroot()
band = tree.find('VRTRasterBand')
band.attrib['subClass'] = 'VRTDerivedRasterBand'
if dtype is not None:
band.attrib['dataType'] = Dtype(dtype).gdalstr
if fun == 'decibel':
pxfun_language = etree.SubElement(band, 'PixelFunctionLanguage')
pxfun_language.text = 'Python'
pxfun_type = etree.SubElement(band, 'PixelFunctionType')
pxfun_type.text = fun
pxfun_code = etree.SubElement(band, 'PixelFunctionCode')
pxfun_code.text = etree.CDATA("""
import numpy as np
def decibel(in_ar, out_ar, xoff, yoff, xsize, ysize, raster_xsize, raster_ysize, buf_radius, gt, **kwargs):
np.multiply(np.log10(in_ar[0], where=in_ar[0]>0.0, out=out_ar, dtype='float32'), 10.0, out=out_ar, dtype='float32')
""")
else:
pixfun_type = etree.SubElement(band, 'PixelFunctionType')
pixfun_type.text = fun
if args is not None:
arg = etree.SubElement(band, 'PixelFunctionArguments')
for key, value in args.items():
arg.attrib[key] = str(value)
if scale is not None:
sc = etree.SubElement(band, 'Scale')
sc.text = str(scale)
if offset is not None:
off = etree.SubElement(band, 'Offset')
off.text = str(offset)
if any([overviews, overview_resampling]) is not None:
ovr = tree.find('OverviewList')
if ovr is None:
ovr = etree.SubElement(root, 'OverviewList')
if overview_resampling is not None:
ovr.attrib['resampling'] = overview_resampling.lower()
if overviews is not None:
ov = str(overviews)
for x in ['[', ']', ',']:
ov = ov.replace(x, '')
ovr.text = ov
if relpaths:
srcfiles = tree.xpath('//SourceFilename[@relativeToVRT="0"]')
for srcfile in srcfiles:
repl = os.path.relpath(srcfile.text, start=os.path.dirname(dst))
repl = repl.replace('\\', '/')
srcfile.text = repl
srcfile.attrib['relativeToVRT'] = '1'
etree.indent(root)
tree.write(dst, pretty_print=True, xml_declaration=False, encoding='utf-8')
[docs]
def create_rgb_vrt(outname, infiles, overviews, overview_resampling):
"""
Creation of the color composite VRT file.
Parameters
----------
outname: str
Full path to the output VRT file.
infiles: list[str]
A list of paths pointing to the linear scaled measurement backscatter files.
overviews: list[int]
Internal overview levels to be defined for the created VRT file.
overview_resampling: str
Resampling method applied to overview pyramids.
"""
# make sure order is right and co-polarization (VV or HH) is first
pols = [re.search('[hv]{2}', os.path.basename(f)).group() for f in infiles]
if pols[1] in ['vv', 'hh']:
infiles.reverse()
pols.reverse()
# format overview levels
ov = str(overviews)
for x in ['[', ']', ',']:
ov = ov.replace(x, '')
# create VRT file and change its content
gdalbuildvrt(src=infiles, dst=outname, separate=True)
tree = etree.parse(outname)
root = tree.getroot()
srs = tree.find('SRS').text
geotrans = tree.find('GeoTransform').text
bands = tree.findall('VRTRasterBand')
vrt_nodata = bands[0].find('NoDataValue').text
complex_src = [band.find('ComplexSource') for band in bands]
for cs in complex_src:
cs.remove(cs.find('NODATA'))
new_band = etree.SubElement(root, 'VRTRasterBand',
attrib={'dataType': 'Float32', 'band': '3',
'subClass': 'VRTDerivedRasterBand'})
new_band_na = etree.SubElement(new_band, 'NoDataValue')
new_band_na.text = 'nan'
pxfun_type = etree.SubElement(new_band, 'PixelFunctionType')
pxfun_type.text = 'mul'
for cs in complex_src:
new_band.append(deepcopy(cs))
src = new_band.findall('ComplexSource')[1]
fname = src.find('SourceFilename')
fname_old = fname.text
src_attr = src.find('SourceProperties').attrib
fname.text = etree.CDATA("""
<VRTDataset rasterXSize="{rasterxsize}" rasterYSize="{rasterysize}">
<SRS dataAxisToSRSAxisMapping="1,2">{srs}</SRS>
<GeoTransform>{geotrans}</GeoTransform>
<VRTRasterBand dataType="{dtype}" band="1" subClass="VRTDerivedRasterBand">
<NoDataValue>{vrt_nodata}</NoDataValue>
<PixelFunctionType>{px_fun}</PixelFunctionType>
<ComplexSource>
<SourceFilename relativeToVRT="1">{fname}</SourceFilename>
<SourceBand>1</SourceBand>
<SourceProperties RasterXSize="{rasterxsize}" RasterYSize="{rasterysize}" DataType="{dtype}" BlockXSize="{blockxsize}" BlockYSize="{blockysize}"/>
<SrcRect xOff="0" yOff="0" xSize="{rasterxsize}" ySize="{rasterysize}"/>
<DstRect xOff="0" yOff="0" xSize="{rasterxsize}" ySize="{rasterysize}"/>
</ComplexSource>
</VRTRasterBand>
<OverviewList resampling="{ov_resampling}">{ov}</OverviewList>
</VRTDataset>
""".format(rasterxsize=src_attr['RasterXSize'], rasterysize=src_attr['RasterYSize'], srs=srs, geotrans=geotrans,
dtype=src_attr['DataType'], px_fun='inv', fname=fname_old, vrt_nodata=vrt_nodata,
blockxsize=src_attr['BlockXSize'], blockysize=src_attr['BlockYSize'],
ov_resampling=overview_resampling.lower(), ov=ov))
bands = tree.findall('VRTRasterBand')
for band, col in zip(bands, ['Red', 'Green', 'Blue']):
color = etree.Element('ColorInterp')
color.text = col
band.insert(0, color)
ovr = etree.SubElement(root, 'OverviewList', attrib={'resampling': overview_resampling.lower()})
ovr.text = ov
etree.indent(root)
tree.write(outname, pretty_print=True, xml_declaration=False, encoding='utf-8')
[docs]
def calc_product_start_stop(src_ids, extent, epsg):
"""
Calculates the start and stop times of the ARD product.
The geolocation grid points including their azimuth time information are
extracted first from the metadata of each source product.
These grid points are then used to interpolate the azimuth time for the
coordinates of the MGRS tile extent. The lowest and highest interpolated
value are returned as product acquisition start and stop times of the
ARD product.
Parameters
----------
src_ids: list[pyroSAR.drivers.ID]
List of :class:`~pyroSAR.drivers.ID` objects of all source products
that overlap with the current MGRS tile.
extent: dict
Spatial extent of the MGRS tile, derived from a
:class:`~spatialist.vector.Vector` object.
epsg: int
The coordinate reference system of the extent as an EPSG code.
Returns
-------
tuple[datetime.datetime]
Start and stop time of the ARD product in UTC.
See Also
--------
pyroSAR.drivers.SAFE.geo_grid
scipy.interpolate.RBFInterpolator
"""
with bbox(extent, epsg) as tile_geom:
tile_geom.reproject(4326)
scene_geoms = [x.geometry() for x in src_ids]
with combine_polygons(scene_geoms) as scene_geom:
intersection = gpd.overlay(df1=tile_geom.to_geopandas(),
df2=scene_geom.to_geopandas(),
how='intersection')
tile_geom_pts = intersection.get_coordinates().to_numpy()
scene_geoms = None
# combine geo grid of all scenes into one
gdfs = []
for src_id in src_ids:
with src_id.geo_grid() as vec:
gdfs.append(vec.to_geopandas())
gdf = pd.concat(gdfs, ignore_index=True)
# remove duplicate points
gdf["xy"] = gdf.geometry.apply(lambda p: (p.x, p.y))
gdf = gdf.drop_duplicates(subset="xy").copy()
gdf.drop(columns="xy", inplace=True)
# get grid point coordinates and numerical time stamps for interpolation
gdf['timestamp'] = gdf['azimuthTime'].astype(np.int64) / 10 ** 9
gridpts = gdf.get_coordinates().to_numpy()
az_time = gdf['timestamp'].values
# perform interpolation
rbf = RBFInterpolator(y=gridpts, d=az_time)
interpolated = rbf(tile_geom_pts)
# check interpolation validity
if np.isnan(interpolated).any():
raise RuntimeError('The interpolated array contains NaN values.')
# Make sure the interpolated values do not exceed the actual values.
# This might happen when the source product geometries are slightly
# larger than the geo grid extent.
out = [max(min(interpolated), min(gdf['timestamp'])),
min(max(interpolated), max(gdf['timestamp']))]
# double-check that values are plausible
if out[0] < min(gdf['timestamp']) or out[1] > max(gdf['timestamp']):
raise RuntimeError('The interpolated values exceed the input range.')
if out[0] >= out[1]:
raise RuntimeError('The determined acquisition start is larger '
'than or equal to the acquisition end.')
return (datetime.fromtimestamp(x, tz=timezone.utc) for x in out)
[docs]
def create_data_mask(outname, datasets, extent, epsg, driver, creation_opt,
overviews, overview_resampling, dst_nodata, product_type,
lsm_encoding, wbm=None):
"""
Creation of the Data Mask image.
Parameters
----------
outname: str
Full path to the output data mask file.
datasets: list[dict]
List of processed output files that match the source scenes and overlap
with the current MGRS tile. An error will be thrown if not all datasets
contain a key `datamask`. The function will return without an error if
not all datasets contain a key `dm`.
extent: dict
Spatial extent of the MGRS tile, derived from a
:class:`~spatialist.vector.Vector` object.
epsg: int
The coordinate reference system as an EPSG code.
driver: str
GDAL driver to use for raster file creation.
creation_opt: list[str]
GDAL creation options to use for raster file creation. Should match
specified GDAL driver.
overviews: list[int]
Internal overview levels to be created for each raster file.
overview_resampling: str
Resampling method for overview levels.
dst_nodata: int or str
Nodata value to write to the output raster.
product_type: str
The type of ARD product that is being created. Either 'NRB' or 'ORB'.
lsm_encoding: dict
a dictionary containing the layover shadow mask encoding.
wbm: str or None
Path to a water body mask file with the dimensions of an MGRS tile.
Optional if `product_type='NRB', mandatory if `product_type='ORB'`.
"""
measurement_keys = [x for x in datasets[0].keys() if re.search('[gs]-lin', x)]
measurement = [scene[measurement_keys[0]] for scene in datasets]
datamask = [scene['datamask'] for scene in datasets]
ls = []
for scene in datasets:
if 'dm' in scene:
ls.append(scene['dm'])
else:
return # do not create a data mask if not all scenes have a layover-shadow mask
dm_bands = ['not layover, nor shadow',
'layover',
'shadow',
'ocean',
'lakes',
'rivers']
if product_type == 'ORB':
if wbm is None:
raise RuntimeError('Water body mask is required for ORB products')
tile_bounds = [extent['xmin'], extent['ymin'], extent['xmax'], extent['ymax']]
vrt_ls = '/vsimem/' + os.path.dirname(outname) + 'ls.vrt'
vrt_valid = '/vsimem/' + os.path.dirname(outname) + 'valid.vrt'
vrt_measurement = '/vsimem/' + os.path.dirname(outname) + 'measurement.vrt'
gdalbuildvrt(src=ls, dst=vrt_ls, outputBounds=tile_bounds, void=False)
gdalbuildvrt(src=datamask, dst=vrt_valid, outputBounds=tile_bounds, void=False)
gdalbuildvrt(src=measurement, dst=vrt_measurement, outputBounds=tile_bounds, void=False)
with Raster(vrt_ls) as ras_ls:
with bbox(extent, crs=epsg) as tile_vec:
ras_ls_res = ras_ls.res
rows = ras_ls.rows
cols = ras_ls.cols
geotrans = ras_ls.raster.GetGeoTransform()
proj = ras_ls.raster.GetProjection()
arr_dm = ras_ls.array()
# Get Water Body Mask
if wbm is not None:
with Raster(wbm) as ras_wbm:
ras_wbm_cols = ras_wbm.cols
cols_ratio = ras_wbm_cols / cols
if cols_ratio > 1:
# create low resolution VRT
res = int(ras_ls_res[0])
wbm_lowres = wbm.replace('.tif', f'_{res}m.vrt')
options = {'xRes': res, 'yRes': res,
'resampleAlg': 'mode'}
gdalbuildvrt(src=wbm, dst=wbm_lowres, **options)
with Raster(wbm_lowres) as ras_wbm_lowres:
arr_wbm = ras_wbm_lowres.array()
else:
arr_wbm = ras_wbm.array()
else:
del dm_bands[3:]
c = lsm_encoding['not layover, not shadow']
l = lsm_encoding['layover']
s = lsm_encoding['shadow']
ls = lsm_encoding['layover in shadow']
n = lsm_encoding['nodata']
# Extend the shadow class of the data mask with nodata values
# from backscatter data and create final array
with Raster(vrt_valid)[tile_vec] as ras_valid:
with Raster(vrt_measurement)[tile_vec] as ras_measurement:
arr_valid = ras_valid.array()
arr_measurement = ras_measurement.array()
arr_dm[~np.isfinite(arr_dm)] = n
arr_dm = np.where(((arr_valid == 1) & (np.isnan(arr_measurement))),
s, arr_dm)
arr_dm[arr_valid != 1] = n
del arr_measurement
del arr_valid
outname_tmp = '/vsimem/' + os.path.basename(outname) + '.vrt'
gdriver = gdal.GetDriverByName('GTiff')
ds_tmp = gdriver.Create(outname_tmp, rows, cols, len(dm_bands), gdal.GDT_Byte,
options=['ALPHA=UNSPECIFIED', 'PHOTOMETRIC=MINISWHITE'])
gdriver = None
ds_tmp.SetGeoTransform(geotrans)
ds_tmp.SetProjection(proj)
for i, name in enumerate(dm_bands):
band = ds_tmp.GetRasterBand(i + 1)
# not layover, nor shadow
if i == 0:
arr = arr_dm == c
# layover | layover in shadow
elif i == 1:
arr = (arr_dm == l) | (arr_dm == ls)
# shadow | layover in shadow
elif i == 2:
arr = (arr_dm == s) | (arr_dm == ls)
# ocean
elif i == 3:
arr = arr_wbm == 1
# lakes
elif i == 4:
arr = arr_wbm == 2
# rivers
elif i == 5:
arr = arr_wbm == 3
else:
raise ValueError(f'unknown array value: {i}')
arr = arr.astype('uint8')
arr[arr_dm == n] = dst_nodata
band.WriteArray(arr)
band.SetNoDataValue(dst_nodata)
band.SetDescription(name)
band.FlushCache()
band = None
del arr
ds_tmp.SetMetadataItem('TIFFTAG_DATETIME', strftime('%Y:%m:%d %H:%M:%S', gmtime()))
ds_tmp.BuildOverviews(overview_resampling, overviews)
outDataset_cog = gdal.GetDriverByName(driver).CreateCopy(outname, ds_tmp, strict=1, options=creation_opt)
outDataset_cog = None
ds_tmp = None
tile_vec = None
[docs]
def create_acq_id_image(outname, ref_tif, datasets, src_ids, extent,
epsg, driver, creation_opt, overviews, dst_nodata):
"""
Creation of the Acquisition ID image.
Parameters
----------
outname: str
Full path to the output data mask file.
ref_tif: str
Full path to any GeoTIFF file of the ARD product.
datasets: list[dict]
List of processed output files that match the source SLC scenes and overlap with the current MGRS tile.
src_ids: list[pyroSAR.drivers.ID]
List of :class:`~pyroSAR.drivers.ID` objects of all source SLC scenes that overlap with the current MGRS tile.
extent: dict
Spatial extent of the MGRS tile, derived from a :class:`~spatialist.vector.Vector` object.
epsg: int
The CRS used for the ARD product; provided as an EPSG code.
driver: str
GDAL driver to use for raster file creation.
creation_opt: list[str]
GDAL creation options to use for raster file creation. Should match specified GDAL driver.
overviews: list[int]
Internal overview levels to be created for each raster file.
dst_nodata: int or str
Nodata value to write to the output raster.
"""
src_scenes = [sid.scene for sid in src_ids]
# If there are two source scenes, make sure that the order of acquisitions in all lists is correct!
if len(src_scenes) > 1:
if not len(src_scenes) == 2 and len(datasets) == 2:
raise RuntimeError('expected lists `src_scenes` and `valid_mask_list` to be of length 2; length is '
'{} and {} respectively'.format(len(src_scenes), len(datasets)))
starts_src = [datetime.strptime(identify(f).start, '%Y%m%dT%H%M%S') for f in src_scenes]
start_valid = [re.search('[0-9]{8}T[0-9]{6}', os.path.basename(x)).group() for x in src_scenes]
start_valid = [datetime.strptime(x, '%Y%m%dT%H%M%S') for x in start_valid]
if starts_src[0] > starts_src[1]:
src_scenes.reverse()
starts_src.reverse()
if start_valid[0] != starts_src[0]:
datasets.reverse()
if start_valid[0] != starts_src[0]:
raise RuntimeError('failed to match order of lists `src_scenes` and `valid_mask_list`')
tile_bounds = [extent['xmin'], extent['ymin'], extent['xmax'], extent['ymax']]
arr_list = []
for dataset in datasets:
vrt_valid = '/vsimem/' + os.path.dirname(outname) + 'mosaic.vrt'
gdalbuildvrt(src=dataset['datamask'], dst=vrt_valid, outputBounds=tile_bounds, void=False)
with bbox(extent, crs=epsg) as tile_vec:
with Raster(vrt_valid)[tile_vec] as vrt_ras:
vrt_arr = vrt_ras.array()
arr_list.append(vrt_arr)
del vrt_arr
tile_vec = None
src_scenes_clean = [os.path.basename(src).replace('.zip', '').replace('.SAFE', '') for src in src_scenes]
tag = '{{"{src1}": 1}}'.format(src1=src_scenes_clean[0])
out_arr = np.full(arr_list[0].shape, dst_nodata)
out_arr[arr_list[0] == 1] = 1
if len(arr_list) == 2:
out_arr[arr_list[1] == 1] = 2
tag = '{{"{src1}": 1, "{src2}": 2}}'.format(src1=src_scenes_clean[0], src2=src_scenes_clean[1])
creation_opt.append('TIFFTAG_IMAGEDESCRIPTION={}'.format(tag))
with Raster(ref_tif) as ref_ras:
ref_ras.write(outname, format=driver, array=out_arr.astype('uint8'), nodata=dst_nodata, overwrite=True,
overviews=overviews, options=creation_opt)
[docs]
def wind_normalization(src, dst_wm, dst_wn, measurement, gapfill, bounds, epsg, driver,
creation_opt, dst_nodata, multithread, resolution=915):
"""
Create wind normalization layers. A wind model annotation layer is created and optionally
a wind normalization VRT.
Parameters
----------
src: list[str]
A list of OCN products as prepared by :func:`s1ard.ocn.extract`
dst_wm: str
The name of the wind model layer in the ARD product
dst_wn: str or None
The name of the wind normalization VRT. If None, no VRT will be created.
Requires `measurement` to point to a file.
measurement: str or None
The name of the measurement file used for wind normalization in `dst_wn`.
If None, no wind normalization VRT will be created.
gapfill: bool
Perform additional gap filling (:func:`s1ard.ocn.gapfill`)?
This is recommended if the Level-1 source product of `measurement` is GRD
in which case gaps are introduced between subsequently acquired scenes.
bounds: list[float]
the bounds of the MGRS tile
epsg: int
The EPSG code of the MGRS tile
driver: str
GDAL driver to use for raster file creation.
creation_opt: list[str]
GDAL creation options to use for raster file creation. Should match specified GDAL driver.
dst_nodata: float
Nodata value to write to the output raster.
multithread: bool
Should `gdalwarp` use multithreading?
resolution: int, optional
The target pixel resolution in meters. 915 is chosen as default because it is closest
to the OCN product resolution (1000) and still fits into the MGRS bounds
(``109800 % 915 == 0``).
Returns
-------
"""
if len(src) > 1:
cmod_mosaic = get_tmp_name(suffix='.tif')
gdalwarp(src=src, dst=cmod_mosaic)
if gapfill:
cmod_geo = get_tmp_name(suffix='.tif')
ocn.gapfill(src=cmod_mosaic, dst=cmod_geo, md=2, si=1)
os.remove(cmod_mosaic)
else:
cmod_geo = cmod_mosaic
cmod_geo_tmp = True
else:
cmod_geo = src[0]
cmod_geo_tmp = False
if not os.path.isfile(dst_wm):
log.info(f'creating {dst_wm}')
gdalwarp(src=cmod_geo,
dst=dst_wm,
outputBounds=bounds,
dstSRS=f'EPSG:{epsg}',
xRes=resolution, yRes=resolution,
resampleAlg='bilinear',
format=driver,
dstNodata=dst_nodata,
multithread=multithread,
creationOptions=creation_opt)
if cmod_geo_tmp:
os.remove(cmod_geo)
if dst_wn is not None and measurement is not None:
if not os.path.isfile(dst_wn):
log.info(f'creating {dst_wn}')
with Raster(measurement) as ras:
xres, yres = ras.res
create_vrt(src=[measurement, dst_wm], dst=dst_wn, fun='div',
options={'xRes': xres, 'yRes': yres}, relpaths=True)