Validation#

In order to validate our results, this notebook compare our results with the flood mapping from the ‘Global Flood Monitoring’(GFM) for the same area and time range.

from importlib.resources import files

import holoviews as hv
import hvplot.xarray  # noqa
import numpy as np
import pystac_client
import rioxarray  # noqa
import xarray as xr
from dask.distributed import Client
from dask_flood_mapper import flood
from odc import stac as odc_stac
# define time range and area
time_range = "2023-10-11/2023-10-25"
bounding_box = [12.3, 54.3, 13.1, 54.6]
client = Client(processes=False, threads_per_worker=2, n_workers=1, memory_limit="12GB")

As usual we perform Dask based flood mapping.

fd = flood.decision(bbox=bounding_box, datetime=time_range).compute()
fd
---------------------------------------------------------------------------
CPLE_HttpResponseError                    Traceback (most recent call last)
File rasterio/_base.pyx:310, in rasterio._base.DatasetBase.__init__()

File rasterio/_base.pyx:221, in rasterio._base.open_dataset()

File rasterio/_err.pyx:359, in rasterio._err.exc_wrap_pointer()

CPLE_HttpResponseError: CURL error: Failed to connect to data.eodc.eu port 443 after 133 ms: Couldn't connect to server

During handling of the above exception, another exception occurred:

RasterioIOError                           Traceback (most recent call last)
Cell In[4], line 1
----> 1 fd = flood.decision(bbox=bounding_box, datetime=time_range).compute()
      2 fd

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/dask_flood_mapper/flood.py:156, in decision(bbox, datetime, dynamic, keep_masks)
     48 def decision(
     49     bbox: tuple[float, float, float, float],
     50     datetime: str,
   (...)
     53     keep_masks: bool = False,
     54 ) -> xr.DataArray:
     55     """Bayesian Flood Decision.
     56 
     57     Classify Sentinel-1 radar images by simple Bayes inference into flood (1)
   (...)
    154 
    155     """  # noqa: E501
--> 156     sig0_dc, hpar_dc, plia_dc = preprocess(bbox, datetime, dynamic=dynamic)
    157     flood_dc: xr.Dataset = calculate_flood_dc(sig0_dc, plia_dc, hpar_dc)
    158     flood_dc["wbsc"] = calc_water_likelihood(flood_dc)  # Water

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/dask_flood_mapper/flood.py:298, in preprocess(bbox, datetime, dynamic)
    296 items_sig0: ItemCollection = search.item_collection()
    297 sig0_dc: xr.Dataset = prepare_dc(items_sig0, bbox, bands="VV")
--> 298 sig0_dc, orbit_sig0 = process_sig0_dc(sig0_dc, items_sig0, bands="VV")
    299 print("sigma naught datacube processed")  # noqa: T201
    301 if dynamic:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/dask_flood_mapper/processing.py:55, in process_sig0_dc(sig0_dc, items_sig0, bands)
     48 def process_sig0_dc(
     49     sig0_dc: xr.Dataset,
     50     items_sig0: ItemCollection,
     51     bands: str | list[str],
     52 ) -> tuple[xr.Dataset, np.ndarray]:
     53     """Process the sig0 datacube."""
     54     sig0_dc: xr.Dataset = (
---> 55         post_process_eodc_cube(sig0_dc, items_sig0, bands)
     56         .rename_vars({"VV": "sig0"})
     57         .assign_coords(orbit=("time", extract_orbit_names(items_sig0)))
     58         .dropna(dim="time", how="all")
     59         .sortby("time")
     60     )
     61     orbit_sig0: np.ndarray = order_orbits(sig0_dc)
     62     sig0_dc: xr.Dataset = sig0_dc.groupby("time").mean(skipna=True)

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/xarray/util/deprecation_helpers.py:115, in _deprecate_positional_args.<locals>._decorator.<locals>.inner(*args, **kwargs)
    111     kwargs.update({name: arg for name, arg in zip_args})
    113     return func(*args[:-n_extra_args], **kwargs)
--> 115 return func(*args, **kwargs)

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/xarray/core/dataset.py:6356, in Dataset.dropna(self, dim, how, thresh, subset)
   6354     if dim in array.dims:
   6355         dims = [d for d in array.dims if d != dim]
-> 6356         count += np.asarray(array.count(dims))
   6357         size += math.prod([self.sizes[d] for d in dims])
   6359 if thresh is not None:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/xarray/core/common.py:165, in AbstractArray.__array__(self, dtype)
    164 def __array__(self: Any, dtype: DTypeLike | None = None) -> np.ndarray:
--> 165     return np.asarray(self.values, dtype=dtype)

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/xarray/core/variable.py:525, in Variable.values(self)
    522 @property
    523 def values(self):
    524     """The variable's data as a numpy.ndarray"""
--> 525     return _as_array_or_item(self._data)

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/xarray/core/variable.py:323, in _as_array_or_item(data)
    309 def _as_array_or_item(data):
    310     """Return the given values as a numpy array, or as an individual item if
    311     it's a 0d datetime64 or timedelta64 array.
    312 
   (...)
    321     TODO: remove this (replace with np.asarray) once these issues are fixed
    322     """
--> 323     data = np.asarray(data)
    324     if data.ndim == 0:
    325         if data.dtype.kind == "M":

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/dask/array/core.py:1693, in Array.__array__(self, dtype, **kwargs)
   1692 def __array__(self, dtype=None, **kwargs):
-> 1693     x = self.compute()
   1694     if dtype and x.dtype != dtype:
   1695         x = x.astype(dtype)

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
    351 def compute(self, **kwargs):
    352     """Compute this dask collection
    353 
    354     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    373     dask.compute
    374     """
--> 375     (result,) = compute(self, traverse=False, **kwargs)
    376     return result

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    658     postcomputes.append(x.__dask_postcompute__())
    660 with shorten_traceback():
--> 661     results = schedule(dsk, keys, **kwargs)
    663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/odc/loader/_builder.py:436, in _dask_loader_tyx()
    434 with rdr.restore_env(env, load_state):
    435     for ti, ti_srcs in enumerate(srcs):
--> 436         _fill_nd_slice(
    437             ti_srcs, gbox, cfg, chunk[ti], ydim=ydim, selection=selection
    438         )
    439     return chunk

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/odc/loader/_builder.py:513, in _fill_nd_slice()
    510     return dst
    512 src, *rest = srcs
--> 513 yx_roi, pix = src.read(cfg, dst_gbox, dst=dst, selection=selection)
    514 assert len(yx_roi) == 2
    515 assert pix.ndim == dst.ndim

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/odc/loader/_rio.py:115, in read()
    107 def read(
    108     self,
    109     cfg: RasterLoadParams,
   (...)
    113     selection: Optional[ReaderSubsetSelection] = None,
    114 ) -> tuple[tuple[slice, slice], np.ndarray]:
--> 115     return rio_read(self._src, cfg, dst_geobox, dst=dst, selection=selection)

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/odc/loader/_rio.py:526, in rio_read()
    520     if cfg.fail_on_error:
    521         log.error(
    522             "Aborting load due to failure while reading: %s:%d",
    523             src.uri,
    524             src.band,
    525         )
--> 526         raise e
    527 except rasterio.errors.RasterioError as e:
    528     if cfg.fail_on_error:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/odc/loader/_rio.py:512, in rio_read()
    508     return roi, out.transpose([1, 2, 0])
    510 try:
    511     return fixup_out(
--> 512         _rio_read(src, cfg, dst_geobox, prep_dst(dst), selection=selection)
    513     )
    514 except (
    515     rasterio.errors.RasterioIOError,
    516     rasterio.errors.RasterBlockError,
    517     rasterio.errors.WarpOperationError,
    518     rasterio.errors.WindowEvaluationError,
    519 ) as e:
    520     if cfg.fail_on_error:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/odc/loader/_rio.py:560, in _rio_read()
    549 def _rio_read(
    550     src: RasterSource,
    551     cfg: RasterLoadParams,
   (...)
    556     # if resampling is `nearest` then ignore sub-pixel translation when deciding
    557     # whether we can just paste source into destination
    558     ttol = 0.9 if cfg.nearest else 0.05
--> 560     with rasterio.open(src.uri, "r", sharing=False) as rdr:
    561         assert isinstance(rdr, rasterio.DatasetReader)
    562         ovr_idx: Optional[int] = None

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/rasterio/env.py:463, in wrapper()
    460     session = DummySession()
    462 with env_ctor(session=session):
--> 463     return f(*args, **kwds)

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/rasterio/__init__.py:356, in open()
    353     path = _parse_path(raw_dataset_path)
    355 if mode == "r":
--> 356     dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
    357 elif mode == "r+":
    358     dataset = get_writer_for_path(path, driver=driver)(
    359         path, mode, driver=driver, sharing=sharing, **kwargs
    360     )

File rasterio/_base.pyx:312, in rasterio._base.DatasetBase.__init__()

RasterioIOError: CURL error: Failed to connect to data.eodc.eu port 443 after 133 ms: Couldn't connect to server

We mask the water bodies.

data_text = files("dask_flood_mapper.data").joinpath("wcover.nc")
wcover = xr.open_dataarray(data_text, decode_coords="all")
fd = fd.where(wcover != 80)

Reference Flood Map (GFM)#

As a reference of the validation we use the TUW component of the GFM service. This data can be accessed through STAC from EODC.

# Connect to STAC catalog
eodc_catalog = pystac_client.Client.open("https://stac.eodc.eu/api/v1/")

# Search for available images
search = eodc_catalog.search(collections="GFM", bbox=bounding_box, datetime=time_range)
items_GFM = search.item_collection()

crs = "EPSG:4326"

GFM_fd = odc_stac.load(
    items_GFM,
    bbox=bounding_box,
    crs=crs,
    bands=["tuw_flood_extent"],
    resolution=fd.rio.resolution()[0],
).tuw_flood_extent

# for accuracy overwrite the coordinates
GFM_fd = GFM_fd.assign_coords(
    {
        "longitude": fd.longitude.data,
        "latitude": fd.latitude.data,
    }
)

# no data
GFM_fd = GFM_fd.where(GFM_fd != GFM_fd.rio.nodata)

# mask water bodies
GFM_fd = GFM_fd.where(wcover != 80)

Validation plot#

Finally we can compare the results of the Dask flood map implementation with the TU Wien component of GFM.

common_times = np.intersect1d(GFM_fd.time.values, fd.time.values)


def synced_plot(t):
    plot1 = GFM_fd.sel(time=t).hvplot.image(
        x="longitude",
        y="latitude",
        title="GFM flood map",
        cmap=["rgba(0, 0, 1, 0.1)", "darkblue"],
    )
    plot2 = fd.sel(time=t).hvplot.image(
        x="longitude",
        y="latitude",
        title="dask-flood-mapper",
        cmap=["rgba(0, 0, 1, 0.1)", "darkblue"],
    )

    return (plot1 + plot2).cols(2)


hv.DynamicMap(synced_plot, kdims="time").redim.values(time=common_times)