Skip to content

Streaming chunk accumulation #741

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
29ee128
preliminary code and pseudocode
nichwch May 7, 2024
66a5333
add chunk accumulation strategy to Validator base class
nichwch May 7, 2024
7831466
handle case where llm chunk > validator chunk in validator class
nichwch May 8, 2024
2dbae2e
added some questions
nichwch May 8, 2024
1e3544d
change stream_runner to handle the result of iterable validate
nichwch May 8, 2024
e9084e1
format
nichwch May 8, 2024
1269f68
change validator base to use a chunking function instead of specifyin…
nichwch May 10, 2024
b454cd5
connect streaming all the way down call chain, include validated chun…
nichwch May 10, 2024
b64ab4e
change execute_validator to handle streaming
nichwch May 10, 2024
bf2bd32
make validate take stream parameter, remove validate_stream in top le…
nichwch May 10, 2024
c79e9b2
use wyatts sentence splitting strategy
nichwch May 10, 2024
4583cb9
import nltk
nichwch May 10, 2024
f1b4a88
use stream-enabled execute_validator
nichwch May 14, 2024
289745c
format
nichwch May 14, 2024
58d8eed
fix bug where json_schema was being called with streaming
nichwch May 15, 2024
947f476
conditionally use old logic for json_schema to avoid breaking json_sc…
nichwch May 16, 2024
8b2c154
validate remainders
nichwch May 16, 2024
d6c3739
merge in main
nichwch May 16, 2024
0ab245c
new chunk span validation schema
nichwch May 16, 2024
a320464
field for reason that validation failed for a given span
nichwch May 16, 2024
93bb781
add validated_chunk to ValidationResult
nichwch May 17, 2024
1381821
add helper method to get a list of error spans relative to llm output
nichwch May 17, 2024
3ccdda1
conceptual question
nichwch May 17, 2024
6fdbcd1
Merge branch 'main' into nichwch/chunk-accumulation-rewrite
nichwch May 17, 2024
f455ae2
Merge branch 'nichwch/chunk-accumulation-rewrite' into nichwch/stream…
nichwch May 17, 2024
74485eb
turn chunking_function into class method
nichwch May 17, 2024
a39b5af
incomplete tests for streaming chunk accumulation
nichwch May 17, 2024
0ae850e
format
nichwch May 20, 2024
847dd0a
remove print
nichwch May 20, 2024
f0b3030
fix a few bugs uncovered by testing
nichwch May 20, 2024
e8b6069
tests (WIP) for streaming
nichwch May 20, 2024
628e490
Merge branch 'main' into nichwch/chunk-accumulation-rewrite
nichwch May 21, 2024
a9a91a1
merge
nichwch May 21, 2024
eec8e19
base model
nichwch May 21, 2024
8726a28
optional typing to avoid breaking existing validators
nichwch May 21, 2024
ba68eb6
top level helper function for spans on guard, patch validated_chunk i…
nichwch May 21, 2024
2607423
attempt to use openai finish_reason field
nichwch May 21, 2024
da720c3
add comment explaining problem with using openai finish_message
nichwch May 21, 2024
8bdb292
test error span behavior
nichwch May 22, 2024
0abac83
address some changes
nichwch May 28, 2024
8f45a0a
handle case where llm callable doesnt provide finished flag
nichwch May 28, 2024
dfcd3b8
Merge pull request #771 from guardrails-ai/nichwch/streaming-error-spans
nichwch May 28, 2024
fe56871
Merge branch 'feat/streaming-update' into nichwch/chunk-accumulation-…
CalebCourier May 30, 2024
b52b8cb
lint, type, and test fixes
CalebCourier May 30, 2024
0aede77
use status for validation_passed in streaming
CalebCourier May 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions guardrails/run/stream_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,27 @@ def step(
chunk_text = self.get_chunk_text(chunk, api)
fragment += chunk_text

# 2. Parse the fragment
parsed_fragment, move_to_next = self.parse(
index, fragment, output_schema, verified
# 2. Parse the chunk
# I assume we have to parse the chunk before validating it...
parsed_chunk, move_to_next = self.parse(
index, chunk, output_schema, verified
)
if move_to_next:
# Continue to next chunk
continue

# 3. Run output validation
# If validator chunk size is smaller than LLM chunk size:
# split llm chunk down into validator-sized chunks
# Question: How can I tell what the validator chunk size is?

# If validator chunk size is larger, pass to validator.
# Validator will return None until it's accumulated enough
# Don't forget to validate incomplete chunks at the end.
validated_fragment = self.validate(
iteration,
index,
parsed_fragment,
parsed_chunk,
output_schema,
validate_subschema=True,
)
Expand Down
3 changes: 3 additions & 0 deletions guardrails/schema/string_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def validate(
disable_tracer: Optional[bool] = True,
**kwargs,
) -> Any:
# TODO: add class field to track number of chunks accumulated
# If not enough chunks have been accumulated, emit None
# Once enough chunks have been accumulated, validate and emit the result
"""Validate a dictionary of data against the schema.

Args:
Expand Down
45 changes: 45 additions & 0 deletions guardrails/validator_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Any,
Callable,
Dict,
Iterable,
List,
Literal,
Optional,
Expand All @@ -26,6 +27,8 @@
from guardrails.errors import ValidationError
from guardrails.utils.dataclass import dataclass

VALIDATOR_CHUNKING_STRATEGIES = Enum('VALIDATOR_CHUNKING_STRATEGIES', ['WORD', 'SENTENCE', 'PARAGRAPH'])

VALIDATOR_IMPORT_WARNING = """Accessing `{validator_name}` using
`from guardrails.validators import {validator_name}` is deprecated and
support will be removed after version 0.5.x. Please switch to the Guardrails Hub syntax:
Expand Down Expand Up @@ -174,6 +177,14 @@ class Filter:
class Refrain:
pass

def is_word(chunk:str) -> bool:
return ' ' in chunk

def is_sentence(chunk:str) -> bool:
return '.' in chunk

def is_paragraph(chunk:str) -> bool:
return '\n' in chunk

def check_refrain_in_list(schema: List) -> bool:
"""Checks if a Refrain object exists in a list.
Expand Down Expand Up @@ -390,6 +401,8 @@ class Validator(Runnable):

rail_alias: str = ""

chunking_strategy=VALIDATOR_CHUNKING_STRATEGIES.SENTENCE
accumulated_chunks = []
run_in_separate_process = False
override_value_on_pass = False
required_metadata_keys = []
Expand Down Expand Up @@ -452,6 +465,38 @@ def validate(self, value: Any, metadata: Dict[str, Any]) -> ValidationResult:
"""Validates a value and return a validation result."""
raise NotImplementedError

def validate_stream(self, chunk:Any, metadata: Dict[str, Any]) -> ValidationResult:
"""Validates a chunk emitted by an LLM.
If the LLM chunk is smaller than the validator's chunking strategy,
it will be accumulated until it reaches the desired size. In the meantime,
the validator will return None.

Otherwise, the validator will validate the chunk and return the result.
"""
# combine accumulated chunks and new chunk
self.accumulated_chunks.append(chunk)
# check if enough chunks have accumulated for validation
accumulated_enough = self.accumulated_enough_to_validate()
if not accumulated_enough:
return None
# if we've accumulated enough chunks, validate the accumulated chunks
accumulated_text = ''.join(self.accumulated_chunks)
# remove the accummulated chunks
self.accumulated_chunks = []
return self.validate(accumulated_text, metadata)


def accumulated_enough_to_validate(self) -> bool:
accumulated_text = ''.join(self.accumulated_chunks)
"""Check if the accumulated chunks are large enough to be validated."""
if(self.chunking_strategy == VALIDATOR_CHUNKING_STRATEGIES.WORD):
return is_word(accumulated_text)
if(self.chunking_strategy == VALIDATOR_CHUNKING_STRATEGIES.SENTENCE):
return is_sentence(accumulated_text)
if(self.chunking_strategy == VALIDATOR_CHUNKING_STRATEGIES.PARAGRAPH):
return is_paragraph(accumulated_text)


def to_prompt(self, with_keywords: bool = True) -> str:
"""Convert the validator to a prompt.

Expand Down