Source code for s1ard.metadata.stac

import os
import re
import sys
import shutil
import pystac
from spatialist.ancillary import finder
import logging

log = logging.getLogger('s1ard')


[docs] def make_catalog(directory, product_type, recursive=True, silent=False): """ For a given directory of Sentinel-1 ARD products, this function will create a high-level STAC :class:`~pystac.catalog.Catalog` object serving as the STAC endpoint and lower-level STAC :class:`~pystac.collection.Collection` objects for each subdirectory corresponding to a unique MGRS tile ID. WARNING: The directory content will be reorganized into subdirectories based on the ARD type and unique MGRS tile IDs if this is not yet the case. Parameters ---------- directory: str Path to a directory that contains ARD products. product_type: str Type of ARD products. Options: 'NRB' or 'ORB'. recursive: bool, optional Search `directory` recursively? Default is True. silent: bool, optional Should the output during directory reorganization be suppressed? Default is False. Returns ------- nrb_catalog: pystac.catalog.Catalog STAC Catalog object Notes ----- The returned STAC Catalog object contains Item asset hrefs that are absolute, whereas the actual on-disk files contain relative asset hrefs corresponding to the self-contained Catalog-Type. The returned in-memory STAC Catalog object deviates in this regard to ensure compatibility with the stackstac library: https://github.com/gjoseph92/stackstac/issues/20 """ overwrite = False product_type = product_type.upper() pattern = fr'^S1[AB]_(IW|EW|S[1-6])_{product_type}__1S(SH|SV|DH|DV|VV|HH|HV|VH)_[0-9]{{8}}T[0-9]{{6}}_[0-9]{{6}}_' \ fr'[0-9A-F]{{6}}_[0-9A-Z]{{5}}_[0-9A-Z]{{4}}$' products = finder(target=directory, matchlist=[pattern], foldermode=2, regex=True, recursive=recursive) directory = os.path.join(directory, product_type) # Check if Catalog already exists catalog_path = os.path.join(directory, 'catalog.json') if os.path.isfile(catalog_path): overwrite = True catalog = pystac.Catalog.from_file(catalog_path) items = catalog.get_all_items() item_ids = [item.id for item in items] products_base = [os.path.basename(prod) for prod in products] diff = set(products_base) - set(item_ids) if len(diff) == 0: # See note in docstring - https://github.com/gjoseph92/stackstac/issues/20 catalog.make_all_asset_hrefs_absolute() log.info(f"existing STAC endpoint found: {os.path.join(directory, 'catalog.json')}") return catalog sp_extent = pystac.SpatialExtent([None, None, None, None]) tmp_extent = pystac.TemporalExtent([None, None]) unique_tiles = list( set([re.search(re.compile(r'_[0-9A-Z]{5}_'), prod).group().replace('_', '') for prod in products])) products = _reorganize_by_tile(directory=directory, product_type=product_type, products=products, recursive=recursive, silent=silent) catalog = pystac.Catalog(id=f'{product_type.lower()}_catalog', description=f'STAC Catalog of Sentinel-1 {product_type} products.', title=f'STAC Catalog of Sentinel-1 {product_type} products.', catalog_type=pystac.CatalogType.SELF_CONTAINED) for tile in unique_tiles: tile_collection = pystac.Collection(id=tile, description=f'STAC Collection of Sentinel-1 {product_type} products for ' f'MGRS tile {tile}.', title=f'STAC Collection of Sentinel-1 {product_type} products for ' f'MGRS tile {tile}.', extent=pystac.Extent(sp_extent, tmp_extent), keywords=['sar', 'backscatter', 'esa', 'copernicus', 'sentinel'], providers=[pystac.Provider(name='ESA', roles=[pystac.ProviderRole.LICENSOR, pystac.ProviderRole.PRODUCER])]) catalog.add_child(tile_collection) items = [] for prod in products: if tile in prod: item_path = os.path.join(prod, os.path.basename(prod) + '.json') item = pystac.read_file(href=item_path) items.append(item) tile_collection.add_item(item=item) else: continue extent = tile_collection.extent.from_items(items=items) tile_collection.extent = extent # Save Catalog and Collections on disk catalog.normalize_and_save(root_href=directory) # See note in docstring - https://github.com/gjoseph92/stackstac/issues/20 catalog.make_all_asset_hrefs_absolute() if overwrite: log.info(f"existing STAC endpoint updated: {os.path.join(directory, 'catalog.json')}") else: log.info(f"new STAC endpoint created: {os.path.join(directory, 'catalog.json')}") return catalog
def _reorganize_by_tile(directory, product_type, products=None, recursive=True, silent=False): """ Reorganizes a directory containing Sentinel-1 ARD products based on the ARD type and unique MGRS tile IDs. Parameters ---------- directory: str Path to a directory that contains ARD products. product_type: str Type of ARD products. Options: 'NRB' or 'ORB'. products: list[str] or None, optional List of ARD product paths. Will be created from `directory` if not provided. recursive: bool, optional Search `directory` recursively? Default is True. silent: bool, optional If False (default), a message for each ARD product is printed if it has been moved to a new location or not. Returns ------- products_new: list[str] An updated list of ARD product paths. """ if products is None: parent_dir = os.path.dirname(directory) pattern = fr'^S1[AB]_(IW|EW|S[1-6])_{product_type}__1S(SH|SV|DH|DV|VV|HH|HV|VH)_[0-9]{{8}}T[0-9]{{6}}_' \ fr'[0-9]{{6}}_[0-9A-F]{{6}}_[0-9A-Z]{{5}}_[0-9A-Z]{{4}}$' products = finder(target=parent_dir, matchlist=[pattern], foldermode=2, regex=True, recursive=recursive) inp = input('WARNING:\n{}\nand the ARD products it contains will be reorganized into subdirectories ' 'based on unique MGRS tile IDs if this directory structure does not yet exist. ' '\nDo you wish to continue? [yes|no] '.format(directory)) if inp == 'yes': tile_dict = {} for prod in products: tile = re.search(re.compile(r'_[0-9A-Z]{5}_'), prod).group().replace('_', '') if tile in tile_dict and isinstance(tile_dict[tile], list): tile_dict[tile].append(prod) else: tile_dict[tile] = [prod] tiles = list(tile_dict.keys()) products_new = [] for tile in tiles: tile_dir = os.path.join(directory, tile) os.makedirs(tile_dir, exist_ok=True) for old_dir in tile_dict[tile]: new_dir = os.path.join(tile_dir, os.path.basename(old_dir)) products_new.append(new_dir) if os.path.dirname(old_dir) != tile_dir: shutil.move(old_dir, new_dir) if not silent: log.info(f"-> {os.path.basename(old_dir)} moved to {tile_dir}") else: if not silent: log.info(f"xx {os.path.basename(old_dir)} already in {tile_dir} (skip!)") continue return products_new else: log.info('abort!') sys.exit(0)