Architecture
Pipeline stages
The EMPI pipeline runs in six stages. Stages 1–2 and 4–6 are shared SQL models. Stage 3 uses adapter-specific Python models for Snowflake (Snowpark) or Spark-family (PySpark) execution.
┌─────────────────────────────────────────────────────────┐
│ 1. STAGING │
│ empi_pre__eligibility ──┐ │
│ empi_pre__patient ──────┼─→ stg_empi_source_person │
│ │ (combined, deduped) │
│ └─→ stg_empi_runtime │
│ (config loaded) │
├─────────────────────────────────────────────────────────┤
│ 2. INPUT PREPARATION │
│ stg_empi_source_person ──→ int_empi_input │
│ stg_empi_overrides ──────→ (manual review state) │
├─────────────────────────────────────────────────────────┤
│ 3. LINKAGE (adapter-specific Python) │
│ Snowflake: sf_empi_scored_pairs → sf_empi_cluster │
│ Spark: py_empi_scored_pairs → py_empi_cluster │
├─────────────────────────────────────────────────────────┤
│ 4. PAIR DECISIONS │
│ scored_pairs + overrides ──→ empi_pair_decisions │
│ (apply thresholds, hard rules, manual overrides) │
├─────────────────────────────────────────────────────────┤
│ 5. SURVIVORSHIP │
│ cluster_assignments + source_person │
│ ──→ int_empi_survivorship_selected │
│ ──→ empi_person_attrs │
│ ──→ empi_person_attrs_provenance │
├─────────────────────────────────────────────────────────┤
│ 6. FINAL OUTPUTS │
│ person_crosswalk, work_queue, core__person, │
│ input_layer.* (remapped tables) │
└─────────────────────────────────────────────────────────┘
Probabilistic matching
The package uses Splink v4 to implement Fellegi-Sunter probabilistic record linkage:
- Blocking: candidate pairs are generated using configurable blocking predicates (e.g., exact match on DOB + last name, or exact match on SSN). This avoids comparing every record to every other record.
- Scoring: each candidate pair is scored across multiple comparison columns (first name, last name, DOB, SSN, email, phone, address) using method-specific similarity functions (exact match, Jaro-Winkler fuzzy, date comparison).
- Thresholding: pairs above
upper_thresholdare auto-matched. Pairs belowlower_thresholdare auto-rejected. Pairs in between are routed to the manual review work queue.
Clustering
After scoring, connected-component analysis groups records into person clusters:
- Snowflake uses SQL-based label propagation.
- Spark uses distributed union-find.
Each cluster gets a stable person_id. Clustering rebuilds from current pair decisions on every run, so manual overrides and new scores take effect immediately.
Survivorship
For each person cluster, the package selects a "golden record" of consolidated attributes using per-attribute strategies defined in empi_survivorship_rules:
| Strategy | Behavior |
|---|---|
most_frequent | Most common non-null value across source records |
most_recent | Value from the record with the latest timestamp |
source_priority | Value from the highest-priority source (e.g., clinical over claims) |
Each strategy has a configurable timestamp column and priority ordering. The _default row sets the fallback strategy for any attribute not explicitly configured.
Provenance is tracked in empi_person_attrs_provenance, a tall table recording which source record contributed each attribute value.
Manual review loop
The package supports a human-in-the-loop review workflow:
- EMPI run publishes
work_queuewith pairs in the clerical-review band. - Analyst reviews pairs in the Manual Review App and records decisions.
- App writes
must_linkorcannot_linkactions to theoverridestable, and optionally writesrematch_requestsfor split/carve operations. - Next EMPI run reads overrides, applies them to pair decisions, and rebuilds clusters.
Both overrides and rematch_requests are incremental tables with full_refresh: false protection, so manual review state survives dbt build --full-refresh.
Adapter implementations
| Capability | Snowflake | Databricks / Fabric |
|---|---|---|
| Scoring engine | Snowpark + JAROWINKLER_SIMILARITY() | PySpark + Splink v4 |
| Clustering | SQL label propagation | Distributed union-find |
| Python model materialization | Snowpark DataFrame | PySpark DataFrame |
| Incremental strategy | merge on pair_key | merge (Databricks) / table (Fabric) |
Both adapters produce identical output schemas so all downstream SQL models are shared.