Skip to content

Add entity / Relation extraction component #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
59a37a0
Pipeline (#81)
stellasia Jul 31, 2024
7388914
Entity / Relation extraction component
stellasia Jul 31, 2024
4171f65
Adds a Text Splitter (#82)
alexthomas93 Jul 31, 2024
93f893a
Add tests
stellasia Jul 31, 2024
60dcc8e
Merge branch 'feature/kg_builder' of https://github.com/neo4j/neo4j-g…
stellasia Jul 31, 2024
f1173b2
Keep it simple: remove deps to jinja for now
stellasia Aug 1, 2024
64f00b7
Merge
stellasia Aug 1, 2024
accdabd
Update example with existing components
stellasia Aug 1, 2024
123bf4b
log config in example
stellasia Aug 1, 2024
ebeda16
Fix tests
stellasia Aug 1, 2024
cf93935
Rm unused import
stellasia Aug 1, 2024
f66e197
Add copyright headers
stellasia Aug 1, 2024
08882ae
Rm debug code
stellasia Aug 1, 2024
2613f12
Try and fix tests
stellasia Aug 1, 2024
5f10cab
Unused import
stellasia Aug 1, 2024
b1f3df1
get_type_hints is failing for python 3.8/3.9, even when using __futur…
stellasia Aug 2, 2024
68458aa
Return model is also conditioned to the existence of the run method
stellasia Aug 2, 2024
947a2b0
Log when we do not raise exception to keep track of the failure
stellasia Aug 2, 2024
253d86b
Update prompt to match new KGwriter expected type
stellasia Aug 5, 2024
db8c5f7
Merge branch 'feature/kg_builder' of https://github.com/neo4j/neo4j-g…
stellasia Aug 5, 2024
a7747af
Fix test
stellasia Aug 6, 2024
1efcf64
Fix type for `examples`
stellasia Aug 6, 2024
19674ab
Merge branch 'feature/kg_builder' of https://github.com/neo4j/neo4j-g…
stellasia Aug 7, 2024
760cfb5
Merge branch 'feature/kg_builder' of https://github.com/neo4j/neo4j-g…
stellasia Aug 7, 2024
6b1c865
Use SchemaConfig as input for the ER Extractor component
stellasia Aug 7, 2024
9483d7a
The "base" EntityRelationExtractor is an ABC that must be subclassed
stellasia Aug 7, 2024
9ea75d9
Make node IDs unique across several runs of the pipeline by prefixing…
stellasia Aug 7, 2024
0c46445
Option to build lexical graph in the ERExtractor component
stellasia Aug 7, 2024
8d10bb2
Fix one test
stellasia Aug 7, 2024
5d329c2
Fix some more tests
stellasia Aug 7, 2024
ce02a76
Fix some more tests
stellasia Aug 7, 2024
1450bbe
Remove "type: ignore" comments
stellasia Aug 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 132 additions & 88 deletions examples/pipeline/kg_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,103 +15,147 @@
from __future__ import annotations

import asyncio
import logging
import logging.config
from typing import Any

from neo4j_genai.pipeline import Component, DataModel
from pydantic import BaseModel, validate_call

logging.basicConfig(level=logging.DEBUG)


class DocumentChunkModel(DataModel):
chunks: list[str]


class DocumentChunker(Component):
async def run(self, text: str) -> DocumentChunkModel:
chunks = [t.strip() for t in text.split(".") if t.strip()]
return DocumentChunkModel(chunks=chunks)


class SchemaModel(DataModel):
data_schema: str


class SchemaBuilder(Component):
async def run(self, schema: str) -> SchemaModel:
return SchemaModel(data_schema=schema)


class EntityModel(BaseModel):
label: str
properties: dict[str, str]


class Neo4jGraph(DataModel):
entities: list[dict[str, Any]]
relations: list[dict[str, Any]]


class ERExtractor(Component):
async def _process_chunk(self, chunk: str, schema: str) -> dict[str, Any]:
return {
"entities": [{"label": "Person", "properties": {"name": "John Doe"}}],
"relations": [],
}

async def run(self, chunks: list[str], schema: str) -> Neo4jGraph:
tasks = [self._process_chunk(chunk, schema) for chunk in chunks]
result = await asyncio.gather(*tasks)
merged_result: dict[str, Any] = {"entities": [], "relations": []}
for res in result:
merged_result["entities"] += res["entities"]
merged_result["relations"] += res["relations"]
return Neo4jGraph(
entities=merged_result["entities"], relations=merged_result["relations"]
)


class WriterModel(DataModel):
status: str
entities: list[EntityModel]
relations: list[EntityModel]


class Writer(Component):
@validate_call
async def run(self, graph: Neo4jGraph) -> WriterModel:
entities = graph.entities
relations = graph.relations
return WriterModel(
status="OK",
entities=[EntityModel(**e) for e in entities],
relations=[EntityModel(**r) for r in relations],
)


if __name__ == "__main__":
from neo4j_genai.pipeline import Pipeline

import neo4j
from langchain_text_splitters import CharacterTextSplitter
from neo4j_genai.components.entity_relation_extractor import (
LLMEntityRelationExtractor,
OnError,
)
from neo4j_genai.components.kg_writer import Neo4jWriter
from neo4j_genai.components.schema import (
SchemaBuilder,
SchemaEntity,
SchemaProperty,
SchemaRelation,
)
from neo4j_genai.components.text_splitters.langchain import LangChainTextSplitterAdapter
from neo4j_genai.llm import OpenAILLM
from neo4j_genai.pipeline import Pipeline

# set log level to DEBUG for all neo4j_genai.* loggers
logging.config.dictConfig(
{
"version": 1,
"handlers": {
"console": {
"class": "logging.StreamHandler",
}
},
"loggers": {
"root": {
"handlers": ["console"],
},
"neo4j_genai": {
"level": "DEBUG",
},
},
}
)


async def main(neo4j_driver: neo4j.Driver) -> dict[str, Any]:
"""This is where we define and run the KG builder pipeline, instantiating a few
components:
- Text Splitter: in this example we use a text splitter from the LangChain package
- Schema Builder: this component takes a list of entities, relationships and
possible triplets as inputs, validate them and return a schema ready to use
for the rest of the pipeline
- LLM Entity Relation Extractor is an LLM-based entity and relation extractor:
based on the provided schema, the LLM will do its best to identity these
entities and their relations within the provided text
- KG writer: once entities and relations are extracted, they can be writen
to a Neo4j database
"""
pipe = Pipeline()
pipe.add_component("chunker", DocumentChunker())
# define the components
pipe.add_component(
"splitter",
LangChainTextSplitterAdapter(
# chunk_size=50 for the sake of this demo
CharacterTextSplitter(chunk_size=50, chunk_overlap=10, separator=".")
),
)
pipe.add_component("schema", SchemaBuilder())
pipe.add_component("extractor", ERExtractor())
pipe.add_component("writer", Writer())
pipe.connect("chunker", "extractor", input_config={"chunks": "chunker.chunks"})
pipe.connect("schema", "extractor", input_config={"schema": "schema.data_schema"})
pipe.add_component(
"extractor",
LLMEntityRelationExtractor(
llm=OpenAILLM(
model_name="gpt-4o",
model_params={
"max_tokens": 1000,
"response_format": {"type": "json_object"},
},
),
on_error=OnError.RAISE,
),
)
pipe.add_component("writer", Neo4jWriter(neo4j_driver))
# define the execution order of component
# and how the output of previous components must be used
pipe.connect("splitter", "extractor", input_config={"chunks": "splitter"})
pipe.connect("schema", "extractor", input_config={"schema": "schema"})
pipe.connect(
"extractor",
"writer",
input_config={"graph": "extractor"},
)

# user input:
# the initial text
# and the list of entities and relations we are looking for
pipe_inputs = {
"chunker": {
"text": """Graphs are everywhere.
GraphRAG is the future of Artificial Intelligence.
Robots are already running the world."""
"splitter": {
"text": """Albert Einstein was a German physicist born in 1879 who
wrote many groundbreaking papers especially about general relativity
and quantum mechanics. He worked for many different institutions, including
the University of Bern in Switzerland and the University of Oxford."""
},
"schema": {
"entities": [
SchemaEntity(
label="Person",
properties=[
SchemaProperty(name="name", type="STRING"),
SchemaProperty(name="place_of_birth", type="STRING"),
SchemaProperty(name="date_of_birth", type="DATE"),
],
),
SchemaEntity(
label="Organization",
properties=[
SchemaProperty(name="name", type="STRING"),
SchemaProperty(name="country", type="STRING"),
],
),
SchemaEntity(
label="Field",
properties=[
SchemaProperty(name="name", type="STRING"),
],
),
],
"relations": [
SchemaRelation(
label="WORKED_ON",
),
SchemaRelation(
label="WORKED_FOR",
),
],
"potential_schema": [
("Person", "WORKED_ON", "Field"),
("Person", "WORKED_FOR", "Organization"),
],
},
"schema": {"schema": "Person OWNS House"},
}
print(asyncio.run(pipe.run(pipe_inputs)))
# run the pipeline
return await pipe.run(pipe_inputs)


if __name__ == "__main__":
with neo4j.GraphDatabase.driver(
"bolt://localhost:7687", auth=("neo4j", "password")
) as driver:
print(asyncio.run(main(driver)))
Loading