
Adk Agent Builder
Install this skill when you are wiring Google ADK Python Workflow graphs with nesting, retries, custom nodes, and validation-aware graph design.
Install
npx skills add https://github.com/google/adk-python --skill adk-agent-builderWhat is this skill?
- 7-rule agent verification checklist for advanced workflow graphs (including no unconditional cycles)
- Nested Workflow pattern: use a Workflow as a node inside an outer Workflow pipeline
- RetryConfig quick reference (max_attempts, initial_delay) and per-node timeout/retry_config fields
- Custom node contract: override get_name() and run(); optional rerun_on_resume and wait_for_output
- Dynamic execution guidance via run_node with dedicated dynamic-nodes reference rules
Adoption & trust: 1 installs on skills.sh; 20k GitHub stars; 2/3 security scanners passed (skills.sh audits); trending (+100% hot-view momentum).
Recommended Skills
Journey fit
The readme is entirely about constructing and validating ADK Workflow agents in code, which is core product-building work rather than distribution, ops, or discovery. Agent-tooling is the right shelf because the content covers graph edges, nested workflows, dynamic nodes, and custom node types—the orchestration layer for ADK agents.
Common Questions / FAQ
Is Adk Agent Builder safe to install?
skills.sh reports 2 of 3 security scanners passed. Review the Security Audits panel on this page before installing in production.
SKILL.md
READMESKILL.md - Adk Agent Builder
# Advanced Workflow Patterns Reference Nested workflows, dynamic nodes, retry configuration, custom node types, and graph construction. ## 📋 Agent Verification Checklist (Advanced Patterns) Use this checklist when implementing complex workflows: - [ ] **Validation**: Does your graph follow all 7 validation rules? (e.g., no unconditional cycles) - [ ] **Custom Nodes**: If creating a custom node, did you override `get_name()` and `run()`? - [ ] **Dynamic Execution**: If using `run_node`, did you follow the rules in the dedicated dynamic-nodes reference? - [ ] **Waiting State**: Did you use `wait_for_output=True` if the node should stay in WAITING state until output is yielded? ## 💡 Quick Reference - **Retry**: `RetryConfig(max_attempts=5, initial_delay=1.0)` - **Custom Node Fields**: `rerun_on_resume`, `wait_for_output`, `retry_config`, `timeout` ## Nested Workflows A `Workflow` is both an agent and a node. Use one workflow inside another: ```python from google.adk.workflow import Workflow # Inner workflow inner = Workflow( name="inner_pipeline", edges=[ ('START', step_a), (step_a, step_b), ], ) # Outer workflow using inner as a node outer = Workflow( name="outer_pipeline", edges=[ ('START', pre_process), (pre_process, inner), # Nested workflow (inner, post_process), ], ) ``` The inner workflow receives the predecessor's output as its START input and its terminal output flows to the next node in the outer workflow. ## Dynamic Node Scheduling Schedule nodes at runtime using `ctx.run_node()`. See the dedicated [Dynamic Node Scheduling Reference](file:///Users/deanchen/Desktop/adk-workflow/.agents/skills/adk-workflow/references/dynamic-nodes.md) for detailed rules, examples, and best practices. ## Retry Configuration Configure automatic retry for nodes that may fail: ```python from google.adk.workflow import RetryConfig from google.adk.workflow import FunctionNode retry = RetryConfig( max_attempts=5, # Max attempts (default: 5). 0 or 1 = no retry initial_delay=1.0, # Seconds before first retry (default: 1.0) max_delay=60.0, # Max seconds between retries (default: 60.0) backoff_factor=2.0, # Delay multiplier per attempt (default: 2.0) jitter=1.0, # Randomness factor (default: 1.0, 0.0 = none) exceptions=None, # Exception types to retry (None = all) ) node = FunctionNode( flaky_api_call, name="api_call", retry_config=retry, ) ``` ### Retry delay formula ``` delay = initial_delay * (backoff_factor ^ attempt) delay = min(delay, max_delay) delay = delay * (1 + random(0, jitter)) ``` ### Accessing retry count ```python def my_node(ctx: Context, node_input: str) -> str: if ctx.retry_count > 0: print(f"Retry attempt {ctx.retry_count}") return "result" ``` ## Custom Node Types Subclass `BaseNode` for custom behavior: ```python from google.adk.workflow import BaseNode from google.adk.events.event import Event from google.adk.agents.context import Context from pydantic import ConfigDict, Field from typing import Any, AsyncGenerator from typing_extensions import override class BatchProcessorNode(BaseNode): """Processes items in batches.""" model_config = ConfigDict(arbitrary_types_allowed=True) name: str = Field(default="batch_processor") batch_size: int = Field(default=10) def __init__(self, *, name: str = "batch_processor", batch_size: int = 10): super().__init__() object.__setattr__(self, 'name', name) object.__setattr__(self, 'batch_size', batch_size) @override def get_name(self) -> str: return self.name @override async def run( self, *, ctx: Context, node_input: Any, ) -> AsyncGenerator[Any, None]: items = node_input if isinstance(node_input, list) else [node_input] results = [] for i in range(0, len(items), self.batch_size): batch = items[i:i + self.batch_size] batch_res