Skip to content

Expose schema enforcement via SimpleKGPipeline #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

- Added the `run_with_context` method to `Component`. This method includes a `context_` parameter, which provides information about the pipeline from which the component is executed (e.g., the `run_id`). It also enables the component to send events to the pipeline's callback function.

### Fixed

- Added `enforce_schema` parameter to `SimpleKGPipeline` for optional schema enforcement.

## 1.6.0

Expand Down
8 changes: 7 additions & 1 deletion src/neo4j_graphrag/experimental/pipeline/kg_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
from neo4j_graphrag.experimental.components.kg_writer import KGWriter
from neo4j_graphrag.experimental.components.pdf_loader import DataLoader
from neo4j_graphrag.experimental.components.text_splitters.base import TextSplitter
from neo4j_graphrag.experimental.components.types import LexicalGraphConfig
from neo4j_graphrag.experimental.components.types import (
LexicalGraphConfig,
SchemaEnforcementMode,
)
from neo4j_graphrag.experimental.pipeline.config.object_config import ComponentType
from neo4j_graphrag.experimental.pipeline.config.runner import PipelineRunner
from neo4j_graphrag.experimental.pipeline.config.template_pipeline import (
Expand Down Expand Up @@ -61,6 +64,7 @@ class SimpleKGPipeline:
- dict: following the SchemaRelation schema, ie with label, description and properties keys

potential_schema (Optional[List[tuple]]): A list of potential schema relationships.
enforce_schema (str): Validation of the extracted entities/rels against the provided schema. Defaults to "NONE", where schema enforcement will be ignored even if the schema is provided. Possible values "None" or "STRICT".
from_pdf (bool): Determines whether to include the PdfLoader in the pipeline.
If True, expects `file_path` input in `run` methods.
If False, expects `text` input in `run` methods.
Expand All @@ -81,6 +85,7 @@ def __init__(
entities: Optional[Sequence[EntityInputType]] = None,
relations: Optional[Sequence[RelationInputType]] = None,
potential_schema: Optional[List[tuple[str, str, str]]] = None,
enforce_schema: str = "NONE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this type be Optional[SchemaEnforcementMode] instead and default to None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things here:

  • we wanted SimpleKGPipeline to accept a string and convert it internally to the enum value (for consistency with on_error param)
  • as for Optional, I don't mind. I initially made it similar to what we have with on_error, but I also thought having it like this will avoid extra conditions for None values. It is also "semantically" different (SchemaEnforcementMode.NONE means that we know the enforcement option chosen by the user -> no enforcement , None means the choice of schema enforcement is unset or unknown). But maybe we don't care? wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, in that case can we use an Enum here? It's more explicit imo

e.g.

class SchemaEnforced(Enum):
    default = "NONE"
    strict = "STRICT"

then the default is

    enforce_schema: str = SchemaEnforced.default

from_pdf: bool = True,
text_splitter: Optional[TextSplitter] = None,
pdf_loader: Optional[DataLoader] = None,
Expand All @@ -100,6 +105,7 @@ def __init__(
entities=entities or [],
relations=relations or [],
potential_schema=potential_schema,
enforce_schema=SchemaEnforcementMode(enforce_schema),
from_pdf=from_pdf,
pdf_loader=ComponentType(pdf_loader) if pdf_loader else None,
kg_writer=ComponentType(kg_writer) if kg_writer else None,
Expand Down
14 changes: 14 additions & 0 deletions tests/unit/experimental/pipeline/test_kg_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,20 @@ def test_simple_kg_pipeline_on_error_invalid_value() -> None:
)


def test_simple_kg_pipeline_enforce_schema_invalid_value() -> None:
llm = MagicMock(spec=LLMInterface)
driver = MagicMock(spec=neo4j.Driver)
embedder = MagicMock(spec=Embedder)

with pytest.raises(PipelineDefinitionError):
SimpleKGPipeline(
llm=llm,
driver=driver,
embedder=embedder,
enforce_schema="INVALID_VALUE",
)


@mock.patch(
"neo4j_graphrag.experimental.components.kg_writer.get_version",
return_value=((5, 23, 0), False, False),
Expand Down