Appendix E — The BayesFlow Pipeline: An Annotated Walkthrough

F The BayesFlow Pipeline: An Annotated Walkthrough

This appendix presents the complete, annotated source of scripts/ucloud_npe/amortized_particle_filter.py — the Python script that trains the Neural Posterior Estimation (NPE) workflow for Model A in Chapter 17.

All code blocks here have eval: false. They are shown for reading and learning, not executed when the book renders. The artifacts the script produces (CSV files, trained network checkpoints) are what Chapter 15 loads and plots.

TipHow this appendix connects to Chapter 15

Chapter 15 explains why NPE is needed: the particle filter’s stochastic resampling step breaks differentiability, so HMC (Stan) cannot compute a gradient through the likelihood. This appendix shows how NPE is implemented in practice — every function the chapter discusses, annotated line by line.

Read the chapter first to understand the cognitive model and the motivation; come here when you want to understand or modify the pipeline itself.

F.1 0. Why Python? Why BayesFlow? Why uCloud?

The rest of this book uses R and Stan. This script is an exception, and the reason is technical, not preferential.

Why Python? BayesFlow 2.x is a Python library built on Keras 3. Keras supports JAX, TensorFlow, and PyTorch backends, all of which provide auto-differentiation through the neural network. There is no equivalent R library for amortized inference at this level of maturity.

Why BayesFlow? The NPE workflow requires three components: (1) a simulator that can generate arbitrarily many (θ, y) pairs, (2) a summary network that compresses each y into a fixed-size embedding, and (3) an inference network that learns the posterior p(θ | summary(y)). BayesFlow provides all three with a single coherent API and handles the training loop, validation, and diagnostics internally.

Why uCloud? Training on 20,000+ simulated datasets is slow on a laptop. uCloud gives students access to multi-core CPU nodes (and optionally GPU nodes) through a browser, so the training can be offloaded without installing anything locally beyond the scripts. See the step-by-step instructions in Chapter 17 (the “Step-by-step: running the NPE on uCloud” callout).


F.2 1. Setup: Imports, Backend Selection, and CLI Arguments

"""Amortized Bayesian inference for the rule-based Bayesian particle filter
(Model A, Chapter 15) using BayesFlow.

This script is self-contained: it re-implements the particle filter in NumPy
(no R / reticulate / sbi dependency), trains a BayesFlow workflow on a fixed
Kruschke (1993) schedule under one of three feedback scenarios, and writes
diagnostics + posterior artifacts that the v2 chapter consumes.

Inferred parameters
-------------------
    logit_eps        : Normal(0, 1.5)            -> eps in (0, 1)
    log_n_particles  : Uniform(log 1, log 100)   -> N in {1, ..., 100}
    logit_mu         : Normal(-3, 1.5)           -> mu in (0, 1)

Scenarios (selected via --scenario)
-----------------------------------
    static            : Kruschke labels held fixed across all trials.
    contingent_shift  : labels flip after the first half of trials.
    drift             : a 1-D category boundary on `height` drifts linearly
                        from 2.0 to 3.0 across trials; feedback is the side
                        of the (possibly drifting) boundary.

Run with:
    KERAS_BACKEND=jax python amortized_particle_filter.py --scenario static
"""

from __future__ import annotations

import argparse
import json
import os
from pathlib import Path

# The Keras backend must be fixed BEFORE keras or bayesflow are imported.
# Once either library is imported, switching backends raises an error.
# JAX is the recommended backend for BayesFlow 2.x: it JIT-compiles the
# forward pass and gives the best throughput on both CPU and GPU.
os.environ.setdefault("KERAS_BACKEND", "jax")

import matplotlib
matplotlib.use("Agg")  # non-interactive backend — required on headless servers
import matplotlib.pyplot as plt
import numpy as np

import bayesflow as bf

The CLI argument parser lets you run different experimental scenarios from the command line without editing the script. The three scenarios — static, contingent_shift, and drift — correspond to the three ecological structures the chapter evaluates (see §“Variants and Extensions” in Chapter 17).

SCENARIOS = ("static", "contingent_shift", "drift")

parser = argparse.ArgumentParser(description=__doc__,
                                 formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--scenario", choices=SCENARIOS, default="static",
                    help="Feedback regime (default: static).")
parser.add_argument("--n-pilot", type=int, default=20_000,
                    help="Pilot simulation budget for offline training.")
parser.add_argument("--epochs", type=int, default=100)
parser.add_argument("--batch-size", type=int, default=32)
parser.add_argument("--results-root", default="particle-filter-npe",
                    help="Top-level results directory; per-scenario subdirs are created.")
parser.add_argument("--n-ppc", type=int, default=50,
                    help="Number of posterior draws used for the PPC overlay.")
parser.add_argument("--shift-streak", type=int, default=6,
                    help="contingent_shift: flip remaining labels after the "
                         "agent hits this many consecutive correct responses.")
args = parser.parse_args()

Global constants mirror the structural commitments made in R in Chapter 17. Note that the prior hyperparameters here (LOGIT_EPS_MEAN, LOGIT_EPS_SD, etc.) must be identical to those in the R setup block; if you change the prior in one place you must change it in the other.

RESULTS_DIR = Path(args.results_root) / args.scenario
RESULTS_DIR.mkdir(parents=True, exist_ok=True)

RNG_SEED = 2026
N_PILOT = args.n_pilot
N_VAL = 300   # held-out validation set for early stopping
N_TEST = 300  # held-out test set for diagnostics (never seen during training)
EPOCHS = args.epochs
BATCH_SIZE = args.batch_size

N_PARTICLES_MIN = 1
N_PARTICLES_MAX = 100
MAX_RULE_DIMS = 2
RESAMPLE_THRESHOLD = 0.5
N_FEATURES = 2

# Prior hyperparameters — must match the R setup block in Chapter 15.
LOGIT_EPS_MEAN, LOGIT_EPS_SD = 0.0, 1.5
LOGIT_MU_MEAN,  LOGIT_MU_SD  = -3.0, 1.5
LOG_N_LOW  = np.log(N_PARTICLES_MIN + 1e-6)
LOG_N_HIGH = np.log(N_PARTICLES_MAX)

F.3 2. Reproducing the Kruschke Schedule in NumPy

The Kruschke (1993) stimulus set is defined identically to the R stimulus_info tibble in Chapter 17: 8 stimuli crossing height (1–4) and position (1–4) with a fixed ground-truth category structure. The only difference is that here it lives in NumPy arrays rather than a data frame, so the particle filter simulator can call it millions of times without leaving Python.

# Compare to `stimulus_info` in the R setup block of Chapter 15:
# stimulus 5→(height=1, position=2, cat=0), 3→(1,3,0), ...
STIMULUS_HEIGHT   = np.array([1, 1, 2, 2, 3, 3, 4, 4], dtype=np.float32)
STIMULUS_POSITION = np.array([2, 3, 1, 4, 1, 4, 2, 3], dtype=np.float32)
STIMULUS_CATEGORY = np.array([0, 0, 1, 0, 1, 0, 1, 1], dtype=np.int32)
N_BLOCKS = 8

def _kruschke_order(seed: int = 42):
    """Randomly permute the 8 stimuli within each block, then concatenate blocks."""
    rng = np.random.default_rng(seed)
    n_stim = len(STIMULUS_HEIGHT)
    return np.concatenate([rng.permutation(n_stim) for _ in range(N_BLOCKS)])

def _drift_boundary(t: int, T: int) -> float:
    """Linearly drift the height boundary from 2.0 (t=0) to 3.0 (t=T-1)."""
    return 2.0 + (t / max(T - 1, 1)) * 1.0

The build_schedule function selects the feedback regime. For static, the feedback is simply the ground-truth category. For drift, the category boundary moves across trials. For contingent_shift, the base feedback is the pre-shift labelling; the actual flip is computed inside the simulator (see below) because it depends on the agent’s response stream, which in turn depends on θ.

def build_schedule(scenario: str, seed: int = 42):
    """Returns (height, position, base_feedback).

    For `static` and `drift`, `base_feedback` is the actual feedback delivered
    on every trial. For `contingent_shift`, `base_feedback` is the pre-shift
    labelling — the actual delivered sequence is computed online inside the
    particle filter, because the shift fires only when the agent first hits a
    streak of consecutive correct responses.
    """
    order = _kruschke_order(seed)
    height   = STIMULUS_HEIGHT[order]
    position = STIMULUS_POSITION[order]
    base_feedback = STIMULUS_CATEGORY[order]
    T = height.size

    if scenario in ("static", "contingent_shift"):
        feedback = base_feedback.copy()
    elif scenario == "drift":
        feedback = np.array(
            [int(height[t] > _drift_boundary(t, T)) for t in range(T)],
            dtype=np.int32,
        )
    else:
        raise ValueError(f"Unknown scenario: {scenario}")
    return height, position, feedback.astype(np.float32)

HEIGHT, POSITION, BASE_FEEDBACK = build_schedule(args.scenario)
N_TRIALS = HEIGHT.size  # 64 trials (8 stimuli × 8 blocks)
OBS = np.stack([HEIGHT, POSITION], axis=1)
FEATURE_RANGE = np.array([[OBS[:, 0].min(), OBS[:, 0].max()],
                           [OBS[:, 1].min(), OBS[:, 1].max()]], dtype=np.float32)

SCENARIO    = args.scenario
SHIFT_STREAK = args.shift_streak

F.4 3. The Particle Filter in NumPy (Model A as a Simulator)

This section re-implements the R rule_particle_filter from Chapter 17 in pure NumPy. The cognitive logic is identical; the language change is necessary because BayesFlow’s simulator must be a Python callable.

Rule representation. Each particle is a tuple (dims, thresholds, ops, logic, pred_if_true) representing a conjunctive or disjunctive rule over one or two stimulus dimensions. This matches the algebraic structure described in Chapter 17 §“Rule Grammar.”

def _generate_rule(rng):
    """Sample one rule uniformly from the grammar (1 or 2 dimensions)."""
    n_dims = rng.integers(1, MAX_RULE_DIMS + 1)
    dims = rng.choice(N_FEATURES, size=n_dims, replace=False)
    thresholds = np.array(
        [rng.uniform(FEATURE_RANGE[d, 0], FEATURE_RANGE[d, 1]) for d in dims],
        dtype=np.float32,
    )
    ops = rng.integers(0, 2, size=n_dims).astype(np.int8)  # 0 = <=, 1 = >
    logic = rng.integers(0, 2) if n_dims == 2 else 1        # 0 = OR, 1 = AND
    pred_if_true = rng.integers(0, 2)                        # predicted category when rule fires
    return (dims.astype(np.int8), thresholds, ops, np.int8(logic), np.int8(pred_if_true))

def _evaluate_rule(rule, stimulus):
    """Return the category predicted by `rule` for `stimulus`."""
    dims, thr, ops, logic, pred = rule
    truths = np.empty(dims.size, dtype=bool)
    for k in range(dims.size):
        v = stimulus[dims[k]]
        truths[k] = (v > thr[k]) if ops[k] == 1 else (v <= thr[k])
    rule_true = bool(truths[0]) if dims.size == 1 else (
        truths.all() if logic == 1 else truths.any()
    )
    return int(pred) if rule_true else 1 - int(pred)

def _initialize_particles(n_particles, rng):
    return [_generate_rule(rng) for _ in range(n_particles)]

The particle filter loop is the core cognitive model. On each trial it: 1. Computes the weighted-average category prediction across all particles. 2. Generates the agent’s noisy response (ε = error probability). 3. Updates particle weights using the feedback as evidence. 4. Resamples when effective sample size falls below the threshold. 5. Mutates a fraction μ of particles (hypothesis exploration).

def particle_filter(eps: float, n_particles: int, mu: float, rng):
    """Run Model A on the fixed schedule. Returns (responses, probs, delivered).

    `delivered` is the per-trial feedback the agent actually saw — equal to
    BASE_FEEDBACK for `static` and `drift`. For `contingent_shift` it equals
    BASE_FEEDBACK up until the agent has accumulated SHIFT_STREAK consecutive
    correct responses; from the next trial onward the labels are flipped.
    """
    particles = _initialize_particles(n_particles, rng)
    weights   = np.full(n_particles, 1.0 / n_particles, dtype=np.float64)
    responses = np.empty(N_TRIALS, dtype=np.int8)
    probs     = np.empty(N_TRIALS, dtype=np.float32)
    delivered = np.empty(N_TRIALS, dtype=np.float32)

    flipped = False
    streak  = 0

    for t in range(N_TRIALS):
        stim = OBS[t]

        # Determine the label the agent sees on this trial.
        base     = int(BASE_FEEDBACK[t])
        true_cat = (1 - base) if (SCENARIO == "contingent_shift" and flipped) else base
        delivered[t] = float(true_cat)

        # Weighted-average category prediction.
        preds  = np.array([_evaluate_rule(p, stim) for p in particles], dtype=np.int8)
        p_cat1 = np.where(preds == 1, 1.0 - eps, eps)
        prob   = float(np.clip(np.sum(weights * p_cat1), 1e-9, 1 - 1e-9))
        probs[t] = prob

        # Noisy response.
        r_t = rng.binomial(1, prob)
        responses[t] = r_t

        # Streak / shift accounting (contingent_shift only).
        if SCENARIO == "contingent_shift" and not flipped:
            if int(r_t) == true_cat:
                streak += 1
                if streak >= SHIFT_STREAK:
                    flipped = True  # affects t+1 onward
            else:
                streak = 0

        # Bayesian weight update: reweight particles by how well they predicted
        # the delivered feedback.
        likelihoods = np.where(preds == true_cat, 1.0 - eps, eps)
        new_w = weights * likelihoods
        s     = new_w.sum()
        weights = new_w / s if s > 1e-12 else np.full(n_particles, 1.0 / n_particles)

        # Systematic resampling when ESS drops below the threshold.
        ess = 1.0 / np.sum(weights ** 2)
        if ess < n_particles * RESAMPLE_THRESHOLD:
            idx       = rng.choice(n_particles, size=n_particles, replace=True, p=weights)
            particles = [particles[i] for i in idx]
            weights   = np.full(n_particles, 1.0 / n_particles)

        # Mutation: replace a random fraction μ of particles with new random rules.
        if mu > 0:
            mask = rng.random(n_particles) < mu
            for i in np.where(mask)[0]:
                particles[i] = _generate_rule(rng)

    return responses, probs, delivered

F.5 4. The BayesFlow Prior and Simulator Wrapper

BayesFlow requires the generative model to be split into two callables: a prior() that returns a dictionary of parameter draws, and an observation_model() that takes those draws as keyword arguments and returns a dictionary of simulated observations.

def prior():
    """Sample one set of parameters from the joint prior."""
    rng = np.random.default_rng()
    return {
        # logit-scale sampling ensures the prior is specified on an unconstrained
        # real line; BayesFlow's inference network works in this unconstrained space.
        "logit_eps":       np.float32(rng.normal(LOGIT_EPS_MEAN, LOGIT_EPS_SD)),
        "log_n_particles": np.float32(rng.uniform(LOG_N_LOW, LOG_N_HIGH)),
        "logit_mu":        np.float32(rng.normal(LOGIT_MU_MEAN, LOGIT_MU_SD)),
    }

def observation_model(logit_eps, log_n_particles, logit_mu):
    """Run the particle filter for one draw from the prior.

    Returns a dict with key `trial_series`: a T × 4 array whose columns are
    [response, height, position, delivered_feedback]. This is the data tensor
    the summary network will compress into a fixed-size embedding.
    """
    rng = np.random.default_rng()

    # Inverse-transform to the natural scale.
    eps    = float(1.0 / (1.0 + np.exp(-logit_eps)))
    mu     = float(1.0 / (1.0 + np.exp(-logit_mu)))
    n_part = int(np.clip(np.round(np.exp(log_n_particles)),
                         N_PARTICLES_MIN, N_PARTICLES_MAX))

    responses, _, delivered = particle_filter(eps, n_part, mu, rng)

    # Stack into T × 4. The channel order matches what the R-side chunks expect.
    series = np.stack(
        [responses.astype(np.float32), HEIGHT, POSITION, delivered],
        axis=1,
    )
    return {"trial_series": series}

# bf.make_simulator wires prior() and observation_model() into a single
# callable that generates (θ, y) pairs in the format BayesFlow expects.
simulator = bf.make_simulator([prior, observation_model])

F.6 5. The BayesFlow Adapter Chain, Summary Network, and Inference Network

The adapter chain is BayesFlow’s routing layer. It specifies:

  • which simulator outputs become the summary variables (input to the summary network),
  • which become the inference variables (the parameters the network is trying to infer),
  • any pre-processing steps (dtype casting, reshaping).

Think of it as the BayesFlow equivalent of Stan’s data and parameters blocks — it tells the framework what is data and what is latent.

adapter = (
    bf.Adapter()
    # Mark `trial_series` as a time series: shape is (batch, T, channels).
    # The summary network will process it along the time axis.
    .as_time_series(["trial_series"])
    # Ensure float32 throughout — JAX and PyTorch are happiest with float32.
    .convert_dtype("float64", "float32")
    # Concatenate the three parameters into a single vector that becomes the
    # inference network's target (the thing it learns to predict).
    .concatenate(["logit_eps", "log_n_particles", "logit_mu"],
                 into="inference_variables")
    # Rename for the networks' expected input keys.
    .rename("trial_series", "summary_variables")
)

The summary network compresses each T × 4 trial sequence into a 9-dimensional embedding. A TimeSeriesTransformer is used (rather than a plain MLP) because the trial sequence is ordered and path-dependent: what the agent learned in trial 10 depends on what happened in trials 1–9. A transformer can capture these long-range dependencies.

The inference network learns the mapping from the 9-dim summary to the posterior over (logit_ε, log N, logit_μ). FlowMatching is a continuous normalizing flow that is fast to train and produces smooth, well-calibrated posteriors.

summary_net = bf.networks.TimeSeriesTransformer(
    summary_dim=9,   # output dimensionality of the embedding
    time_axis=-2,    # time is the second-to-last axis (batch, time, channels)
)

inference_net = bf.networks.FlowMatching(
    subnet_kwargs={"widths": (256, 256, 256, 256)},  # 4-layer MLP inside the flow
)

# BasicWorkflow wires everything together and provides fit_offline(), sample(),
# compute_default_diagnostics(), and plot_default_diagnostics().
workflow = bf.BasicWorkflow(
    simulator=simulator,
    inference_network=inference_net,
    summary_network=summary_net,
    adapter=adapter,
    checkpoint_filepath=str(RESULTS_DIR),  # saves best weights here
)

F.7 6. Pre-Simulating the Training Budget

BayesFlow supports two training modes: online (simulate during training) and offline (simulate everything up-front, then train). We use offline training for reproducibility: the same 20,000 simulated datasets are used in every training run, so results do not vary across runs that happen to use different random training draws.

The budget is split into training (20,000), validation (300), and test (300) sets. The validation set drives early stopping; the test set is never seen during training and is used only for diagnostics.

N_TOTAL  = N_PILOT + N_VAL + N_TEST
SIM_CHUNK = 200   # simulate in chunks to show a progress bar

print(f"\nSimulating {N_TOTAL} datasets in chunks of {SIM_CHUNK}...")
chunks = []
from tqdm import tqdm
for start in tqdm(range(0, N_TOTAL, SIM_CHUNK), desc="Simulating", unit="batch"):
    n = min(SIM_CHUNK, N_TOTAL - start)
    chunks.append(workflow.simulate(n))

all_sims   = {k: np.concatenate([c[k] for c in chunks], axis=0) for k in chunks[0]}
train_data = {k: v[:N_PILOT]              for k, v in all_sims.items()}
val_data   = {k: v[N_PILOT:N_PILOT+N_VAL] for k, v in all_sims.items()}
test_data  = {k: v[N_PILOT+N_VAL:]        for k, v in all_sims.items()}

Prior predictive check (Phase 2 of the validation battery). Before training, we check that the prior predictive distribution is plausible: if the prior draws generate response sequences where cumulative accuracy never rises above chance, the prior is too diffuse and the network will have nothing useful to learn from.

import pandas as pd

n_pp_show = min(500, N_PILOT)
pp_idx     = np.random.default_rng(0).choice(N_PILOT, size=n_pp_show, replace=False)
pp_series  = train_data["trial_series"][pp_idx]   # (n_pp, T, 4)
pp_responses = pp_series[..., 0]
pp_delivered = pp_series[..., 3]
pp_correct   = (pp_responses == pp_delivered).astype(np.float32)
pp_cumacc    = np.cumsum(pp_correct, axis=1) / np.arange(1, N_TRIALS + 1)[None, :]
qs = np.quantile(pp_cumacc, [0.05, 0.25, 0.5, 0.75, 0.95], axis=0)

# Export quantiles so Chapter 15 can build a ggplot2 version of this figure.
pd.DataFrame({
    "trial": np.arange(1, N_TRIALS + 1),
    "q05": qs[0], "q25": qs[1], "q50": qs[2], "q75": qs[3], "q95": qs[4],
}).to_csv(RESULTS_DIR / "prior_predictive_curves.csv", index=False)

F.8 7. Training the Inference Network

Training is a standard Keras loop wrapped by fit_offline. Early stopping monitors validation loss; BayesFlow handles the learning rate schedule internally (do not add ReduceLROnPlateau — it conflicts with BayesFlow’s internal schedule).

import keras

callbacks = [
    keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=15,               # stop if val_loss doesn't improve for 15 epochs
        restore_best_weights=True,
        verbose=1,
    ),
]

print(f"\nTraining offline for up to {EPOCHS} epochs (batch_size={BATCH_SIZE})...")
history = workflow.fit_offline(
    data=train_data,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_data=val_data,
    callbacks=callbacks,
)

# Save training history so Chapter 15 can plot the loss curve.
with open(RESULTS_DIR / "history.json", "w") as f:
    json.dump(history.history, f)
NoteExpected training time on uCloud

On a 32-core AMD EPYC 9535 CPU node: approximately 2–3 hours per scenario for the full 20,000-simulation budget. On a GPU node: roughly 20–40 minutes per scenario (JAX XLA compilation adds ~10 minutes on first run). The three scenarios can be run in parallel in separate terminal windows if enough cores are allocated.


F.9 8. Diagnostics on the Held-Out Test Set

compute_default_diagnostics evaluates the trained network on the 300 held-out test cases and returns metrics that summarise calibration and recovery. We then export the raw numbers as CSVs so Chapter 17 can rebuild the diagnostic plots using ggplot2, consistent with the visual conventions of the rest of the book.

metrics = workflow.compute_default_diagnostics(test_data=test_data, as_data_frame=True)
metrics.to_csv(RESULTS_DIR / "metrics.csv")

_N_CAL  = N_TEST   # 300 test cases
_N_POST = 100      # posterior draws per test case
_N_SBC  = 99       # posterior draws for SBC rank computation

samples_cal = workflow.sample(conditions=test_data, num_samples=_N_POST)

# Back-transform from the unconstrained parameterisation to the natural scale.
eps_post_cal = 1.0 / (1.0 + np.exp(-np.asarray(samples_cal["logit_eps"]).squeeze(-1)))
n_post_cal   = np.exp(np.asarray(samples_cal["log_n_particles"]).squeeze(-1))
mu_post_cal  = 1.0 / (1.0 + np.exp(-np.asarray(samples_cal["logit_mu"]).squeeze(-1)))

eps_true_cal = 1.0 / (1.0 + np.exp(-test_data["logit_eps"].reshape(_N_CAL)))
n_true_cal   = np.exp(test_data["log_n_particles"].reshape(_N_CAL))
mu_true_cal  = 1.0 / (1.0 + np.exp(-test_data["logit_mu"].reshape(_N_CAL)))

Recovery scatter (posterior mean ± 90% CI vs ground truth):

recovery_rows = []
for pname, post_s, true_v in [
        ("eps",         eps_post_cal, eps_true_cal),
        ("n_particles", n_post_cal,   n_true_cal),
        ("mu",          mu_post_cal,  mu_true_cal)]:
    pm  = post_s.mean(axis=1)
    q05 = np.quantile(post_s, 0.05, axis=1)
    q95 = np.quantile(post_s, 0.95, axis=1)
    for i in range(_N_CAL):
        recovery_rows.append({"parameter": pname, "true_val": float(true_v[i]),
                               "post_mean": float(pm[i]),
                               "post_q05":  float(q05[i]),
                               "post_q95":  float(q95[i])})
pd.DataFrame(recovery_rows).to_csv(RESULTS_DIR / "recovery_data.csv", index=False)

SBC ranks — the marginal rank of the true parameter value within 99 posterior draws. Under a calibrated posterior, ranks should be uniform on {0, …, 99}:

samples_sbc = workflow.sample(conditions=test_data, num_samples=_N_SBC)
eps_sbc  = 1.0 / (1.0 + np.exp(-np.asarray(samples_sbc["logit_eps"]).squeeze(-1)))
n_sbc_s  = np.exp(np.asarray(samples_sbc["log_n_particles"]).squeeze(-1))
mu_sbc_s = 1.0 / (1.0 + np.exp(-np.asarray(samples_sbc["logit_mu"]).squeeze(-1)))

sbc_rows = []
for pname, post_s, true_v in [
        ("eps",         eps_sbc,  eps_true_cal),
        ("n_particles", n_sbc_s,  n_true_cal),
        ("mu",          mu_sbc_s, mu_true_cal)]:
    ranks = (post_s < true_v[:, None]).sum(axis=1)
    for i in range(_N_CAL):
        sbc_rows.append({"parameter": pname, "rank": int(ranks[i])})
pd.DataFrame(sbc_rows).to_csv(RESULTS_DIR / "sbc_ranks.csv", index=False)

Expected coverage — for each nominal credible-interval level α ∈ {0.05, …, 0.95}, the fraction of test cases where the true value falls inside the α-credible interval. This should track the diagonal:

_alphas = np.linspace(0.05, 0.95, 19)
cov_rows = []
for pname, post_s, true_v in [
        ("eps",         eps_post_cal, eps_true_cal),
        ("n_particles", n_post_cal,   n_true_cal),
        ("mu",          mu_post_cal,  mu_true_cal)]:
    for alpha in _alphas:
        lo  = np.quantile(post_s, (1 - alpha) / 2, axis=1)
        hi  = np.quantile(post_s, 1 - (1 - alpha) / 2, axis=1)
        emp = float(np.mean((true_v >= lo) & (true_v <= hi)))
        cov_rows.append({"parameter": pname, "nominal": float(alpha), "empirical": emp})
pd.DataFrame(cov_rows).to_csv(RESULTS_DIR / "coverage_data.csv", index=False)

Posterior contraction and z-scores — contraction measures how much the posterior variance shrinks relative to the prior; z-score measures how far the posterior mean is from the truth in posterior-SD units. A well-calibrated network should show high contraction and z-scores centred near zero:

_rng_p = np.random.default_rng(0)
prior_eps_s = 1.0 / (1.0 + np.exp(-_rng_p.normal(LOGIT_EPS_MEAN, LOGIT_EPS_SD, 20_000)))
prior_n_s   = np.exp(_rng_p.uniform(LOG_N_LOW, LOG_N_HIGH, 20_000))
prior_mu_s  = 1.0 / (1.0 + np.exp(-_rng_p.normal(LOGIT_MU_MEAN, LOGIT_MU_SD, 20_000)))

cont_rows = []
for pname, post_s, true_v, prior_s in [
        ("eps",         eps_post_cal, eps_true_cal, prior_eps_s),
        ("n_particles", n_post_cal,   n_true_cal,   prior_n_s),
        ("mu",          mu_post_cal,  mu_true_cal,  prior_mu_s)]:
    post_sd  = post_s.std(axis=1)
    prior_sd = float(prior_s.std())
    contraction = 1.0 - post_sd / prior_sd
    zscore      = (post_s.mean(axis=1) - true_v) / np.maximum(post_sd, 1e-9)
    for i in range(_N_CAL):
        cont_rows.append({"parameter": pname,
                          "contraction": float(contraction[i]),
                          "zscore":      float(zscore[i])})
pd.DataFrame(cont_rows).to_csv(RESULTS_DIR / "contraction_data.csv", index=False)

F.10 9. Amortized Inference on a Demo Subject

The key pay-off of amortization: once the network is trained, obtaining the posterior for any new observed sequence takes milliseconds (a single forward pass), regardless of how long training took. Here we apply the trained network to one held-out test subject to produce the posterior samples that Chapter 17 overlays against the Stan (Model B) posterior.

demo_idx = 0
demo_obs = {"trial_series": test_data["trial_series"][demo_idx:demo_idx + 1]}

# 2,000 posterior draws via a single forward pass.
samples = workflow.sample(conditions=demo_obs, num_samples=2000)

eps_post = 1 / (1 + np.exp(-np.asarray(samples["logit_eps"]).reshape(-1)))
n_post   = np.exp(np.asarray(samples["log_n_particles"]).reshape(-1))
mu_post  = 1 / (1 + np.exp(-np.asarray(samples["logit_mu"]).reshape(-1)))

# Save for the NPE-vs-Stan comparison in Chapter 15.
pd.DataFrame({"eps": eps_post, "n_particles": n_post, "mu": mu_post}).to_csv(
    RESULTS_DIR / "posterior_demo.csv", index=False
)

# Ground-truth parameter values for the demo subject (used as reference lines).
true_eps = float(1 / (1 + np.exp(-np.asarray(test_data["logit_eps"][demo_idx]).ravel()[0])))
true_n   = float(np.exp(np.asarray(test_data["log_n_particles"][demo_idx]).ravel()[0]))
true_mu  = float(1 / (1 + np.exp(-np.asarray(test_data["logit_mu"][demo_idx]).ravel()[0])))

pd.DataFrame({"eps": [true_eps], "n_particles": [true_n], "mu": [true_mu]}).to_csv(
    RESULTS_DIR / "demo_truth.csv", index=False
)

# The demo subject's response sequence, so Chapter 15 can refit Model B
# (HMC) on the same data and produce a like-for-like comparison.
demo_trial_series = np.asarray(test_data["trial_series"][demo_idx])  # (T, 4)
pd.DataFrame(demo_trial_series, columns=["y", "height", "position", "feedback"]).to_csv(
    RESULTS_DIR / "demo_responses.csv", index=False
)

F.11 10. Posterior Predictive Check

Phase 3 of the validation battery. For the demo subject we draw 50 parameter vectors from the posterior and simulate 50 response sequences. The observed cumulative accuracy curve should lie inside the 90% credible ribbon for almost every trial.

n_ppc = args.n_ppc  # default 50

obs_responses = test_data["trial_series"][demo_idx, :, 0].astype(np.int32)
obs_delivered = test_data["trial_series"][demo_idx, :, 3].astype(np.int32)
obs_cumacc    = np.cumsum(obs_responses == obs_delivered) / np.arange(1, N_TRIALS + 1)

ppc_idx    = np.random.default_rng(1).choice(eps_post.size, size=n_ppc, replace=False)
ppc_curves = np.empty((n_ppc, N_TRIALS), dtype=np.float32)
for j, i in enumerate(ppc_idx):
    rng_j  = np.random.default_rng(int(1_000_000 + i))
    n_j    = int(np.clip(round(n_post[i]), N_PARTICLES_MIN, N_PARTICLES_MAX))
    rep, _, rep_delivered = particle_filter(float(eps_post[i]), n_j, float(mu_post[i]), rng_j)
    correct       = (rep == rep_delivered.astype(np.int32)).astype(np.float32)
    ppc_curves[j] = np.cumsum(correct) / np.arange(1, N_TRIALS + 1)

ppc_q = np.quantile(ppc_curves, [0.05, 0.25, 0.5, 0.75, 0.95], axis=0)

pd.DataFrame({
    "trial":    np.arange(1, N_TRIALS + 1),
    "q05": ppc_q[0], "q25": ppc_q[1], "q50": ppc_q[2],
    "q75": ppc_q[3], "q95": ppc_q[4],
    "observed": obs_cumacc,
}).to_csv(RESULTS_DIR / "ppc_overlay_curves.csv", index=False)

print(f"\nDone. All artifacts in: {RESULTS_DIR.resolve()}")
print("To compare scenarios, run this script three times with "
      "--scenario static, contingent_shift, and drift.")

F.12 Summary: What the Script Produces

After a successful run the following files appear under particle-filter-npe/<scenario>/:

File Used by Chapter 15 for
metrics.csv NPE diagnostic summary table
history.json Training loss curve
prior_predictive_curves.csv Phase 2 (prior predictive) plot
recovery_data.csv Phase 6 recovery scatter
sbc_ranks.csv Phase 6 SBC rank histogram
coverage_data.csv Phase 6 expected coverage plot
contraction_data.csv Phase 6 z-score / contraction plot
posterior_demo.csv NPE-vs-Stan overlay
demo_truth.csv Reference lines on NPE-vs-Stan plot
demo_responses.csv Input for the Stan (Model B) refit
ppc_overlay_curves.csv Phase 3 (posterior predictive) plot

Copy particle-filter-npe/ into simdata/ch15_v2_bayesflow/ in your local book directory, and all NPE chunks in Chapter 15 will find the artifacts and render.