Skip to content

Commit 7596e1e

Browse files
Data Aggregation Cookbook (#151)
* Update cookbook docs * Update cookbooks * Minor docstring and error message fix * poetry.lock * docstring * Added provenance and bundle merge fhir helper, tidied up naming conventions * Make auth optional for public endpoints in fhirgateway * Add convenience extractor and accessors for metadata and patient resources to Document * Update cookbook examples * resolve merge conflict * Fix test * Safer logging * Update cookbook examples * Fix links in docs, add images, add tutorial tab * Edit docs * Add integration tests * Update docs * Add demo script
1 parent 8bc7bb4 commit 7596e1e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+6756
-1053
lines changed

β€Ž.gitignoreβ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,5 +166,5 @@ scrap/
166166
.ruff_cache/
167167
.python-version
168168
.cursor/
169-
scripts/
169+
.private/
170170
.idea/

β€ŽREADME.mdβ€Ž

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -168,18 +168,16 @@ fhir.add_source("epic", "fhir://fhir.epic.com/r4?client_id=epic_client_id")
168168
fhir.add_source("cerner", "fhir://fhir.cerner.com/r4?client_id=cerner_client_id")
169169

170170
@fhir.aggregate(Patient)
171-
def enrich_patient_data(id: str, source: str = None) -> Patient:
171+
def enrich_patient_data(id: str, source: str) -> Patient:
172172
"""Get patient data from any connected EHR and add AI enhancements"""
173-
patient = fhir.read(Patient, id, source)
174-
175-
# Add AI processing metadata
176-
patient.extension = patient.extension or []
177-
patient.extension.append({
178-
"url": "http://healthchain.org/fhir/ai-processed",
179-
"valueString": f"Enhanced by HealthChain from {source}"
180-
})
181-
182-
return patient
173+
bundle = fhir.search(
174+
Patient,
175+
{"_id": id},
176+
source,
177+
add_provenance=True,
178+
provenance_tag="ai-enhanced",
179+
)
180+
return bundle
183181

184182
app.register_gateway(fhir)
185183

β€Žcookbook/data/notereader_cda.xmlβ€Ž

Lines changed: 2151 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Multi-Source FHIR Data Aggregation
4+
5+
Demonstrates aggregating patient data from multiple FHIR sources with
6+
simple pipeline processing and provenance tracking.
7+
8+
Requirements:
9+
- pip install healthchain python-dotenv
10+
11+
Run:
12+
- python data_aggregation.py
13+
"""
14+
15+
from typing import List
16+
17+
from dotenv import load_dotenv
18+
19+
from fhir.resources.bundle import Bundle
20+
from fhir.resources.condition import Condition
21+
from fhir.resources.annotation import Annotation
22+
23+
from healthchain.gateway import FHIRGateway, HealthChainAPI
24+
from healthchain.gateway.clients.fhir.base import FHIRAuthConfig
25+
from healthchain.pipeline import Pipeline
26+
from healthchain.io.containers import Document
27+
from healthchain.fhir import merge_bundles
28+
29+
30+
load_dotenv()
31+
32+
33+
# Epic FHIR Sandbox - configure via environment, then build connection string
34+
config = FHIRAuthConfig.from_env("EPIC")
35+
EPIC_URL = config.to_connection_string()
36+
37+
# Cerner Open Sandbox
38+
CERNER_URL = "fhir://fhir-open.cerner.com/r4/ec2458f2-1e24-41c8-b71b-0e701af7583d"
39+
40+
41+
def create_pipeline() -> Pipeline[Document]:
42+
"""Build simple pipeline for demo purposes."""
43+
pipeline = Pipeline[Document]()
44+
45+
@pipeline.add_node
46+
def deduplicate(doc: Document) -> Document:
47+
"""Remove duplicate conditions by resource ID."""
48+
conditions = doc.fhir.get_resources("Condition")
49+
unique = list({c.id: c for c in conditions if c.id}.values())
50+
doc.fhir.add_resources(unique, "Condition", replace=True)
51+
print(f"Deduplicated {len(unique)} conditions")
52+
return doc
53+
54+
@pipeline.add_node
55+
def add_annotation(doc: Document) -> Document:
56+
"""Add a note to each Condition indicating pipeline processing."""
57+
conditions = doc.fhir.get_resources("Condition")
58+
for condition in conditions:
59+
note_text = "This resource has been processed by healthchain pipeline"
60+
annotation = Annotation(text=note_text)
61+
condition.note = (condition.note or []) + [annotation]
62+
print(f"Added annotation to {len(conditions)} conditions")
63+
return doc
64+
65+
return pipeline
66+
67+
68+
def create_app():
69+
# Initialize gateway and add sources
70+
gateway = FHIRGateway()
71+
gateway.add_source("epic", EPIC_URL)
72+
gateway.add_source("cerner", CERNER_URL)
73+
74+
pipeline = create_pipeline()
75+
76+
@gateway.aggregate(Condition)
77+
def get_unified_patient(patient_id: str, sources: List[str]) -> Bundle:
78+
"""Aggregate conditions for a patient from multiple sources"""
79+
bundles = []
80+
for source in sources:
81+
try:
82+
bundle = gateway.search(
83+
Condition,
84+
{"patient": patient_id},
85+
source,
86+
add_provenance=True,
87+
provenance_tag="aggregated",
88+
)
89+
bundles.append(bundle)
90+
except Exception as e:
91+
print(f"Error from {source}: {e}")
92+
93+
# Merge bundles - OperationOutcome resources are automatically extracted
94+
merged_bundle = merge_bundles(bundles, deduplicate=True)
95+
96+
doc = Document(data=merged_bundle)
97+
doc = pipeline(doc)
98+
99+
# print([outcome.model_dump() for outcome in doc.fhir.operation_outcomes])
100+
101+
return doc.fhir.bundle.model_dump()
102+
103+
app = HealthChainAPI()
104+
app.register_gateway(gateway)
105+
106+
return app
107+
108+
109+
if __name__ == "__main__":
110+
import uvicorn
111+
112+
app = create_app()
113+
uvicorn.run(app, port=8888)
114+
# Runs at: http://127.0.0.1:8888/

β€Žcookbook/notereader_clinical_coding_fhir.pyβ€Ž

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,17 @@
1313
- python notereader_clinical_coding_fhir.py # Demo and start server
1414
"""
1515

16-
import os
1716
import uvicorn
18-
from datetime import datetime, timezone
19-
2017
import healthchain as hc
18+
2119
from fhir.resources.documentreference import DocumentReference
22-
from fhir.resources.meta import Meta
2320
from spacy.tokens import Span
2421
from dotenv import load_dotenv
2522

26-
from healthchain.fhir import create_document_reference
23+
from healthchain.fhir import create_document_reference, add_provenance_metadata
2724
from healthchain.gateway.api import HealthChainAPI
2825
from healthchain.gateway.fhir import FHIRGateway
26+
from healthchain.gateway.clients.fhir.base import FHIRAuthConfig
2927
from healthchain.gateway.soap import NoteReaderService
3028
from healthchain.io import CdaAdapter, Document
3129
from healthchain.models import CdaRequest
@@ -35,14 +33,9 @@
3533

3634
load_dotenv()
3735

38-
39-
BILLING_URL = (
40-
f"fhir://api.medplum.com/fhir/R4/"
41-
f"?client_id={os.environ.get('MEDPLUM_CLIENT_ID')}"
42-
f"&client_secret={os.environ.get('MEDPLUM_CLIENT_SECRET')}"
43-
f"&token_url={os.environ.get('MEDPLUM_TOKEN_URL', 'https://api.medplum.com/oauth2/token')}"
44-
f"&scope={os.environ.get('MEDPLUM_SCOPE', 'openid')}"
45-
)
36+
# Load configuration from environment variables
37+
config = FHIRAuthConfig.from_env("MEDPLUM")
38+
BILLING_URL = config.to_connection_string()
4639

4740

4841
def create_pipeline():
@@ -84,7 +77,6 @@ def link_entities(doc: Document) -> Document:
8477

8578

8679
def create_app():
87-
"""Create production healthcare API."""
8880
pipeline = create_pipeline()
8981
cda_adapter = CdaAdapter()
9082

@@ -102,9 +94,8 @@ def ai_coding_workflow(request: CdaRequest):
10294

10395
for condition in doc.fhir.problem_list:
10496
# Add basic provenance tracking
105-
condition.meta = Meta(
106-
source="urn:healthchain:pipeline:cdi",
107-
lastUpdated=datetime.now(timezone.utc).isoformat(),
97+
condition = add_provenance_metadata(
98+
condition, source="epic-notereader", tag_code="cdi"
10899
)
109100
fhir_gateway.create(condition, source="billing")
110101

@@ -127,7 +118,7 @@ class NotereaderSandbox(ClinicalDocumentation):
127118

128119
def __init__(self):
129120
super().__init__()
130-
self.data_path = "./resources/uclh_cda.xml"
121+
self.data_path = "./data/notereader_cda.xml"
131122

132123
@hc.ehr(workflow="sign-note-inpatient")
133124
def load_clinical_document(self) -> DocumentReference:
167 KB
Loading
351 KB
Loading
42.9 KB
Loading
193 KB
Loading
266 KB
Loading

0 commit comments

Comments
Β (0)