news 2026/4/23 14:19:57

AgentScope深入学习-Pipeline与消息

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
AgentScope深入学习-Pipeline与消息

协调的艺术:Pipeline 与消息系统核心解析

请关注公众号【碳硅化合物AI】

摘要

多智能体系统的核心是协调。AgentScope 通过 Pipeline 和消息系统实现了优雅的多智能体编排。本文将深入分析 MsgHub、Pipeline 模式以及消息系统的设计。你会发现,消息(Msg)不仅是数据载体,更是整个框架的统一接口;MsgHub 通过订阅机制实现了自动消息广播;Pipeline 提供了多种编排模式,让多智能体协作变得简单而强大。通过阅读本文,你会理解多智能体如何通过消息进行通信,如何通过 Pipeline 进行编排,以及这些机制背后的设计考量。

入口类与类关系

消息系统的类层次

消息系统非常简单,但设计精妙:

Pipeline 系统的类层次

Pipeline 提供了多种编排模式:

关键代码:Msg 数据结构

Msg 是框架的核心数据结构:

class Msg: """The message class in agentscope.""" def __init__( self, name: str, content: str | Sequence[ContentBlock], role: Literal["user", "assistant", "system"], metadata: dict[str, JSONSerializableObject] | None = None, timestamp: str | None = None, invocation_id: str | None = None, ) -> None: self.name = name self.content = content self.role = role self.metadata = metadata self.id = shortuuid.uuid() self.timestamp = timestamp or datetime.now().strftime(...)

这个设计非常巧妙:

  • 统一接口:所有智能体间通信都使用 Msg
  • 多模态支持:content 可以是字符串或 ContentBlock 列表
  • 元数据支持:metadata 字段可以存储结构化输出等额外信息
  • 自动标识:每个消息都有唯一的 id 和 timestamp

关键流程分析

消息创建和传递流程

消息在智能体间的传递非常直接:

多智能体对话流程

使用 MsgHub 的多智能体对话流程:

Pipeline 执行流程

顺序 Pipeline 的执行流程:

关键技术点

1. 消息作为统一数据结构的设计

Msg 类在 AgentScope 中扮演着核心角色,它是:

  • 智能体间通信的媒介:所有智能体都通过 Msg 交换信息
  • 与 LLM API 的桥梁:Formatter 将 Msg 转换为 API 所需格式
  • 记忆存储的单元:Memory 存储的是 Msg 对象列表
  • UI 显示的数据源:前端可以直接显示 Msg 对象

这种统一设计避免了数据格式转换的复杂性。无论消息来自哪里、要去哪里,都是同一个数据结构。

2. 多模态消息块系统

Msg 支持多模态内容,通过 ContentBlock 系统实现:

class TextBlock(TypedDict, total=False): type: Required[Literal["text"]] text: str class ToolUseBlock(TypedDict, total=False): type: Required[Literal["tool_use"]] id: Required[str] name: Required[str] input: Required[dict[str, object]] class ToolResultBlock(TypedDict, total=False): type: Required[Literal["tool_result"]] id: Required[str] name: Required[str] output: Required[str | List[ContentBlock]] class ImageBlock(TypedDict, total=False): type: Required[Literal["image"]] source: Required[Base64Source | URLSource] class AudioBlock(TypedDict, total=False): type: Required[Literal["audio"]] source: Required[Base64Source | URLSource] ContentBlock = ( ToolUseBlock | ToolResultBlock | TextBlock | ThinkingBlock | ImageBlock | AudioBlock | VideoBlock )

这种设计让消息可以包含:

  • 文本内容
  • 工具调用和结果
  • 图像、音频、视频等多媒体内容
  • 思考过程(thinking block)

所有内容都统一在一个消息对象中,非常灵活。

3. 消息中心(MsgHub)的设计

MsgHub 通过订阅机制实现自动消息广播。关键代码:

def _reset_subscriber(self) -> None: """Reset the subscriber for agent in `self.participant`""" if self.enable_auto_broadcast: for agent in self.participants: agent.reset_subscribers(self.name, self.participants)

当智能体在 MsgHub 中回复时,消息会自动广播给其他参与者。这个机制通过 AgentBase 的订阅系统实现:

# 在 AgentBase 中defreset_subscribers(self,hub_name:str,subscribers:list[AgentBase])->None:"""设置订阅者,当智能体回复时,消息会自动发送给订阅者"""self._subscribers[hub_name]=subscribers

这种设计让多智能体对话变得非常简单:

asyncwithMsgHub([agent1,agent2,agent3]):awaitagent1()# agent2 和 agent3 自动收到消息awaitagent2()# agent1 和 agent3 自动收到消息awaitagent3()# agent1 和 agent2 自动收到消息

4. 不同 Pipeline 模式的实现

AgentScope 提供了多种 Pipeline 模式:

顺序 Pipeline(SequentialPipeline)

async def sequential_pipeline( agents: list[AgentBase], msg: Msg | list[Msg] | None = None, ) -> Msg | list[Msg] | None: """执行智能体序列,前一个的输出作为下一个的输入""" for agent in agents: msg = await agent(msg) return msg

扇出 Pipeline(FanoutPipeline)

async def fanout_pipeline( agents: list[AgentBase], msg: Msg | list[Msg] | None = None, enable_gather: bool = True, ) -> list[Msg]: """将同一个输入分发给多个智能体""" if enable_gather: # 并发执行 tasks = [asyncio.create_task(agent(deepcopy(msg))) for agent in agents] return await asyncio.gather(*tasks) else: # 顺序执行 return [await agent(deepcopy(msg)) for agent in agents]

这些 Pipeline 模式让多智能体编排变得非常灵活。你可以:

  • 顺序执行:前一个智能体的输出作为下一个的输入(适合流水线场景)
  • 并发执行:多个智能体同时处理同一个输入(适合并行分析场景)
  • 动态编排:在 MsgHub 中动态添加或删除参与者

5. 消息的序列化和反序列化

Msg 支持完整的序列化:

def to_dict(self) -> dict: """Convert the message into JSON dict data.""" return { "id": self.id, "name": self.name, "role": self.role, "content": self.content, "metadata": self.metadata, "timestamp": self.timestamp, } @classmethod def from_dict(cls, json_data: dict) -> "Msg": """Load a message object from the given JSON data.""" new_obj = cls( name=json_data["name"], content=json_data["content"], role=json_data["role"], metadata=json_data.get("metadata", None), timestamp=json_data.get("timestamp", None), invocation_id=json_data.get("invocation_id", None), ) new_obj.id = json_data.get("id", new_obj.id) return new_obj

这种设计让消息可以:

  • 保存到文件或数据库
  • 通过网络传输
  • 在不同进程间共享
  • 用于调试和日志记录

总结

Pipeline 和消息系统是 AgentScope 框架中实现多智能体协调的核心:

  1. 消息系统:通过统一的 Msg 数据结构,实现了智能体间通信、API 交互、记忆存储的统一接口
  2. MsgHub:通过订阅机制实现了自动消息广播,让多智能体对话变得简单优雅
  3. Pipeline:提供了多种编排模式,支持顺序、并发、动态等多种场景

这些系统的设计都体现了 AgentScope 的核心理念:透明、模块化、易用。在下一篇文章中,我们会分析扩展机制的实现,了解如何为框架添加新功能。


版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 9:49:27

合肥工业大学团队首创TIMAR:3D虚拟人实现真实对话交互

这项由合肥工业大学陈俊杰团队主导,联合中国科学技术大学、上海交通大学、中国电信人工智能研究院、西北工业大学、阿联酋大学和安徽理工大学等多家机构合作完成的研究,于2024年12月发表在arXiv预印本平台(论文编号:arXiv:2512.15…

作者头像 李华
网站建设 2026/4/23 9:46:31

维也纳大学团队破解超双曲几何在强化学习中的训练难题

在人工智能的世界里,有一个一直困扰研究者们的问题:如何让机器像人类一样理解事物之间的层次关系?当你下棋时,每一步棋都会开启无数种可能的未来,这些可能性像树枝一样层层分叉。传统的AI系统在处理这种树状结构时就像…

作者头像 李华
网站建设 2026/4/23 11:15:38

28、电脑网络连接与音乐播放指南

电脑网络连接与音乐播放指南 1. 网络连接准备 一些互联网服务提供商(ISP)会为用户提供无线路由器/调制解调器,甚至会派技术人员到用户家中设置网络。不妨向ISP咨询一下是否有此服务。 2. 无线路由器设置 无线路连接给每个手机用户带来了便利,但对于电脑而言,它也带来了…

作者头像 李华
网站建设 2026/4/22 21:06:32

如何用Excalidraw做思维导图?替代XMind的新思路

如何用 Excalidraw 做思维导图?替代 XMind 的新思路 在一场远程产品评审会上,主讲人还在手忙脚乱地展开 XMind 文件、调整缩放比例时,另一位工程师直接打开一个链接,对着摄像头说:“我来画一下我的理解。” 话音未落&a…

作者头像 李华
网站建设 2026/4/18 10:21:08

ByteDance研究团队如何用轨迹场重新定义4D视频理解

在ByteDance Seed、香港科技大学、浙江大学和达特茅斯学院的研究团队合作下,一项突破性的研究在2025年10月发表于arXiv(论文编号:2510.13802),这项研究为我们理解和处理视频内容提供了全新的视角。由Xinhang Liu、Yuxi…

作者头像 李华
网站建设 2026/4/23 13:03:10

16、高效利用Windows系统:搜索、打印与扫描全攻略

高效利用Windows系统:搜索、打印与扫描全攻略 一、Cortana数字助理的使用 Windows 10 中包含了一个友好的个人数字助理Cortana,它不仅能帮你找到丢失的文件,还能提供关于你和周围环境的有用信息,比如当地的天气更新、回家路上的交通信息,甚至附近受欢迎的餐厅列表。它还…

作者头像 李华