import requests
import ast
import pandas as pd
from tqdm import tqdm
import numpy as np
tqdm.pandas() # enables .progress_apply
[docs]
class CheckList():
"""
Load a GRIIS checklist from GBIF.
Parameters
----------
filepath : str
Path to the distribution.txt file of the checklist.
Returns
-------
griis.Checklist
A checklist object containing the list of species.
"""
def __init__(self, filePath: str):
self.filePath = filePath
# Create cube
self.species = self._load_GRIIS(filePath)
[docs]
def _load_GRIIS(self, filePath):
"""
Load the checklist and extract speciesKey(s).
Parameters
----------
filePath : str
Path to the distribution.txt file of the checklist.
Returns
-------
list
A list of speciesKey(s) in the checklist.
"""
df_merged = pd.read_csv(filePath, sep="\t")
df_merged['speciesKey'] = df_merged['speciesKey'].apply(
lambda x: ast.literal_eval(x) if isinstance(x, str) else x
)
df_exploded = df_merged.explode("speciesKey")
species_to_keep = df_exploded["speciesKey"].unique()
species_to_keep = [
int(x) for x in species_to_keep
if pd.notnull(x) and x != "Uncertain"
]
return species_to_keep
[docs]
def _add_species(self, new_species):
"""
Add new species to the checklist.
Parameters
----------
new_species : list
A list of speciesKey(s) to add to the checklist.
Returns
-------
None
"""
for sp in new_species:
if sp not in self.species:
self.species.append(sp)
[docs]
def _remove_species(self, rem_species):
"""
Remove species from the checklist.
Parameters
----------
rem_species : list
A list of speciesKey(s) to remove from the checklist.
Returns
-------
None
"""
self.species = [sp for sp in self.species if sp not in rem_species]
[docs]
def get_species_under_genus(taxon_key):
"""
Get all the keys of the species listed under a specific genus.
Parameters
----------
taxon_key : int
The GBIF taxonKey of the genus.
Returns
-------
list
A list of speciesKey(s) under the specified genus.
"""
species_keys = []
offset = 0
limit = 1000
while True:
url = f"https://api.gbif.org/v1/species/{taxon_key}/children"
params = {"rank": "species", "limit": limit, "offset": offset}
response = requests.get(url, params=params)
if response.status_code != 200:
break
data = response.json()
results = data.get("results", [])
if not results:
break
keys = [res["key"] for res in results if res.get("rank", "").upper() == "SPECIES"]
species_keys.extend(keys)
offset += limit
return species_keys or ["Uncertain"]
[docs]
def get_speciesKey(sciname):
"""
Resolve a scientific name to its GBIF taxonKey. If the name is a genus, retrieve all species under that genus.
Parameters
----------
sciname : str
The scientific name to resolve.
Returns
-------
list
A list of resolved speciesKey(s) or ["Uncertain"] if unresolved.
"""
try:
# Query GBIF backbone for the name
response = requests.get(
"https://api.gbif.org/v1/species/match",
params={"name": sciname, "strict": True},
timeout=10
)
result = response.json()
except Exception:
return ["Uncertain"]
if "usageKey" not in result:
return ["Uncertain"]
taxon_key = result["usageKey"]
rank = result.get("rank", "").upper()
# Case 1: SPECIES
if rank == "SPECIES":
return [taxon_key]
# Case 2: GENUS — query children directly from GBIF API
elif rank == "GENUS":
all_species_keys = []
offset = 0
limit = 1000
while True:
try:
children_url = f"https://api.gbif.org/v1/species/{taxon_key}/children"
children_response = requests.get(
children_url,
params={"rank": "species", "limit": limit, "offset": offset},
timeout=10
)
children_data = children_response.json()
results = children_data.get("results", [])
if not results:
break
species_keys = [r["key"] for r in results if r.get("rank", "").upper() == "SPECIES"]
all_species_keys.extend(species_keys)
offset += limit
except Exception:
break
return all_species_keys if all_species_keys else ["Uncertain"]
# Case 3: Other ranks or unresolvable
return ["Uncertain"]
[docs]
def split_event_date(eventDate):
"""
Interprete the event date as introduction date and date of last seen,
when this information is available in the checklist.
Parameters
----------
eventDate : str
Text string of eventDate
Returns
-------
pd.Series
A series containing introduction date ('intro') and date last seen ('outro')
"""
if isinstance(eventDate, str):
parts = eventDate.strip().split('/')
if len(parts) == 2:
intro = parts[0]
outro = parts[1]
else:
intro = outro = np.nan
return pd.Series([intro, outro])
else:
return pd.Series([np.nan, np.nan])
[docs]
def do_taxon_matching(dirPath):
"""
Match keys between taxon.txt and distribution.txt
Parameters
----------
dirPath : str
Path to the directory of the checklist
Returns
-------
Saves a new checklist file 'merged_distr.txt' in the checklist directory
"""
taxon = dirPath + "taxon.txt"
distribution = dirPath + "distribution.txt"
df_t = pd.read_csv(taxon, sep="\t")
df_dist = pd.read_csv(distribution, sep="\t")
# Now apply this on the whole dataframe
df_t["speciesKey"] = df_t["scientificName"].progress_apply(get_speciesKey)
df_merged = df_dist.merge(df_t[['id', 'speciesKey']], on='id', how='left')
df_merged.to_csv(dirPath + 'merged_distr.txt', sep='\t', index=False)
# The rest assumes already a merged dataset
[docs]
def read_checklist(filePath, cl_type='detailed', locality='Belgium'):
"""
Read a GRIIS checklist and extract speciesKey(s) and time series of species numbers over time.
Parameters
----------
filePath : str
Path to the directory of the checklist (must contain distribution.txt and taxon.txt if cl
type is not 'detailed').
cl_type : str
Type of checklist: 'detailed' (with eventDate) or 'simple' (
without eventDate, requires taxon.txt and distribution.txt).
locality : str
The locality to filter on (default is 'Belgium').
Returns
-------
tuple
A tuple containing:
- list of speciesKey(s) in the checklist
- pd.DataFrame with columns 'introDate' and 'cumulative_total' representing the
cumulative number of species over time.
"""
distribution = filePath + "distribution.txt"
df_cl = pd.read_csv(distribution, sep='\t', low_memory=False)
df_cl["speciesKey"] = df_cl["id"].str.rsplit("/", n=1).str[-1].astype("int64")
if cl_type == 'detailed':
species_to_keep = df_cl["speciesKey"].astype("int64").unique()
# 1. Filter rows where locality == 'Belgium' and eventDate is not missing
df = df_cl[df_cl["locality"] == locality].copy()
df = df[df["eventDate"].notna()]
# 2. Split eventDate into introDate and outroDate
df[["introDate", "outroDate"]] = df["eventDate"].apply(split_event_date)
df["introDate"] = pd.to_datetime(df["introDate"], format="%Y", errors="coerce")
df["outroDate"] = pd.to_datetime(df["outroDate"], format="%Y", errors="coerce")
# 3. Clean rows with missing introDate
df_intro = df.dropna(subset=["introDate"]).copy()
# 4. Group by introDate and count species
in_species = (
df_intro.groupby("introDate", sort=True)["id"]
.count()
.reset_index(name="nspec")
)
# 5. Cumulative sum
in_species["cumn"] = in_species["nspec"].cumsum()
# 6. Clean outro side and count outgoing species
df_outro = df.dropna(subset=["outroDate"]).copy()
out_species = (
df_outro.groupby("outroDate", sort=True)["id"]
.count()
.reset_index(name="nspeco")
)
# 7. Merge intro and outro on date
n_species = pd.merge(in_species, out_species, how="outer", left_on="introDate", right_on="outroDate")
# 8. Replace NaNs with 0
n_species["nspec"] = n_species["nspec"].fillna(0).astype(int)
n_species["nspeco"] = n_species["nspeco"].fillna(0).astype(int)
# 9. Net species present at each time step
n_species["total"] = n_species["nspec"] - n_species["nspeco"]
# 10. Final frame with total species over time
tot_species = n_species[["introDate", "total"]].copy()
# 11. Optional: sort and compute cumulative total over time
tot_species = tot_species.sort_values("introDate")
tot_species["cumulative_total"] = tot_species["total"].cumsum()
return species_to_keep, tot_species
else:
taxon = filePath + "taxon.txt"
distribution = filePath + "distribution.txt"
df_t = pd.read_csv(taxon, sep="\t")
df_dist = pd.read_csv(distribution, sep="\t")
# Now apply this on the whole dataframe
# Apply the function — returns lists
df_t["speciesKey"] = df_t["scientificName"].apply(get_speciesKey)
# Explode so each speciesKey gets its own row
df_t_exploded = df_t.explode("speciesKey")
# Merge
df_merged = df_dist.merge(df_t_exploded[['id', 'speciesKey']], on='id', how='left')
# Clean and filter
species_to_keep = df_merged["speciesKey"].unique()
species_to_keep = [int(x) for x in species_to_keep if x != "Uncertain"]
return species_to_keep