返回首页

Agent 框架深度解析:CrewAI

从角色、任务、工具和编排策略几个核心概念入手,拆解 CrewAI 的 Agent 协作模型。

在针对性的学习 CrewAI 之前,我大概了解了一下目前主要的 Agent 框架有 LangGraph、 CrewAI、AutoGen。其中 LangGraph 是脱胎自 LangChain,LangChain 是个大 IP,之前也了解过,感觉新鲜程度不高,AutoGen 是微软开源的,让我想起了之前解析 Excel XML 的悲惨经历,故第一篇先研究一下 CrewAI。

“角色胜于规则”的设计范式 1.0 版本 Crew

CrewAI 最初的设想是很先进的,它从一开始就与 LangChain 截然不同,摒弃了旧的研发范式,与”传统的 Agent 框架” 不太一样。它没有状态管理,也不严格控制流程,给 Agent 较大的自主性,强调 “角色胜于规则”(Roles over Rules) ,通过为每个Agent定义清晰的角色身份来驱动协作。

核心组件详解

CrewAI 包含了下述的组件 Tool 、Agent 、Task 、Crew。

  • Tool (工具):赋予Agent与外部世界交互的能力,可以是函数或 API,或者 MCP Server 中的 tool 。
  • Agent:通过 goal (目标) 和 backstory (背景故事) 来定义这个role (角色),使用tool(工具)赋予角色做事的能力,这种定义方式是 CrewAI 的标志性特征,它不仅简化了Agent的创建,还通过丰富的上下文帮助 LLM 更好地进行角色扮演。
  • Task (任务):分配给Agent的离散工作单元。每个任务都有明确的:description (描述)、 expected_output (预期输出)、agent(执行的 Agent),确保了工作目标的清晰性。
  • Crew (团队):作为总指挥,将Agent和任务组合在一起,并管理整个工作流程执行。组建 Crew 时除了需要 Agent 和 Task 外,还需要定义编排策略,当前有两种内置的编排策略:
    • Process.sequential:任务按顺序执行,后一个任务可以利用前一个任务的输出
    • Process.hierarchical:引入一个经理Agent,由它来动态地将任务分配给其他Agent
    • # TODO: consensual = 'consensual': 这个是在源码里发现的,翻译过来是协商,应该是多 Agent 协同。

实战教程:流水线诊断系统的实现

现在,我们通过一个流水线诊断系统来展示 CrewAI 的基本用法。这个系统模拟了对流水线组件进行故障诊断的场景。

定义自定义工具

CrewAI 支持自定义工具,我们 先创建一个 HTTP 组件日志获取 工具:

from crewai.tools import BaseTool
class PipelineDiagnosisTool(BaseTool):
    """HTTP 组件日志获取"""
    name: str = "Pipeline Diagnosis Tool"
    description: str = "HTTP 组件日志获取"
    def _run(self, components: str = "10086") -> str:
        """HTTP 组件日志获取,mock 一下直接返回 500"""
        result = components + " response:500"
        print(f"   {result}")
        return result

创建 Agent

使用 YAML 配置文件来定义Agent,提高可维护性:

# config/agents.yaml
http_component_diagnosis_agent:
  role: >
    三方HTTP组件诊断专家
  goal: >
    专门负责诊断和分析三方HTTP类型组件的故障和性能问题。
    识别HTTP请求失败、超时、响应异常等问题,并提供修复建议。
  backstory: >
    你是一名经验丰富的系统诊断专家,专注于HTTP组件的故障排查。
    具备深厚的网络协议知识和丰富的三方服务集成经验,能够快速定位
    HTTP组件相关的各种问题并提供有效的解决方案。
    所有问题使用中文回复

加载配置,创建 Agent:

# 加载配置
agents_config = yaml.safe_load(open("config/agents.yaml"))
llm = os.getenv("MODEL", "openai/DeepSeek-V3-Function-Call")
def http_component_diagnosis_agent() -> Agent:
    return Agent(
        config=agents_config["http_component_diagnosis_agent"],
        tools=[PipelineDiagnosisTool()],
        llm=llm,
        verbose=False,  
    )

这种基于配置文件的方法有几个优势:

  • 可维护性:Agent的角色、目标和背景故事都存储在配置文件中,便于修改和管理
  • 国际化支持:可以轻松为不同语言创建不同的配置文件
  • 团队协作:非技术人员也可以修改Agent的行为描述

创建和执行任务

有了Agent之后,我们需要为它们创建具体的任务:

from crewai import Task, Crew, Process
# 创建任务
diagnosis_task = Task(
    description="""
    🎯 诊断任务:
    
    📊 目标组件:HTTP组件 [123]
    📋 分析要求:
    1. 检查组件状态
    2. 识别潜在问题
    3. 提供修复建议
    
    ✅ 请使用诊断工具完成分析并输出清晰的结果
    """,
    expected_output="详细的组件诊断报告,包括问题分析和修复建议",
    agent=http_component_diagnosis_agent()
)

组建和启动 Crew

# 组建 Crew
diagnosis_crew = Crew(
    agents=[http_component_diagnosis_agent()],
    tasks=[diagnosis_task],
    process=Process.sequential,  # 顺序执行
    verbose=True  # 显示详细日志
)

多Agent协作示例

简单的多Agent协作示例:

# 创建两个Agent
analyst_agent = Agent(
    role='数据分析师',
    goal='分析数据并生成报告',
    backstory='您是一位经验丰富的数据分析专家',
    llm=llm
)
reviewer_agent = Agent(
    role='报告审核员',
    goal='审核分析报告的准确性',
    backstory='您是一位细致的质量控制专家',
    llm=llm
)
# 创建任务
analysis_task = Task(
    description="分析销售数据并生成初步报告",
    expected_output="包含关键指标的数据分析报告",
    agent=analyst_agent
)
review_task = Task(
    description="审核上述分析报告,提出改进建议",
    expected_output="审核后的最终报告",
    agent=reviewer_agent,
    context=[analysis_task]  # 依赖分析任务的结果
)
# 组建 Crew
crew = Crew(
    agents=[analyst_agent, reviewer_agent],
    tasks=[analysis_task, review_task],
    process=Process.sequential,
    verbose=True
)
# 执行
result = crew.kickoff()

深入理解 Crew.kickoff :如何执行 Tasks

kickoff 方法是 CrewAI 的心脏,负责启动和协调整个 Crew 的执行。kickoff 是啥意思?我们经常说 项目KO 战役 KO ,这里的 KO 就是 kickoff,启动的意思!

kickoff 方法的执行流程

CrewAI 的 kickoff 方法位于 crew.py 文件,它的执行可以分为以下几个关键阶段:

1. 初始化和准备阶段

def kickoff(
    self,
    inputs: Optional[Dict[str, Any]] = None,
) -> CrewOutput:
    # 设置上下文追踪
    ctx = baggage.set_baggage("crew_context", CrewContext(id=str(self.id), key=self.key))
    token = attach(ctx)
    
    try:
        # 执行预执行回调,这个可以预定义的
        for before_callback in self.before_kickoff_callbacks:
            if inputs is None:
                inputs = {}
            inputs = before_callback(inputs)
        
        # 发送启动事件
        crewai_event_bus.emit(
            self, CrewKickoffStartedEvent(crew_name=self.name, inputs=inputs)
        )

2. Agent 准备和配置阶段

# 重置任务输出处理器
self._task_output_handler.reset()
self._logging_color = "bold_purple"

if inputs is not None:
    self._inputs = inputs
    self._interpolate_inputs(inputs)  # 插值替换输入变量
self._set_tasks_callbacks()  # 设置任务回调

i18n = I18N(prompt_file=self.prompt_file)

# 为每个 Agent 设置配置
for agent in self.agents:
    agent.i18n = i18n
    agent.crew = self
    agent.set_knowledge(crew_embedder=self.embedder)
    if not agent.function_calling_llm:
        agent.function_calling_llm = self.function_calling_llm
    if not agent.step_callback:
        agent.step_callback = self.step_callback
    agent.create_agent_executor()  # 创建 Agent 执行器

3. 执行流程选择

if self.planning:
    self._handle_crew_planning()  # 可选的规划阶段

if self.process == Process.sequential:
    result = self._run_sequential_process()
elif self.process == Process.hierarchical:
    result = self._run_hierarchical_process()
else:
    raise NotImplementedError(f"The process '{self.process}' is not implemented yet.")

根据配置的 Process 类型选择不同的执行策略:

Sequential 执行策略

def _run_sequential_process(self) -> CrewOutput:
    """Executes tasks sequentially and returns the final output."""
    return self._execute_tasks(self.tasks)

def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
    if self.process == Process.hierarchical:
        return self.manager_agent
    return task.agent  # Sequential 模式直接返回任务绑定的 Agent

Sequential 模式特点: Agent 和 Task 的绑定关系在创建时就确定,每个任务直接由预先分配的 Agent 执行,简单线性,上下文依次传递,执行路径完全可控。

Hierarchical 执行策略

def _run_hierarchical_process(self) -> CrewOutput:
    """Creates and assigns a manager agent to make sure the crew completes the tasks."""
    self._create_manager_agent()  # 关键:创建 Manager Agent
    return self._execute_tasks(self.tasks)

def _create_manager_agent(self):
    i18n = I18N(prompt_file=self.prompt_file)
    if self.manager_agent is not None:
        # 使用自定义的 Manager Agent
        self.manager_agent.allow_delegation = True
        manager = self.manager_agent
    else:
        # 自动创建默认的 Manager Agent
        self.manager_llm = create_llm(self.manager_llm)
        manager = Agent(
            role=i18n.retrieve("hierarchical_manager_agent", "role"),
            goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
            backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
            tools=AgentTools(agents=self.agents).tools(),  # 自动获得委派工具
            allow_delegation=True,
            llm=self.manager_llm,
            verbose=self.verbose,
        )
        self.manager_agent = manager
    manager.crew = self

def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
    if self.process == Process.hierarchical:
        return self.manager_agent  # 所有任务都由 Manager Agent 执行
    return task.agent

Hierarchical 模式特点: 也是任务依次执行,所有任务都由 Manager Agent 统一执行和调度,Manager Agent 可以将具体工作委派给其他 Agent,更好的全局控制和协调能力。

核心执行引擎:_execute_tasks 方法

无论选择哪种执行流程,最终都会调用 _execute_tasks 方法,这是 CrewAI 的核心执行引擎:

任务循环处理

def _execute_tasks(self, tasks: List[Task], start_index: Optional[int] = 0, was_replayed: bool = False) -> CrewOutput:
    task_outputs: List[TaskOutput] = []
    futures: List[Tuple[Task, Future[TaskOutput], int]] = []
    last_sync_output: Optional[TaskOutput] = None
    
    for task_index, task in enumerate(tasks):
        # 跳过已执行的任务(用于重播功能)
        if start_index is not None and task_index < start_index:
            if task.output:
                if task.async_execution:
                    task_outputs.append(task.output)
                else:
                    task_outputs = [task.output]
                    last_sync_output = task.output
            continue

Agent 选择和工具准备

agent_to_use = self._get_agent_to_use(task)
if agent_to_use is None:
    raise ValueError(f"No agent available for task: {task.description}")

# 工具优先级:任务工具 > Agent 工具
tools_for_task = task.tools or agent_to_use.tools or []
tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task)

self._log_task_start(task, agent_to_use.role)

条件任务处理

CrewAI 支持条件任务,可以根据前序任务的结果决定是否执行:

if isinstance(task, ConditionalTask):
    skipped_task_output = self._handle_conditional_task(
        task, task_outputs, futures, task_index, was_replayed
    )
    if skipped_task_output:
        task_outputs.append(skipped_task_output)
        continue

异步执行支持

CrewAI 原生支持异步任务执行,这是其高性能的关键特性:

if task.async_execution:
    # 异步执行:不阻塞后续任务
    context = self._get_context(task, [last_sync_output] if last_sync_output else [])
    future = task.execute_async(
        agent=agent_to_use,
        context=context,
        tools=cast(List[BaseTool], tools_for_task),
    )
    futures.append((task, future, task_index))
else:
    # 同步执行:等待前面的异步任务完成
    if futures:
        task_outputs = self._process_async_tasks(futures, was_replayed)
        futures.clear()
    
    context = self._get_context(task, task_outputs)
    task_output = task.execute_sync(
        agent=agent_to_use,
        context=context,
        tools=cast(List[BaseTool], tools_for_task),
    )
    task_outputs.append(task_output)

上下文管理机制

CrewAI 的上下文管理是其智能协作的核心:

def _get_context(self, task: Task, task_outputs: List[TaskOutput]) -> str:
    if not task.context:
        return ""
    
    context = (
        aggregate_raw_outputs_from_task_outputs(task_outputs)
        if task.context is NOT_SPECIFIED
        else aggregate_raw_outputs_from_tasks(task.context)
    )
    return context
  • 自动上下文:如果没有指定 context,自动使用前面所有任务的输出
  • 显式上下文:可以明确指定依赖的任务,只使用特定任务的输出

Memory 和状态管理

CrewAI 支持多种类型的 memory:

# 在 model_validator 中初始化 memory
def create_crew_memory(self) -> "Crew":
    self._external_memory = self.external_memory.set_crew(self) if self.external_memory else None
    self._long_term_memory = self.long_term_memory
    self._short_term_memory = self.short_term_memory
    self._entity_memory = self.entity_memory
    
    if self.memory:
        self._initialize_default_memories()
    
    return self
  • short_term_memory:存储当前会话的信息
  • long_term_memory:持久化重要信息
  • entity_memory:记住特定实体的信息
  • external_memory:支持外部存储系统

源码解读小结

通过源码分析,我们可以看到 CrewAI 的设计理念:

  1. 声明式配置:用户只需定义”做什么”,框架处理”怎么做”
  2. 智能协作:通过上下文自动传递,Agent 能够基于前序结果工作
  3. 高性能执行:支持异步任务和并行处理
  4. 灵活扩展:工具、memory、回调等都支持自定义扩展

这种设计让开发者能够专注于业务逻辑的实现,而不需要关心复杂的执行细节,体现了 CrewAI “角色胜于规则” 的核心理念。

控制与自主的平衡:2.0 版本 Flows

对比 LangGraph 的实现,CrewAI 的代码显著更短、更具声明性。开发者只需定义”谁做什么”(Agent和任务),而框架的 Process 则负责处理”如何做”(执行顺序和上下文传递)。这正是 CrewAI 在”简单性”方面的核心优势,它为许多标准化的多步工作流(如研究→分析→报告)提供了一条从想法到实现的最快路径。

虽然 Crews 在简化工作流方面表现出色,但其自主性也意味着开发者对执行过程的控制力较弱。为了应对更复杂、需要精确控制的状态和逻辑,CrewAI 引入了 Flows,看来新范式还是过于先进了呀。

Flows 是 CrewAI 对标 LangGraph 的解决方案,它允许开发者构建事件驱动的、非线性的工作流。通过 @start@listen@router 等装饰器,可以实现复杂的条件分支和并行执行。此外,Flows 支持显式的状态管理,开发者既可以使用灵活的字典式非结构化状态,也可以通过 Pydantic 模型定义类型安全的结构化状态,从而在任务之间精确地传递和管理数据。

Flow 的核心特性

  • 事件驱动的架构

    • @start() 装饰器标记流程的入口点
    • @listen() 装饰器创建事件监听器,当指定的方法完成后自动触发
    • @router 路由决策器
  • 状态管理

    • self.state 提供了全局状态存储
    • 每个 Flow 实例自动分配唯一 ID ,方便回滚
    • 状态在整个流程中持久化,方便数据传递
  • 可视化支持

    • flow.plot() 方法可以生成流程图,帮助理解工作流结构
  • 灵活的控制流

    • 支持条件分支和并行执行 OR AND
    • 可以动态决定下一步执行哪个方法
    • 比传统的 Crew 提供更精细的控制

Flows 实战案例:流水线诊断流程

基于前面的诊断系统,我们使用 Flows 来实现更复杂的事件驱动诊断流程:

from crewai.flow.flow import Flow, listen, start
import os
class PipelineDiagnosisFlow(Flow):
    model = os.getenv("MODEL", "openai/DeepSeek-V3-Function-Call")
    @start()
    def pre_process_pipeline(self):
        """预处理阶段:初始化流程状态"""
        print("Starting flow")
        # 每个流程状态都会自动获得唯一ID
        print(f"Flow State ID: {self.state['id']}")
        
        # 设置需要诊断的组件
        self.state["components"] = [123]
        print(f"Components: {self.state['components']}")
        
        return self.state["components"]
    @listen(pre_process_pipeline)
    def diagnosis_components(self, components):
        """诊断阶段:对组件进行详细诊断"""
        agent = http_component_diagnosis_agent()
        
        # 使用 kickoff 方法执行Agent任务
        response = agent.kickoff(
            messages=[
                {
                    "role": "user",
                    "content": f"Diagnose the components: {components}",
                },
            ],
        )
        # 将诊断结果存储到状态中
        self.state["diagnosis_result"] = response
        return response

启动和执行 Flow

# 创建并启动流程
flow = PipelineDiagnosisFlow()
# 可视化流程结构
flow.plot()
# 启动流程执行
result = flow.kickoff(
    inputs={"components": [123]}
)
print(f"Diagnosis result: {result}")

深入理解 Flow.kickoff :事件驱动与状态管理

同样 Flow.kickoff 也是 Flow 的核心方法,那么启动一个 Flow 后会发生什么呢,对比了其与 Crew 的差异,我主要关注不同的装饰器实现的事件驱动和新支持的状态管理。

装饰器与事件驱动

回忆下,主要有下面三个装饰器:

1. @start() - 流程入口点

@start()  # 无条件启动
def initialize(self):
    return "INIT_COMPLETE"

@start("external_event")  # 条件启动
def conditional_start(self):
    return "CONDITIONAL_COMPLETE"

2. @router() - 路由决策器

@router("initialize")
def decide_path(self):
    if self.state.get("emergency"):
        return "EMERGENCY_PATH"  # 返回值成为新的触发源
    return "NORMAL_PATH"

3. @listen() - 事件监听器

@listen("NORMAL_PATH")  # 监听单个事件
def normal_processing(self):
    pass

@listen(and_("task1", "task2"))  # AND 条件:所有完成才触发
def final_step(self):
    pass

@listen(or_("success", "timeout"))  # OR 条件:任一完成即触发
def cleanup(self):
    pass

@start 启动后立即执行

async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
    # 1. 状态管理:初始化或恢复持久化状态
    if inputs and "id" in inputs and self._persistence:
        stored_state = self._persistence.load_state(inputs["id"])
        if stored_state:
            self._restore_state(stored_state)  # 断点续传
    
    # 2. 并行启动:所有 @start 方法同时执行
    tasks = [self._execute_start_method(method) for method in self._start_methods]
    await asyncio.gather(*tasks)

事件链执行策略

@router 串行,@listen 并行

async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
    # 第一层:串行执行 @router(确保决策一致性)
    while True:
        routers = self._find_triggered_methods(current_trigger, router_only=True)
        if not routers: break
        
        for router in routers:
            await self._execute_single_listener(router, result)
            router_result = self._method_outputs[-1]
            current_trigger = router_result  # 路由结果成为新的触发源
    
    # 第二层:并行执行 @listen(提升执行效率)
    for trigger in all_triggers:
        listeners = self._find_triggered_methods(trigger, router_only=False)
        if listeners:
            tasks = [self._execute_single_listener(name, result) for name in listeners]
            await asyncio.gather(*tasks)  # 并行执行

分层设计的优势:

  • Router 层(串行):保证路径决策的确定性,避免竞态条件
  • Listen 层(并行):无依赖的监听器同时执行,提升性能
  • 动态路由:Router 返回值可触发新的事件链,实现复杂流程控制

智能条件触发

Flow 支持复杂的条件逻辑,通过状态跟踪实现精确的依赖管理:

# OR 条件:任一触发即执行
if condition_type == "OR" and trigger_method in methods:
    triggered.append(listener_name)

# AND 条件:全部完成才触发(使用状态跟踪)
elif condition_type == "AND":
    if listener_name not in self._pending_and_listeners:
        self._pending_and_listeners[listener_name] = set(methods)  # 初始化待完成集合
    
    self._pending_and_listeners[listener_name].discard(trigger_method)  # 标记完成
    
    if not self._pending_and_listeners[listener_name]:  # 全部完成
        triggered.append(listener_name)
        self._pending_and_listeners.pop(listener_name)  # 清理状态

条件组合示例:

@listen(or_("payment_success", "free_trial"))     # 支付成功 OR 免费试用
@listen(and_("auth_passed", "quota_available"))   # 认证通过 AND 配额充足

双模式状态管理

Flow 支持两种状态管理模式,兼顾灵活性与类型安全:

模式一:字典状态(动态灵活)

class MyFlow(Flow):  # 默认字典状态
    @start()
    def begin(self):
        self.state["dynamic_field"] = "任意数据"  # 运行时动态添加字段
        return "started"

模式二:Pydantic 状态(类型安全)

class FlowState(BaseModel):
    components: List[int]
    status: str = "pending"
    result: Optional[str] = None

class MyFlow(Flow[FlowState]):
    initial_state = FlowState
    
    @start()
    def begin(self):
        self.state.components = [123]  # IDE 类型检查 + 自动完成
        return "started"

持久化机制:

  • 断点续传:通过 flow.kickoff(inputs={"id": "flow-123"}) 恢复状态
  • 自动序列化:字典直接存储,Pydantic 模型自动验证和转换
  • 状态隔离:每个 Flow 实例拥有独立的状态空间

总结

CrewAI 以其独特的”角色胜于规则”设计理念,为多 Agent 系统开发提供了一条简洁而强大的路径。通过 Crews 和 Flows 的双重范式,它既能满足快速开发的需求,又能应对复杂的业务逻辑。对于希望快速构建企业级多Agent应用的开发者而言,CrewAI 无疑是一个值得深入探索的优秀框架。

参考资料

相关笔记

  • 【WIP】Agent 框架深度解析(三):AutoGen - 同系列,AutoGen 的对话驱动范式
  • 三年四次转向,LangChain 1.0 终于来了 - LangGraph 状态图架构,与 CrewAI 形成对比
  • AI Agent 中的工具动态加载 - Agent 工具加载机制深入探讨
  • DeerFlow-学习笔记 - 字节开源 Agent 运行时平台,含 Skills 和子代理设计