Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cumulus_etl/etl/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ async def run_pipeline(
# record filesystem options like --s3-region before creating Roots
store.set_user_fs_options(vars(args))

if args.dir_input == "%EXAMPLE%" and not os.path.exists(args.dir_input):
args.dir_input = os.path.join(os.path.dirname(__file__), "studies/example/ndjson")
Comment on lines +177 to +178
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this %EXAMPLE% approach gross? Got a better idea?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two approaches come to mind:

  • have a special cli arg which will use a specific on disk path just for this
  • make this study a seperate git repo, and then have someone install it in some location they can get easy path access to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have a special cli arg which will use a specific on disk path just for this

Like --use-example-data or something? But then what do we do with the normally-required input folder positional CLI arg? Make it not required if we see the other argument? Adds some complexity to the code but could work. And adds a new CLI arg to the pile.

make this study a seperate git repo, and then have someone install it in some location they can get easy path access to

Yeah that would be the traditional approach, and more similar to "real" studies, which is nice. But... it adds several steps to the instructions and means that the docker compose lines have to have the extra complexity of "OK now volume mount where you put that folder and refer to it on the command line from the mounted location" stuff - which again, they'll have to do eventually for real data. But I was hoping to avoid for the simple workflow case.


Were you just brainstorming, and/or do you dislike %EXAMPLE%? Like, how much on a scale of 1-10? 😄 I dislike it about a 3. And slightly prefer it to the above I think, which I'd put at 4-5 maybe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not my favorite, but I think it's... fine?

I like the real study approach but only with the idea of 'get everything to use the example study for demonstration purposes' which is maybe a bit more of a lift, but I don't think it's a must do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's interesting thought - to re-use this elsewhere. I'd have a concern about viability though. Like, these docs are chosen for matching what the NLP task needs - certain "final" status codes, having a clear age listed in the doc, etc. If we use them elsewhere, we might struggle to make the data fit all use cases - and/or add more data that would make this small token test into a larger one.

HOWEVER, I do intend to make a little example study for the Library - I was thinking of having it built-in like core and discovery, again preferring ease-of-use over some realism for the non-NLP bits of this workflow. If that approach is no good, we would make a new repo for it, and that might be a reasonable place to put this data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this separately. I think we're fine with this current approach for now, but we want this document and its parent "NLP overview for execs" doc to expand a bit and likely move to the global cumulus docs repo. But this hand-wavy approach around "where do the docs come from" is fine for this "NLP overview for engineers" doc.


root_input = store.Root(args.dir_input)
root_output = store.Root(args.dir_output)
root_phi = store.Root(args.dir_phi, create=True)
Expand Down
7 changes: 7 additions & 0 deletions cumulus_etl/etl/studies/example/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""The example study"""

from .example_tasks import ExampleGpt4oTask as ExampleGpt4oTask
from .example_tasks import ExampleGpt4Task as ExampleGpt4Task
from .example_tasks import ExampleGpt5Task as ExampleGpt5Task
from .example_tasks import ExampleGptOss120bTask as ExampleGptOss120bTask
from .example_tasks import ExampleLlama4ScoutTask as ExampleLlama4ScoutTask
65 changes: 65 additions & 0 deletions cumulus_etl/etl/studies/example/example_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Define tasks for the example/sample study"""

import json

import pydantic

from cumulus_etl import nlp
from cumulus_etl.etl import tasks


class AgeMention(pydantic.BaseModel):
has_mention: bool | None = pydantic.Field(None)
spans: list[str] = pydantic.Field(default_factory=list, description="Supporting text spans")
age: int | None = pydantic.Field(None, description="The age of the patient")


class BaseExampleTask(tasks.BaseOpenAiTaskWithSpans):
task_version = 0
# Task Version History:
# ** 0 (2025-08): Initial work, still in flux **

system_prompt = (
"You are a clinical chart reviewer.\n"
"Your task is to extract patient-specific information from an unstructured clinical "
"document and map it into a predefined Pydantic schema.\n"
"\n"
"Core Rules:\n"
"1. Base all assertions ONLY on patient-specific information in the clinical document.\n"
" - Never negate or exclude information just because it is not mentioned.\n"
" - Never conflate family history or population-level risk with patient findings.\n"
"2. Do not invent or infer facts beyond what is documented.\n"
"3. Maintain high fidelity to the clinical document language when citing spans.\n"
"4. Always produce structured JSON that conforms to the Pydantic schema provided below.\n"
"\n"
"Pydantic Schema:\n" + json.dumps(AgeMention.model_json_schema())
)
response_format = AgeMention


# Have a task for every ETL-supported model, to allow sites to choose whatever model works for them.


class ExampleGpt4Task(BaseExampleTask):
name = "example__nlp_gpt4"
client_class = nlp.Gpt4Model


class ExampleGpt4oTask(BaseExampleTask):
name = "example__nlp_gpt4o"
client_class = nlp.Gpt4oModel


class ExampleGpt5Task(BaseExampleTask):
name = "example__nlp_gpt5"
client_class = nlp.Gpt5Model


class ExampleGptOss120bTask(BaseExampleTask):
name = "example__nlp_gpt_oss_120b"
client_class = nlp.GptOss120bModel


class ExampleLlama4ScoutTask(BaseExampleTask):
name = "example__nlp_llama4_scout"
client_class = nlp.Llama4ScoutModel
Binary file not shown.
Binary file not shown.
24 changes: 16 additions & 8 deletions cumulus_etl/etl/tasks/nlp_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import re
import string
import sys
import types
import typing
from collections.abc import AsyncIterator, Callable
from typing import ClassVar
Expand Down Expand Up @@ -223,7 +224,7 @@ def convert_pydantic_fields_to_pyarrow(
) -> pyarrow.DataType:
return pyarrow.struct(
[
pyarrow.field(name, cls._convert_type_to_pyarrow(info.annotation))
pyarrow.field(name, cls._convert_type_to_pyarrow(info.annotation), nullable=True)
for name, info in fields.items()
]
)
Expand All @@ -232,16 +233,23 @@ def convert_pydantic_fields_to_pyarrow(
def _convert_type_to_pyarrow(cls, annotation) -> pyarrow.DataType:
# Since we only need to handle a small amount of possible types, we just do this ourselves
# rather than relying on an external library.
if issubclass(annotation, str):
if origin := typing.get_origin(annotation): # e.g. "UnionType" or "list"
sub_type = typing.get_args(annotation)[0]
if issubclass(origin, types.UnionType):
# This is gonna be something like "str | None" so just grab first arg.
# We mark all our fields are nullable at the pyarrow layer.
return cls._convert_type_to_pyarrow(sub_type)
elif issubclass(origin, list):
# Note: does not handle struct types underneath yet
return pyarrow.list_(cls._convert_type_to_pyarrow(sub_type))
elif issubclass(annotation, str):
return pyarrow.string()
elif issubclass(annotation, bool):
return pyarrow.bool_()
elif issubclass(typing.get_origin(annotation), list):
sub_type = typing.get_args(annotation)[0]
# Note: does not handle struct types underneath yet
return pyarrow.list_(cls._convert_type_to_pyarrow(sub_type))
else:
raise ValueError(f"Unsupported type {annotation}") # pragma: no cover
elif issubclass(annotation, int):
return pyarrow.int32()

raise ValueError(f"Unsupported type {annotation}") # pragma: no cover


class BaseOpenAiTaskWithSpans(BaseOpenAiTask):
Expand Down
7 changes: 6 additions & 1 deletion cumulus_etl/etl/tasks/task_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import TypeVar

from cumulus_etl import cli_utils, errors
from cumulus_etl.etl.studies import covid_symptom, irae
from cumulus_etl.etl.studies import covid_symptom, example, irae
from cumulus_etl.etl.tasks import basic_tasks

AnyTask = TypeVar("AnyTask", bound="EtlTask") # noqa: F821
Expand All @@ -31,6 +31,11 @@ def get_nlp_tasks() -> list[type[AnyTask]]:
covid_symptom.CovidSymptomNlpResultsGpt4Task,
covid_symptom.CovidSymptomNlpResultsTask,
covid_symptom.CovidSymptomNlpResultsTermExistsTask,
example.ExampleGpt4Task,
example.ExampleGpt4oTask,
example.ExampleGpt5Task,
example.ExampleGptOss120bTask,
example.ExampleLlama4ScoutTask,
irae.IraeGptOss120bTask,
irae.IraeGpt4oTask,
irae.IraeGpt5Task,
Expand Down
19 changes: 15 additions & 4 deletions cumulus_etl/nlp/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ async def post_init_check(self) -> None:
try:
models = self.client.models.list()
names = {model.id async for model in models}
except openai.APIError:
except openai.APIError as exc:
errors.fatal(
f"NLP server '{self.USER_ID}' is unreachable.\n"
f"NLP server '{self.USER_ID}' is unreachable: {exc}.\n"
f"If it's a local server, try running 'docker compose up {self.USER_ID} --wait'.",
errors.SERVICE_MISSING,
)
Expand All @@ -45,6 +45,9 @@ async def post_init_check(self) -> None:
)

async def prompt(self, system: str, user: str, schema: BaseModel) -> chat.ParsedChatCompletion:
return await self._parse_prompt(system, user, schema)

async def _parse_prompt(self, system: str, user: str, schema) -> chat.ParsedChatCompletion:
return await self.client.chat.completions.parse(
model=self.MODEL_NAME,
messages=[
Expand Down Expand Up @@ -75,12 +78,20 @@ async def pre_init_check(cls) -> None:
errors.fatal("\n".join(messages), errors.ARGS_INVALID)

def make_client(self) -> openai.AsyncOpenAI:
return openai.AsyncAzureOpenAI(api_version="2024-06-01")
return openai.AsyncAzureOpenAI(api_version="2024-10-21")
Comment on lines -78 to +81
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets us onto latest - I believe the only change is deprecating something we aren't using and adding batch processing (which we'll be interested in at some point)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a cli arg with a default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naw, because this isn't a user visible thing. I think largely even if it were, you'd still want to ratchet it upward, because who knows when they'll drop an old API version. But mostly it's just for us to know how to call into it.



class Gpt35Model(AzureModel):
class Gpt35Model(AzureModel): # deprecated, do not use in new code (doesn't support JSON schemas)
MODEL_NAME = "gpt-35-turbo-0125"

# 3.5 doesn't support a pydantic JSON schema, so we do some work to keep it using the same API
# as the rest of our code.
Comment on lines +87 to +88
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized this issue while testing the model for this branch - I think it's broken right now in main for the covid study (but 🤷) - this gets it working again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH we can probably remove 3.5 at this point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that becomes a question of "when do we feel comfortable deleting code for the covid study?" and an issue of reproducibility.

I realize 3.5 is harder to hit today, but not impossible (we can still do it through Azure at BCH).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should maybe talk about that at a weekly meeting? but that shouldn't block this PR

async def prompt(self, system: str, user: str, schema: BaseModel) -> chat.ParsedChatCompletion:
response = await self._parse_prompt(system, user, {"type": "json_object"})
parsed = schema.model_validate_json(response.choices[0].message.content)
response.choices[0].message.parsed = parsed
return response


class Gpt4Model(AzureModel):
MODEL_NAME = "gpt-4"
Expand Down
119 changes: 119 additions & 0 deletions docs/nlp/example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
---
title: Example Workflow
parent: NLP
grand_parent: ETL
nav_order: 1
# audience: engineer familiar with the project
# type: tutorial
---

# An Example NLP Workflow

Let's work through an end-to-end NLP workflow, as if you were doing a real study.
But we'll use an example study shipped with the ETL for testing purposes instead.

This will take us from the initial actual NLP, then to chart review,
then finally to analyzing accuracy.

You don't need to prepare your own clinical notes for this run-through.
We'll use synthetic notes shipped with Cumulus ETL for this very purpose.

This example study we'll use is just a very simple age range study.
The NLP will only be tasked with extracting an age from a clinical note.

## The NLP Itself

Before we start, you'll need to have Cumulus ETL and your AWS infrastructure ready.
Follow the [setup instructions](../setup) if you haven't done so already, then come back here.

### Model Setup

You have a choice of model for this.
Real studies might require one specific model or another.
But this example task is fairly liberal.

Here are the options, along with the task name to use.

#### Azure Cloud Options
For these, you'll want to set a couple variables first:
```sh
export AZURE_OPENAI_API_KEY=xxx
export AZURE_OPENAI_ENDPOINT=https://xxx.openai.azure.com/
```

Task names:
- GPT4: `example__nlp_gpt4`
- GPT4o: `example__nlp_gpt4o`
- GPT5: `example__nlp_gpt5`

This should cost you less than 15 cents to run and could be much less depending on the model.
We'll use less than five thousand tokens.

#### Local (On-Prem) Options
For these, you'll need to start up the appropriate model on your machine:
```sh
docker compose up --wait gpt-oss-120b
```

Task names:
- GPT-OSS 120B (needs 80GB of GPU memory): `example__nlp_gpt_oss_120b`

### Running the ETL

Now that your model is ready, let's run the ETL on some notes!

Below is the command line to use.
You'll need to change the bucket names and paths to wherever you set up your AWS infrastructure.
And you'll want to change the task name as appropriate for your model.
Leave the odd looking `%EXAMPLE%` bit in place;
that just tells Cumulus ETL to use its built-in example documents as the input.

The output and PHI bucket locations should be the same as your normal ETL runs on raw FHIR data.
There's no actual PHI in this example run because of the synthetic data,
but normally there is, and that PHI bucket is where Cumulus ETL keeps caches of NLP results.

```sh
docker compose run --rm \
cumulus-etl nlp \
%EXAMPLE% \
s3://my-output-bucket/ \
s3://my-phi-bucket/ \
--task example__nlp_gpt4
```

(If this were a real study, you'd probably do this a bit differently.
You'd point at your real DocumentReference resources for example.
And you'd probably restrict the set of documents you run NLP on with an argument
like `--cohort-athena-table study__my_cohort`.
But for this run-through, we're going to hand-wave all the document selection pieces.)

### Running the Crawler

Whenever you write a new table to S3, you'll want to run your AWS Glue crawler again,
so that the table's schema gets set correctly in Athena.

First, confirm that your AWS Cloud Formation templates have the `example__nlp_*` tables
configured in them. If not, try copying the Glue crawler definition from
[the sample template we provide](../setup/aws.md).

Then go to your AWS console, in the AWS Glue service, in the sidebar under Data Catalog, and
choose Crawlers.
You should see your crawler listed there. Select it, click Run, and wait for it to finish.

### Confirm the Data in Athena

While you're in the AWS console, switch to the Athena service and select the appropriate
Cumulus workgroup and database.

Then if you make a query like below (assuming you used the GPT4 model),
you should see eight results with extracted ages.
```sql
select * from example__nlp_gpt4
```

**Congratulations!**
You've now run NLP on some synthetic clinical notes and uploaded the results to Athena.
Those extracted ages could now be post-processed by the `example` study to calculate age ranges,
and then confirmed with chart review by humans.

At least that's the flow you'd use for a real study.
1 change: 1 addition & 0 deletions docs/nlp.md → docs/nlp/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
title: NLP
parent: ETL
nav_order: 5
has_children: true
# audience: non-programmers, conversational tone, selling a bit
# type: explanation
---
Expand Down
8 changes: 8 additions & 0 deletions docs/setup/cumulus-aws-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,14 @@ Resources:
- !Sub "s3://${S3Bucket}/${EtlSubdir}/irae__nlp_llama4_scout"
CreateNativeDeltaTable: True
WriteManifest: False
- DeltaTables:
- !Sub "s3://${S3Bucket}/${EtlSubdir}/example__nlp_gpt4"
- !Sub "s3://${S3Bucket}/${EtlSubdir}/example__nlp_gpt4o"
- !Sub "s3://${S3Bucket}/${EtlSubdir}/example__nlp_gpt5"
- !Sub "s3://${S3Bucket}/${EtlSubdir}/example__nlp_gpt_oss_120b"
- !Sub "s3://${S3Bucket}/${EtlSubdir}/example__nlp_llama4_scout"
CreateNativeDeltaTable: True
WriteManifest: False

####################################################
# Athena queries and where to store them
Expand Down
2 changes: 1 addition & 1 deletion docs/studies/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ In addition to the default basic-FHIR-oriented Cumulus ETL tasks like `condition
which simply strip identifying information and largely leaves the FHIR alone,
there are also more interesting study-oriented tasks.

These tend to be [NLP](../nlp.md) tasks that extract information from clinical notes.
These tend to be [NLP](../nlp) tasks that extract information from clinical notes.

They aren't run by default,
but you can provide the ones you are interested in with the `--task` parameter.
Expand Down
4 changes: 2 additions & 2 deletions tests/covid_symptom/test_covid_gpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def test_gpt4_changes(self):

async def test_happy_path(self):
self.make_json("DocumentReference", "1", **i2b2_mock_data.documentreference("foo"))
self.mock_response()
self.mock_response(parsed=False)

task = covid_symptom.CovidSymptomNlpResultsGpt35Task(self.job_config, self.scrubber)
await task.run()
Expand All @@ -75,7 +75,7 @@ async def test_happy_path(self):
"seed": 12345,
"temperature": 0,
"timeout": 120,
"response_format": CovidSymptoms,
"response_format": {"type": "json_object"},
},
self.mock_create.call_args_list[0][1],
)
Expand Down
29 changes: 29 additions & 0 deletions tests/nlp/test_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Tests for etl/studies/example/"""

import ddt
import pydantic

from cumulus_etl.etl.studies.example.example_tasks import AgeMention
from tests.etl import BaseEtlSimple
from tests.nlp.utils import OpenAITestCase


@ddt.ddt
class TestExampleTask(OpenAITestCase, BaseEtlSimple):
"""Test case for example tasks"""

def default_content(self) -> pydantic.BaseModel:
return AgeMention(has_mention=True, spans=["year-old"], age=20)

@ddt.data(
"example__nlp_gpt_oss_120b",
"example__nlp_gpt4",
"example__nlp_gpt4o",
"example__nlp_gpt5",
"example__nlp_llama4_scout",
)
async def test_basic_etl(self, task_name):
for _ in range(8):
self.mock_response()
await self.run_etl(tasks=[task_name], input_path="%EXAMPLE%")
self.assertEqual(self.mock_create.call_count, 8)
Loading