Skip to content

Commit 8b6cf43

Browse files
willtaistellasia
andauthored
Pdf to text component (#103)
* Pipeline (#81) * First draft of pipeline/component architecture. Example using the RAG pipeline. * More complex implementation of pipeline to deal with branching and aggregations - no async yet * Introduce Store to add flexibility as where to store pipeline results - Only return the leaf components results by default * Test RAG with new Pipeline implementation * File refactoring * Pipeline orchestration with async support * Import sorting * Pipeline rerun + exception on cyclic graph (for now) * Mypy * Python version compat * Rename process->run for Components for consistency with Pipeline * Move components test in the example folder - add some tests * Race condition fix - documentation - ruff * Fix import sorting * mypy on tests * Mark test as async * Tests were not testing... * Ability to create Pipeline templates * Ruff * Future + header * Renaming + update import structure to make it more compatible with rest of the repo * Check input parameters before starting the pipeline * Introduce output model for component - Validate pipeline before running - More unit tests * Import.. * Finally installed pre-commit hooks... * Finally installed pre-commit hooks... * Finally installed pre-commit hooks... and struggling with pydantic.. * Mypy on examples * Add missing header * Update doc * Fix import in doc * Update changelog * Update docs/source/user_guide_pipeline.rst Co-authored-by: willtai <wtaisen@gmail.com> * Refactor tests folder to match src structure * Move exceptions to separate file and rename them to make it clearer they are related to pipeline * Mypy * Rename def => config * Introduce generic type to remove most of the "type: ignore" comments * Remove unnecessary comment * Ruff * Document and test is_cyclic method * Remove find_all method from store (simplify data retrieval) * value is not a list anymore (or, if it is, it's on purpose) * Remove comments, fix example in doc * Remove core directory - move files to /pipeline * Expose stores from pipeline subpackage * Ability to pass the full output of one component to the next one - useful when a component accepts a pydantic model as input * Component subclasses can return DataModel * Add note on async + schema to illustrate parameter propagation --------- Co-authored-by: willtai <wtaisen@gmail.com> * Add documentation for pipeline exceptions (#90) * Add schema for kg builder (#88) * Add schema for kg builder and tests * Fixed mypy checks * Reverted kg builder example with schema * Revert to List and Dict due to Python3.8 issue with using get_type_hints * Added properties to Entity and Relation * Add test for missing properties * Fix type annotations in test * Add property types * Refactored entity, relation, and property types * Unused import * Moved schema to components/ (#96) * Add PDFLoader Component * Added tests * Remove pypdf check * Refactored examples * Moved to experimental folder * Exposed fs to run() --------- Co-authored-by: Estelle Scifo <stellasia@users.noreply.github.com>
1 parent dd3dcfe commit 8b6cf43

File tree

11 files changed

+202
-23
lines changed

11 files changed

+202
-23
lines changed

docs/source/user_guide_pipeline.rst

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ This page provides information about how to create a pipeline.
1010

1111
Pipelines run asynchronously, see examples below.
1212

13-
******************************
13+
14+
*******************
1415
Creating Components
15-
******************************
16+
*******************
1617

1718
Components are asynchronous units of work that perform simple tasks,
1819
such as chunking documents or saving results to Neo4j.
Binary file not shown.

examples/pipeline/kg_builder_with_schema.py renamed to examples/pipeline/kg_builder_from_pdf.py

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,24 @@
1818
import logging
1919
from typing import Any
2020

21+
import neo4j
22+
from langchain_text_splitters import CharacterTextSplitter
23+
from neo4j_genai.experimental.components.entity_relation_extractor import (
24+
LLMEntityRelationExtractor,
25+
OnError,
26+
)
27+
from neo4j_genai.experimental.components.kg_writer import Neo4jWriter
28+
from neo4j_genai.experimental.components.pdf_loader import PdfLoader
2129
from neo4j_genai.experimental.components.schema import (
2230
SchemaBuilder,
2331
SchemaEntity,
2432
SchemaRelation,
2533
)
34+
from neo4j_genai.experimental.components.text_splitters.langchain import (
35+
LangChainTextSplitterAdapter,
36+
)
2637
from neo4j_genai.experimental.pipeline import Component, DataModel
38+
from neo4j_genai.llm import OpenAILLM
2739
from pydantic import BaseModel, validate_call
2840

2941
logging.basicConfig(level=logging.DEBUG)
@@ -86,7 +98,7 @@ async def run(self, graph: Neo4jGraph) -> WriterModel:
8698
)
8799

88100

89-
if __name__ == "__main__":
101+
async def main(neo4j_driver: neo4j.Driver) -> dict[str, Any]:
90102
from neo4j_genai.experimental.pipeline import Pipeline
91103

92104
# Instantiate Entity and Relation objects
@@ -96,34 +108,62 @@ async def run(self, graph: Neo4jGraph) -> WriterModel:
96108
label="ORGANIZATION",
97109
description="A structured group of people with a common purpose.",
98110
),
111+
SchemaEntity(label="LOCATION", description="A location or place."),
99112
SchemaEntity(
100-
label="AGE",
113+
label="HORCRUX",
114+
description="A magical item in the Harry Potter universe.",
101115
),
102116
]
103117
relations = [
104118
SchemaRelation(
105-
label="EMPLOYED_BY", description="Indicates employment relationship."
119+
label="SITUATED_AT", description="Indicates the location of a person."
106120
),
107121
SchemaRelation(
108-
label="ORGANIZED_BY",
109-
description="Indicates organization responsible for an event.",
122+
label="LED_BY",
123+
description="Indicates the leader of an organization.",
110124
),
111125
SchemaRelation(
112-
label="ATTENDED_BY", description="Indicates attendance at an event."
126+
label="OWNS",
127+
description="Indicates the ownership of an item such as a Horcrux.",
128+
),
129+
SchemaRelation(
130+
label="INTERACTS", description="The interaction between two people."
113131
),
114132
]
115133
potential_schema = [
116-
("PERSON", "EMPLOYED_BY", "ORGANIZATION"),
117-
("ORGANIZATION", "ATTENDED_BY", "PERSON"),
134+
("PERSON", "SITUATED_AT", "LOCATION"),
135+
("PERSON", "INTERACTS", "PERSON"),
136+
("PERSON", "OWNS", "HORCRUX"),
137+
("ORGANIZATION", "LED_BY", "PERSON"),
118138
]
119139

120140
# Set up the pipeline
121141
pipe = Pipeline()
122-
pipe.add_component("chunker", DocumentChunker())
142+
pipe.add_component("pdf_loader", PdfLoader())
143+
pipe.add_component(
144+
"splitter",
145+
LangChainTextSplitterAdapter(
146+
# chunk_size=50 for the sake of this demo
147+
CharacterTextSplitter(chunk_size=50, chunk_overlap=10, separator=".")
148+
),
149+
)
123150
pipe.add_component("schema", SchemaBuilder())
124-
pipe.add_component("extractor", ERExtractor())
125-
pipe.add_component("writer", Writer())
126-
pipe.connect("chunker", "extractor", input_config={"chunks": "chunker.chunks"})
151+
pipe.add_component(
152+
"extractor",
153+
LLMEntityRelationExtractor(
154+
llm=OpenAILLM(
155+
model_name="gpt-4o",
156+
model_params={
157+
"max_tokens": 1000,
158+
"response_format": {"type": "json_object"},
159+
},
160+
),
161+
on_error=OnError.RAISE,
162+
),
163+
)
164+
pipe.add_component("writer", Neo4jWriter(neo4j_driver))
165+
pipe.connect("pdf_loader", "splitter", input_config={"text": "pdf_loader.text"})
166+
pipe.connect("splitter", "extractor", input_config={"chunks": "splitter"})
127167
pipe.connect("schema", "extractor", input_config={"schema": "schema"})
128168
pipe.connect(
129169
"extractor",
@@ -132,15 +172,20 @@ async def run(self, graph: Neo4jGraph) -> WriterModel:
132172
)
133173

134174
pipe_inputs = {
135-
"chunker": {
136-
"text": """Graphs are everywhere.
137-
GraphRAG is the future of Artificial Intelligence.
138-
Robots are already running the world."""
175+
"pdf_loader": {
176+
"filepath": "examples/pipeline/Harry Potter and the Death Hallows Summary.pdf"
139177
},
140178
"schema": {
141179
"entities": entities,
142180
"relations": relations,
143181
"potential_schema": potential_schema,
144182
},
145183
}
146-
print(asyncio.run(pipe.run(pipe_inputs)))
184+
return await pipe.run(pipe_inputs)
185+
186+
187+
if __name__ == "__main__":
188+
with neo4j.GraphDatabase.driver(
189+
"bolt://localhost:7687", auth=("neo4j", "password")
190+
) as driver:
191+
print(asyncio.run(main(driver)))

poetry.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ weaviate-client = {version = "^4.6.1", optional = true}
3636
pinecone-client = {version = "^4.1.0", optional = true}
3737
types-mock = "^5.1.0.20240425"
3838
eval-type-backport = "^0.2.0"
39+
pypdf = "^4.3.1"
3940

4041
[tool.poetry.group.dev.dependencies]
4142
pylint = "^3.1.0"

src/neo4j_genai/exceptions.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,13 @@ class SchemaFetchError(Neo4jGenAiError):
106106
pass
107107

108108

109-
class SchemaValidationError(Exception):
109+
class SchemaValidationError(Neo4jGenAiError):
110110
"""Custom exception for errors in schema configuration."""
111111

112112
pass
113+
114+
115+
class PdfLoaderError(Neo4jGenAiError):
116+
"""Custom exception for errors in PDF loader."""
117+
118+
pass
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Copyright (c) "Neo4j"
2+
# Neo4j Sweden AB [https://neo4j.com]
3+
# #
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
# #
8+
# https://www.apache.org/licenses/LICENSE-2.0
9+
# #
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
import io
16+
from abc import abstractmethod
17+
from pathlib import Path
18+
from typing import Optional, Union
19+
20+
import fsspec
21+
import pypdf
22+
from fsspec import AbstractFileSystem
23+
from fsspec.implementations.local import LocalFileSystem
24+
25+
from neo4j_genai.exceptions import PdfLoaderError
26+
from neo4j_genai.experimental.pipeline import Component, DataModel
27+
28+
29+
class PdfDocument(DataModel):
30+
text: str
31+
32+
33+
class DataLoader(Component):
34+
"""
35+
Interface for loading data of various input types.
36+
"""
37+
38+
@abstractmethod
39+
async def run(self, filepath: Path) -> PdfDocument:
40+
pass
41+
42+
43+
def is_default_fs(fs: fsspec.AbstractFileSystem) -> bool:
44+
return isinstance(fs, LocalFileSystem) and not fs.auto_mkdir
45+
46+
47+
class PdfLoader(DataLoader):
48+
@staticmethod
49+
def load_file(
50+
file: Union[Path, str],
51+
fs: Optional[AbstractFileSystem] = None,
52+
) -> str:
53+
"""Parse PDF file and return text."""
54+
if not isinstance(file, Path):
55+
file = Path(file)
56+
57+
fs = fs or LocalFileSystem()
58+
59+
try:
60+
with fs.open(file, "rb") as fp:
61+
stream = fp if is_default_fs(fs) else io.BytesIO(fp.read())
62+
pdf = pypdf.PdfReader(stream)
63+
num_pages = len(pdf.pages)
64+
text_parts = (
65+
pdf.pages[page].extract_text() for page in range(num_pages)
66+
)
67+
full_text = "\n".join(text_parts)
68+
69+
return full_text
70+
except Exception as e:
71+
raise PdfLoaderError(e)
72+
73+
async def run(
74+
self,
75+
filepath: Path,
76+
fs: Optional[AbstractFileSystem] = None,
77+
) -> PdfDocument:
78+
return PdfDocument(text=self.load_file(filepath, fs))

src/neo4j_genai/experimental/components/schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def check_schema(cls, data: Dict[str, Any]) -> Dict[str, Any]:
9292
)
9393
if entity2 not in entities:
9494
raise SchemaValidationError(
95-
f"Entity '{entity1}' is not defined in the provided entities."
95+
f"Entity '{entity2}' is not defined in the provided entities."
9696
)
9797

9898
return data
Binary file not shown.

0 commit comments

Comments
 (0)