Skip to main content

Architecture

Pipeline stages

The EMPI pipeline runs in six stages. Stages 1–2 and 4–6 are shared SQL models. Stage 3 uses adapter-specific Python models for Snowflake (Snowpark) or Spark-family (PySpark) execution.

┌─────────────────────────────────────────────────────────┐
│ 1. STAGING │
│ empi_pre__eligibility ──┐ │
│ empi_pre__patient ──────┼─→ stg_empi_source_person │
│ │ (combined, deduped) │
│ └─→ stg_empi_runtime │
│ (config loaded) │
├─────────────────────────────────────────────────────────┤
│ 2. INPUT PREPARATION │
│ stg_empi_source_person ──→ int_empi_input │
│ stg_empi_overrides ──────→ (manual review state) │
├─────────────────────────────────────────────────────────┤
│ 3. LINKAGE (adapter-specific Python) │
│ Snowflake: sf_empi_scored_pairs → sf_empi_cluster │
│ Spark: py_empi_scored_pairs → py_empi_cluster │
├─────────────────────────────────────────────────────────┤
│ 4. PAIR DECISIONS │
│ scored_pairs + overrides ──→ empi_pair_decisions │
│ (apply thresholds, hard rules, manual overrides) │
├─────────────────────────────────────────────────────────┤
│ 5. SURVIVORSHIP │
│ cluster_assignments + source_person │
│ ──→ int_empi_survivorship_selected │
│ ──→ empi_person_attrs │
│ ──→ empi_person_attrs_provenance │
├─────────────────────────────────────────────────────────┤
│ 6. FINAL OUTPUTS │
│ person_crosswalk, work_queue, core__person, │
│ input_layer.* (remapped tables) │
└─────────────────────────────────────────────────────────┘

Probabilistic matching

The package uses Splink v4 to implement Fellegi-Sunter probabilistic record linkage:

  1. Blocking: candidate pairs are generated using configurable blocking predicates (e.g., exact match on DOB + last name, or exact match on SSN). This avoids comparing every record to every other record.
  2. Scoring: each candidate pair is scored across multiple comparison columns (first name, last name, DOB, SSN, email, phone, address) using method-specific similarity functions (exact match, Jaro-Winkler fuzzy, date comparison).
  3. Thresholding: pairs above upper_threshold are auto-matched. Pairs below lower_threshold are auto-rejected. Pairs in between are routed to the manual review work queue.

Clustering

After scoring, connected-component analysis groups records into person clusters:

  • Snowflake uses SQL-based label propagation.
  • Spark uses distributed union-find.

Each cluster gets a stable person_id. Clustering rebuilds from current pair decisions on every run, so manual overrides and new scores take effect immediately.

Survivorship

For each person cluster, the package selects a "golden record" of consolidated attributes using per-attribute strategies defined in empi_survivorship_rules:

StrategyBehavior
most_frequentMost common non-null value across source records
most_recentValue from the record with the latest timestamp
source_priorityValue from the highest-priority source (e.g., clinical over claims)

Each strategy has a configurable timestamp column and priority ordering. The _default row sets the fallback strategy for any attribute not explicitly configured.

Provenance is tracked in empi_person_attrs_provenance, a tall table recording which source record contributed each attribute value.

Manual review loop

The package supports a human-in-the-loop review workflow:

  1. EMPI run publishes work_queue with pairs in the clerical-review band.
  2. Analyst reviews pairs in the Manual Review App and records decisions.
  3. App writes must_link or cannot_link actions to the overrides table, and optionally writes rematch_requests for split/carve operations.
  4. Next EMPI run reads overrides, applies them to pair decisions, and rebuilds clusters.

Both overrides and rematch_requests are incremental tables with full_refresh: false protection, so manual review state survives dbt build --full-refresh.

Adapter implementations

CapabilitySnowflakeDatabricks / Fabric
Scoring engineSnowpark + JAROWINKLER_SIMILARITY()PySpark + Splink v4
ClusteringSQL label propagationDistributed union-find
Python model materializationSnowpark DataFramePySpark DataFrame
Incremental strategymerge on pair_keymerge (Databricks) / table (Fabric)

Both adapters produce identical output schemas so all downstream SQL models are shared.