import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from scipy.optimize import fmin
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm
import warnings
def count_m(t, params):
"""Calculates the mean, mu, from Solow and Costello (2004)."""
m0 = params[0]
m1 = params[1]
m = np.exp(m0 + m1 * t)
return m
def count_pi(s, t, params):
"""Calculates the variable pi from Solow and Costello (2004)."""
pi0 = params[2]
pi1 = params[3]
pi2 = params[4]
exponent = np.clip(pi0 + pi1 * t + pi2 * np.exp(t - s), -700, 700)
num = np.exp(exponent)
pi = np.divide(num, (1 + num), out=np.zeros_like(num), where=(1 + num) != 0)
pi = np.where(np.isinf(num), 1, pi)
return pi
def count_p(t, params):
"""Calculates the value p from Solow and Costello (2004).
It uses matrix coding for efficiency.
"""
S = np.tile(np.arange(1, t + 1), (t, 1))
thing = 1 - count_pi(S, S.T, params)
thing[t - 1, :] = 1
up = np.triu(np.ones_like(thing), 1)
thing2 = np.tril(thing) + up
product = np.prod(thing2, axis=0)
pst = product * count_pi(np.arange(1, t + 1), t, params)
return pst
def count_lambda(params, N):
"""
This function calculates lambda from Solow and Costello, 2004.
params is a vector of parameters
N is the number of time points
Note: an additional offset parameter is included to allow for a non-zero baseline.
"""
lambda_result = np.zeros(N)
for t in range(1, N + 1):
S = np.arange(1, t + 1)
Am = count_m(S, params)
Ap = count_p(t, params)
lambda_result[t - 1] = np.dot(Am, Ap)
# apply the offset
offset = params[-1]
lambda_result += offset
# Ensure no negative expected values
lambda_result = np.clip(lambda_result, 0, None)
return lambda_result
def count_log_like(params, restrict, num_discov):
"""
Calculates the negative log likelihood from Solow and Costello (2004),
supporting optional parameter restrictions.
"""
f = np.where(restrict != 99)[0]
g = np.where(restrict == 99)[0]
new_params = params.copy()
new_params[g] = params[g]
new_params[f] = restrict[f]
# Use count_lambda (which now includes the offset)
lambda_values = count_lambda(new_params, len(num_discov))
# Compute log-likelihood components safely
summand2 = np.where(
lambda_values > 0,
num_discov * np.log(lambda_values) - lambda_values,
-lambda_values
)
LL = -np.sum(summand2)
return LL, lambda_values
[docs]
def simulate_solow_costello(annual_time_gbif, annual_rate_gbif, vis=False):
"""
Solow-Costello simulation of the rate of establishment.
Parameters
----------
annual_time_gbif : pandas.Series
Time series of the rate of establishment.
annual_rate_gbif : pandas.Series
Rates corresponding to the time series.
vis : bool, optional
Create a plot of the simulation. Default is False.
Returns
-------
C1: numpy.Series
Result of the simulation.
val1: numpy.Series
Parameters of the fitting.
"""
# global num_discov; # No need for global, pass as argument
num_discov = pd.Series(annual_rate_gbif).T # Load and transpose
T = pd.Series(annual_time_gbif) # np.arange(1851, 1996) # Create the time period
# options = optimset('TolFun',.01,'TolX',.01); # Tolerance is handled differently in scipy
guess = np.array([-1.1106, 0.0435, -1.4534, 0.1, 0.1]) # Initial guess
constr = 99 * np.ones_like(guess) # Constraint vector
vec1 = fmin(
lambda x: count_log_like(x, constr, num_discov)[0],
guess,
xtol=0.01,
ftol=0.01,
disp=0 # disables all output
)
val1 = count_log_like(vec1, constr, num_discov)[0] # Get the function value at the minimum
C1 = count_lambda(vec1, len(num_discov)) # Calculate the mean of Y
if vis:
# Create the plot
plt.plot(T, np.cumsum(num_discov), 'k-', T, np.cumsum(C1), 'k--')
plt.legend(['Discoveries', 'Unrestricted'])
plt.xlabel('Time')
plt.ylabel('Cumulative Discovery')
plt.show()
return C1, vec1
[docs]
def simulate_solow_costello_scipy(annual_time_gbif, annual_rate_gbif, vis=False):
"""
Solow-Costello simulation of the rate of establishment. Uses scipy's minimize for optimization.
Parameters
----------
annual_time_gbif : pandas.Series
Time series of the rate of establishment.
annual_rate_gbif : pandas.Series
Rates corresponding to the time series.
vis : bool, optional
Create a plot of the simulation. Default is False.
Returns
-------
C1: numpy.Series
Result of the simulation.
val1: numpy.Series
Parameters of the fitting.
"""
# global num_discov; # No need for global, pass as argument
num_discov = pd.Series(annual_rate_gbif).T # Load and transpose
T = pd.Series(annual_time_gbif) #np.arange(1851, 1996) # Create the time period
# options = optimset('TolFun',.01,'TolX',.01); # Tolerance is handled differently in scipy
guess = np.array([-1.1106, 0.0135, -1.4534, 0.0, 0.0, 0.0]) # Initial guess
constr = 99 * np.ones_like(guess)
# Objective function for minimize (returns scalar log-likelihood)
def objective(x):
return count_log_like(x, constr, num_discov)[0] # still log-likelihood
# Define bounds for each parameter
# These must match the size and meaning of `guess`
bounds = [
(-5, 5), # e.g., parameter 1: negative decay
(-1, 1), # e.g., parameter 2: rate between 0 and 1
(-5, 0), # e.g., parameter 3: another decay
(0.01, 2), # e.g., parameter 4: noise scale
(0.01, 2), # e.g., parameter 5: another scale
(-1, 1), # adding an additional offset
]
# Run bounded optimization
result = minimize(
objective,
guess,
method="Nelder-Mead",
options={"xatol": 0.01, "fatol": 0.01, "disp": False, "maxiter": 1000}
)
vec1 = result.x
C1 = count_lambda(vec1, len(num_discov)) # Calculate the mean of Y
if vis:
# Create the plot
plt.plot(T, np.cumsum(num_discov), 'k-', T, np.cumsum(C1), 'k--')
plt.legend(['Discoveries', 'Unrestricted'])
plt.xlabel('Time')
plt.ylabel('Cumulative Discovery')
plt.show()
return C1, vec1
import numpy as np
import pandas as pd
[docs]
def get_bootstrap_errors(annual_time, annual_rate, iterations=100):
"""
Perform bootstrap resampling to estimate the standard errors of the parameters and C1 values.
Parameters
----------
annual_time: pandas Series
Time points
annual_rate: pandas Series
Rates corresponding to the time points
iterations: int
Number of bootstrap iterations to perform
Returns
-------
vec1_mean
mean of the fitted parameters across bootstrap samples
vec1_std
standard error of the fitted parameters across bootstrap samples
C1_mean:
mean of the C1 values across bootstrap samples
C1_std:
standard error of the C1 values across bootstrap samples
"""
all_vec1 = []
all_C1 = []
print(f"Starting {iterations} bootstrap iterations...")
for i in range(iterations):
# 1. Resample the data with replacement
# This simulates alternative 'histories' of the same process
indices = np.random.choice(len(annual_rate), size=len(annual_rate), replace=True)
resampled_time = annual_time.iloc[indices].sort_values()
resampled_rate = annual_rate.iloc[indices] # Keep rates associated with their times
try:
# 2. Run your existing fitting function
C1_boot, vec1_boot = simulate_solow_costello_scipy(resampled_time, resampled_rate, vis=False)
all_vec1.append(vec1_boot)
all_C1.append(C1_boot)
except Exception as e:
# Skip iterations that fail to converge
continue
# Convert to numpy arrays for easier math
all_vec1 = np.array(all_vec1)
all_C1 = np.array(all_C1)
# 3. Calculate means and standard errors
vec1_mean = np.mean(all_vec1, axis=0)
vec1_std = np.std(all_vec1, axis=0)
C1_mean = np.mean(all_C1, axis=0)
C1_std = np.std(all_C1, axis=0)
return vec1_mean, vec1_std, C1_mean, C1_std
def bootstrap_worker(i, time_list, rate_list):
"""
Worker function for bootstrap analysis. Each worker will perform one bootstrap iteration.
Returns the fitted parameter and cumulative curve for that iteration.
"""
time_series = pd.Series(time_list)
rate_series = pd.Series(rate_list)
try:
# Fit once to get baseline model
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
# Fit once to get baseline model
C1_fit, vec1 = simulate_solow_costello_scipy(time_series, rate_series)
residuals = rate_series.reset_index(drop=True) - C1_fit
# Bootstrap residuals and create new synthetic data
resampled_residuals = residuals.sample(frac=1, replace=True).reset_index(drop=True)
simulated_rate = C1_fit + resampled_residuals
# Fit again on simulated data
C1_sim, vec1_boot = simulate_solow_costello_scipy(time_series, simulated_rate)
return vec1_boot[1], np.cumsum(C1_sim)
except Exception:
return None
[docs]
def run_bootstrap_analysis(time_list, rate_list, n_iterations=200):
"""
Run the bootstrap analysis in parallel and aggregate results into a DataFrame.
Parameters
----------
time_list: list or pandas Series
Time points for the analysis.
rate_list: list or pandas Series
Rates corresponding to the time points.
n_iterations: int
Number of bootstrap iterations to perform.
Returns
-------
pandas DataFrame
A DataFrame containing the mean annual rates, cumulative values, and confidence intervals.
"""
param_samples = []
cumulative_samples = []
print(f"Starting {n_iterations} bootstrap iterations...")
# 1. Parallel Execution
with ProcessPoolExecutor() as executor:
# Passing time and rate lists to every worker
futures = [executor.submit(bootstrap_worker, i, time_list, rate_list) for i in range(n_iterations)]
for f in futures:
res = f.result()
if res is not None:
p_val, cum_vals = res
param_samples.append(p_val)
cumulative_samples.append(cum_vals)
if not param_samples:
raise RuntimeError("All bootstrap iterations failed. No valid samples to analyze.")
# Convert to numpy arrays
param_samples = np.array(param_samples)
cumulative_samples = np.array(cumulative_samples)
# 2. Extract Annual Rates (Deltas) from Cumulative Samples
# Since worker returns np.cumsum(C1_sim), we take the difference to get C1 back
rate_samples = np.diff(cumulative_samples, axis=1, prepend=0)
# 3. Calculate Statistics for Rates (Deltas)
rate_mean = np.mean(rate_samples, axis=0)
rate_std = np.std(rate_samples, axis=0)
# 4. Calculate Statistics for Cumulative
cum_mean = np.mean(cumulative_samples, axis=0)
cum_std = np.std(cumulative_samples, axis=0)
# 5. Print the Fit Parameter (vec1[1]) with error
p_mean = np.mean(param_samples)
p_std = np.std(param_samples)
print("\n" + "="*30)
print("FITTING PARAMETERS RESULTS")
print("="*30)
print(f"Parameter vec1[1]: {p_mean:.6f} ± {p_std:.6f}")
print("="*30 + "\n")
# 6. Build the Resulting DataFrame
df_results = pd.DataFrame({
'Year': time_list,
'Annual_Rate': rate_mean,
'Annual_Rate_Error': rate_std,
'Cumulative_Value': cum_mean,
'Cumulative_Error': cum_std,
'Lower_CI_95': rate_mean - (1.96 * rate_std),
'Upper_CI_95': rate_mean + (1.96 * rate_std)
})
return df_results
[docs]
def parallel_bootstrap_solow_costello(annual_time_gbif, annual_rate_gbif, n_iterations=1000, ci=95):
"""
Perform parallel bootstrapping of the Solow-Costello model
to estimate confidence intervals.
Parameters
----------
annual_time_gbif : pandas.Series
Time series of the rate of establishment.
annual_rate_gbif : pandas.Series
Rates corresponding to the time series.
n_iterations : int, optional
Number of bootstrap iterations. Default is 1000.
ci : float, optional
Confidence interval percentage. Default is 95.
Returns
-------
dict
A dictionary containing bootstrap results and confidence intervals.
"""
time_list = list(annual_time_gbif)
rate_list = list(annual_rate_gbif)
n_cores = max(1, multiprocessing.cpu_count() - 1)
beta1_samples = []
c1_curves = []
with ProcessPoolExecutor(max_workers=n_cores) as executor:
futures = [
executor.submit(bootstrap_worker, i, time_list, rate_list)
for i in range(n_iterations)
]
for f in tqdm(as_completed(futures), total=n_iterations, desc="Bootstrapping"):
try:
result = f.result()
if result is not None:
beta1, c1_curve = result
beta1_samples.append(beta1)
c1_curves.append(c1_curve)
except Exception as e:
print(f"Unhandled error in future: {e}")
if not beta1_samples:
raise RuntimeError("All bootstrap iterations failed. No valid samples.")
beta1_samples = np.array(beta1_samples)
c1_curves = np.array(c1_curves) # shape: (B, T)
ci_lower = np.percentile(beta1_samples, (100 - ci) / 2)
ci_upper = np.percentile(beta1_samples, 100 - (100 - ci) / 2)
ci_beta1 = (ci_lower, ci_upper)
lower_band = np.percentile(c1_curves, (100 - ci) / 2, axis=0)
upper_band = np.percentile(c1_curves, 100 - (100 - ci) / 2, axis=0)
mean_cumsum = np.mean(c1_curves, axis=0)
return {
"beta1_samples": beta1_samples,
"beta1_ci": ci_beta1,
"c1_mean": mean_cumsum,
"c1_lower": lower_band,
"c1_upper": upper_band,
"c1_all": c1_curves
}
[docs]
def plot_with_confidence(T, observed, results):
"""
Plot the observed cumulative discoveries
with bootstrap confidence intervals.
Parameters
----------
T : pandas.Series
Time series of the rate of establishment.
observed : pandas.Series
Observed cumulative discoveries.
results : dict
Dictionary containing bootstrap results and confidence intervals.
"""
plt.figure(figsize=(10, 6))
plt.plot(T, np.cumsum(observed), 'k-', label='Observed Discoveries')
plt.plot(T, results["c1_mean"], 'b--', label='Bootstrap Mean C1')
plt.fill_between(T, results["c1_lower"], results["c1_upper"], color='blue', alpha=0.3, label='95% CI')
plt.xlabel('Time')
plt.ylabel('Cumulative Discoveries')
plt.title(f'Solow-Costello Fit with CI (β₁ 95% CI: {results["beta1_ci"][0]:.4f} – {results["beta1_ci"][1]:.4f})')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()