Source code for model_catalogs.utils

"""
Utilities to help with catalogs.
"""

import fnmatch
import pathlib
import re

import cf_xarray  # noqa
import numpy as np
import pandas as pd
import requests
import yaml

from intake.catalog import Catalog
from siphon.catalog import TDSCatalog

import model_catalogs as mc


[docs]def astype(value, type_): """Return `value` as type `type_`. Particularly made to work correctly for returning string, `PosixPath`, or `Timestamp` as list. """ if not isinstance(value, type_): if type_ == list and isinstance( value, (str, pathlib.PurePath, pd.Timestamp, Catalog) ): return [value] return type_(value) return value
[docs]def status(urlpath, suffix=".das"): """Check status of server for urlpath. Parameters ---------- urlpath : str Path to file location to check. Returns ------- bool If True, server was reachable. """ resp = requests.get(urlpath + suffix) if resp.status_code != 200: status = False else: status = True return status
[docs]def file2dt(filename): """Return Timestamp of NOAA OFS filename ...without reading in the filename to xarray. See `docs <https://model-catalogs.readthedocs.io/en/latest/aggregations.html>`_ for details on the formula. Most NOAA OFS models have 1 timestep per file, but NYOFS has 6. Parameters ---------- filename : str Filename for which to decipher datetime. Can be full path or just file name. Returns ------- Timestamp pandas Timestamp of the time(s) in the file. Examples -------- >>> url = 'https://www.ncei.noaa.gov/thredds/dodsC/model-cbofs-files/2022/07/nos.cbofs.fields.n001.20220701.t00z.nc' >>> mc.filename2datetime(url) Timestamp('2022-06-30 19:00:00') """ # strip off path if present since can mess up the matching filename = filename.split("/")[-1] # read in date from filename date = pd.to_datetime(filename, format="%Y%m%d", exact=False) # has time 00:00 # pull timing cycle from filename regex = re.compile(".t[0-9]{2}z.") cycle = int(regex.findall(filename)[0][2:4]) # NYOFS: multiple times per file if fnmatch.fnmatch(filename, "*.nowcast.*"): date = [date + pd.Timedelta(f"{cycle - dt} hours") for dt in range(6)[::-1]] # NYOFS: multiple times per file elif fnmatch.fnmatch(filename, "*.forecast.*"): # models all have different forecast lengths! Though only nyofs is left in this category if "nyofs" in filename: nfiles = 54 date = [date + pd.Timedelta(f"{cycle + 1 + dt} hours") for dt in range(nfiles)] # Main style of NOAA OFS files, 1 file per time step elif fnmatch.fnmatch(filename, "*.n???.*") or fnmatch.fnmatch(filename, "*.f???.*"): # number of hours in repeat cycle for forecast if "wcofs" in filename: tshift = 24 else: tshift = 6 # pull hours from filename regex = re.compile(".[n,f][0-9]{3}.") hour = int(regex.findall(filename)[0][2:-1]) # calculate hours dt = cycle + hour # if nowcast file, subtract dt hours if fnmatch.fnmatch(filename, "*.n???.*"): dt -= tshift # construct datetime. dt might be negative. date += pd.Timedelta(f"{dt} hours") return date
[docs]def get_fresh_parameter(filename, source): """Get freshness parameter, based on the filename. A freshness parameter is stored in ``__init__`` for required scenarios which is looked up using the logic in this function, based on the filename. The source is checked for most types of actions for an overriding freshness parameter value, otherwise the default is used. Parameters ---------- filename : Path Filename to determine freshness. source : Intake Source Source from which to check for an overriding freshness parameter. Is not used for "compiled" catalog files. Returns ------- str mu, a pandas Timedelta-interpretable string describing the amount of time that filename should be considered fresh before needing to be recalculated. """ # a start or end datetime file if filename.parent == mc.CACHE_PATH_AVAILABILITY: if source is None: raise ValueError("source cannot be None for this freshness calculation.") # which type are we after if "start" in filename.name: parameter = "start" elif "end" in filename.name: parameter = "end" elif "catrefs" in filename.name: parameter = "catrefs" # check for overriding freshness parameter in source metadata if "freshness" in source.metadata and parameter in source.metadata["freshness"]: mu = source.metadata["freshness"][parameter] else: mu = mc.FRESH[parameter] # a file of file locs for aggregation elif filename.parent == mc.CACHE_PATH_FILE_LOCS: if source is None: raise ValueError("source cannot be None for this freshness calculation.") parameter = "file_locs" # check for overriding freshness parameter in source metadata if "freshness" in source.metadata and parameter in source.metadata["freshness"]: mu = source.metadata["freshness"][parameter] else: mu = mc.FRESH[parameter] # a compiled catalog file elif filename.parent == mc.CACHE_PATH_COMPILED: mu = mc.FRESH["compiled"] return mu
[docs]def is_fresh(filename, source=None): """Check if file called filename is fresh. If filename doesn't exist, return False. Parameters ---------- filename : Path Filename to determine freshness source : Intake Source Source from which to check for an overriding freshness parameter. Is not used for "compiled" catalog files. Returns ------- Boolean True if fresh and False if not or if filename is not found. """ now = pd.Timestamp.today(tz="UTC") try: filetime = pd.Timestamp(filename.stat().st_mtime_ns).tz_localize("UTC") mu = get_fresh_parameter(filename, source=source) return now - filetime < pd.Timedelta(mu) except FileNotFoundError: return False
[docs]def find_bbox(ds, dd=None, alpha=None): """Determine bounds and boundary of model. Parameters ---------- ds: Dataset xarray Dataset containing model output. dd: int, optional Number to decimate model output lon/lat, as a stride. alpha: float, optional Number for alphashape to determine what counts as the convex hull. Larger number is more detailed, 1 is a good starting point. Returns ------- List Contains the name of the longitude and latitude variables for ds, geographic bounding box of model output (`[min_lon, min_lat, max_lon, max_lat]`), low res and high res wkt representation of model boundary. """ import shapely.geometry hasmask = False try: lon = ds.cf["longitude"].values lat = ds.cf["latitude"].values lonkey = ds.cf["longitude"].name latkey = ds.cf["latitude"].name except KeyError: if "lon_rho" in ds: lonkey = "lon_rho" latkey = "lat_rho" else: lonkey = list(ds.cf[["longitude"]].coords.keys())[0] # need to make sure latkey matches lonkey grid latkey = f"lat{lonkey[3:]}" # In case there are multiple grids, just take first one; # they are close enough lon = ds[lonkey].values lat = ds[latkey].values # check for corresponding mask (rectilinear and curvilinear grids) if any([var for var in ds.data_vars if "mask" in var]): if ("mask_rho" in ds) and (lonkey == "lon_rho"): maskkey = lonkey.replace("lon", "mask") elif "mask" in ds: maskkey = "mask" else: maskkey = None if maskkey in ds: lon = ds[lonkey].where(ds[maskkey] == 1).values lon = lon[~np.isnan(lon)].flatten() lat = ds[latkey].where(ds[maskkey] == 1).values lat = lat[~np.isnan(lat)].flatten() hasmask = True # This is structured, rectilinear # GFS, RTOFS, HYCOM if (lon.ndim == 1) and ("nele" not in ds.dims) and not hasmask: nlon, nlat = ds["lon"].size, ds["lat"].size lonb = np.concatenate(([lon[0]] * nlat, lon[:], [lon[-1]] * nlat, lon[::-1])) latb = np.concatenate((lat[:], [lat[-1]] * nlon, lat[::-1], [lat[0]] * nlon)) # boundary = np.vstack((lonb, latb)).T p = shapely.geometry.Polygon(zip(lonb, latb)) p0 = p.simplify(1) # Now using the more simplified version because all of these models are boxes p1 = p0 elif hasmask or ("nele" in ds.dims): # unstructured assertion = ( "dd and alpha need to be defined in the catalog metadata for this model." ) assert dd is not None and alpha is not None, assertion # need to calculate concave hull or alphashape of grid import alphashape # downsample a bit to save time, still should clearly see shape of domain lon, lat = lon[::dd], lat[::dd] pts = list(zip(lon, lat)) # low res, same as convex hull p0 = alphashape.alphashape(pts, 0.0) p1 = alphashape.alphashape(pts, alpha) # useful things to look at: p.wkt #shapely.geometry.mapping(p) return lonkey, latkey, list(p0.bounds), p1.wkt
# return lonkey, latkey, list(p0.bounds), p0.wkt, p1.wkt
[docs]def filedates2df(filelocs): """Set up dataframe of datetimes to filenames. Parameters ---------- filelocs : list of str File locations. Returns ------- DataFrame Contains the index datetimes corresponding to file locations (column 'filenames'). """ # 1+ number of times possible from mc.file2dt, need the number of filenames to match filedates, filenames = [], [] for fname in filelocs: filedate = mc.astype(mc.file2dt(fname), list) filenames.extend([fname] * len(filedate)) filedates.extend(filedate) # Make dataframe df = pd.DataFrame(index=filedates, data={"filenames": filenames}) # Sort resulting df by filenames and then by index which is the datetime of each file df = df.reset_index().sort_values(by=["index", "filenames"]).set_index("index") # df = df.sort_values(axis="index", by="filenames").sort_index() # remove rows if index is duplicated, sorting makes it so nowcast files are kept df = df[~df.index.duplicated(keep="last")] return df
[docs]def agg_for_date(date, strings, filetype, is_forecast=False, pattern=None): """Select NOAA OFS-style nowcast/forecast files for aggregation. This function finds the files whose path includes the given date, regardless of times which might change the date forward or backward. Parameters ---------- date: str of datetime, pd.Timestamp Date of day to find model output files for. Doesn't pay attention to hours/minutes seconds. strings: list List of strings to be filtered. Expected to be file locations from a thredds catalog. filetype: str Which filetype to use. Every NOAA OFS model has "fields" available, but some have "regulargrid" or "2ds" also. This availability information is in the catalog metadata for the model under `filetypes` metadata. is_forecast: bool, optional If True, then date is the last day of the time period being sought and the forecast files should be brought in along with the nowcast files, to get the model output the length of the forecast out in time. The forecast files brought in will have the latest timing cycle of the day that is available. If False, all nowcast files (for all timing cycles) are brought in. pattern: str, optional If a model file pattern doesn't match that assumed in this code, input one that will work. Currently only NYOFS doesn't match but the pattern is built into the catalog file. Returns ------- List Contains URLs for where to find all of the model output files that match the keyword arguments. List is not sorted correctly for times (this happens later). """ date = astype(date, pd.Timestamp) if pattern is None: pattern = date.strftime(f"*{filetype}*.n*.%Y%m%d.t??z.*") else: pattern = eval(f"f'{pattern}'") # if using forecast, find nowcast and forecast files for the latest full timing cycle if is_forecast: import re # Find the most recent, complete timing cycle for the forecast day regex = re.compile(".t[0-9]{2}z.") # substrings: list of repeated strings of hours, e.g. ['12', '06', '00', '12', ...] subs = [substr[2:4] for substr in regex.findall("".join(strings))] # unique str times, in increasing order, e.g. ['00', '06', '12'] times = sorted(list(set(subs))) # choose the timing cycle that is latest but also has the most times available # sometimes the forecast files aren't available yet so don't want to use that time cycle = sorted(times, key=subs.count)[-1] # noqa: F841 # find all nowcast files with only timing "cycle" # replace '.t??z.' in pattern with '.t{cycle}z.' to get the latest timing cycle only pattern1 = pattern.replace(".t??z.", ".t{cycle}z.").replace(".n*.", ".[n,f]*.") pattern1 = eval(f"f'{pattern1}'") # use `eval` to sub in value of `cycle` # sort nowcast files alone to get correct time order fnames_cycle = fnmatch.filter(strings, pattern1) # Include the nowcast files between the start of the day and when the time series # represented in fnames begins # fnames_now = find_nowcast_cycles(strings, pattern) fnames_nowcast = fnmatch.filter(strings, pattern) fnames = fnames_cycle + fnames_nowcast # combine if len(fnames) == 0: raise ValueError( f"Error finding filenames. Filenames found so far: {fnames}. " "Maybe you have the wrong source for the days requested." ) # if not using forecast, find all nowcast files matching pattern else: fnames = fnmatch.filter(strings, pattern) return fnames
[docs]def find_catrefs(catloc): """Find hierarchy of catalog references for thredds catalog. Parameters ---------- catloc: str Search in thredds catalog structure from base catalog, catloc. Returns ------- list Contains tuples containing the hierarchy of directories in the thredds catalog structure to get to where the datafiles start. """ # 0th level catalog cat = TDSCatalog(catloc) catrefs_to_check = list(cat.catalog_refs) # only keep numerical directories catrefs_to_check = [catref for catref in catrefs_to_check if catref.isnumeric()] # 1st level catalog catrefs_to_check1 = [ cat.catalog_refs[catref].follow().catalog_refs for catref in catrefs_to_check ] # Combine the catalog references together catrefs = [ (catref0, catref1) for catref0, catrefs1 in zip(catrefs_to_check, catrefs_to_check1) for catref1 in catrefs1 ] # Check first one to see if there are more catalog references or not cat_ref_test = ( cat.catalog_refs[catrefs[0][0]] .follow() .catalog_refs[catrefs[0][1]] .follow() .catalog_refs ) # If there are more catalog references, run another level of catalog and combine, ## 2 if len(cat_ref_test) > 0: catrefs2 = [ cat.catalog_refs[catref[0]] .follow() .catalog_refs[catref[1]] .follow() .catalog_refs for catref in catrefs ] catrefs = [ (catref[0], catref[1], catref22) for catref, catref2 in zip(catrefs, catrefs2) for catref22 in catref2 ] return catrefs
[docs]def find_filelocs(catref, catloc, filetype="fields"): """Find thredds file locations. Parameters ---------- catref: tuple 2 or 3 labels describing the directories from catlog to get the data locations. catloc: str Base thredds catalog location. filetype: str Which filetype to use. Every NOAA OFS model has "fields" available, but some have "regulargrid" or "2ds" also (listed in separate catalogs in the model name). Returns ------- list Locations of files found from catloc to hierarchical location described by catref. """ filelocs = [] cat = TDSCatalog(catloc) if len(catref) == 2: catref1, catref2 = catref cat1 = cat.catalog_refs[catref1].follow() cat2 = cat1.catalog_refs[catref2].follow() last_cat = cat2 datasets = cat2.datasets elif len(catref) == 3: catref1, catref2, catref3 = catref cat1 = cat.catalog_refs[catref1].follow() cat2 = cat1.catalog_refs[catref2].follow() cat3 = cat2.catalog_refs[catref3].follow() last_cat = cat3 datasets = cat3.datasets for dataset in datasets: if ( "stations" not in dataset and "vibrioprob" not in dataset and filetype in dataset ): url = last_cat.datasets[dataset].access_urls["OPENDAP"] filelocs.append(url) return filelocs
[docs]def calculate_boundaries(cats, save_files=True, return_boundaries=False): """Calculate boundary information for all models. This loops over all input catalogs and will try with multiple model_source if necessary (in case servers aren't working) to access the example model output files and calculate the bounding box and numerical domain boundary. The numerical domain boundary is calculated using `alpha_shape` with previously-chosen parameters stored in the original model catalog files. The bounding box and boundary string representation (as WKT) are then saved to files. The files are saved the first time you run this function, so this function should only be rerun if you suspect that a model domain has changed or you have a new model catalog. Parameters ---------- cats : Catalog, list of Catalogs The Catalog or Catalogs for which to find boundaries. save_files : boolean, optional Whether to save files or not. Defaults to True. Saves to ``mc.FILE_PATH_BOUNDARIES(cat_loc.name)``. return_boundaries : boolean, optional Whether to return boundaries information from this call. Defaults to False. Examples -------- Calculate boundary information for CBOFS: >>> import model_catalogs as mc >>> main_cat = mc.setup() >>> mc.calculate_boundaries(main_cat["CBOFS"]) """ # loop over all orig catalogs boundaries = {} for cat in mc.astype(cats, list): # loop over available sources and use the first that works for model_source in list(cat): source_orig = cat[model_source] source_transform = mc.transform_source(source_orig) # need to make catalog to transfer information properly from # source_orig to source_transform cat_transform = mc.make_catalog( source_transform, full_cat_name=cat.name, # model name full_cat_description=cat.description, full_cat_metadata=cat.metadata, cat_driver=mc.process.DatasetTransform, cat_path=None, save_catalog=False, ) if cat_transform[model_source].status: # read in model output ds = cat_transform[model_source].to_dask() break # find boundary information for model if "alpha_shape" in cat.metadata: dd, alpha = cat.metadata["alpha_shape"] else: dd, alpha = None, None lonkey, latkey, bbox, wkt = mc.find_bbox(ds, dd=dd, alpha=alpha) ds.close() # save boundary info to file if save_files: with open(mc.FILE_PATH_BOUNDARIES(cat.name.lower()), "w") as outfile: yaml.dump({"bbox": bbox, "wkt": wkt}, outfile, default_flow_style=False) if return_boundaries: boundaries[cat.name] = {"bbox": bbox, "wkt": wkt} if return_boundaries: return boundaries