# AsyncWorkflow

The `AsyncWorkflow` class enables asynchronous, non-blocking execution of Web3 tasks across multiple agents. Designed for high-performance DeFi operations, cross-chain interoperability, and enterprise-grade automation, it leverages Python's `asyncio` for resource-efficient parallel processing while maintaining atomicity and compliance.

***

#### **Key Features**

1. **Multi-Chain Concurrency**: Execute tasks across Ethereum, Polygon, and Solana simultaneously.
2. **Gas-Aware Scheduling**: Dynamically adjust transaction ordering based on real-time gas prices.
3. **Autosave to IPFS**: Persist workflow results to decentralized storage with CID anchoring.
4. **Retry Engine**: Auto-retry failed transactions with exponential backoff (3x by default).
5. **Role-Based Agents**: Assign agents specialized roles (Executor, Validator, Auditor).

***

#### **Class Definition**

**AsyncWorkflowConfig**

```python
from pydantic import BaseModel  
from typing import Optional  

class AsyncWorkflowConfig(BaseModel):  
    max_workers: int = 8                # Max concurrent agents  
    gas_strategy: str = "optimal"       # "fast" | "eco" | "custom"  
    auto_store_proofs: bool = True      # ZK-proofs for audit trails  
    rpc_fallback: bool = True           # Failover to backup RPC nodes  
    context_length: int = 16_384        # For long-chain tasks (e.g., Uniswap<>AAVE loops)  
```

***

#### **Core Methods**

**Initialize Workflow**

```python
def __init__(  
    self,  
    agents: List[BaseAgent],  
    config: AsyncWorkflowConfig = AsyncWorkflowConfig(),  
    **kwargs  
):  
    """  
    :param agents: List of Agent instances with Web3 capabilities  
    :param config: Runtime optimization parameters  
    """  
    if not agents:  
        raise ValueError("Minimum 1 agent required for compliance")  
    self.web3_agents = agents  
    self.gas_oracle = ChainlinkGasFeed()  # Real-time gas estimator  
```

**Execute DeFi Task (Async)**

```python
async def run(  
    self,   
    task: Union[str, Dict],   
    chain_id: int = 1  
) -> List[TxReceipt]:  
    """  
    Run cross-agent DeFi operation (e.g., flash loan arbitrage)  
    """  
    semaphore = asyncio.Semaphore(self.config.max_workers)  

    async def _task_wrapper(agent: BaseAgent):  
        async with semaphore:  
            try:  
                return await agent.execute(  
                    task,  
                    gas_price=await self.gas_oracle.fetch(chain_id)  
                )  
            except BlockchainError as e:  
                return self._handle_retry(agent, task, e)  

    results = await asyncio.gather(  
        *(_task_wrapper(agent) for agent in self.web3_agents)  
    )  
    return self._anchor_to_ipfs(results)  
```

***

#### **Enterprise Use Cases**

**1. Cross-Chain Liquidity Management**

```python
agents = [  
    AaveLendingAgent(chain="ethereum"),  
    UniswapV3Agent(chain="polygon"),  
    RiskMonitorAgent(threshold=0.85)  
]  

workflow = AsyncWorkflow(agents=agents, config=AsyncWorkflowConfig(gas_strategy="fast"))  

# Task: Rebalance $1M USDC across AAVE and Uniswap  
await workflow.run({  
    "action": "optimize_yield",  
    "params": {"capital": "1000000 USDC", "risk_tolerance": "medium"}  
})  
```

**2. Multi-Sig Compliance Workflow**

```python
workflow = AsyncWorkflow(  
    agents=[  
        ComplianceAgent(signers=["0xCFO", "0xCOO"]),  
        TreasuryAgent(threshold=500_000)  # $500K tx limit  
    ],  
    config=AsyncWorkflowConfig(auto_store_proofs=True)  
)  

# Executes only after 2/3 signatures  
await workflow.run({  
    "type": "approve_budget",  
    "amount": "750000 DAI",  
    "recipient": "0xVendor"  
})  
```

***

#### **Performance Metrics**

| **Metric**                  | Ethereum Mainnet | Polygon   |
| --------------------------- | ---------------- | --------- |
| Avg. Completeness           | 99.3%            | 99.8%     |
| Gas Savings (vs sequential) | 41%              | 38%       |
| Latency Reduction           | 6s → 0.9s        | 2s → 0.4s |

***

#### **Best Practices**

**1. Gas Optimization**

```python
# Use "eco" mode for batch NFTs  
workflow = AsyncWorkflow(  
    config=AsyncWorkflowConfig(gas_strategy="eco")  
)  
await workflow.run("mint_erc721_batch", params={"count": 100})  
```

**2. Agent Specialization**

```python
class MEVResistantAgent(BaseAgent):  
    def __init__(self):  
        super().__init__(  
            flashbots=True,       # Use Flashbots RPC  
            max_slippage=0.3,     # 0.3% slippage tolerance  
            frontrun_protection=True  
        )  
```

**3. Security**

* Enable `auto_store_proofs=True` for GDPR/OFAC audits
* Use Chainlink Proof of Reserve for stablecoin workflows

***

#### **Advanced Example: DeFi Hedge Strategy**

```python
from agentgpt.workflows import AsyncWorkflow  
from agentgpt.defi import CurveLPAgent, ChainlinkOracleAgent  

# Initialize agents  
curve_agent = CurveLPAgent(pool_address="0xbEbc...82f")  
oracle_agent = ChainlinkOracleAgent(feed="ETH/USD")  

async def hedge_eth(amount: float):  
    workflow = AsyncWorkflow(agents=[curve_agent, oracle_agent])  
    results = await workflow.run({  
        "action": "hedge",  
        "params": {  
            "asset": "ETH",  
            "amount": amount,  
            "strategy": "delta_neutral"  
        }  
    })  
    return results  

# Hedge $2M ETH exposure  
asyncio.run(hedge_eth(2_000_000))  
```

***

#### **Troubleshooting**

1. **RPC Timeouts**:\
   Set `config.rpc_fallback=True` to auto-switch providers.
2. **Nonce Errors**:\
   Use `nonce_genesis=True` in agents for parallel TX sequencing.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://agent-gpt.gitbook.io/agent-gpt/workflows/asyncworkflow.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
