Skip to content

First draft of pipeline/component architecture. Example using the RAG… #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 50 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
7d54050
First draft of pipeline/component architecture. Example using the RAG…
stellasia Jul 17, 2024
1519b2c
More complex implementation of pipeline to deal with branching and ag…
stellasia Jul 17, 2024
897f1fa
Introduce Store to add flexibility as where to store pipeline results…
stellasia Jul 18, 2024
1391732
Test RAG with new Pipeline implementation
stellasia Jul 18, 2024
86554af
File refactoring
stellasia Jul 18, 2024
c48c3f3
Pipeline orchestration with async support
stellasia Jul 18, 2024
edb9af7
Import sorting
stellasia Jul 18, 2024
7823e6e
Pipeline rerun + exception on cyclic graph (for now)
stellasia Jul 19, 2024
ccd5970
Mypy
stellasia Jul 19, 2024
dfe6d9d
Python version compat
stellasia Jul 19, 2024
a38f895
Rename process->run for Components for consistency with Pipeline
stellasia Jul 19, 2024
3d3f988
Move components test in the example folder - add some tests
stellasia Jul 19, 2024
2bcd347
Race condition fix - documentation - ruff
stellasia Jul 22, 2024
d550888
Fix import sorting
stellasia Jul 22, 2024
b307fa0
mypy on tests
stellasia Jul 22, 2024
aeebd24
Mark test as async
stellasia Jul 22, 2024
266a908
Tests were not testing...
stellasia Jul 22, 2024
082764a
Ability to create Pipeline templates
stellasia Jul 23, 2024
542def8
Ruff
stellasia Jul 23, 2024
8612e09
Future + header
stellasia Jul 23, 2024
7153978
Renaming + update import structure to make it more compatible with re…
stellasia Jul 23, 2024
f0edf15
Check input parameters before starting the pipeline
stellasia Jul 23, 2024
9c8bbd3
Introduce output model for component - Validate pipeline before runni…
stellasia Jul 24, 2024
d059882
Import..
stellasia Jul 24, 2024
3a20fa3
Finally installed pre-commit hooks...
stellasia Jul 24, 2024
95fdf60
Finally installed pre-commit hooks...
stellasia Jul 24, 2024
2613bae
Finally installed pre-commit hooks... and struggling with pydantic..
stellasia Jul 24, 2024
a1710e7
Mypy on examples
stellasia Jul 24, 2024
945f84e
Add missing header
stellasia Jul 24, 2024
ce005a7
Update doc
stellasia Jul 24, 2024
4bb9ff2
Fix import in doc
stellasia Jul 24, 2024
c66aeee
Update changelog
stellasia Jul 24, 2024
9da467e
Update docs/source/user_guide_pipeline.rst
stellasia Jul 24, 2024
2eb5832
Merge branch 'kgb/pipeline' of https://github.com/stellasia/neo4j-gen…
stellasia Jul 24, 2024
ad82891
Refactor tests folder to match src structure
stellasia Jul 25, 2024
405ee1f
Move exceptions to separate file and rename them to make it clearer t…
stellasia Jul 25, 2024
dcf96bb
Mypy
stellasia Jul 25, 2024
a6fb080
Rename def => config
stellasia Jul 25, 2024
1b7e7eb
Introduce generic type to remove most of the "type: ignore" comments
stellasia Jul 25, 2024
0e69d83
Remove unnecessary comment
stellasia Jul 25, 2024
4b2c083
Ruff
stellasia Jul 25, 2024
e5a7d29
Document and test is_cyclic method
stellasia Jul 25, 2024
f68f5f8
Remove find_all method from store (simplify data retrieval)
stellasia Jul 25, 2024
852eaa0
value is not a list anymore (or, if it is, it's on purpose)
stellasia Jul 25, 2024
96a0bd8
Remove comments, fix example in doc
stellasia Jul 26, 2024
5666a44
Remove core directory - move files to /pipeline
stellasia Jul 26, 2024
c7dae43
Expose stores from pipeline subpackage
stellasia Jul 26, 2024
fef5385
Ability to pass the full output of one component to the next one - us…
stellasia Jul 29, 2024
d7202c4
Component subclasses can return DataModel
stellasia Jul 29, 2024
1103d25
Add note on async + schema to illustrate parameter propagation
stellasia Jul 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Next

### Added
- Introduced support for Component/Pipeline flexible architecture

## 0.3.1

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Embedder
SentenceTransformerEmbeddings
================================

.. autoclass:: neo4j_genai.embeddings.SentenceTransformerEmbeddings
.. autoclass:: neo4j_genai.embeddings.sentence_transformers.SentenceTransformerEmbeddings
:members:

**********
Expand Down
6 changes: 4 additions & 2 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ Python versions supported:
Topics
******

+ :ref:`user-guide`
+ :ref:`user-guide-rag`
+ :ref:`user-guide-pipeline`
+ :ref:`api-documentation`
+ :ref:`types-documentation`

Expand All @@ -39,7 +40,8 @@ Topics
:caption: Contents:
:hidden:

user_guide.rst
user_guide_rag.rst
user_guide_pipeline.rst
api.rst
types.rst

Expand Down
81 changes: 81 additions & 0 deletions docs/source/user_guide_pipeline.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
.. _user-guide-pipeline:

User Guide: Pipeline
####################

This page provides information about how to create a pipeline.

.. node::

Pipelines run asynchronously, see examples below.

******************************
Creating Components
******************************

Components are asynchronous units of work that perform simple tasks,
such as chunking documents or saving results to Neo4j.
This package includes a few default components, but developers can create
their own by following these steps:

1. Create a subclass of the Pydantic `neo4j_genai.pipeline.DataModel` to represent the data being returned by the component
2. Create a subclass of `neo4j_genai.pipeline.Component`
3. Create a run method in this new class and specify the required inputs and output model using the just created `DataModel`
4. Implement the run method: it's an `async` method, allowing tasks to be parallelized and awaited within this method.

An example is given below, where a `ComponentAdd` is created to add two numbers together and return
the resulting sum:

.. code:: python

from neo4j_genai.pipeline import Component, DataModel

class IntResultModel(DataModel):
result: int

class ComponentAdd(Component):
async def run(self, number1: int, number2: int = 1) -> IntResultModel:
return IntResultModel(result = number1 + number2)


***************************************
Connecting Components within a Pipeline
***************************************

The ultimate aim of creating components is to assemble them into a complex pipeline
for a specific purpose, such as building a Knowledge Graph from text data.

Here's how to create a simple pipeline and propagate results from one component to another
(detailed explanations follow):

.. code:: python

import asyncio
from neo4j_genai.pipeline import Pipeline

pipe = Pipeline()
pipe.add_component("a", ComponentAdd())
pipe.add_component("b", ComponentAdd())

pipe.connect("a", "b", {"number2": "a.result"})
asyncio.run(pipe.run({"a": {"number1": 10, "number2": 1}, "b": {"number1": 4}))
# result: 10+1+4 = 15

1. First, a pipeline is created, and two components named "a" and "b" are added to it.
2. Next, the two components are connected so that "b" runs after "a", with the "number2" parameter for component "b" being the result of component "a".
3. Finally, the pipeline is run with 10 and 1 as input parameters for "a". Component "b" will receive 11 (10 + 1, the result of "a") as "number1" and 4 as "number2" (as specified in the pipeline.run parameters).

The data flow is illustrated in the diagram below:

.. code-block::

10 ---\
Component "a" -> 11
1 ----/ \
\
Component "b" -> 15
4 -------------------------/

.. warning::

Cycles are not allowed in a Pipeline.
4 changes: 2 additions & 2 deletions docs/source/user_guide.rst → docs/source/user_guide_rag.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.. _user-guide:
.. _user-guide-rag:

User Guide
User Guide: RAG
#################

This guide provides a starting point for using the Neo4j GenAI Python package
Expand Down
Empty file added examples/pipeline/__init__.py
Empty file.
117 changes: 117 additions & 0 deletions examples/pipeline/kg_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Copyright (c) "Neo4j"
# Neo4j Sweden AB [https://neo4j.com]
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# https://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import asyncio
import logging
from typing import Any

from neo4j_genai.pipeline import Component, DataModel
from pydantic import BaseModel, validate_call

logging.basicConfig(level=logging.DEBUG)


class DocumentChunkModel(DataModel):
chunks: list[str]


class DocumentChunker(Component):
async def run(self, text: str) -> DocumentChunkModel:
chunks = [t.strip() for t in text.split(".") if t.strip()]
return DocumentChunkModel(chunks=chunks)


class SchemaModel(DataModel):
data_schema: str


class SchemaBuilder(Component):
async def run(self, schema: str) -> SchemaModel:
return SchemaModel(data_schema=schema)


class EntityModel(BaseModel):
label: str
properties: dict[str, str]


class Neo4jGraph(DataModel):
entities: list[dict[str, Any]]
relations: list[dict[str, Any]]


class ERExtractor(Component):
async def _process_chunk(self, chunk: str, schema: str) -> dict[str, Any]:
return {
"entities": [{"label": "Person", "properties": {"name": "John Doe"}}],
"relations": [],
}

async def run(self, chunks: list[str], schema: str) -> Neo4jGraph:
tasks = [self._process_chunk(chunk, schema) for chunk in chunks]
result = await asyncio.gather(*tasks)
merged_result: dict[str, Any] = {"entities": [], "relations": []}
for res in result:
merged_result["entities"] += res["entities"]
merged_result["relations"] += res["relations"]
return Neo4jGraph(
entities=merged_result["entities"], relations=merged_result["relations"]
)


class WriterModel(DataModel):
status: str
entities: list[EntityModel]
relations: list[EntityModel]


class Writer(Component):
@validate_call
async def run(self, graph: Neo4jGraph) -> WriterModel:
entities = graph.entities
relations = graph.relations
return WriterModel(
status="OK",
entities=[EntityModel(**e) for e in entities],
relations=[EntityModel(**r) for r in relations],
)


if __name__ == "__main__":
from neo4j_genai.pipeline import Pipeline

pipe = Pipeline()
pipe.add_component("chunker", DocumentChunker())
pipe.add_component("schema", SchemaBuilder())
pipe.add_component("extractor", ERExtractor())
pipe.add_component("writer", Writer())
pipe.connect("chunker", "extractor", input_config={"chunks": "chunker.chunks"})
pipe.connect("schema", "extractor", input_config={"schema": "schema.data_schema"})
pipe.connect(
"extractor",
"writer",
input_config={"graph": "extractor"},
)

pipe_inputs = {
"chunker": {
"text": """Graphs are everywhere.
GraphRAG is the future of Artificial Intelligence.
Robots are already running the world."""
},
"schema": {"schema": "Person OWNS House"},
}
print(asyncio.run(pipe.run(pipe_inputs)))
112 changes: 112 additions & 0 deletions examples/pipeline/rag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright (c) "Neo4j"
# Neo4j Sweden AB [https://neo4j.com]
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# https://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This example illustrates how to use a Pipeline with
the existing Retriever and LLM interfaces. It consists
in creating a Component wrapper around the required
objects.
"""

from __future__ import annotations

import asyncio

import neo4j
from neo4j_genai.embeddings.openai import OpenAIEmbeddings
from neo4j_genai.generation import PromptTemplate, RagTemplate
from neo4j_genai.llm import LLMInterface, OpenAILLM
from neo4j_genai.pipeline import Component, Pipeline
from neo4j_genai.pipeline.component import DataModel
from neo4j_genai.pipeline.types import ComponentConfig, ConnectionConfig, PipelineConfig
from neo4j_genai.retrievers import VectorRetriever
from neo4j_genai.retrievers.base import Retriever


class StringDataModel(DataModel):
result: str


class RetrieverComponent(Component):
def __init__(self, retriever: Retriever) -> None:
self.retriever = retriever

async def run(self, query: str) -> StringDataModel:
res = self.retriever.search(query_text=query)
return StringDataModel(result="\n".join(c.content for c in res.items))


class PromptTemplateComponent(Component):
def __init__(self, prompt: PromptTemplate) -> None:
self.prompt = prompt

async def run(self, query: str, context: list[str]) -> StringDataModel:
prompt = self.prompt.format(query, context, examples="")
return StringDataModel(result=prompt)


class LLMComponent(Component):
def __init__(self, llm: LLMInterface) -> None:
self.llm = llm

async def run(self, prompt: str) -> StringDataModel:
llm_response = self.llm.invoke(prompt)
return StringDataModel(result=llm_response.content)


if __name__ == "__main__":
driver = neo4j.GraphDatabase.driver(
"bolt://localhost:7687",
auth=("neo4j", "password"),
database="neo4j",
)
embedder = OpenAIEmbeddings()
retriever = VectorRetriever(
driver, index_name="moviePlotsEmbedding", embedder=embedder
)
prompt_template = RagTemplate()
llm = OpenAILLM(model_name="gpt-4o")

pipe = Pipeline.from_template(
PipelineConfig(
components=[
ComponentConfig(
name="retrieve", component=RetrieverComponent(retriever)
),
ComponentConfig(
name="augment", component=PromptTemplateComponent(prompt_template)
),
ComponentConfig(name="generate", component=LLMComponent(llm)),
],
connections=[
ConnectionConfig(
start="retrieve",
end="augment",
input_config={"context": "retrieve.result"},
),
ConnectionConfig(
start="augment",
end="generate",
input_config={"prompt": "augment.result"},
),
],
)
)

query = "A movie about the US presidency"
result = asyncio.run(
pipe.run({"retrieve": {"query": query}, "augment": {"query": query}})
)
print(result["generate"]["result"])

driver.close()
Loading
Loading