Skip to content

Workflows

Sikka Agent's Workflows module enables orchestrating multiple agents to solve complex problems through coordination and collaboration.

Overview

Workflows allow you to create sophisticated multi-agent systems where specialized agents work together on complex tasks. Key capabilities include:

  • Directed Acyclic Graph (DAG) based workflows for sequential or parallel processing
  • Role-playing configurations for simulating interactions between agents
  • Task distribution across worker agents with different specializations
  • Hierarchical agent structures with supervisor-worker relationships

Core Workflow Types

Role-Playing

Create simulated interactions between agents with different roles.

Parameters

Parameter Type Description Default
RolePlayingConfig Class Configuration for role-playing setup N/A
assistant_role_name str Name/role of the assistant agent Required
user_role_name str Name/role of the user agent Required
task_prompt str Task description for the interaction Required
with_task_specify bool Whether to specify the task further True
with_task_planner bool Whether to plan the task True
model ModelConfigure Model configuration Required

Returns

  • result.result: Final output of the role-playing interaction

Code Example

from sikkaagent.workflows.roleplaying import RolePlayingConfig
from sikkaagent.workflows.workforce import Workforce
from sikkaagent.models import ModelConfigure
from sikkaagent.utils.enums import ModelPlatformType
from sikkaagent.workflows.task import Task

# Set up model
model = ModelConfigure(
    model="llama3.1:8b",
    model_platform=ModelPlatformType.OLLAMA,
    url="http://localhost:11434/v1"
)

# Define role-playing configuration
role_config = RolePlayingConfig(
    assistant_role_name="Tech Expert",
    user_role_name="Business Analyst",
    task_prompt="Discuss the business implications of generative AI",
    with_task_specify=True,
    with_task_planner=True,
    model=model
)

# Create workforce and add role-playing worker
workforce = Workforce(model=model, description='Role Playing Discussion')
workforce.add_role_playing_worker(
    model,
    'AI Discussion Group',
    assistant_role_name="Tech Expert",
    user_role_name="Business Analyst",
    config=role_config,
    assistant_agent_kwargs={
        "system_prompt": "You are a technology expert with deep knowledge of AI systems."
    },
    user_agent_kwargs={
        "system_prompt": "You are a business analyst focused on ROI and implementation."
    },
    chat_turn_limit=10  # Maximum conversation turns
)

# Create and process the task
task = Task(
    content="Analyze the business implications of generative AI in healthcare",
    id='discussion-task'
)

# Run the discussion
result = workforce.process_task(task)
print(result.result)

Workforce

Sikka Agent's Workforce module provides a powerful orchestration system for automating and coordinating different types of agent work, including roleplaying workers, single agent workers, and coordinated task agents. It serves as a flexible framework for complex multi-agent collaboration.

Key Features

  • Intelligent Task Decomposition: Automatically breaks down complex tasks into manageable subtasks using LLM-powered task planning
  • Dynamic Worker Assignment: Uses a coordinator agent to intelligently assign tasks to the most suitable worker based on capabilities
  • Multiple Worker Types: Supports single agent workers, role-playing workers, and nested workforces
  • Adaptive Failure Handling: Robust mechanisms for handling task failures with automatic retries and dynamic worker creation
  • Hierarchical Organization: Create complex hierarchies of workers for sophisticated workflows
  • Task Channel Communication: Asynchronous task distribution and result collection through a centralized channel

Worker Types

Worker Type Description
SingleAgentWorker Individual agent with specific capabilities, optionally equipped with tools
RolePlayingWorker Simulates interactions between multiple agents with different roles
Workforce Nested workforce for hierarchical task organization

Methods

Method Description
add_single_agent_worker() Add a worker powered by a single ChatAgent
add_role_playing_worker() Add a worker that uses role-playing between agents
add_workforce() Add a nested workforce for hierarchical organization
process_task() Main entry point to process a task through the workforce

Parameters

Parameter Type Description Default
description str Description of the workforce Required
model str or ModelConfigure Model to use for coordination "gpt-4o-mini"
children list[BaseNode] Initial list of worker nodes []
coordinator_agent_kwargs dict Additional arguments for the coordinator agent None
task_agent_kwargs dict Additional arguments for the task agent None
new_worker_agent_kwargs dict Additional arguments for dynamically created worker agents None

Returns from process_task()

  • task: The processed task with results, state information, and any subtasks

Code Stub

# Import necessary modules
from sikkaagent.workflows.workforce import Workforce
from sikkaagent.agents.chat_agent import ChatAgent
from sikkaagent.workflows.roleplaying import RolePlayingConfig
from sikkaagent.workflows.task import Task
from sikkaagent.tools import SearchToolkit


# 1. Configure model
model = ModelConfigure(model="llama3.1:8b", model_platform=ModelPlatformType.OLLAMA)

# 2. Create workforce
workforce = Workforce(model=model, description="Research Team")

# 3. Add a single agent worker with tools
researcher = ChatAgent(model=model, tools=[*SearchToolkit().get_tools()])
workforce.add_single_agent_worker("Researcher", researcher)

# 4. Add a role-playing worker
role_config = RolePlayingConfig(
    user_role_name="User", 
    assistant_role_name="Analyst",
    model=model
)
workforce.add_role_playing_worker(
    model, "Analysis Team", "Analyst", "User", role_config
)

# 5. Create and process a task
task = Task(content="Research quantum computing advancements")
result = workforce.process_task(task)

DAG Workflow

Sikka Agent workflow provides a powerful graph-based system for orchestrating complex agent interactions with advanced flow control.

Understanding DAGs in Sikka Agent

A DAG is a directed graph with no cycles, meaning you can't create loops where a node eventually points back to itself. This structure is perfect for workflows where tasks need to be executed in a specific order with clear dependencies.

Core DAG Concepts
  • Nodes: Processing units that perform specific tasks
  • Edges: Connections between nodes that define the flow of execution
  • State: Data that flows through the graph and gets transformed by nodes
  • Execution Flow: The path taken through the graph during workflow execution
Understanding the Pregel BSP Implementation

Sikka Agent's DAG workflow is built on the Pregel Bulk Synchronous Parallel (BSP) model, originally developed by Google for large-scale graph processing. This model is particularly well-suited for workflows because:

  1. Deterministic Execution: The BSP model ensures consistent results regardless of execution environment
  2. State Isolation: Each node operates on its own copy of the state, preventing side effects
  3. Synchronization Barriers: Ensures all parallel tasks complete before proceeding to the next step
  4. Message Passing: Allows nodes to communicate results through a shared state
How Pregel BSP Works in Sikka Agent

The workflow execution follows these steps:

  1. Superstep Initialization: The workflow begins with an initial state
  2. Parallel Computation: During a superstep, all eligible nodes execute in parallel
  3. Barrier Synchronization: The system waits for all nodes in the current superstep to complete
  4. State Merging: Results from all nodes are merged into a unified state
  5. Next Superstep: The workflow proceeds to the next set of nodes with the merged state

This approach is ideal for: - Sequential Processing: When tasks must be executed in a specific order - Conditional Branching: When the next step depends on the result of the current step - Parallel Execution: When multiple independent tasks can run simultaneously - Dynamic Routing: When the workflow path is determined at runtime

When to Use DAGs

DAGs are ideal for:

  1. Multi-step processes where each step depends on previous steps
  2. Complex decision trees with conditional branching
  3. Parallel task execution where independent tasks can run simultaneously
  4. Role-playing scenarios where different agents need to interact in a structured way

Designing Effective DAG Workflows

Sequential Processing

For sequential workflows where tasks must be executed in a specific order:

  1. Create nodes for each processing step
  2. Connect them with standard edges in the desired sequence
  3. Use the add_edge() method to define the flow
# Sequential workflow pattern
graph = StateGraph(State)

# Define nodes
graph.add_node("step1", step1_function)
graph.add_node("step2", step2_function)
graph.add_node("step3", step3_function)

# Connect sequentially: START → step1 → step2 → step3 → END
graph.add_edge(START, "step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", "step3")
graph.add_edge("step3", END)
Choosing Node Types

Select the appropriate node type based on the task:

  1. STANDARD: For custom Python functions that process state

    # Define a function that transforms state
    def process_data(state):
        state["processed_data"] = transform(state.get("raw_data", ""))
        return state
    
    # Add as standard node
    graph.add_node("processor", process_data)
    

  2. LLM_WORKER: For tasks requiring LLM processing

    # Add LLM worker node
    graph.add_llm_worker_node(
        id="content_creator",
        system_prompt="Write engaging articles",
        model=model
    )
    

  3. LLM_ROLEPLAYING: For simulating interactions between agents

    # Add roleplaying node
    graph.add_llm_roleplaying_node(
        id="interview",
        assistant_role_name="Interviewer",
        user_role_name="Candidate",
        model=model
    )
    

  4. DECISION: For branching logic

    # Define decision function
    def route_by_sentiment(state):
        return "positive_path" if "positive" in state.get("sentiment", "") else "negative_path"
    
    # Add as decision node
    graph.add_node("decision", route_by_sentiment, node_type=NodeType.DECISION)
    

Designing Edge Connections

Edges define how data flows between nodes:

  1. STANDARD: Simple connections for sequential flow

    # Connect two nodes sequentially
    graph.add_edge("node_a", "node_b")
    

  2. CONDITIONAL: Edges with conditions

    # Define condition function
    def quality_check(state):
        return state.get("quality_score", 0) > 80
    
    # Add conditional edge
    graph.add_edge("validator", "high_quality_path", condition=quality_check)
    

  3. DYNAMIC: Edges with targets determined at runtime

    # Define routing function
    def route_by_category(state):
        category = state.get("category", "")
        return f"{category}_processor" if category else "default_processor"
    
    # Add dynamic edge
    graph.add_edge("classifier", route_by_category, edge_type=EdgeType.DYNAMIC)
    

  4. PARALLEL: Edges for concurrent execution paths

    # Set up parallel processing with join point
    graph.add_parallel_split(
        "data_source",                         # Source node
        ["process_a", "process_b", "process_c"], # Parallel nodes
        join_node="aggregator"                 # Join node
    )
    

Key Features

  • LLM Worker and Roleplaying Nodes: Specialized nodes for LLM-based processing and agent roleplaying
  • Conditional and Dynamic Routing: Sophisticated flow control with conditional branching
  • Parallel Execution: Execute multiple nodes simultaneously for improved performance
  • Sophisticated State Management: Enhanced state tracking across workflow execution
  • Visualization Capabilities: Generate visual representations of workflow graphs

Node Types

Node Type Description When to Use
STANDARD Standard processing node Custom Python functions for data transformation, validation, or any custom logic
LLM_WORKER Node that uses an LLM for processing Text generation, summarization, classification, or any NLP task
LLM_ROLEPLAYING Node that uses LLM roleplaying Simulating conversations, interviews, or any multi-agent interaction
DECISION Decision node for branching Routing logic based on state conditions
AGGREGATOR Node that aggregates results from multiple nodes Combining results from parallel processing paths
PARALLEL_SPLIT Node that initiates parallel execution Starting multiple independent processing paths
PARALLEL_JOIN Node that synchronizes parallel execution Waiting for all parallel paths to complete before continuing

Edge Types

Edge Type Description When to Use
STANDARD Standard edge between nodes Simple sequential flow from one node to another
CONDITIONAL Edge with a condition for traversal When the next step depends on a condition
DYNAMIC Edge with dynamic target determination When the next node is determined at runtime
PARALLEL Edge for parallel execution paths When multiple tasks can be executed concurrently

State Management

The state object is a dictionary that flows through the graph and gets transformed by nodes:

# Example of a state object
{
    "user_input": "Analyze this document about quantum computing",
    "results": {
        "research": "Quantum computing uses quantum bits or qubits...",
        "analysis": "The document covers three main areas..."
    },
    "current_node": "writing",
    "execution_path": ["research", "analysis", "writing"],
    "shared_context": {
        "key_topics": ["quantum entanglement", "quantum supremacy"]
    }
}

Key state fields: - user_input: The initial input to the workflow - results: Output from each node, keyed by node ID - current_node: The currently executing node - execution_path: History of executed nodes - shared_context: Data shared between nodes

Parameters

Parameter Type Description Default
StateGraph Class Core class for defining workflow structure N/A
State Class Base state type for graph nodes N/A
START Constant Special node marking workflow beginning N/A
END Constant Special node marking workflow completion N/A
NodeType Enum Types of nodes in the workflow graph N/A
EdgeType Enum Types of edges in the workflow graph N/A
ExecutionMode Enum Execution modes for the workflow N/A

Returns

  • compiled_workflow: Executable workflow object with invoke(), stream(), and visualize() methods
  • invoke() returns Dict: Final state containing workflow results
  • stream() returns Generator: Stream of intermediate states during execution

Basic Example

# Import necessary modules
from sikkaagent.workflows.dag import StateGraph, State, START, END

# 1. Create model
model = ModelConfigure(model="llama3.1:8b", model_platform=ModelPlatformType.OLLAMA)

# 2. Create workflow graph
graph = StateGraph(State)

# 3. Add LLM worker nodes
graph.add_llm_worker_node(id="research", system_prompt="Research facts", model=model)
graph.add_llm_worker_node(id="analysis", system_prompt="Analyze information", model=model)
graph.add_llm_worker_node(id="writing", system_prompt="Create content", model=model)

# 4. Define workflow structure: START → research → analysis → writing → END
graph.add_edge(START, "research")
graph.add_edge("research", "analysis")
graph.add_edge("analysis", "writing")
graph.add_edge("writing", END)

# 5. Compile and run workflow
workflow = graph.compile()
result = workflow.invoke({"user_input": "Summarize AI advancements"})

Advanced Example: Conditional Branching

# Import necessary modules
from sikkaagent.workflows.dag import StateGraph, State, START, END, EdgeType

# 1. Create workflow graph
graph = StateGraph(State)

# 2. Add nodes
graph.add_llm_worker_node(id="research", system_prompt="Research facts", model=model)
graph.add_llm_worker_node(id="analysis", system_prompt="Analyze information", model=model)

# 3. Add formatter node
def format_output(state):
    # Format research and analysis into a report
    state["final_output"] = f"Report: {state['results']['research']} + {state['results']['analysis']}"
    return state

graph.add_node("formatter", format_output)

# 4. Add decision function for dynamic routing
def decide_next_step(state):
    # Route based on analysis content
    return "research" if "further research needed" in state["results"]["analysis"] else "formatter"

# 5. Connect with conditional branching
graph.add_edge(START, "research")
graph.add_edge("research", "analysis")
graph.add_edge("analysis", decide_next_step, edge_type=EdgeType.DYNAMIC)
graph.add_edge("formatter", END)

# 6. Run workflow
workflow = graph.compile()
result = workflow.invoke({"user_input": "Analyze quantum computing"})

Parallel Execution with Pregel BSP Model

Sikka Agent's DAG implementation uses the Pregel Bulk Synchronous Parallel (BSP) model for parallel execution. This model follows a superstep approach:

  1. Compute: Execute all tasks in parallel
  2. Communicate: Collect outputs and messages
  3. Synchronize: Wait for all tasks to complete before proceeding
  4. Merge: Combine results from all parallel tasks

Each node operates on its own copy of the state and can't see updates from other nodes until the next superstep. This ensures state isolation and deterministic execution.

# Import necessary modules
from sikkaagent.workflows.dag import StateGraph, State, START, END, ExecutionMode, NodeType

# 1. Create workflow graph
graph = StateGraph(State)

# 2. Define processor functions
def process1(state):
    # Process first data stream
    state["result1"] = "Processed data 1"
    return state

def process2(state):
    # Process second data stream
    state["result2"] = "Processed data 2"
    return state

def process3(state):
    # Process third data stream
    state["result3"] = "Processed data 3"
    return state

# 3. Define aggregator function
def aggregate_results(state):
    # Combine all results
    state["final_result"] = f"{state.get('result1')} + {state.get('result2')} + {state.get('result3')}"
    return state

# 4. Add nodes to graph
graph.add_node("process1", process1)
graph.add_node("process2", process2)
graph.add_node("process3", process3)
graph.add_node("aggregator", aggregate_results, node_type=NodeType.AGGREGATOR)
graph.add_node("data_source", lambda state: state)  # Simple pass-through

# 5. Connect with parallel execution pattern
graph.add_edge(START, "data_source")
graph.add_parallel_split(
    "data_source",                      # Source node
    ["process1", "process2", "process3"],  # Parallel nodes
    join_node="aggregator"              # Join node
)
graph.add_edge("aggregator", END)

# 6. Compile with parallel execution and run
workflow = graph.compile(execution_mode=ExecutionMode.PARALLEL)
result = workflow.invoke({"input": "Test data"})

Important: In the Pregel BSP model implementation, the system executes nodes in parallel during a superstep, but due to synchronization barriers, only one superstep executes at a time. This means that while nodes within a superstep run in parallel, the execution flow between supersteps is sequential.

Understanding the Parallel Execution Flow

  1. Superstep Execution: During a superstep, all parallel tasks are executed simultaneously using asyncio.gather()
  2. State Isolation: Each task receives its own copy of the state to ensure isolation
  3. Barrier Synchronization: After all tasks complete, the system waits at a synchronization barrier
  4. State Merging: Results from all parallel tasks are merged into a single state
  5. Next Superstep: The workflow proceeds to the next superstep with the merged state

This approach ensures deterministic execution while allowing parallel processing of independent tasks.

LLM Roleplaying

# Import necessary modules
from sikkaagent.workflows.dag import StateGraph, State, START, END

# 1. Create workflow graph
graph = StateGraph(State)

# 2. Add roleplaying node
graph.add_llm_roleplaying_node(
    id="expert_discussion",
    assistant_role_name="Quantum Physicist",
    user_role_name="Computer Scientist",
    model=model
)

# 3. Connect to workflow: START → expert_discussion → END
graph.add_edge(START, "expert_discussion")
graph.add_edge("expert_discussion", END)

# 4. Run workflow
workflow = graph.compile()
result = workflow.invoke({"user_input": "Discuss quantum computing impact"})

Visualization and Monitoring

# Generate Mermaid diagram for documentation or UI display
mermaid = workflow.visualize(format="mermaid")

# Generate DOT format for use with Graphviz
dot = workflow.visualize(format="dot")

# Stream execution to monitor progress in real-time
for state in workflow.stream({"user_input": "Analyze quantum computing"}):
    # Track current node and status
    current = state.get("current_node")
    status = state.get("status", {}).get(current, "Unknown")

    # Display intermediate results as they become available
    if "results" in state and current in state["results"]:
        print(f"Node {current} ({status}): {state['results'][current][:50]}...")

Integration with Other Modules

Workflows leverage other Sikka Agent capabilities:

# With tools and memory
from sikkaagent.tools import SearchToolkit, RetrievalToolkit
from sikkaagent.memories import Memory
from sikkaagent.storages import InMemoryStorage
from sikkaagent.models import ModelConfigure
from sikkaagent.utils.enums import ModelPlatformType
from sikkaagent.agents import ChatAgent
from sikkaagent.workflows.dag import StateGraph, State, START, END

# Create model configuration
model = ModelConfigure(
    model="llama3.1:8b",
    model_platform=ModelPlatformType.OLLAMA,
    url="http://localhost:11434/v1"
)

# Create memory
memory = Memory(storage=InMemoryStorage())

# Create tool-enabled agent
tool_agent = ChatAgent(
    model=model,
    tools=[SearchToolkit(), RetrievalToolkit()],
    memory=memory,
    system_prompt="You are a research agent with access to search and retrieval tools."
)

# Create workflow with the enhanced agent
graph = StateGraph(State)
graph.add_node("research", tool_agent)
graph.add_edge(START, "research")
graph.add_edge("research", END)

# Run workflow
workflow = graph.compile()
result = workflow.invoke({"user_input": "Research quantum computing advances"})

Best Practices

  • Agent Specialization: Design agents with clear, focused responsibilities
  • Information Flow: Carefully plan data flow between workflow stages
  • Error Handling: Implement robust error handling across all nodes
  • Prompt Engineering: Craft system prompts that clearly define agent roles
  • Testing: Test workflows with diverse inputs to ensure robustness
  • State Management: Design state objects that efficiently capture necessary information
  • Visualization: Use the visualization capabilities to understand and debug complex workflows
  • Parallel Processing: Leverage parallel execution for independent tasks to improve performance
  • Dynamic Routing: Use conditional and dynamic routing for adaptive workflows that respond to intermediate results