Skip to content

Commit 54edb86

Browse files
committed
feat: add example__* tasks
These are bare bone prompts, which can run on sample shipped data. They are meant to be used when following some docs on how to step through the NLP workflow.
1 parent 947e08d commit 54edb86

File tree

15 files changed

+279
-24
lines changed

15 files changed

+279
-24
lines changed

cumulus_etl/etl/pipeline.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ async def run_pipeline(
174174
# record filesystem options like --s3-region before creating Roots
175175
store.set_user_fs_options(vars(args))
176176

177+
if args.dir_input == "%EXAMPLE%" and not os.path.exists(args.dir_input):
178+
args.dir_input = os.path.join(os.path.dirname(__file__), "studies/example/ndjson")
179+
177180
root_input = store.Root(args.dir_input)
178181
root_output = store.Root(args.dir_output)
179182
root_phi = store.Root(args.dir_phi, create=True)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""The example study"""
2+
3+
from .example_tasks import ExampleGpt4oTask as ExampleGpt4oTask
4+
from .example_tasks import ExampleGpt4Task as ExampleGpt4Task
5+
from .example_tasks import ExampleGpt5Task as ExampleGpt5Task
6+
from .example_tasks import ExampleGptOss120bTask as ExampleGptOss120bTask
7+
from .example_tasks import ExampleLlama4ScoutTask as ExampleLlama4ScoutTask
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
"""Define tasks for the example/sample study"""
2+
3+
import json
4+
5+
import pydantic
6+
7+
from cumulus_etl import nlp
8+
from cumulus_etl.etl import tasks
9+
10+
11+
class AgeMention(pydantic.BaseModel):
12+
has_mention: bool | None = pydantic.Field(None)
13+
spans: list[str] = pydantic.Field(default_factory=list, description="Supporting text spans")
14+
age: int | None = pydantic.Field(None, description="The age of the patient")
15+
16+
17+
class BaseExampleTask(tasks.BaseOpenAiTaskWithSpans):
18+
task_version = 0
19+
# Task Version History:
20+
# ** 0 (2025-08): Initial work, still in flux **
21+
22+
system_prompt = (
23+
"You are a clinical chart reviewer.\n"
24+
"Your task is to extract patient-specific information from an unstructured clinical "
25+
"document and map it into a predefined Pydantic schema.\n"
26+
"\n"
27+
"Core Rules:\n"
28+
"1. Base all assertions ONLY on patient-specific information in the clinical document.\n"
29+
" - Never negate or exclude information just because it is not mentioned.\n"
30+
" - Never conflate family history or population-level risk with patient findings.\n"
31+
"2. Do not invent or infer facts beyond what is documented.\n"
32+
"3. Maintain high fidelity to the clinical document language when citing spans.\n"
33+
"4. Always produce structured JSON that conforms to the Pydantic schema provided below.\n"
34+
"\n"
35+
"Pydantic Schema:\n" + json.dumps(AgeMention.model_json_schema())
36+
)
37+
response_format = AgeMention
38+
39+
40+
# Have a task for every ETL-supported model, to allow sites to choose whatever model works for them.
41+
42+
43+
class ExampleGpt4Task(BaseExampleTask):
44+
name = "example__nlp_gpt4"
45+
client_class = nlp.Gpt4Model
46+
47+
48+
class ExampleGpt4oTask(BaseExampleTask):
49+
name = "example__nlp_gpt4o"
50+
client_class = nlp.Gpt4oModel
51+
52+
53+
class ExampleGpt5Task(BaseExampleTask):
54+
name = "example__nlp_gpt5"
55+
client_class = nlp.Gpt5Model
56+
57+
58+
class ExampleGptOss120bTask(BaseExampleTask):
59+
name = "example__nlp_gpt_oss_120b"
60+
client_class = nlp.GptOss120bModel
61+
62+
63+
class ExampleLlama4ScoutTask(BaseExampleTask):
64+
name = "example__nlp_llama4_scout"
65+
client_class = nlp.Llama4ScoutModel
3.63 KB
Binary file not shown.
2.71 KB
Binary file not shown.

cumulus_etl/etl/tasks/nlp_task.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import re
77
import string
88
import sys
9+
import types
910
import typing
1011
from collections.abc import AsyncIterator, Callable
1112
from typing import ClassVar
@@ -223,7 +224,7 @@ def convert_pydantic_fields_to_pyarrow(
223224
) -> pyarrow.DataType:
224225
return pyarrow.struct(
225226
[
226-
pyarrow.field(name, cls._convert_type_to_pyarrow(info.annotation))
227+
pyarrow.field(name, cls._convert_type_to_pyarrow(info.annotation), nullable=True)
227228
for name, info in fields.items()
228229
]
229230
)
@@ -232,16 +233,23 @@ def convert_pydantic_fields_to_pyarrow(
232233
def _convert_type_to_pyarrow(cls, annotation) -> pyarrow.DataType:
233234
# Since we only need to handle a small amount of possible types, we just do this ourselves
234235
# rather than relying on an external library.
235-
if issubclass(annotation, str):
236+
if origin := typing.get_origin(annotation): # e.g. "UnionType" or "list"
237+
sub_type = typing.get_args(annotation)[0]
238+
if issubclass(origin, types.UnionType):
239+
# This is gonna be something like "str | None" so just grab first arg.
240+
# We mark all our fields are nullable at the pyarrow layer.
241+
return cls._convert_type_to_pyarrow(sub_type)
242+
elif issubclass(origin, list):
243+
# Note: does not handle struct types underneath yet
244+
return pyarrow.list_(cls._convert_type_to_pyarrow(sub_type))
245+
elif issubclass(annotation, str):
236246
return pyarrow.string()
237247
elif issubclass(annotation, bool):
238248
return pyarrow.bool_()
239-
elif issubclass(typing.get_origin(annotation), list):
240-
sub_type = typing.get_args(annotation)[0]
241-
# Note: does not handle struct types underneath yet
242-
return pyarrow.list_(cls._convert_type_to_pyarrow(sub_type))
243-
else:
244-
raise ValueError(f"Unsupported type {annotation}") # pragma: no cover
249+
elif issubclass(annotation, int):
250+
return pyarrow.int32()
251+
252+
raise ValueError(f"Unsupported type {annotation}") # pragma: no cover
245253

246254

247255
class BaseOpenAiTaskWithSpans(BaseOpenAiTask):

cumulus_etl/etl/tasks/task_factory.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import TypeVar
66

77
from cumulus_etl import cli_utils, errors
8-
from cumulus_etl.etl.studies import covid_symptom, irae
8+
from cumulus_etl.etl.studies import covid_symptom, example, irae
99
from cumulus_etl.etl.tasks import basic_tasks
1010

1111
AnyTask = TypeVar("AnyTask", bound="EtlTask") # noqa: F821
@@ -31,6 +31,11 @@ def get_nlp_tasks() -> list[type[AnyTask]]:
3131
covid_symptom.CovidSymptomNlpResultsGpt4Task,
3232
covid_symptom.CovidSymptomNlpResultsTask,
3333
covid_symptom.CovidSymptomNlpResultsTermExistsTask,
34+
example.ExampleGpt4Task,
35+
example.ExampleGpt4oTask,
36+
example.ExampleGpt5Task,
37+
example.ExampleGptOss120bTask,
38+
example.ExampleLlama4ScoutTask,
3439
irae.IraeGptOss120bTask,
3540
irae.IraeGpt4oTask,
3641
irae.IraeGpt5Task,

cumulus_etl/nlp/openai.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ async def post_init_check(self) -> None:
3131
try:
3232
models = self.client.models.list()
3333
names = {model.id async for model in models}
34-
except openai.APIError:
34+
except openai.APIError as exc:
3535
errors.fatal(
36-
f"NLP server '{self.USER_ID}' is unreachable.\n"
36+
f"NLP server '{self.USER_ID}' is unreachable: {exc}.\n"
3737
f"If it's a local server, try running 'docker compose up {self.USER_ID} --wait'.",
3838
errors.SERVICE_MISSING,
3939
)
@@ -45,6 +45,9 @@ async def post_init_check(self) -> None:
4545
)
4646

4747
async def prompt(self, system: str, user: str, schema: BaseModel) -> chat.ParsedChatCompletion:
48+
return await self._parse_prompt(system, user, schema)
49+
50+
async def _parse_prompt(self, system: str, user: str, schema) -> chat.ParsedChatCompletion:
4851
return await self.client.chat.completions.parse(
4952
model=self.MODEL_NAME,
5053
messages=[
@@ -75,12 +78,20 @@ async def pre_init_check(cls) -> None:
7578
errors.fatal("\n".join(messages), errors.ARGS_INVALID)
7679

7780
def make_client(self) -> openai.AsyncOpenAI:
78-
return openai.AsyncAzureOpenAI(api_version="2024-06-01")
81+
return openai.AsyncAzureOpenAI(api_version="2024-10-21")
7982

8083

81-
class Gpt35Model(AzureModel):
84+
class Gpt35Model(AzureModel): # deprecated, do not use in new code (doesn't support JSON schemas)
8285
MODEL_NAME = "gpt-35-turbo-0125"
8386

87+
# 3.5 doesn't support a pydantic JSON schema, so we do some work to keep it using the same API
88+
# as the rest of our code.
89+
async def prompt(self, system: str, user: str, schema: BaseModel) -> chat.ParsedChatCompletion:
90+
response = await self._parse_prompt(system, user, {"type": "json_object"})
91+
parsed = schema.model_validate_json(response.choices[0].message.content)
92+
response.choices[0].message.parsed = parsed
93+
return response
94+
8495

8596
class Gpt4Model(AzureModel):
8697
MODEL_NAME = "gpt-4"

docs/nlp/example.md

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
---
2+
title: Example Workflow
3+
parent: NLP
4+
grand_parent: ETL
5+
nav_order: 1
6+
# audience: engineer familiar with the project
7+
# type: tutorial
8+
---
9+
10+
# An Example NLP Workflow
11+
12+
Let's work through an end-to-end NLP workflow, as if you were doing a real study.
13+
But we'll use an example study shipped with the ETL for testing purposes instead.
14+
15+
This will take us from the initial actual NLP, then to chart review,
16+
then finally to analyzing accuracy.
17+
18+
You don't need to prepare your own clinical notes for this run-through.
19+
We'll use synthetic notes shipped with Cumulus ETL for this very purpose.
20+
21+
This example study we'll use is just a very simple age range study.
22+
The NLP will only be tasked with extracting an age from a clinical note.
23+
24+
## The NLP Itself
25+
26+
Before we start, you'll need to have Cumulus ETL and your AWS infrastructure ready.
27+
Follow the [setup instructions](../setup) if you haven't done so already, then come back here.
28+
29+
### Model Setup
30+
31+
You have a choice of model for this.
32+
Real studies might require one specific model or another.
33+
But this example task is fairly liberal.
34+
35+
Here are the options, along with the task name to use.
36+
37+
#### Azure Cloud Options
38+
For these, you'll want to set a couple variables first:
39+
```sh
40+
export AZURE_OPENAI_API_KEY=xxx
41+
export AZURE_OPENAI_ENDPOINT=https://xxx.openai.azure.com/
42+
```
43+
44+
Task names:
45+
- GPT4: `example__nlp_gpt4`
46+
- GPT4o: `example__nlp_gpt4o`
47+
- GPT5: `example__nlp_gpt5`
48+
49+
#### Local (On-Prem) Options
50+
For these, you'll need to start up the appropriate model on your machine:
51+
```sh
52+
docker compose up --wait gpt-oss-120b
53+
```
54+
55+
Task names:
56+
- GPT-OSS 120B (needs 80GB of GPU memory): `example__nlp_gpt_oss_120b`
57+
58+
### Running the ETL
59+
60+
Now that your model is ready, let's run the ETL on some notes!
61+
62+
Below is the command line to use.
63+
You'll need to change the bucket names and paths to wherever you set up your AWS infrastructure.
64+
And you'll want to change the task name as appropriate for your model.
65+
Leave the odd looking `%EXAMPLE%` bit in place;
66+
that just tells Cumulus ETL to use its built-in example documents as the input.
67+
68+
The output and PHI bucket locations should be the same as your normal ETL runs on raw FHIR data.
69+
There's no actual PHI in this example run because of the synthetic data,
70+
but normally there is, and that PHI bucket is where Cumulus ETL keeps caches of NLP results.
71+
72+
```sh
73+
docker compose run --rm \
74+
cumulus-etl nlp \
75+
%EXAMPLE% \
76+
s3://my-output-bucket/ \
77+
s3://my-phi-bucket/ \
78+
--task example__nlp_gpt4
79+
```
80+
81+
(If this were a real study, you'd probably do this a bit differently.
82+
You'd point at your real DocumentReference resources for example.
83+
And you'd probably restrict the set of documents you run NLP on with an argument
84+
like `--cohort-athena-table study__my_cohort`.
85+
But for this run-through, we're going to hand-wave all the document selection pieces.)
86+
87+
### Running the Crawler
88+
89+
Whenever you write a new table to S3, you'll want to run your AWS Glue crawler again,
90+
so that the table's schema gets set correctly in Athena.
91+
92+
First, confirm that your AWS Cloud Formation templates have the `example__nlp_*` tables
93+
configured in them. If not, try copying the Glue crawler definition from
94+
[the sample template we provide](../setup/aws.md).
95+
96+
Then go to your AWS console, in the AWS Glue service, in the sidebar under Data Catalog, and
97+
choose Crawlers.
98+
You should see your crawler listed there. Select it, click Run, and wait for it to finish.
99+
100+
### Confirm the Data in Athena
101+
102+
While you're in the AWS console, switch to the Athena service and select the appropriate
103+
Cumulus workgroup and database.
104+
105+
Then if you make a query like below (assuming you used the GPT4 model),
106+
you should see eight results with extracted ages.
107+
```sql
108+
select * from example__nlp_gpt4
109+
```
110+
111+
**Congratulations!**
112+
You've now run NLP on some synthetic clinical notes and uploaded the results to Athena.
113+
Those extracted ages could now be post-processed by the `example` study to calculate age ranges,
114+
and then confirmed with chart review by humans.
115+
116+
At least that's the flow you'd use for a real study.

docs/nlp.md renamed to docs/nlp/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
title: NLP
33
parent: ETL
44
nav_order: 5
5+
has_children: true
56
# audience: non-programmers, conversational tone, selling a bit
67
# type: explanation
78
---

0 commit comments

Comments
 (0)