Sentinel-2 → NDVI z-score cube → rolling-window tail-dependence vs center pixel → Lexcube
This end-to-end recipe mirrors the R-style tail-dependence workflow: stream
Sentinel-2 L2A data with cubo, compute NDVI, standardize per pixel, and
calculate rolling partial tail Spearman correlations against the center pixel.
The example uses a fixed 90-day window, optional spatial coarsening and time
striding to stay within memory limits, and visualizes the bottom-tail, top-tail,
and difference cubes with Lexcube.
Dependencies:
cubo,lexcube,xarray,rioxarray,matplotlib. Uncomment thepip installline if needed.
# Requirements (uncomment if needed in your environment):
# !pip install -q cubo lexcube xarray rioxarray
from __future__ import annotations
import warnings
import gc
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
import cubo
import lexcube
# -------------------------------------------------------------------
# Helper functions: ranks + partial tail Spearman
# -------------------------------------------------------------------
def _rank_1d(a: np.ndarray) -> np.ndarray:
"""
Simple 1D rank function (1..n) with 'ordinal' handling of ties.
For continuous NDVI, ties should be rare; if you need exact
'average' tie handling, swap in scipy.stats.rankdata.
"""
a = np.asarray(a)
order = np.argsort(a)
ranks = np.empty_like(order, dtype=float)
ranks[order] = np.arange(1, len(a) + 1, dtype=float)
return ranks
def partial_tail_spearman(
x: np.ndarray,
y: np.ndarray,
b: float = 0.5,
min_t: int = 5,
) -> tuple[float, float, float]:
"""
Partial Spearman correlations in lower and upper tails,
following Ghosh et al.-style decomposition.
x, y : 1D arrays (time series)
b : split parameter (e.g., 0.5 → lower vs upper half in (u+v) space)
min_t: minimum number of valid observations to compute a tail statistic
Returns:
(left_tail_corr, right_tail_corr, left_minus_right)
"""
x = np.asarray(x)
y = np.asarray(y)
# Drop NaNs
mask = np.isfinite(x) & np.isfinite(y)
x = x[mask]
y = y[mask]
Tot = x.size
if Tot < min_t:
return np.nan, np.nan, np.nan
# Normalized ranks u, v in (0,1) approximately
u = _rank_1d(x) / (Tot + 1.0)
v = _rank_1d(y) / (Tot + 1.0)
uv_sum = u + v
# Lower (left) tail and upper (right) tail masks
left_mask = (uv_sum > 0.0) & (uv_sum < 2.0 * b)
right_mask = (uv_sum > 2.0 * b) & (uv_sum < 2.0)
# Global means & variances (as in the R code)
u_mean = u.mean()
v_mean = v.mean()
u_var = np.var(u, ddof=1)
v_var = np.var(v, ddof=1)
denom = (Tot - 1.0) * np.sqrt(u_var * v_var)
if denom <= 0.0:
return np.nan, np.nan, np.nan
# Left (bottom) tail
if left_mask.sum() >= min_t:
cor_left = np.sum((u[left_mask] - u_mean) * (v[left_mask] - v_mean)) / denom
else:
cor_left = np.nan
# Right (top) tail
if right_mask.sum() >= min_t:
cor_right = np.sum((u[right_mask] - u_mean) * (v[right_mask] - v_mean)) / denom
else:
cor_right = np.nan
return cor_left, cor_right, cor_left - cor_right
# -------------------------------------------------------------------
# 0) User parameters
# -------------------------------------------------------------------
LAT = 43.89 # Kyle, South Dakota
LON = -102.18
START = "2023-06-01"
END = "2024-09-30"
EDGE_SIZE = 1028 # raw pixels on a side
RESOLUTION = 10 # meters
CLOUD_LT = 40 # max global cloud cover %
STD_EPS = 1e-4 # min std for valid z-scores
MIN_T = 5 # min time steps inside a window before computing stats
# 💾 MEMORY CONTROL KNOBS
COARSEN_FACTOR = 4 # 1 = none, 2 = 2x2 blocks, 4 = 4x4, etc.
TIME_STRIDE = 2 # 1 = keep all times, 2 = every other, etc.
# Rolling window parameters (fixed window length in days)
WINDOW_DAYS = 90
# Tail dependence parameter
TAIL_B = 0.5 # split parameter in (u+v) space, analogous to b in the R code
# Display parameters
CORR_VMIN, CORR_VMAX = -1.0, 1.0
CORR_CMAP = "RdBu_r"
# -------------------------------------------------------------------
# 1) Build Sentinel-2 cube (B04=Red, B08=NIR)
# -------------------------------------------------------------------
with warnings.catch_warnings():
warnings.simplefilter("ignore")
s2 = cubo.create(
lat=LAT,
lon=LON,
collection="sentinel-2-l2a",
bands=["B04", "B08"],
start_date=START,
end_date=END,
edge_size=EDGE_SIZE,
resolution=RESOLUTION,
query={"eo:cloud_cover": {"lt": CLOUD_LT}},
)
# Ensure dims ordering is (time, y, x, band)
if "band" in s2.dims and s2.dims[0] == "band":
s2 = s2.transpose("time", "y", "x", "band")
# Optional chunking for dask-based workflows
s2 = s2.chunk({"time": -1, "y": 256, "x": 256})
print("Raw Sentinel-2 cube:", s2)
# -------------------------------------------------------------------
# 2) Compute NDVI through time (NO spyndex)
# -------------------------------------------------------------------
NIR = s2.sel(band="B08") # (time, y, x)
RED = s2.sel(band="B04")
eps = 1e-6
ndvi = (NIR - RED) / (NIR + RED + eps)
ndvi = ndvi.rename("ndvi").astype("float32")
ndvi.attrs.update({
"long_name": "Normalized Difference Vegetation Index",
"formula": "(B08 - B04) / (B08 + B04)",
"bands": "B08(NIR), B04(Red)",
})
# Free S2 cube to save memory
del s2, NIR, RED
gc.collect()
print("NDVI cube shape:", ndvi.shape)
# -------------------------------------------------------------------
# 3) NDVI z-scores per pixel
# -------------------------------------------------------------------
ndvi_mean = ndvi.mean(dim="time", skipna=True)
ndvi_std = ndvi.std(dim="time", skipna=True)
valid_std = ndvi_std > STD_EPS
ndvi_z = xr.where(
valid_std,
(ndvi - ndvi_mean) / ndvi_std,
np.nan
)
ndvi_z = ndvi_z.rename("ndvi_zscore").astype("float32")
ndvi_z.attrs.update({
"long_name": "Standardized NDVI anomaly (z-score)",
"definition": "z = (NDVI - mean_t NDVI) / std_t NDVI at each pixel",
"baseline_period": f"{START} to {END}",
"note": f"Pixels with std < {STD_EPS} set to NaN.",
})
del ndvi, ndvi_mean, ndvi_std
gc.collect()
print("NDVI z-score cube shape (pre-downsampling):", ndvi_z.shape)
# -------------------------------------------------------------------
# 4) Optional spatial coarsening + temporal subsampling
# -------------------------------------------------------------------
if COARSEN_FACTOR > 1:
ndvi_z = ndvi_z.coarsen(
y=COARSEN_FACTOR,
x=COARSEN_FACTOR,
boundary="trim"
).mean()
ndvi_z.attrs["coarsen_factor"] = COARSEN_FACTOR
if TIME_STRIDE > 1:
ndvi_z = ndvi_z.isel(time=slice(0, None, TIME_STRIDE))
ndvi_z.attrs["time_stride"] = TIME_STRIDE
ndvi_z = ndvi_z.chunk({"time": -1, "y": 256, "x": 256})
time = ndvi_z["time"]
T = ndvi_z.sizes["time"]
print(f"NDVI z cube shape after downsampling: {ndvi_z.shape} (time, y, x)")
print(f"Number of time steps after downsampling: T = {T}")
# -------------------------------------------------------------------
# 5) Build tail-dependence cubes vs center pixel (fixed rolling window)
# -------------------------------------------------------------------
# Identify the center pixel on the coarsened grid
y_size = ndvi_z.sizes["y"]
x_size = ndvi_z.sizes["x"]
y_center_idx = y_size // 2
x_center_idx = x_size // 2
# Full time series of the reference (center) pixel
ref_full = ndvi_z.isel(y=y_center_idx, x=x_center_idx) # (time,)
print(
"Reference pixel (coarsened grid):",
f"y={float(ndvi_z['y'].values[y_center_idx])},",
f"x={float(ndvi_z['x'].values[x_center_idx])}"
)
print(f"Using rolling window of {WINDOW_DAYS} days.")
print(f"Minimum time steps per window: MIN_T = {MIN_T}")
print(f"Tail split parameter b = {TAIL_B}")
tail_left_slices = [] # bottom tail dependence
tail_right_slices = [] # top tail dependence
tail_diff_slices = [] # bottom - top
for ti in range(T):
t_end = time.values[ti]
t_start = t_end - np.timedelta64(WINDOW_DAYS, "D")
# Subset to the rolling window [t_start, t_end]
ndvi_sub = ndvi_z.sel(time=slice(t_start, t_end)) # (time, y, x)
ref_sub = ref_full.sel(time=slice(t_start, t_end)) # (time,)
# Require at least MIN_T observations in the window
if ndvi_sub.sizes["time"] < MIN_T:
continue
# Vectorized partial tail Spearman across (y, x)
tail_left, tail_right, tail_diff = xr.apply_ufunc(
partial_tail_spearman,
ndvi_sub,
ref_sub,
input_core_dims=[["time"], ["time"]],
output_core_dims=[[], [], []],
vectorize=True,
dask="parallelized",
kwargs={"b": TAIL_B, "min_t": MIN_T},
output_dtypes=[np.float32, np.float32, np.float32],
)
# Attach time coordinate (window end time) and collect slices
for arr, collection in [
(tail_left, tail_left_slices),
(tail_right, tail_right_slices),
(tail_diff, tail_diff_slices),
]:
arr = arr.expand_dims("time")
arr = arr.assign_coords(time=("time", [t_end]))
collection.append(arr)
del ndvi_sub, ref_sub, tail_left, tail_right, tail_diff
gc.collect()
if not tail_left_slices:
raise RuntimeError(
f"No tail-dependence slices computed; T={T}, MIN_T={MIN_T}, "
f"WINDOW_DAYS={WINDOW_DAYS}. Try reducing MIN_T or WINDOW_DAYS, "
"or ensuring enough valid observations."
)
# Concatenate into three cubes
tail_left_cube = xr.concat(tail_left_slices, dim="time").rename("tail_dep_bottom")
tail_right_cube = xr.concat(tail_right_slices, dim="time").rename("tail_dep_top")
tail_diff_cube = xr.concat(tail_diff_slices, dim="time").rename("tail_dep_diff")
shared_attrs = {
"min_time_points": MIN_T,
"reference_pixel_y": float(ndvi_z["y"].values[y_center_idx]),
"reference_pixel_x": float(ndvi_z["x"].values[x_center_idx]),
"coarsen_factor": COARSEN_FACTOR,
"time_stride": TIME_STRIDE,
"window_days": WINDOW_DAYS,
"tail_b": TAIL_B,
"note": (
"Tail dependence from partial Spearman over normalized ranks (u, v); "
"left=bottom tail, right=top tail, diff=left-right."
),
}
tail_left_cube.attrs.update({
"long_name": "Bottom-tail partial Spearman vs center pixel",
**shared_attrs,
})
tail_right_cube.attrs.update({
"long_name": "Top-tail partial Spearman vs center pixel",
**shared_attrs,
})
tail_diff_cube.attrs.update({
"long_name": "Difference in tail dependence (bottom - top) vs center pixel",
**shared_attrs,
})
del ndvi_z, ref_full
gc.collect()
print("Tail-dependence cube shapes:",
"bottom:", tail_left_cube.shape,
"top:", tail_right_cube.shape,
"diff:", tail_diff_cube.shape)
# -------------------------------------------------------------------
# 6) QA: median tail-dependence difference over space through time
# -------------------------------------------------------------------
fig, ax = plt.subplots(figsize=(6, 3))
tail_diff_med = tail_diff_cube.median(dim=["y", "x"], skipna=True).to_series()
tail_diff_med.plot(ax=ax)
ax.axhline(0.0, ls="--", lw=1, alpha=0.7)
ax.set_ylabel("Median bottom - top tail dep")
ax.set_xlabel("Date")
ax.set_ylim(CORR_VMIN, CORR_VMAX)
ax.set_title(f"Median tail-dependence difference (rolling {WINDOW_DAYS} days)")
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# -------------------------------------------------------------------
# 7) Lexcube visualization (example: tail_diff_cube)
# -------------------------------------------------------------------
# Clip to the desired color range before plotting
corr_display = tail_diff_cube.clip(CORR_VMIN, CORR_VMAX)
w = lexcube.Cube3DWidget(
corr_display,
cmap=CORR_CMAP,
vmin=CORR_VMIN,
vmax=CORR_VMAX,
title=(
f"Tail-dependence difference (bottom - top) vs center pixel "
f"(rolling {WINDOW_DAYS} days) • coarsen={COARSEN_FACTOR} "
f"• t_stride={TIME_STRIDE} • b={TAIL_B}"
),
)
w.plot()
Tips
- Increase
TIME_STRIDEorCOARSEN_FACTORif memory becomes an issue. - Lower
MIN_TorWINDOW_DAYSif you see the runtime error about missing tail-dependence slices. - Swap
lexcube.Cube3DWidgetforpipe(... ) | v.show_cube_lexcube(...)if you prefer the CubeDynamics visualization verbs. - To target a different reference pixel, change the
y_center_idx/x_center_idxselections before computing the rolling window.