import numpy as np
import xarray as xr
from datetime import datetime
import os
from openeo_flood_mapper_local.view_flood_map import view_flood_map
import openeo
from openeo.processes import ProcessBuilder, array_element, add, multiply, sin, cos, mask, exp, median
2 openEO remote processing
In this second exercise we use the EODC openEO service as our data source, and, more importantly, as our processing center.
2.1 Setting-up a Python session
We again begin by loading openEO and some additional packages.
2.2 Connect to the EODC openEO Backend
Establish a connection to the EODC backend with openeo.connect()
. This results in a connection object which is a critical aspect of collection discovery on the backend by openEO.
= "https://openeo.eodc.eu"
backend = openeo.connect(backend)
connection connection.authenticate_oidc()
Authenticated using refresh token.
<Connection to 'https://openeo.eodc.eu/openeo/1.1.0/' with OidcBearerAuth>
2.3 Load Collections from the EODC
We can first have a look at the metadata available at the EODC for the required collections (SENTINEL1_SIG0_20M, SENTINEL1_HPAR, and SENTINEL1_MPLIA), like so:
"SENTINEL1_SIG0_20M") connection.collection_metadata(
We can then load the collections. This is done by using the method load_collection()
and by using the collection ids as defined above. During collection loading we also do some initial filtering on the spatial and temporal extent. More importantly, we have to filter SENTINEL1_MPLIA and SENTINEL1_HPAR for the descending orbit “D080” to be able to calculate the correct reference backscatter signatures. So, we use the following criteria for filtering.
= {"west": 21.93, "south": 39.47, "east": 22.23, "north": 39.64}
spatial_extent = ["2018-02-28T04:00:00Z", "2018-02-28T05:00:00Z"]
sensing_date = {
props "sat:orbit_state": lambda x: openeo.processes.eq(x, "descending"),
"sat:relative_orbit": lambda x: openeo.processes.eq(x, 80)
}
= connection.load_collection(
sig0_dc "SENTINEL1_SIG0_20M",
= spatial_extent,
spatial_extent = sensing_date,
temporal_extent =["VV"]
bands\
). mean_time()
"SENTINEL1_HPAR") connection.collection_metadata(
= connection.load_collection(
hparam_dc "SENTINEL1_HPAR",
= spatial_extent,
spatial_extent = "2019",
temporal_extent =props
properties\
). mean_time()
/home/mschobbe/miniconda3/envs/openeo-flood-mapper-local/lib/python3.10/site-packages/openeo/rest/connection.py:1188: UserWarning: SENTINEL1_HPAR property filtering with properties that are undefined in the collection metadata (summaries): sat:orbit_state, sat:relative_orbit.
return DataCube.load_collection(
"SENTINEL1_MPLIA") connection.collection_metadata(
= connection.load_collection(
plia_dc "SENTINEL1_MPLIA",
= spatial_extent,
spatial_extent = ["2020-01-01", "2020-12-31"],
temporal_extent =["MPLIA"],
bands=props
properties\
). mean_time()
/home/mschobbe/miniconda3/envs/openeo-flood-mapper-local/lib/python3.10/site-packages/openeo/rest/connection.py:1188: UserWarning: SENTINEL1_MPLIA property filtering with properties that are undefined in the collection metadata (summaries): sat:orbit_state, sat:relative_orbit.
return DataCube.load_collection(
2.4 openEO Analysis at the EODC
The remainder of the worklfow is similar to the local processing with some minor differences associated to naming of the objects.
def water_backscatter(plia_dc):
return (plia_dc.band('MPLIA') * -0.394181 + -4.142015). \
'bands', 'wbsc', 'bands') add_dimension(
Here we apply again the water_backscatter()
function to the incidence angle datacube, as follows:
= water_backscatter(plia_dc)
water_bsc_dc water_bsc_dc
To initiate the processing we create a batch job with the create_job()
method. This performs the data processing based on the JSON representation of the processing graph. Only by submitting this job to the EODC backend, we can actually perform the processing, like so:
= water_bsc_dc.save_result(format = "NetCDF")
water_bsc_res = water_bsc_res.create_job(title = "water_bsc_greece_flood_2018_as_NetCDF_py")
water_bsc_job water_bsc_job.start_job()
We can then download the results.
"data/watter_backscatter/example.nc") water_bsc_job.download_result(
And view the retrieved data.
= xr.open_dataset("data/watter_backscatter/example.nc")
water_bsc_dc water_bsc_dc
The following code is a duplicate of the local flood mapping processing pipeline, where we define the harmonic model.
def harmonic_expected_backscatter(data, dtime_str):
= np.pi * 2 / 365
w = datetime.strptime(dtime_str, "%Y-%m-%d")
dt = dt.timetuple().tm_yday
t = w * t
wt
= data.band('M0')
M0 = data.band('S1')
S1 = data.band('S2')
S2 = data.band('S3')
S3 = data.band('C1')
C1 = data.band('C2')
C2 = data.band('C3')
C3 = (M0 + S1 * np.sin(wt)) + (C1 * np.cos(wt))
hm_c1 = ((hm_c1 + S2 * np.sin(2 * wt)) + C2 * np.cos(2 * wt))
hm_c2 = ((hm_c2 + S3 * np.sin(3 * wt)) + C3 * np.cos(3 * wt))
hm_c3 return hm_c3.add_dimension('bands', 'hbsc', 'bands')
Perform this function on the datacube for the time slice of the flooding event.
= harmonic_expected_backscatter(hparam_dc, '2018-02-01') land_bsc_dc
In turn, we define the Bayesian classification model, as follows:
def bayesian_flood_decision(x: ProcessBuilder) -> ProcessBuilder:
= 2.754041
nf_std = x.array_element(index=0)
sig0 = x.array_element(index=1)
std = x.array_element(index=2)
wbsc = x.array_element(index=3)
hbsc
= (1.0 / (std * np.sqrt(2 * np.pi))) * np.exp(-0.5 * \
f_prob - wbsc) / nf_std) ** 2))
(((sig0 = (1.0 / (nf_std * np.sqrt(2 * np.pi))) * np.exp(-0.5 * \
nf_prob - hbsc) / nf_std) ** 2))
(((sig0
= (nf_prob * 0.5) + (f_prob * 0.5)
evidence = (f_prob * 0.5) / evidence
f_post_prob = (nf_prob * 0.5) / evidence
nf_post_prob
# flood if flood class has higher probability
return f_post_prob.gt(nf_post_prob)
And then execute this on a combined data cube, as follows:
= hparam_dc.band('STD').add_dimension('bands', 'std', 'bands')
std_dc = sig0_dc.reduce_bands('mean').add_dimension('bands', 'sig0', 'bands')
sig0_dc
= sig0_dc. \
decision_in_dc \
merge_cubes(std_dc). \
merge_cubes(water_bsc_dc). \
merge_cubes(land_bsc_dc) .
merge_cubes(plia_dc)
= decision_in_dc.reduce_bands(bayesian_flood_decision). \
flood_dc 'bands', 'dec', 'bands')
add_dimension(
= flood_dc.merge_cubes(decision_in_dc) flood_dc
Finally, we can again proceed and send this processing pipeline of to the EODC.
= flood_dc.save_result(format = "NetCDF")
flood_res = flood_res.create_job(title = "flood_greece_flood_2018_as_NetCDF_py")
flood_job flood_job.start_job()
And retrieve the data, like so:
"data/thessaly_floodmap.nc") flood_job.download_result(
Now let’s have a look at the processing job performed at EODC.
"data/thessaly_floodmap.nc").dec) view_flood_map(xr.open_dataset(
We can than also again extend this pipeline to include the postprocessing steps.
- Masking of Exceeding Incidence Angles
= (flood_dc.band("MPLIA") >= 27) * (flood_dc.band("MPLIA") <= 48)
mask_ia = flood_dc * mask_ia flood_dc
Code
= flood_dc.save_result(format = "NetCDF")
flood_res = flood_res.create_job(title = "flood_greece_flood_2018_as_NetCDF_py")
flood_job flood_job.start_job()
Code
"data/thessaly_floodmap_plia.nc") flood_job.download_result(
- Identification of Conflicting Distributions
= flood_dc.band("wbsc") + 0.5 * 2.754041
water_bsc_threshold = flood_dc.band("hbsc") > water_bsc_threshold
mask_conflict = flood_dc * mask_conflict flood_dc
Code
= flood_dc.save_result(format = "NetCDF")
flood_res = flood_res.create_job(title = "flood_greece_flood_2018_as_NetCDF_py")
flood_job flood_job.start_job()
Code
"data/thessaly_floodmap_plia_distr.nc") flood_job.download_result(
- Removal of Measurement Outliers
= flood_dc.band("hbsc") - 3 * flood_dc.band("std")
land_bsc_lower = flood_dc.band("hbsc") + 3 * flood_dc.band("std")
land_bsc_upper = flood_dc.band("wbsc") + 3 * 2.754041
water_bsc_upper
= (flood_dc.band("sig0") > land_bsc_lower) * (flood_dc.band("sig0") < land_bsc_upper)
mask_land_outliers = flood_dc.band("sig0") < water_bsc_upper
mask_water_outliers = flood_dc * (mask_land_outliers | mask_water_outliers) flood_dc
Code
= flood_dc.save_result(format = "NetCDF")
flood_res = flood_res.create_job(title = "flood_greece_flood_2018_as_NetCDF_py")
flood_job flood_job.start_job()
Code
"data/thessaly_floodmap_plia_distr_out.nc") flood_job.download_result(
- Denial of High Uncertainty on Decision
= flood_dc.band("dec") > 0.8
mask_uncertainty = flood_dc * mask_uncertainty flood_dc
Code
= flood_dc.save_result(format = "NetCDF")
flood_res = flood_res.create_job(title = "flood_greece_flood_2018_as_NetCDF_py")
flood_job flood_job.start_job()
Code
"data/thessaly_floodmap_plia_distr_out_den.nc") flood_job.download_result(