# ConcurrentWorkflow

The `ConcurrentWorkflow` class orchestrates parallel execution of specialized agents in Web3 ecosystems, enabling high-throughput automation of tasks like cross-chain transactions, compliance checks, and DeFi strategy execution. Designed for enterprises requiring atomicity and scalability, it leverages Python's `asyncio` and `ThreadPoolExecutor` to maximize resource efficiency while preserving auditability via ZK-proofs.

***

#### **Key Features**

1. **Multi-Chain Execution**: Deploy agents across 40+ chains simultaneously (EVM, Cosmos, Solana).
2. **ZK-Metadata Anchoring**: Securely log agent outputs to IPFS/Filecoin with cryptographic proofs.
3. **Resiliency**: Auto-retry failed transactions (3x by default) with gas fee re-estimation.
4. **Modular Architecture**: Integrates with AgentGPT's `TaskQueue` and `AgentRegistry` for enterprise-scale workflows.

***

#### **Class Definitions**

**1. AgentOutputSchema**

```python
from pydantic import BaseModel  
from datetime import datetime  

class AgentOutputSchema(BaseModel):  
    agent_id: str               # Agent's on-chain identifier (e.g., 0x3fC...926)  
    tx_hash: Optional[str]      # On-chain transaction hash  
    gas_used: float             # Gas consumption in gwei  
    execution_time: float       # Milliseconds  
    error: Optional[str]        # Revert reason/error message  
```

**2. WorkflowMetadata**

```python
class WorkflowMetadata(BaseModel):  
    workflow_id: str            # Workflow's Merkle root (SHA-3 hash of steps)  
    chain_id: int               # Primary execution chain (e.g., 1 for Ethereum)  
    agent_outputs: List[AgentOutputSchema]  
    zk_proof: str               # SnarkJS Groth16 proof of workflow integrity  
```

***

#### **Core Methods**

**1. Initialize Workflow**

```python
def __init__(  
    self,  
    agents: List[BaseAgent],  
    max_retries: int = 3,  
    gas_strategy: str = "optimal",  # Options: "fast", "eco", "custom"  
    auto_save: bool = True  
):  
    if not agents:  
        raise ValueError("Minimum 1 agent required")  
    self.agents = agents  
    self.gas_oracle = ChainlinkGasFeed()  
```

**2. Execute Agents Concurrently**

```python
async def _execute_agents(self, task: str) -> WorkflowMetadata:  
    """  
    Runs agents in parallel threads with real-time gas adjustments  
    """  
    with ThreadPoolExecutor(max_workers=len(self.agents)) as executor:  
        futures = [  
            executor.submit(  
                agent.execute,  
                task,  
                gas_price=self.gas_oracle.get_price(agent.chain_id)  
            ) for agent in self.agents  
        ]  
        results = await asyncio.gather(*futures, return_exceptions=True)  
    return self._aggregate_results(results)  
```

***

#### **Enterprise Use Cases**

**1. Cross-Chain Payroll System**

*Agents*:

* **ComplianceAgent**: Validates employee addresses against OFAC lists.
* **FXAgent**: Converts EUR→USDC via Uniswap/AAVE.
* **PaymentAgent**: Distributes USDC across Ethereum/Polygon/Arbitrum.

*Workflow*:

```python
workflow = ConcurrentWorkflow(  
    agents=[fx_agent, compliance_agent, payment_agent],  
    gas_strategy="eco"  
)  
await workflow.run("Process Q3 salaries for 50K employees")  
```

**2. DeFi Liquidation Bot**

*Agents*:

* **RiskAgent**: Monitors LTV ratios on AAVE/Compound.
* **LiquidationAgent**: Executes liquidations via MEV-resistant relays.
* **ReportingAgent**: Generates audit reports for DAO governance.

***

#### **Usage Example: DeFi Strategy Orchestration**

```python
from agentgpt import Agent, ConcurrentWorkflow  
from agentgpt.defi import UniswapV3Agent, AaveLendingAgent  

# Initialize DeFi Agents  
arbitrage_agent = Agent(  
    agent_id="arbitrage_v1",  
    strategy="uniswap_aave_arb",  
    chains=["ethereum", "polygon"],  
    gas_limit=300_000  
)  

liquidity_agent = UniswapV3Agent(  
    pool_address="0x88e6...b56c",  
    fee_tier=5  # 0.05%  
)  

# Configure Workflow  
workflow = ConcurrentWorkflow(  
    agents=[arbitrage_agent, liquidity_agent],  
    gas_strategy="fast"  
)  

# Run Concurrent Execution  
task = {  
    "action": "optimize_lp_returns",  
    "params": {"capital": "100000 USDC"}  
}  
metadata = workflow.run(task)  

# Output (ZK-Anchored to IPFS)  
print(metadata.zk_proof)  # "0x12cf...89a2"  
```

***

#### **Performance Metrics**

| **Metric**     | Batch (10K Tasks) | Real-Time |
| -------------- | ----------------- | --------- |
| Avg. Gas Saved | 22%               | 6%        |
| Success Rate   | 99.1%             | 98.3%     |
| Throughput     | 2,400 tx/s        | 84 tx/s   |

***

#### **Best Practices**

1. **Gas Strategies**:
   * Use `gas_strategy="eco"` for non-critical batch jobs.
   * Enable `gas_price=FastGas * 1.1` during network congestion.
2. **Agent Design**:

   ```python
   class AaveLiquidationAgent(BaseAgent):  
       def __init__(self):  
           super().__init__(  
               health_check_interval=12,  # Block intervals  
               max_slippage=0.5  # 0.5%  
           )  
   ```
3. **Security**:
   * Anchor workflow metadata every 50 blocks.
   * Use `auto_save=True` for GDPR/CEX audit compliance.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://agent-gpt.gitbook.io/agent-gpt/workflows/concurrentworkflow.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
