- 
                Notifications
    
You must be signed in to change notification settings  - Fork 4
 
feat: add example__* tasks #437
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| """The example study""" | ||
| 
     | 
||
| from .example_tasks import ExampleGpt4oTask as ExampleGpt4oTask | ||
| from .example_tasks import ExampleGpt4Task as ExampleGpt4Task | ||
| from .example_tasks import ExampleGpt5Task as ExampleGpt5Task | ||
| from .example_tasks import ExampleGptOss120bTask as ExampleGptOss120bTask | ||
| from .example_tasks import ExampleLlama4ScoutTask as ExampleLlama4ScoutTask | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| """Define tasks for the example/sample study""" | ||
| 
     | 
||
| import json | ||
| 
     | 
||
| import pydantic | ||
| 
     | 
||
| from cumulus_etl import nlp | ||
| from cumulus_etl.etl import tasks | ||
| 
     | 
||
| 
     | 
||
| class AgeMention(pydantic.BaseModel): | ||
| has_mention: bool | None = pydantic.Field(None) | ||
| spans: list[str] = pydantic.Field(default_factory=list, description="Supporting text spans") | ||
| age: int | None = pydantic.Field(None, description="The age of the patient") | ||
| 
     | 
||
| 
     | 
||
| class BaseExampleTask(tasks.BaseOpenAiTaskWithSpans): | ||
| task_version = 0 | ||
| # Task Version History: | ||
| # ** 0 (2025-08): Initial work, still in flux ** | ||
| 
     | 
||
| system_prompt = ( | ||
| "You are a clinical chart reviewer.\n" | ||
| "Your task is to extract patient-specific information from an unstructured clinical " | ||
| "document and map it into a predefined Pydantic schema.\n" | ||
| "\n" | ||
| "Core Rules:\n" | ||
| "1. Base all assertions ONLY on patient-specific information in the clinical document.\n" | ||
| " - Never negate or exclude information just because it is not mentioned.\n" | ||
| " - Never conflate family history or population-level risk with patient findings.\n" | ||
| "2. Do not invent or infer facts beyond what is documented.\n" | ||
| "3. Maintain high fidelity to the clinical document language when citing spans.\n" | ||
| "4. Always produce structured JSON that conforms to the Pydantic schema provided below.\n" | ||
| "\n" | ||
| "Pydantic Schema:\n" + json.dumps(AgeMention.model_json_schema()) | ||
| ) | ||
| response_format = AgeMention | ||
| 
     | 
||
| 
     | 
||
| # Have a task for every ETL-supported model, to allow sites to choose whatever model works for them. | ||
| 
     | 
||
| 
     | 
||
| class ExampleGpt4Task(BaseExampleTask): | ||
| name = "example__nlp_gpt4" | ||
| client_class = nlp.Gpt4Model | ||
| 
     | 
||
| 
     | 
||
| class ExampleGpt4oTask(BaseExampleTask): | ||
| name = "example__nlp_gpt4o" | ||
| client_class = nlp.Gpt4oModel | ||
| 
     | 
||
| 
     | 
||
| class ExampleGpt5Task(BaseExampleTask): | ||
| name = "example__nlp_gpt5" | ||
| client_class = nlp.Gpt5Model | ||
| 
     | 
||
| 
     | 
||
| class ExampleGptOss120bTask(BaseExampleTask): | ||
| name = "example__nlp_gpt_oss_120b" | ||
| client_class = nlp.GptOss120bModel | ||
| 
     | 
||
| 
     | 
||
| class ExampleLlama4ScoutTask(BaseExampleTask): | ||
| name = "example__nlp_llama4_scout" | ||
| client_class = nlp.Llama4ScoutModel | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -31,9 +31,9 @@ async def post_init_check(self) -> None: | |
| try: | ||
| models = self.client.models.list() | ||
| names = {model.id async for model in models} | ||
| except openai.APIError: | ||
| except openai.APIError as exc: | ||
| errors.fatal( | ||
| f"NLP server '{self.USER_ID}' is unreachable.\n" | ||
| f"NLP server '{self.USER_ID}' is unreachable: {exc}.\n" | ||
| f"If it's a local server, try running 'docker compose up {self.USER_ID} --wait'.", | ||
| errors.SERVICE_MISSING, | ||
| ) | ||
| 
        
          
        
         | 
    @@ -45,6 +45,9 @@ async def post_init_check(self) -> None: | |
| ) | ||
| 
     | 
||
| async def prompt(self, system: str, user: str, schema: BaseModel) -> chat.ParsedChatCompletion: | ||
| return await self._parse_prompt(system, user, schema) | ||
| 
     | 
||
| async def _parse_prompt(self, system: str, user: str, schema) -> chat.ParsedChatCompletion: | ||
| return await self.client.chat.completions.parse( | ||
| model=self.MODEL_NAME, | ||
| messages=[ | ||
| 
          
            
          
           | 
    @@ -75,12 +78,20 @@ async def pre_init_check(cls) -> None: | |
| errors.fatal("\n".join(messages), errors.ARGS_INVALID) | ||
| 
     | 
||
| def make_client(self) -> openai.AsyncOpenAI: | ||
| return openai.AsyncAzureOpenAI(api_version="2024-06-01") | ||
| return openai.AsyncAzureOpenAI(api_version="2024-10-21") | ||
| 
         
      Comment on lines
    
      -78
     to 
      +81
    
   
  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This gets us onto latest - I believe the only change is deprecating something we aren't using and adding batch processing (which we'll be interested in at some point) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be a cli arg with a default value? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naw, because this isn't a user visible thing. I think largely even if it were, you'd still want to ratchet it upward, because who knows when they'll drop an old API version. But mostly it's just for us to know how to call into it.  | 
||
| 
     | 
||
| 
     | 
||
| class Gpt35Model(AzureModel): | ||
| class Gpt35Model(AzureModel): # deprecated, do not use in new code (doesn't support JSON schemas) | ||
| MODEL_NAME = "gpt-35-turbo-0125" | ||
| 
     | 
||
| # 3.5 doesn't support a pydantic JSON schema, so we do some work to keep it using the same API | ||
| # as the rest of our code. | ||
| 
         
      Comment on lines
    
      +87
     to 
      +88
    
   
  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I realized this issue while testing the model for this branch - I think it's broken right now in main for the covid study (but 🤷) - this gets it working again. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TBH we can probably remove 3.5 at this point? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, that becomes a question of "when do we feel comfortable deleting code for the covid study?" and an issue of reproducibility. I realize 3.5 is harder to hit today, but not impossible (we can still do it through Azure at BCH). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should maybe talk about that at a weekly meeting? but that shouldn't block this PR  | 
||
| async def prompt(self, system: str, user: str, schema: BaseModel) -> chat.ParsedChatCompletion: | ||
| response = await self._parse_prompt(system, user, {"type": "json_object"}) | ||
| parsed = schema.model_validate_json(response.choices[0].message.content) | ||
| response.choices[0].message.parsed = parsed | ||
| return response | ||
| 
     | 
||
| 
     | 
||
| class Gpt4Model(AzureModel): | ||
| MODEL_NAME = "gpt-4" | ||
| 
          
            
          
           | 
    ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| --- | ||
| title: Example Workflow | ||
| parent: NLP | ||
| grand_parent: ETL | ||
| nav_order: 1 | ||
| # audience: engineer familiar with the project | ||
| # type: tutorial | ||
| --- | ||
| 
     | 
||
| # An Example NLP Workflow | ||
| 
     | 
||
| Let's work through an end-to-end NLP workflow, as if you were doing a real study. | ||
| But we'll use an example study shipped with the ETL for testing purposes instead. | ||
| 
     | 
||
| This will take us from the initial actual NLP, then to chart review, | ||
| then finally to analyzing accuracy. | ||
| 
     | 
||
| You don't need to prepare your own clinical notes for this run-through. | ||
| We'll use synthetic notes shipped with Cumulus ETL for this very purpose. | ||
| 
     | 
||
| This example study we'll use is just a very simple age range study. | ||
| The NLP will only be tasked with extracting an age from a clinical note. | ||
| 
     | 
||
| ## The NLP Itself | ||
| 
     | 
||
| Before we start, you'll need to have Cumulus ETL and your AWS infrastructure ready. | ||
| Follow the [setup instructions](../setup) if you haven't done so already, then come back here. | ||
| 
     | 
||
| ### Model Setup | ||
| 
     | 
||
| You have a choice of model for this. | ||
| Real studies might require one specific model or another. | ||
| But this example task is fairly liberal. | ||
| 
     | 
||
| Here are the options, along with the task name to use. | ||
| 
     | 
||
| #### Azure Cloud Options | ||
| For these, you'll want to set a couple variables first: | ||
| ```sh | ||
| export AZURE_OPENAI_API_KEY=xxx | ||
| export AZURE_OPENAI_ENDPOINT=https://xxx.openai.azure.com/ | ||
| ``` | ||
| 
     | 
||
| Task names: | ||
| - GPT4: `example__nlp_gpt4` | ||
| - GPT4o: `example__nlp_gpt4o` | ||
| - GPT5: `example__nlp_gpt5` | ||
| 
     | 
||
| This should cost you less than 15 cents to run and could be much less depending on the model. | ||
| We'll use less than five thousand tokens. | ||
| 
     | 
||
| #### Local (On-Prem) Options | ||
| For these, you'll need to start up the appropriate model on your machine: | ||
| ```sh | ||
| docker compose up --wait gpt-oss-120b | ||
| ``` | ||
| 
     | 
||
| Task names: | ||
| - GPT-OSS 120B (needs 80GB of GPU memory): `example__nlp_gpt_oss_120b` | ||
| 
     | 
||
| ### Running the ETL | ||
| 
     | 
||
| Now that your model is ready, let's run the ETL on some notes! | ||
| 
     | 
||
| Below is the command line to use. | ||
| You'll need to change the bucket names and paths to wherever you set up your AWS infrastructure. | ||
| And you'll want to change the task name as appropriate for your model. | ||
| Leave the odd looking `%EXAMPLE%` bit in place; | ||
| that just tells Cumulus ETL to use its built-in example documents as the input. | ||
| 
     | 
||
| The output and PHI bucket locations should be the same as your normal ETL runs on raw FHIR data. | ||
| There's no actual PHI in this example run because of the synthetic data, | ||
| but normally there is, and that PHI bucket is where Cumulus ETL keeps caches of NLP results. | ||
| 
     | 
||
| ```sh | ||
| docker compose run --rm \ | ||
| cumulus-etl nlp \ | ||
| %EXAMPLE% \ | ||
| s3://my-output-bucket/ \ | ||
| s3://my-phi-bucket/ \ | ||
| --task example__nlp_gpt4 | ||
| ``` | ||
| 
     | 
||
| (If this were a real study, you'd probably do this a bit differently. | ||
| You'd point at your real DocumentReference resources for example. | ||
| And you'd probably restrict the set of documents you run NLP on with an argument | ||
| like `--cohort-athena-table study__my_cohort`. | ||
| But for this run-through, we're going to hand-wave all the document selection pieces.) | ||
| 
     | 
||
| ### Running the Crawler | ||
| 
     | 
||
| Whenever you write a new table to S3, you'll want to run your AWS Glue crawler again, | ||
| so that the table's schema gets set correctly in Athena. | ||
| 
     | 
||
| First, confirm that your AWS Cloud Formation templates have the `example__nlp_*` tables | ||
| configured in them. If not, try copying the Glue crawler definition from | ||
| [the sample template we provide](../setup/aws.md). | ||
| 
     | 
||
| Then go to your AWS console, in the AWS Glue service, in the sidebar under Data Catalog, and | ||
| choose Crawlers. | ||
| You should see your crawler listed there. Select it, click Run, and wait for it to finish. | ||
| 
     | 
||
| ### Confirm the Data in Athena | ||
| 
     | 
||
| While you're in the AWS console, switch to the Athena service and select the appropriate | ||
| Cumulus workgroup and database. | ||
| 
     | 
||
| Then if you make a query like below (assuming you used the GPT4 model), | ||
| you should see eight results with extracted ages. | ||
| ```sql | ||
| select * from example__nlp_gpt4 | ||
| ``` | ||
| 
     | 
||
| **Congratulations!** | ||
| You've now run NLP on some synthetic clinical notes and uploaded the results to Athena. | ||
| Those extracted ages could now be post-processed by the `example` study to calculate age ranges, | ||
| and then confirmed with chart review by humans. | ||
| 
     | 
||
| At least that's the flow you'd use for a real study. | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| """Tests for etl/studies/example/""" | ||
| 
     | 
||
| import ddt | ||
| import pydantic | ||
| 
     | 
||
| from cumulus_etl.etl.studies.example.example_tasks import AgeMention | ||
| from tests.etl import BaseEtlSimple | ||
| from tests.nlp.utils import OpenAITestCase | ||
| 
     | 
||
| 
     | 
||
| @ddt.ddt | ||
| class TestExampleTask(OpenAITestCase, BaseEtlSimple): | ||
| """Test case for example tasks""" | ||
| 
     | 
||
| def default_content(self) -> pydantic.BaseModel: | ||
| return AgeMention(has_mention=True, spans=["year-old"], age=20) | ||
| 
     | 
||
| @ddt.data( | ||
| "example__nlp_gpt_oss_120b", | ||
| "example__nlp_gpt4", | ||
| "example__nlp_gpt4o", | ||
| "example__nlp_gpt5", | ||
| "example__nlp_llama4_scout", | ||
| ) | ||
| async def test_basic_etl(self, task_name): | ||
| for _ in range(8): | ||
| self.mock_response() | ||
| await self.run_etl(tasks=[task_name], input_path="%EXAMPLE%") | ||
| self.assertEqual(self.mock_create.call_count, 8) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this
%EXAMPLE%approach gross? Got a better idea?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two approaches come to mind:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like
--use-example-dataor something? But then what do we do with the normally-required input folder positional CLI arg? Make it not required if we see the other argument? Adds some complexity to the code but could work. And adds a new CLI arg to the pile.Yeah that would be the traditional approach, and more similar to "real" studies, which is nice. But... it adds several steps to the instructions and means that the docker compose lines have to have the extra complexity of "OK now volume mount where you put that folder and refer to it on the command line from the mounted location" stuff - which again, they'll have to do eventually for real data. But I was hoping to avoid for the simple workflow case.
Were you just brainstorming, and/or do you dislike
%EXAMPLE%? Like, how much on a scale of 1-10? 😄 I dislike it about a 3. And slightly prefer it to the above I think, which I'd put at 4-5 maybe.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not my favorite, but I think it's... fine?
I like the real study approach but only with the idea of 'get everything to use the example study for demonstration purposes' which is maybe a bit more of a lift, but I don't think it's a must do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah that's interesting thought - to re-use this elsewhere. I'd have a concern about viability though. Like, these docs are chosen for matching what the NLP task needs - certain "final" status codes, having a clear age listed in the doc, etc. If we use them elsewhere, we might struggle to make the data fit all use cases - and/or add more data that would make this small token test into a larger one.
HOWEVER, I do intend to make a little
examplestudy for the Library - I was thinking of having it built-in likecoreanddiscovery, again preferring ease-of-use over some realism for the non-NLP bits of this workflow. If that approach is no good, we would make a new repo for it, and that might be a reasonable place to put this data.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We talked about this separately. I think we're fine with this current approach for now, but we want this document and its parent "NLP overview for execs" doc to expand a bit and likely move to the global cumulus docs repo. But this hand-wavy approach around "where do the docs come from" is fine for this "NLP overview for engineers" doc.