news 2026/4/23 9:55:30

go.dev博客阅读-pipelines

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
go.dev博客阅读-pipelines

这篇文章 2014年3月13日发表,作者 Sameer Ajmani

通过灵活的运用chan类型,在 Go 中更高效的处理数据,这里应用领域为健壮高效的流式数据处理,并在安全性问题上做了补充,例如程序异常、内存泄漏、Gc释放等

一些开源类库也沿用了其思想,例如MapReduces、并行处理等

这篇博客要以MapReduces或者生产消费模型的思想去阅读

博客开头的示例

一个比较基础的管道使用

将一组整数通过管道依次平方,最终输出结果

// 将要计算平方的数字,依次添加到chan中,并返回该只允许读的chan// 注意:该chan是无缓冲的,gen函数运行完后,内部的goroutine会依然运行,直到处理完毕funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}// 从传入的只读chan中读取数据,计算平方,再返回chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}funcTestExample(t*testing.T){// chan数据传输:gen → sq → sq → 打印forn:=rangesq(sq(gen(2,3))){t.Log(n)}}

输出

/Users/www/zero-core/mr/mr_test.go:39: 16 /Users/www/zero-core/mr/mr_test.go:39: 81

过程中的一些说明

  1. gensq方法中分别创建了各自的 chan 变量,用于写入数据,并返回
  2. 声明 chan 类型后,要养成 close 的习惯,close 后依然可以读,有减缓 Gc 压力
  3. sq(sq(gen(2, 3)))中,三个方法,通过传入 chan 参数实现数据流转,sq方法调用了两次
  4. gensq方法中的 chan 均为无缓冲通道,互相调用时为阻塞模型,也就意味着同一时刻只可能会有一段程序在执行(无论几核)

这里就是使用 chan 类型,实现了一个简陋的 MapReduces 过程

并行处理

官方着重提到的是并行,但至于是否多核并行还是依赖于并发实现

依旧是求平方的案例

// 原始数据无阻塞写入 chan, 注意,这里返回的时候有缓冲的 chanfuncgen(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}// 读取传入的 chan, 并计算平方, 写入 chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}// 将传入的n个 chan ,用 n 个 goroutine 读取, 并将其写入到 out chan 中funcmerge(cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)// 读取传入的 chan, 并将其写入到 out chan 中output:=func(c<-chanint){deferwg.Done()forn:=rangec{out<-n}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){// 将 2, 3, 4, 9 写入有缓冲的 chan,返回的 chan 用 2 个 sq 方法去接收(2个消费者)in:=gen(2,3,4,9)c1:=sq(in)c2:=sq(in)forn:=rangemerge(c1,c2){// 输出 4 9 81 16(顺序不定)t.Log(n)}}

输出

/Users/www/zero-core/mr/mr_test.go:68: 4 /Users/www/zero-core/mr/mr_test.go:68: 9 /Users/www/zero-core/mr/mr_test.go:68: 81

说明:

  1. c1、c2 相当于2个消费任务去执行,通过内部创建的 goroutinue 去模型多线程多核并行
  2. merge 方法将多个传入的 chan 输出,合并到一个 chan,保证 Reduces 阶段只会有1个输出出口
  3. ❌这里面有个不严谨漏洞,当取数据不是采用 range 方式或者 chan 数据没有取完, chan 的发送方就会阻塞

带取消功能的 chan

并行处理的代码改进,在每个方法中都引入done

funcgen(done<-chanstruct{},nums...int)<-chanint{out:=make(chanint)gofunc(){deferclose(out)for_,n:=rangenums{select{caseout<-n:case<-done:return}}}()returnout}funcsq(done<-chanstruct{},in<-chanint)<-chanint{out:=make(chanint)gofunc(){deferclose(out)forn:=rangein{select{caseout<-n*n:case<-done:return}}}()returnout}funcmerge(done<-chanstruct{},cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)output:=func(c<-chanint){deferwg.Done()forn:=rangec{select{caseout<-n:case<-done:return}}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){done:=make(chanstruct{})deferclose(done)// 保证所有 goroutine 收到取消信号in:=gen(done,2,3,4,9)c1:=sq(done,in)c2:=sq(done,in)out:=merge(done,c1,c2)// 只消费2个值就退出t.Log(<-out)t.Log(<-out)// 此时 done 被 defer 关闭,所有 goroutine 安全退出}
  1. 在每个方法中,都加入了done,内部使用select来监听是否关闭,并return 释放协程
  2. 如果chan没有取完,通过 close 通知 done 的方式,保证不会存在僵尸协程泄漏

但,这个案例还有改进的一步,比如,chan 中有3个值,现在只取了1个就进行了 close 关闭,chan 随是释放了,但内部剩余的2个值可能会发生逃逸现象,等待系统 Gc 释放

如追求性能,一种写法是 close 后,通过手动读取释放,来减缓 Gc 的压力

// 不仅仅 close 还空读取deferfunc(){close(done)forrangeout{}}()

额外注意的点

在多任务消费读取生产数据时

funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}funcgen2(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}

这两种方式实现过程结果一样,不同之处在于将生产数据变快,还是读取速度变快

gen循序渐进的放入生产计划中,gen2是一口气家在到生产计划中,具体采用哪种适业务而定

🧠🧠🧠🧠

对官方这篇博客,我的理解是

  1. 每个使用了 chan 的地方,应在适当的时候关闭且释放掉
  2. 每个使用了 chan 的地方应持续从输入 channel 读取,直到关闭或收到取消信号,而不是一口气读一口气写
  3. 不要完全依赖有缓冲的 chan 的 size 解决阻塞问题,缓冲的大小是一个容错作用
  4. 使用关闭的 channel 作为广播取消信号,通知所有上游 goroutine 停止工作。
  5. 使用 WaitGroup 时,务必确保所有任务完成后再关闭输出 channel,先 wait,再 close

原文出处 https://go.dev/blog/pipelines

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 16:39:44

Kotaemon医学文献检索:PubMed数据接入实战

Kotaemon医学文献检索&#xff1a;PubMed数据接入实战 在临床决策和科研探索中&#xff0c;医生与研究人员常常面临海量文献的筛选难题。一个关于“二甲双胍改善胰岛素抵抗”的问题&#xff0c;可能涉及成百上千篇论文&#xff0c;手动查阅既耗时又容易遗漏关键证据。而通用大模…

作者头像 李华
网站建设 2026/4/7 11:05:05

运维工程师必备:Ubuntu命令实战案例集

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个Ubuntu运维实战案例展示页面&#xff0c;包含10个典型场景&#xff1a;1) 监控CPU/内存使用&#xff08;top, free&#xff09;2) 分析磁盘空间&#xff08;df, du&#xf…

作者头像 李华
网站建设 2026/4/20 16:57:17

传统开发vsAI生成:wx-open-launch-app实现效率对比

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 请生成一个完整的wx-open-launch-app实现示例&#xff0c;包含前端页面和后端配置。要求展示从零开始到功能实现的完整流程&#xff0c;代码要优化且高效&#xff0c;包含性能考量。…

作者头像 李华
网站建设 2026/4/9 3:58:21

PathOfBuilding完全攻略:5步快速掌握Build规划的精髓

PathOfBuilding完全攻略&#xff1a;5步快速掌握Build规划的精髓 【免费下载链接】PathOfBuilding Offline build planner for Path of Exile. 项目地址: https://gitcode.com/GitHub_Trending/pa/PathOfBuilding 还在为流放之路中复杂的Build规划感到困惑吗&#xff1f…

作者头像 李华
网站建设 2026/4/22 19:23:52

传统VS现代:连接管理效率提升300%的秘诀

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 构建一个性能对比测试平台&#xff0c;比较处理discard long time none received connection问题时不同技术的效率。包含三个模块&#xff1a;1) 传统JDBC手动管理连接 2) Tomcat-D…

作者头像 李华
网站建设 2026/4/18 5:03:43

深度解锁ONNX转换:让AI模型在任意框架间自由流动

深度解锁ONNX转换&#xff1a;让AI模型在任意框架间自由流动 【免费下载链接】onnx Open standard for machine learning interoperability 项目地址: https://gitcode.com/gh_mirrors/onn/onnx 在AI开发的世界里&#xff0c;你是否曾遭遇"语言不通"的尴尬&am…

作者头像 李华