Skip to content

Convert to Mixins for Workflows #220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion document_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
TokenFilter,
VoteFilter,
)
from services.workflows.vector_react import add_documents_to_vectors
from services.workflows.chat import add_documents_to_vectors

# Load environment variables
dotenv.load_dotenv()
Expand Down
17 changes: 13 additions & 4 deletions proposal_evaluation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ async def test_proposal_evaluation_workflow():
# Create a test proposal
proposal_id = await create_test_proposal(dao_id)

# Use a consistent test wallet ID
test_wallet_id = UUID("532fd36b-8a9d-4fdd-82d2-25ddcf007488")

# Test scenarios
scenarios = [
{
Expand All @@ -107,7 +110,7 @@ async def test_proposal_evaluation_workflow():
},
{
"name": "Auto-vote Enabled",
"auto_vote": False, # Fixed: Changed to True for auto-vote scenario
"auto_vote": True, # Corrected: Changed to True for auto-vote scenario
"confidence_threshold": 0.7,
"description": "Testing proposal evaluation with auto-voting",
},
Expand All @@ -128,14 +131,15 @@ async def test_proposal_evaluation_workflow():
if scenario["auto_vote"]:
result = await evaluate_and_vote_on_proposal(
proposal_id=proposal_id,
wallet_id=test_wallet_id, # Add wallet_id for auto-vote scenarios
auto_vote=scenario["auto_vote"],
confidence_threshold=scenario["confidence_threshold"],
dao_id=dao_id,
)
else:
result = await evaluate_proposal_only(
proposal_id=proposal_id,
wallet_id=UUID("532fd36b-8a9d-4fdd-82d2-25ddcf007488"),
wallet_id=test_wallet_id, # Use the same consistent wallet ID
)

# Print the results
Expand All @@ -145,8 +149,13 @@ async def test_proposal_evaluation_workflow():
print(f"Approval: {result['evaluation']['approve']}")
print(f"Confidence: {result['evaluation']['confidence_score']}")
print(f"Reasoning: {result['evaluation']['reasoning']}")
print(f"Token Usage: {result['token_usage']}")
print(f"Cost: ${result['token_costs']['total_cost']:.4f}")
print(
f"Total Token Usage by Model: {result.get('total_token_usage_by_model')}"
)
print(f"Total Cost by Model: {result.get('total_cost_by_model')}")
print(
f"Total Overall Cost: ${result.get('total_overall_cost', 0.0):.4f}"
)

if scenario["auto_vote"]:
print(f"Auto-voted: {result['auto_voted']}")
Expand Down
4 changes: 2 additions & 2 deletions services/runner/tasks/dao_proposal_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ async def process_message(self, message: QueueMessage) -> Dict[str, Any]:
confidence = evaluation.get("confidence_score", 0.0)
reasoning = evaluation.get("reasoning", "No reasoning provided")
formatted_prompt = result.get("formatted_prompt", "No prompt provided")
total_cost = result.get("token_costs", {}).get("total_cost", 0.0)
model = result.get("model_info", {}).get("name", "Unknown")
total_cost = result.get("total_overall_cost", 0.0)
model = result.get("evaluation_model_info", {}).get("name", "Unknown")

logger.info(
f"Proposal {proposal.id} ({dao.name}): Evaluated with result "
Expand Down
4 changes: 2 additions & 2 deletions services/runner/tasks/dao_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
QueueMessageType,
)
from lib.logger import configure_logger
from services.workflows import execute_langgraph_stream
from services.workflows import execute_workflow_stream
from tools.tools_factory import filter_tools_by_names, initialize_tools

from ..base import BaseTask, JobContext, RunnerConfig, RunnerResult
Expand Down Expand Up @@ -181,7 +181,7 @@ async def _process_dao_message(self, message: QueueMessage) -> DAOProcessingResu
logger.debug(f"DAO deployment parameters: {tool_input}")

deployment_data = {}
async for chunk in execute_langgraph_stream(
async for chunk in execute_workflow_stream(
history=[], input_str=tool_input, tools_map=self.tools_map
):
if chunk["type"] == "result":
Expand Down
4 changes: 2 additions & 2 deletions services/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from backend.models import JobBase, JobCreate, StepCreate, Task, TaskFilter
from lib.logger import configure_logger
from lib.persona import generate_persona
from services.workflows import execute_langgraph_stream
from services.workflows import execute_workflow_stream
from tools.tools_factory import exclude_tools_by_names, initialize_tools

logger = configure_logger(__name__)
Expand Down Expand Up @@ -142,7 +142,7 @@ async def _process_job_stream(
["db_update_scheduled_task", "db_add_scheduled_task"], tools_map
)

stream_generator = execute_langgraph_stream(
stream_generator = execute_workflow_stream(
history=history,
input_str=task.prompt,
persona=persona,
Expand Down
65 changes: 20 additions & 45 deletions services/workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,21 @@
BaseWorkflowMixin,
ExecutionError,
LangGraphError,
PlanningCapability,
MessageContent,
MessageProcessor,
StateType,
StreamingCallbackHandler,
StreamingError,
ValidationError,
VectorRetrievalCapability,
)

# Enhanced ReAct workflow variants
from services.workflows.preplan_react import (
PreplanLangGraphService,
PreplanReactWorkflow,
PreplanState,
execute_preplan_react_stream,
# Remove all imports from deleted files and import from chat.py
from services.workflows.chat import (
ChatService,
ChatWorkflow,
execute_chat_stream,
)
from services.workflows.planning_mixin import PlanningCapability

# Special purpose workflows
from services.workflows.proposal_evaluation import (
Expand All @@ -30,15 +31,6 @@

# Core messaging and streaming components
# Core ReAct workflow components
from services.workflows.react import (
LangGraphService,
MessageContent,
MessageProcessor,
ReactState,
ReactWorkflow,
StreamingCallbackHandler,
execute_langgraph_stream,
)
from services.workflows.tweet_analysis import (
TweetAnalysisWorkflow,
analyze_tweet,
Expand All @@ -47,19 +39,11 @@
TweetGeneratorWorkflow,
generate_dao_tweet,
)
from services.workflows.vector_preplan_react import (
VectorPreplanLangGraphService,
VectorPreplanReactWorkflow,
VectorPreplanState,
execute_vector_preplan_stream,
)
from services.workflows.vector_react import (
VectorLangGraphService,
VectorReactState,
VectorReactWorkflow,
from services.workflows.vector_mixin import (
VectorRetrievalCapability,
add_documents_to_vectors,
execute_vector_langgraph_stream,
)
from services.workflows.web_search_mixin import WebSearchCapability

# Workflow service and factory
from services.workflows.workflow_service import (
Expand All @@ -76,7 +60,6 @@
"BaseWorkflowMixin",
"ExecutionError",
"LangGraphError",
"PlanningCapability",
"StateType",
"StreamingError",
"ValidationError",
Expand All @@ -96,22 +79,6 @@
"ReactState",
"ReactWorkflow",
"execute_langgraph_stream",
# PrePlan ReAct workflow
"PreplanLangGraphService",
"PreplanReactWorkflow",
"PreplanState",
"execute_preplan_react_stream",
# Vector ReAct workflow
"VectorLangGraphService",
"VectorReactState",
"VectorReactWorkflow",
"add_documents_to_vectors",
"execute_vector_langgraph_stream",
# Vector PrePlan ReAct workflow
"VectorPreplanLangGraphService",
"VectorPreplanReactWorkflow",
"VectorPreplanState",
"execute_vector_preplan_stream",
# Special purpose workflows
"ProposalEvaluationWorkflow",
"TweetAnalysisWorkflow",
Expand All @@ -120,4 +87,12 @@
"evaluate_and_vote_on_proposal",
"evaluate_proposal_only",
"generate_dao_tweet",
# Chat workflow
"ChatService",
"ChatWorkflow",
"execute_chat_stream",
# Mixins
"PlanningCapability",
"WebSearchCapability",
"add_documents_to_vectors",
]
Loading