import os
import geopandas as gpd
import pandas as pd
import xarray as xr
import numpy as np
import sparse
import dask.array as da
import matplotlib.pyplot as plt
import matplotlib
import gcsfs
import folium
from folium import Choropleth
from IPython.display import display
from b3alien.utils.runtime import in_jupyter
matplotlib.use("Agg")
"""
Create a class object of the occurrence cube
"""
[docs]
class OccurrenceCube():
"""
Load a GeoParquet file (local or from GCS) into a sparse xarray cube.
Parameters
----------
filepath : str
Path to the GeoParquet file (e.g. 'gs://bucket/file.parquet').
dims : list or tuple, optional
Dimension names. Default is ['time', 'cell', 'species'].
coords : dict, optional
Optional coordinates to assign to the cube.
index_col : str or list, optional
Column(s) to use for reshaping if needed.
Returns
-------
b3cube.OccurrenceCube
A sparse data cube loaded from the GeoParquet file.
self.df contains a geopandas.DataFrame
self.data a sparse xarray.Xarray
"""
def __init__(self, filepath: str, source='geoparquet', gproject='', dims=None, coords=None, index_col=None):
self.filepath = filepath
self.source = source
self.gproject = gproject
self.dims = dims or ("time", "cell", "species")
self.coords = coords
self.index_col = index_col
# Load Data
if source == 'gbif':
self.df = self._load_gbifcsv(filepath)
else:
self.df = self._load_geoparquet(filepath, gproject)
# Create cube
self.data = self._create_xcube(self.df)
[docs]
def _load_gbifcsv(self, path):
"""
Load a GBIF CSV file from local disk using Pandas. Assumes tab-separated values.
"""
df = pd.read_csv(path, sep='\t')
return df
[docs]
def _load_geoparquet(self, path, gproject):
"""
Load a GeoParquet file from local disk or GCS using GeoPandas.
"""
if path.startswith("gs://"):
fs = gcsfs.GCSFileSystem(project=gproject)
with fs.open(path) as f:
gdf = gpd.read_parquet(f)
else:
gdf = gpd.read_parquet(path)
if 'geometry' not in gdf.columns:
raise ValueError("The input file must contain a 'geometry' column.")
# Ensure geometry is parsed and valid
gdf["geometry"] = gdf["geometry"].apply(wkt.loads) if gdf["geometry"].dtype == object else gdf["geometry"]
gdf = gdf.set_geometry("geometry")
return gdf
[docs]
def _create_xcube(self, df, dims=("time", "cell", "species")):
"""
This function converts a GeoDataFrame into a sparse xarray cube with geometry metadata in case a GeoParquet file was loaded.
In case of a pure GBIF cube, the geometry is ignored.
"""
# Convert to categorical
df["yearmonth"] = pd.Categorical(df["yearmonth"])
df["cellCode"] = pd.Categorical(df["cellCode"])
df["specieskey"] = pd.Categorical(df["specieskey"])
# Encode to integers
time_codes = df["yearmonth"].cat.codes.values
cell_codes = df["cellCode"].cat.codes.values
species_codes = df["specieskey"].cat.codes.values
# Build sparse cube
sparse_cube = sparse.COO(
coords=[time_codes, cell_codes, species_codes],
data=df["occurrences"].astype("float32").values,
shape=(
df["yearmonth"].cat.categories.size,
df["cellCode"].cat.categories.size,
df["specieskey"].cat.categories.size
)
)
# Prepare coordinates for the DataArray
coords = {
dims[0]: df["yearmonth"].cat.categories,
dims[1]: df["cellCode"].cat.categories,
dims[2]: df["specieskey"].cat.categories,
}
# Conditionally add geometry if the column exists
if "geometry" in df.columns:
cell_categories = df["cellCode"].cat.categories
geometry_per_cell = df.drop_duplicates("cellCode").set_index("cellCode").loc[cell_categories]["geometry"]
coords["geometry"] = (dims[1], geometry_per_cell.values)
# Create xarray DataArray
cube = xr.DataArray(
sparse_cube,
dims=self.dims,
coords=coords,
name="occurrences"
)
return cube
[docs]
def _species_richness(self, normalized=False):
"""
Calculate species richness per cell from the sparse cube.
Parameters
----------
normalized : bool
Whether to compute normalized richness (richness / total occurrences).
Returns
-------
None
Sets self.richness as a DataFrame with columns 'cell' and 'richness' or 'normalized_richness'.
"""
# 1. Binary presence
presence = (self.data > 0)
# 2. Collapse time dimension using logical OR → was the species *ever* seen in this cell?
presence_any_time = presence.any(dim="time") # shape: (cell, species)
# 3. Sum species per cell (species richness)
species_richness = presence_any_time.sum(dim="species") # shape: (cell,)
total_occurrences = self.data.sum(dim=["time", "species"])
if not normalized:
# 4. Get the non-zero values and indices
coords = species_richness.data.coords # (1D arrays of indices)
values = species_richness.data.data # the richness values
# 5. Convert integer cell indices to real labels (from .coords['cell'])
cell_labels = species_richness.coords["cell"].values
richness_df = pd.DataFrame({
"cell": cell_labels[coords[0]],
"richness": values
})
self.richness = richness_df
else:
epsilon = 1e-6
normalized_richness = species_richness / (total_occurrences + epsilon)
coords = normalized_richness.data.coords
values = normalized_richness.data.data
cell_labels = normalized_richness.coords["cell"].values
# Build a DataFrame
norm_df = pd.DataFrame({
"cell": cell_labels[coords[0]],
"normalized_richness": values
})
self.richness = norm_df
[docs]
def _filter_species(self, speciesKey):
"""
Filter the cube to only include data for a specific speciesKey.
Parameters
----------
speciesKey : int or str
The GBIF speciesKey to filter on.
Returns
-------
None
Updates self.df and self.data to only include the specified speciesKey.
"""
self.df = self.df[self.df['specieskey'].eq(speciesKey)]
self.data = self.data.sel(species=speciesKey)
[docs]
def find_correlations(cube: OccurrenceCube, top_n=10):
"""
Calculates species co-occurrence using efficient matrix operations.
This method identifies pairs of species that occur in the same 'cell'
at the same 'time'.
Parameters
----------
cube (OccurrenceCube): An instance of the OccurrenceCube class.
top_n (int): The number of top correlated pairs to return.
Returns
-------
list: A list of tuples, where each tuple contains the species names
and their co-occurrence count, sorted in descending order.
"""
if cube.data is None:
print("Data cube is not loaded. Cannot find correlations.")
return []
# 1. Create a boolean cube for presence (occurrence > 0)
# Using int32 to prevent overflow when summing large arrays.
presence = (cube.data > 0).astype('int32')
# 2. Compute the co-occurrence matrix using broadcasting.
# This multiplies the presence array with itself along a new 'species_copy' dimension
# and sums over the 'time' and 'cell' dimensions.
# The result is a (species x species) matrix.
co_occurrence = (presence * presence.rename({'species': 'species_copy'})).sum(dim=['time', 'cell'])
# 3. If the data is a Dask array, trigger the computation.
# Then, explicitly densify the sparse array before converting to pandas.
computed_co = co_occurrence.compute()
df_co = xr.DataArray(
computed_co.data.todense(),
dims=computed_co.dims,
coords=computed_co.coords
).to_pandas()
df_co.columns.name = 'species'
df_co.index.name = 'species'
# 4. Extract the upper triangle to avoid duplicate pairs (A,B vs B,A)
# and self-pairs (A,A). k=1 excludes the diagonal.
df_upper = df_co.where(np.triu(np.ones(df_co.shape), k=1).astype(bool))
# 5. Stack the matrix to get a Series of (species, species) -> count
stacked_pairs = df_upper.stack().sort_values(ascending=False)
# Filter out pairs with zero co-occurrences
strongest_pairs = stacked_pairs[stacked_pairs > 0]
if strongest_pairs.empty:
print("No co-occurring species pairs found in the provided dataset.")
return []
# 6. Create a mapping from specieskey to species name for readability
species_map = cube.df.drop_duplicates('specieskey').set_index('specieskey')['species']
# 7. Format the output
results = []
for (key1, key2), count in strongest_pairs.head(top_n).items():
name1 = species_map.get(key1, f"Unknown ({key1})")
name2 = species_map.get(key2, f"Unknown ({key2})")
results.append(((name1, name2), int(count)))
return results
[docs]
def filter_time_window(df, start_year, end_year, cols=["year", "rate"]):
"""Filter a two column dataframe based on year window.
Parameters
----------
df: pandas.DataFrame
A two column dataframe with one time axs
start_year : np.int
Integer number of the earliest date to filter on.
end_year : np.int
Integer number of the latest date to filter on.
cols: list of str, optional
List of the names of the columns of the dataframe
Returns
-------
time: pandas.Series
Filtered time series
filtered_var: pandas.Series
Filtered variable corresponding to the time series
"""
filtered = df[(df[cols[0]] >= start_year) & (df[cols[0]] <= end_year)].reset_index(drop=True)
return filtered[cols[0]], filtered[cols[1]]
[docs]
def aggregate_count_per_cell(cube, taxonRank, taxon):
"""
Aggregate the counts per taxonomic level per cell. This can be used as a
normalization factor
Parameters
----------
cube: OccurrenceCube
An OccurrenceCube with geometries added
taxonRank: dwc.taxonRank term
Level at which the aggregation needs to be performed
taxon: str
Name of the taxon at which aggregation needs to be performed
plot: bool, optional
whether the aggregated count per cell needs to be plotted on a map
Returns
-------
gdf: GeoDataFrame
dataframe containing geometries and the aggregated occurrence count
"""
# Step 1: Filter by the desired class
filtered_df = cube.df[cube.df[taxonRank] == taxon]
# Step 2: Select only the relevant columns
count_str = taxonRank + "count"
columns_to_keep = ["yearmonth", "cellCode", taxonRank, count_str, "geometry"]
subset_df = filtered_df[columns_to_keep]
# Step 3: Drop duplicate rows based on those columns
gdf = subset_df.drop_duplicates()
return gdf
[docs]
def plot_richness(cube, normalized=False, html_path='richness_map.html'):
"""
Plot species richness from an OccurrenceCube instance.
Parameters
----------
cube : OccurrenceCube
An instance of the cube with `.data` and `.richness` available.
normalized : bool
Whether to use normalized richness.
html_path : str
Path to save the map if not in Jupyter.
Returns
-------
None
"""
# Compute richness if needed
if not hasattr(cube, "richness"):
cube._species_richness(normalized=normalized)
# Extract richness and geometry directly from cube
richness_df = cube.richness
geometry = cube.data.coords["geometry"].values
cell_labels = cube.data.coords["cell"].values
# Create GeoDataFrame
gdf_plot = gpd.GeoDataFrame(
richness_df,
geometry=geometry[richness_df["cell"].map(dict(zip(cell_labels, range(len(cell_labels)))))]
)
gdf_plot.set_crs("EPSG:4326", inplace=True) # ensure WGS84 for folium
# Optionally simplify geometry
gdf_plot["geometry"] = gdf_plot["geometry"].simplify(0.01)
# Center map
centroid = gdf_plot.geometry.union_all().centroid
m = folium.Map(location=[centroid.y, centroid.x], zoom_start=7)
# Plot
Choropleth(
geo_data=gdf_plot,
name='Species Richness',
data=gdf_plot,
columns=['cell', 'richness' if not normalized else 'normalized_richness'],
key_on='feature.properties.cell',
fill_color='YlGnBu',
fill_opacity=0.7,
line_opacity=0.2,
legend_name='Species Richness'
).add_to(m)
folium.LayerControl().add_to(m)
if in_jupyter():
display(m)
else:
m.save(html_path)
print(f"Map saved as HTML to {os.path.abspath(html_path)}")
[docs]
def cumulative_species(cube, species_to_keep):
"""
Calculate the cumulative number of species in a OccurrenceCube.
Parameters
----------
cube : b3alien.b3cube.OccurrenceCube
Species OccurrenceCube from GBIF.
species_to_keep : numpy.array
Array of GBIF speciesKeys that need to be taken into account to calculate the cumulative species number of a cube.
geom : str, optional
Returns
-------
df1 : pandas.DataFrame
Sparse dataframe that still contains the cumulative sum per grid cell.
df2 : pandas.DataFrame
Cumulative dataframe cell independent.
"""
# Wrap sparse array in Dask array with one or more chunks
dask_sparse_array = da.from_array(cube.data.data, chunks=(100, 100, 1000)) # tune chunking for your use case
# Replace data in cube
cube_dask_sparse = cube.data.copy(data=dask_sparse_array)
species_mask = cube_dask_sparse["species"].isin(species_to_keep)
filtered_cube = cube_dask_sparse.where(species_mask, drop=True)
# Grab the underlying sparse.COO object from Dask
sparse_block = filtered_cube.data.compute() # Warning: loads full filtered cube into RAM!
# Extract sparse coordinates
coords = sparse_block.coords # shape: (ndim, nnz)
data = sparse_block.data # non-zero values
# Map indices to labels
time_labels = filtered_cube.coords["time"].values
species_labels = filtered_cube.coords["species"].values
cell_labels = filtered_cube.coords["cell"].values
# Use the sparse indices to create a DataFrame
df_sparse = pd.DataFrame({
"time": time_labels[coords[0]],
"cell": cell_labels[coords[1]],
"species": species_labels[coords[2]],
"occurrences": sparse_block.data
})
# Drop duplicates and compute cumulative species count
df_sparse = df_sparse.drop_duplicates()
df_sparse["seen"] = 1
df_time = (
df_sparse.groupby("time")["species"]
.nunique()
.cumsum()
.reset_index(name="cumulative_species")
)
df_time["time"] = pd.to_datetime(df_time["time"], format="%Y-%M", errors="coerce")
# fix to have the real cumsum
# Step 1: Remove duplicates (species × time)
df_sparse_unique = df_sparse[["time", "species"]].drop_duplicates()
# Step 2: Sort by time
df_sparse_unique = df_sparse_unique.sort_values("time")
# Step 3: Track cumulative species using a set
seen_species = set()
cumulative = []
for time, group in df_sparse_unique.groupby("time"):
new_species = set(group["species"])
seen_species.update(new_species)
cumulative.append((time, len(seen_species)))
# Step 4: Create cumulative DataFrame
df_cumulative = pd.DataFrame(cumulative, columns=["time", "cumulative_species"])
df_cumulative["time"] = pd.to_datetime(df_cumulative["time"], format="%Y-%M", errors="coerce")
return df_sparse, df_cumulative
[docs]
def plot_cumsum(df_cumulative):
"""
Create a plot of the cumulative number of species.
Parameters
----------
df_cumulative : pandas.DataFrame
Datagrame containing the cumulative number over tume.
Returns
-------
matplotlib.plot
A plot of the cumulative number of species.
"""
plt.figure(figsize=(10, 5))
plt.plot(df_cumulative["time"], df_cumulative["cumulative_species"], marker="o")
plt.title("Cumulative Unique Species Over Time")
plt.xlabel("Time")
plt.ylabel("Unique Species Observed")
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()
[docs]
def filter_multiple_cells(df_sparse):
"""
Only count a species established when it is present in more than one cell.
Parameters
----------
df_sparse : pandas.DataFrame
Datagrame containing the species richness per grid cell.
Returns
-------
pandas.DataFrame
Cumulative species when in multiple cells.
"""
# Ensure cell is in your DataFrame
assert "cell" in df_sparse.columns
# 1. Count unique cells per (time, species)
species_cell_counts = (
df_sparse.groupby(["time", "species"])["cell"]
.nunique()
.reset_index(name="cell_count")
)
# 2. Keep only species seen in at least 2 cells
species_multi_cell = species_cell_counts[species_cell_counts["cell_count"] >= 2]
# 3. Track cumulative set of species across time
species_multi_cell = species_multi_cell.sort_values("time")
seen_species = set()
cumulative = []
for time, group in species_multi_cell.groupby("time"):
new_species = set(group["species"])
seen_species.update(new_species)
cumulative.append((time, len(seen_species)))
df_cumulative_cells = pd.DataFrame(cumulative,
columns=["time",
"cumulative_species_2cells"])
return df_cumulative_cells
[docs]
def filter_multiple_occ(df_sparse):
"""
Only count a species established when there are multiple occurrences in a cell.
Parameters
----------
df_sparse : pandas.DataFrame
Datagrame containing the species richness per grid cell.
Returns
-------
pandas.DataFrame
Cumulative species when multiple occurrences in a cell.
"""
# Ensure 'occurrences' and 'cell' are present
assert "occurrences" in df_sparse.columns and "cell" in df_sparse.columns
# 1. Total occurrences per (time, species, cell)
species_cell_occ = (
df_sparse.groupby(["time", "species", "cell"])["occurrences"]
.sum()
.reset_index()
)
# 2. Filter for species that had ≥ 2 occurrences in any cell
species_with_2occ = (
species_cell_occ[species_cell_occ["occurrences"] >= 2]
.drop_duplicates(subset=["time", "species"])
)
# 3. Cumulative species count logic
species_with_2occ = species_with_2occ.sort_values("time")
seen_species = set()
cumulative = []
for time, group in species_with_2occ.groupby("time"):
new_species = set(group["species"])
seen_species.update(new_species)
cumulative.append((time, len(seen_species)))
df_cumulative_occ = pd.DataFrame(cumulative,
columns=["time",
"cumulative_species_2occ"])
return df_cumulative_occ
[docs]
def calculate_rate(df_cumulative):
"""
Calculate the rate of establishment from the cumulative distribution.
Parameters
----------
df_cumulative : pandas.DataFrame
Datagrame containing the cumulative distribution.
Returns
-------
s1 : pandas.Series
Series of the time axis.
s2 : pandas.Series
Series of the rate of establishment.
"""
# --- Processing GBIF data (Monthly) to get an approximate annual rate ---
df_cumulative["time"] = pd.to_datetime(df_cumulative["time"])
df_cumulative_rate = df_cumulative.sort_values(by="time").copy()
# Group data by year and calculate the total species difference for each year
annual_data = df_cumulative_rate.groupby(df_cumulative_rate['time'].dt.year).agg(
cumulative_species=('cumulative_species', 'last'), # Get the last cumulative species value for the year
first_time=('time', 'first') # Get the first timestamp for the year
)
# Calculate annual rate using the grouped data
annual_rate_gbif = []
annual_time_gbif = []
for i in range(1, len(annual_data)):
current_year_data = annual_data.iloc[i]
previous_year_data = annual_data.iloc[i - 1]
species_diff = current_year_data['cumulative_species'] - previous_year_data['cumulative_species']
time_diff_years = (current_year_data['first_time'] - previous_year_data['first_time']).days / 365.25
annual_rate = species_diff / time_diff_years
annual_rate_gbif.append(annual_rate)
annual_time_gbif.append(current_year_data.name) # Year is the index after groupby
annual_time_gbif = [int(year) for year in annual_time_gbif]
# Convert the lists to Pandas Series for easier plotting
annual_rate_gbif_series = pd.Series(annual_rate_gbif)
annual_time_gbif_series = pd.Series(annual_time_gbif)
return annual_time_gbif, annual_rate_gbif
[docs]
def get_survey_effort(cube, dateFormat='%Y-%m', calc_type='total'):
"""
Estimate the survey effort in an OccurrenceCube.
Parameters
----------
cube : b3alien.b3cube.OccurrenceCube
Species OccurrenceCube from GBIF.
dateFormat : str, optional
Dateformat stored in the OccurrenceCube. Default is '%Y-%m'
calc_type : str, optional
Type of survey effort to be calculated.
'distinct' : total number of distinct observers
'total' : total number of occurrences
Default is total.
Returns
-------
df : pandas.DataFrame
Dataframe containing time and the chosen measurement for survey effort.
"""
if calc_type == 'distinct':
# Group by 'yearmonth' and sum
distinct_observers_over_time = cube.df.groupby('yearmonth', observed=True)['distinctobservers'].sum()
# Convert the index to datetime (if it's not already)
distinct_observers_over_time.index = pd.to_datetime(
distinct_observers_over_time.index, format=dateFormat, errors='coerce'
)
# Resample to yearly frequency using new 'YE' standard
distinct_observers_yearly = distinct_observers_over_time.resample('YE').sum()
# Filter for years from 1900 onward
distinct_observers_filtered = distinct_observers_yearly[distinct_observers_yearly.index.year >= 1900]
# Convert to DataFrame
df = distinct_observers_filtered.reset_index()
df.columns = ['date', 'distinct_observers'] # Rename columns for clarity
return df
else:
total_occurrences_over_time = cube.data.sum(dim=['cell', 'species'])
# Convert the time coordinates to datetime objects if they aren't already
total_occurrences_over_time['time'] = pd.to_datetime(total_occurrences_over_time['time'].values, format=dateFormat, errors='coerce')
df = pd.DataFrame({
'time': total_occurrences_over_time['time'].values,
'total_occurrences': total_occurrences_over_time.data.data
})
# (Optional) Drop rows with invalid or missing time
df = df.dropna(subset=['time'])
return df