Workflows¶
Sikka Agent's Workflows module enables orchestrating multiple agents to solve complex problems through coordination and collaboration.
Overview¶
Workflows allow you to create sophisticated multi-agent systems where specialized agents work together on complex tasks. Key capabilities include:
- Directed Acyclic Graph (DAG) based workflows for sequential or parallel processing
- Role-playing configurations for simulating interactions between agents
- Task distribution across worker agents with different specializations
- Hierarchical agent structures with supervisor-worker relationships
Core Workflow Types¶
Role-Playing¶
Create simulated interactions between agents with different roles.
Parameters¶
Parameter | Type | Description | Default |
---|---|---|---|
RolePlayingConfig |
Class | Configuration for role-playing setup | N/A |
assistant_role_name |
str |
Name/role of the assistant agent | Required |
user_role_name |
str |
Name/role of the user agent | Required |
task_prompt |
str |
Task description for the interaction | Required |
with_task_specify |
bool |
Whether to specify the task further | True |
with_task_planner |
bool |
Whether to plan the task | True |
model |
ModelConfigure |
Model configuration | Required |
Returns¶
result.result
: Final output of the role-playing interaction
Code Example¶
from sikkaagent.workflows.roleplaying import RolePlayingConfig
from sikkaagent.workflows.workforce import Workforce
from sikkaagent.models import ModelConfigure
from sikkaagent.utils.enums import ModelPlatformType
from sikkaagent.workflows.task import Task
# Set up model
model = ModelConfigure(
model="llama3.1:8b",
model_platform=ModelPlatformType.OLLAMA,
url="http://localhost:11434/v1"
)
# Define role-playing configuration
role_config = RolePlayingConfig(
assistant_role_name="Tech Expert",
user_role_name="Business Analyst",
task_prompt="Discuss the business implications of generative AI",
with_task_specify=True,
with_task_planner=True,
model=model
)
# Create workforce and add role-playing worker
workforce = Workforce(model=model, description='Role Playing Discussion')
workforce.add_role_playing_worker(
model,
'AI Discussion Group',
assistant_role_name="Tech Expert",
user_role_name="Business Analyst",
config=role_config,
assistant_agent_kwargs={
"system_prompt": "You are a technology expert with deep knowledge of AI systems."
},
user_agent_kwargs={
"system_prompt": "You are a business analyst focused on ROI and implementation."
},
chat_turn_limit=10 # Maximum conversation turns
)
# Create and process the task
task = Task(
content="Analyze the business implications of generative AI in healthcare",
id='discussion-task'
)
# Run the discussion
result = workforce.process_task(task)
print(result.result)
Workforce¶
Sikka Agent's Workforce module provides a powerful orchestration system for automating and coordinating different types of agent work, including roleplaying workers, single agent workers, and coordinated task agents. It serves as a flexible framework for complex multi-agent collaboration.
Key Features¶
- Intelligent Task Decomposition: Automatically breaks down complex tasks into manageable subtasks using LLM-powered task planning
- Dynamic Worker Assignment: Uses a coordinator agent to intelligently assign tasks to the most suitable worker based on capabilities
- Multiple Worker Types: Supports single agent workers, role-playing workers, and nested workforces
- Adaptive Failure Handling: Robust mechanisms for handling task failures with automatic retries and dynamic worker creation
- Hierarchical Organization: Create complex hierarchies of workers for sophisticated workflows
- Task Channel Communication: Asynchronous task distribution and result collection through a centralized channel
Worker Types¶
Worker Type | Description |
---|---|
SingleAgentWorker |
Individual agent with specific capabilities, optionally equipped with tools |
RolePlayingWorker |
Simulates interactions between multiple agents with different roles |
Workforce |
Nested workforce for hierarchical task organization |
Methods¶
Method | Description |
---|---|
add_single_agent_worker() |
Add a worker powered by a single ChatAgent |
add_role_playing_worker() |
Add a worker that uses role-playing between agents |
add_workforce() |
Add a nested workforce for hierarchical organization |
process_task() |
Main entry point to process a task through the workforce |
Parameters¶
Parameter | Type | Description | Default |
---|---|---|---|
description |
str |
Description of the workforce | Required |
model |
str or ModelConfigure |
Model to use for coordination | "gpt-4o-mini" |
children |
list[BaseNode] |
Initial list of worker nodes | [] |
coordinator_agent_kwargs |
dict |
Additional arguments for the coordinator agent | None |
task_agent_kwargs |
dict |
Additional arguments for the task agent | None |
new_worker_agent_kwargs |
dict |
Additional arguments for dynamically created worker agents | None |
Returns from process_task()¶
task
: The processed task with results, state information, and any subtasks
Code Stub¶
# Import necessary modules
from sikkaagent.workflows.workforce import Workforce
from sikkaagent.agents.chat_agent import ChatAgent
from sikkaagent.workflows.roleplaying import RolePlayingConfig
from sikkaagent.workflows.task import Task
from sikkaagent.tools import SearchToolkit
# 1. Configure model
model = ModelConfigure(model="llama3.1:8b", model_platform=ModelPlatformType.OLLAMA)
# 2. Create workforce
workforce = Workforce(model=model, description="Research Team")
# 3. Add a single agent worker with tools
researcher = ChatAgent(model=model, tools=[*SearchToolkit().get_tools()])
workforce.add_single_agent_worker("Researcher", researcher)
# 4. Add a role-playing worker
role_config = RolePlayingConfig(
user_role_name="User",
assistant_role_name="Analyst",
model=model
)
workforce.add_role_playing_worker(
model, "Analysis Team", "Analyst", "User", role_config
)
# 5. Create and process a task
task = Task(content="Research quantum computing advancements")
result = workforce.process_task(task)
DAG Workflow¶
Sikka Agent workflow provides a powerful graph-based system for orchestrating complex agent interactions with advanced flow control.
Understanding DAGs in Sikka Agent¶
A DAG is a directed graph with no cycles, meaning you can't create loops where a node eventually points back to itself. This structure is perfect for workflows where tasks need to be executed in a specific order with clear dependencies.
Core DAG Concepts¶
- Nodes: Processing units that perform specific tasks
- Edges: Connections between nodes that define the flow of execution
- State: Data that flows through the graph and gets transformed by nodes
- Execution Flow: The path taken through the graph during workflow execution
Understanding the Pregel BSP Implementation¶
Sikka Agent's DAG workflow is built on the Pregel Bulk Synchronous Parallel (BSP) model, originally developed by Google for large-scale graph processing. This model is particularly well-suited for workflows because:
- Deterministic Execution: The BSP model ensures consistent results regardless of execution environment
- State Isolation: Each node operates on its own copy of the state, preventing side effects
- Synchronization Barriers: Ensures all parallel tasks complete before proceeding to the next step
- Message Passing: Allows nodes to communicate results through a shared state
How Pregel BSP Works in Sikka Agent¶
The workflow execution follows these steps:
- Superstep Initialization: The workflow begins with an initial state
- Parallel Computation: During a superstep, all eligible nodes execute in parallel
- Barrier Synchronization: The system waits for all nodes in the current superstep to complete
- State Merging: Results from all nodes are merged into a unified state
- Next Superstep: The workflow proceeds to the next set of nodes with the merged state
This approach is ideal for: - Sequential Processing: When tasks must be executed in a specific order - Conditional Branching: When the next step depends on the result of the current step - Parallel Execution: When multiple independent tasks can run simultaneously - Dynamic Routing: When the workflow path is determined at runtime
When to Use DAGs¶
DAGs are ideal for:
- Multi-step processes where each step depends on previous steps
- Complex decision trees with conditional branching
- Parallel task execution where independent tasks can run simultaneously
- Role-playing scenarios where different agents need to interact in a structured way
Designing Effective DAG Workflows¶
Sequential Processing¶
For sequential workflows where tasks must be executed in a specific order:
- Create nodes for each processing step
- Connect them with standard edges in the desired sequence
- Use the
add_edge()
method to define the flow
# Sequential workflow pattern
graph = StateGraph(State)
# Define nodes
graph.add_node("step1", step1_function)
graph.add_node("step2", step2_function)
graph.add_node("step3", step3_function)
# Connect sequentially: START → step1 → step2 → step3 → END
graph.add_edge(START, "step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", "step3")
graph.add_edge("step3", END)
Choosing Node Types¶
Select the appropriate node type based on the task:
-
STANDARD: For custom Python functions that process state
-
LLM_WORKER: For tasks requiring LLM processing
-
LLM_ROLEPLAYING: For simulating interactions between agents
-
DECISION: For branching logic
Designing Edge Connections¶
Edges define how data flows between nodes:
-
STANDARD: Simple connections for sequential flow
-
CONDITIONAL: Edges with conditions
-
DYNAMIC: Edges with targets determined at runtime
-
PARALLEL: Edges for concurrent execution paths
Key Features¶
- LLM Worker and Roleplaying Nodes: Specialized nodes for LLM-based processing and agent roleplaying
- Conditional and Dynamic Routing: Sophisticated flow control with conditional branching
- Parallel Execution: Execute multiple nodes simultaneously for improved performance
- Sophisticated State Management: Enhanced state tracking across workflow execution
- Visualization Capabilities: Generate visual representations of workflow graphs
Node Types¶
Node Type | Description | When to Use |
---|---|---|
STANDARD |
Standard processing node | Custom Python functions for data transformation, validation, or any custom logic |
LLM_WORKER |
Node that uses an LLM for processing | Text generation, summarization, classification, or any NLP task |
LLM_ROLEPLAYING |
Node that uses LLM roleplaying | Simulating conversations, interviews, or any multi-agent interaction |
DECISION |
Decision node for branching | Routing logic based on state conditions |
AGGREGATOR |
Node that aggregates results from multiple nodes | Combining results from parallel processing paths |
PARALLEL_SPLIT |
Node that initiates parallel execution | Starting multiple independent processing paths |
PARALLEL_JOIN |
Node that synchronizes parallel execution | Waiting for all parallel paths to complete before continuing |
Edge Types¶
Edge Type | Description | When to Use |
---|---|---|
STANDARD |
Standard edge between nodes | Simple sequential flow from one node to another |
CONDITIONAL |
Edge with a condition for traversal | When the next step depends on a condition |
DYNAMIC |
Edge with dynamic target determination | When the next node is determined at runtime |
PARALLEL |
Edge for parallel execution paths | When multiple tasks can be executed concurrently |
State Management¶
The state object is a dictionary that flows through the graph and gets transformed by nodes:
# Example of a state object
{
"user_input": "Analyze this document about quantum computing",
"results": {
"research": "Quantum computing uses quantum bits or qubits...",
"analysis": "The document covers three main areas..."
},
"current_node": "writing",
"execution_path": ["research", "analysis", "writing"],
"shared_context": {
"key_topics": ["quantum entanglement", "quantum supremacy"]
}
}
Key state fields:
- user_input
: The initial input to the workflow
- results
: Output from each node, keyed by node ID
- current_node
: The currently executing node
- execution_path
: History of executed nodes
- shared_context
: Data shared between nodes
Parameters¶
Parameter | Type | Description | Default |
---|---|---|---|
StateGraph |
Class | Core class for defining workflow structure | N/A |
State |
Class | Base state type for graph nodes | N/A |
START |
Constant | Special node marking workflow beginning | N/A |
END |
Constant | Special node marking workflow completion | N/A |
NodeType |
Enum | Types of nodes in the workflow graph | N/A |
EdgeType |
Enum | Types of edges in the workflow graph | N/A |
ExecutionMode |
Enum | Execution modes for the workflow | N/A |
Returns¶
compiled_workflow
: Executable workflow object withinvoke()
,stream()
, andvisualize()
methodsinvoke()
returnsDict
: Final state containing workflow resultsstream()
returnsGenerator
: Stream of intermediate states during execution
Basic Example¶
# Import necessary modules
from sikkaagent.workflows.dag import StateGraph, State, START, END
# 1. Create model
model = ModelConfigure(model="llama3.1:8b", model_platform=ModelPlatformType.OLLAMA)
# 2. Create workflow graph
graph = StateGraph(State)
# 3. Add LLM worker nodes
graph.add_llm_worker_node(id="research", system_prompt="Research facts", model=model)
graph.add_llm_worker_node(id="analysis", system_prompt="Analyze information", model=model)
graph.add_llm_worker_node(id="writing", system_prompt="Create content", model=model)
# 4. Define workflow structure: START → research → analysis → writing → END
graph.add_edge(START, "research")
graph.add_edge("research", "analysis")
graph.add_edge("analysis", "writing")
graph.add_edge("writing", END)
# 5. Compile and run workflow
workflow = graph.compile()
result = workflow.invoke({"user_input": "Summarize AI advancements"})
Advanced Example: Conditional Branching¶
# Import necessary modules
from sikkaagent.workflows.dag import StateGraph, State, START, END, EdgeType
# 1. Create workflow graph
graph = StateGraph(State)
# 2. Add nodes
graph.add_llm_worker_node(id="research", system_prompt="Research facts", model=model)
graph.add_llm_worker_node(id="analysis", system_prompt="Analyze information", model=model)
# 3. Add formatter node
def format_output(state):
# Format research and analysis into a report
state["final_output"] = f"Report: {state['results']['research']} + {state['results']['analysis']}"
return state
graph.add_node("formatter", format_output)
# 4. Add decision function for dynamic routing
def decide_next_step(state):
# Route based on analysis content
return "research" if "further research needed" in state["results"]["analysis"] else "formatter"
# 5. Connect with conditional branching
graph.add_edge(START, "research")
graph.add_edge("research", "analysis")
graph.add_edge("analysis", decide_next_step, edge_type=EdgeType.DYNAMIC)
graph.add_edge("formatter", END)
# 6. Run workflow
workflow = graph.compile()
result = workflow.invoke({"user_input": "Analyze quantum computing"})
Parallel Execution with Pregel BSP Model¶
Sikka Agent's DAG implementation uses the Pregel Bulk Synchronous Parallel (BSP) model for parallel execution. This model follows a superstep approach:
- Compute: Execute all tasks in parallel
- Communicate: Collect outputs and messages
- Synchronize: Wait for all tasks to complete before proceeding
- Merge: Combine results from all parallel tasks
Each node operates on its own copy of the state and can't see updates from other nodes until the next superstep. This ensures state isolation and deterministic execution.
# Import necessary modules
from sikkaagent.workflows.dag import StateGraph, State, START, END, ExecutionMode, NodeType
# 1. Create workflow graph
graph = StateGraph(State)
# 2. Define processor functions
def process1(state):
# Process first data stream
state["result1"] = "Processed data 1"
return state
def process2(state):
# Process second data stream
state["result2"] = "Processed data 2"
return state
def process3(state):
# Process third data stream
state["result3"] = "Processed data 3"
return state
# 3. Define aggregator function
def aggregate_results(state):
# Combine all results
state["final_result"] = f"{state.get('result1')} + {state.get('result2')} + {state.get('result3')}"
return state
# 4. Add nodes to graph
graph.add_node("process1", process1)
graph.add_node("process2", process2)
graph.add_node("process3", process3)
graph.add_node("aggregator", aggregate_results, node_type=NodeType.AGGREGATOR)
graph.add_node("data_source", lambda state: state) # Simple pass-through
# 5. Connect with parallel execution pattern
graph.add_edge(START, "data_source")
graph.add_parallel_split(
"data_source", # Source node
["process1", "process2", "process3"], # Parallel nodes
join_node="aggregator" # Join node
)
graph.add_edge("aggregator", END)
# 6. Compile with parallel execution and run
workflow = graph.compile(execution_mode=ExecutionMode.PARALLEL)
result = workflow.invoke({"input": "Test data"})
Important: In the Pregel BSP model implementation, the system executes nodes in parallel during a superstep, but due to synchronization barriers, only one superstep executes at a time. This means that while nodes within a superstep run in parallel, the execution flow between supersteps is sequential.
Understanding the Parallel Execution Flow¶
- Superstep Execution: During a superstep, all parallel tasks are executed simultaneously using
asyncio.gather()
- State Isolation: Each task receives its own copy of the state to ensure isolation
- Barrier Synchronization: After all tasks complete, the system waits at a synchronization barrier
- State Merging: Results from all parallel tasks are merged into a single state
- Next Superstep: The workflow proceeds to the next superstep with the merged state
This approach ensures deterministic execution while allowing parallel processing of independent tasks.
LLM Roleplaying¶
# Import necessary modules
from sikkaagent.workflows.dag import StateGraph, State, START, END
# 1. Create workflow graph
graph = StateGraph(State)
# 2. Add roleplaying node
graph.add_llm_roleplaying_node(
id="expert_discussion",
assistant_role_name="Quantum Physicist",
user_role_name="Computer Scientist",
model=model
)
# 3. Connect to workflow: START → expert_discussion → END
graph.add_edge(START, "expert_discussion")
graph.add_edge("expert_discussion", END)
# 4. Run workflow
workflow = graph.compile()
result = workflow.invoke({"user_input": "Discuss quantum computing impact"})
Visualization and Monitoring¶
# Generate Mermaid diagram for documentation or UI display
mermaid = workflow.visualize(format="mermaid")
# Generate DOT format for use with Graphviz
dot = workflow.visualize(format="dot")
# Stream execution to monitor progress in real-time
for state in workflow.stream({"user_input": "Analyze quantum computing"}):
# Track current node and status
current = state.get("current_node")
status = state.get("status", {}).get(current, "Unknown")
# Display intermediate results as they become available
if "results" in state and current in state["results"]:
print(f"Node {current} ({status}): {state['results'][current][:50]}...")
Integration with Other Modules¶
Workflows leverage other Sikka Agent capabilities:
# With tools and memory
from sikkaagent.tools import SearchToolkit, RetrievalToolkit
from sikkaagent.memories import Memory
from sikkaagent.storages import InMemoryStorage
from sikkaagent.models import ModelConfigure
from sikkaagent.utils.enums import ModelPlatformType
from sikkaagent.agents import ChatAgent
from sikkaagent.workflows.dag import StateGraph, State, START, END
# Create model configuration
model = ModelConfigure(
model="llama3.1:8b",
model_platform=ModelPlatformType.OLLAMA,
url="http://localhost:11434/v1"
)
# Create memory
memory = Memory(storage=InMemoryStorage())
# Create tool-enabled agent
tool_agent = ChatAgent(
model=model,
tools=[SearchToolkit(), RetrievalToolkit()],
memory=memory,
system_prompt="You are a research agent with access to search and retrieval tools."
)
# Create workflow with the enhanced agent
graph = StateGraph(State)
graph.add_node("research", tool_agent)
graph.add_edge(START, "research")
graph.add_edge("research", END)
# Run workflow
workflow = graph.compile()
result = workflow.invoke({"user_input": "Research quantum computing advances"})
Best Practices¶
- Agent Specialization: Design agents with clear, focused responsibilities
- Information Flow: Carefully plan data flow between workflow stages
- Error Handling: Implement robust error handling across all nodes
- Prompt Engineering: Craft system prompts that clearly define agent roles
- Testing: Test workflows with diverse inputs to ensure robustness
- State Management: Design state objects that efficiently capture necessary information
- Visualization: Use the visualization capabilities to understand and debug complex workflows
- Parallel Processing: Leverage parallel execution for independent tasks to improve performance
- Dynamic Routing: Use conditional and dynamic routing for adaptive workflows that respond to intermediate results