funcf(left,rightchanint){//这个函数就把right的输出和left的输入联系起来了。left<-1+<-right}funcmain(){constn=10000leftmost:=make(chanint)right:=leftmostleft:=leftmost//创建长度为n的daisy链fori:=0;i 那么这个模式有什么用呢?它可以用来处理迭代算法,使得部分迭代运算并发执行。只要迭代的每个阶段都是相互独立的即可。比如,计算质数: 好了,下面我们换些基础的模式讲一下: 代码如下:用一个int表示ball(球),管道表示table(桌子),两个goroutine就是2个运动员,分别编号为1和2。 funcmain(){varBallinttable:=make(chanint)goplayer("2",table)goplayer("1",table)//首先把球放到“桌上”table<-Balltime.Sleep(1*time.Second)//1s后比赛结束……<-table}funcplayer(idstring,tablechanint){for{ball:=<-tablelog.Printf("%sgotball[%d]\n",id,ball)time.Sleep(50*time.Millisecond)log.Printf("%sbouncebackball[%d]\n",id,ball)ball++table<-ball}}输出如下: 1gotball[0]1bouncebackball[0]2gotball[1]2bouncebackball[1]1gotball[2]1bouncebackball[2]2gotball[3]2bouncebackball[3]1gotball[4]1bouncebackball[4]2gotball[5]2bouncebackball[5]代码简洁易懂,很好理解(看不懂的同学请不要拍我)。下面,我们增加一位选手,让3个运动员一块打球 goplayer("2",table)goplayer("3",table)goplayer("1",table)这下子热闹了,输出如下: 1gotball[0]1bouncebackball[0]2gotball[1]2bouncebackball[1]3gotball[2]3bouncebackball[2]1gotball[3]1bouncebackball[3]2gotball[4]2bouncebackball[4]3gotball[5]3bouncebackball[5]1gotball[6]1bouncebackball[6]2gotball[7]2bouncebackball[7]3gotball[8]3bouncebackball[8]看3个人有条不紊的相互击球。此时处女座一定非常满意,但是对于习惯了并发随机性的程序员来说,这实在有些过于美好:为什么它们的顺序如此协调,为什么1总是给2,2给3,3给1,而不是其他顺序呢? 划重点了啊: 也叫“扇入”,应该是并发编程里面比较普通的一个模式了。fan-in会从多个管道读取输入,并汇总到一个channel输出,形象的比喻如下图:示例代码如下 import("fmt""math/rand""os""runtime/trace""time")funcmain(){trace.Start(os.Stderr)c:=fanIn(boring(1),boring(2))fori:=0;i<10;i++{fmt.Println(<-c)}fmt.Println("You'rebothboring;I'mleaving.")trace.Stop()}funcfanIn(input1,input2<-chanint)<-chanint{c:=make(chanint)gofunc(){for{c<-<-input1}}()gofunc(){for{c<-<-input2}}()returnc}funcboring(msgint)<-chanint{c:=make(chanint)gofunc(){//Welaunchthegoroutinefrominsidethefunction.fori:=0;;i++{c<-msg*1000+itime.Sleep(time.Duration(rand.Intn(1e3))*time.Millisecond)}}()returnc//Returnthechanneltothecaller.}输出为:200020011001200210022003100320041004gotrace输出为(注意这是两次独立的运行结果):可以看到,两次的结果都汇入了main线程,并且顺序输出,没有丢失数据,也没有死锁。 当然,简单的情况,用select也可以。 select设计的目的就是在channel中间通讯,谁的数据先到达,哪个case分支先执行。 c1:=boring(1)c2:=boring(2)fori:=0;i<10;i++{select{casev:=<-c1:fmt.Println(v)casev:=<-c2:fmt.Println(v)}}Workers也叫FanOut(扇出),和扇入模式相反,工作模式是一个管道分发任务,多个goroutines来执行。示例代码如下: gotrace结果如下:圆柱体中心就是main进程中生成的pool进程,围绕它的是36个worker进程。蓝色箭头表示pool每隔10ms分发的任务,它们都被worker处理了。 server模式和fan_out类似,只不过它的worker线程是按需生成的,并且工作处理完毕后就释放。所以这种模式常应用到网站服务器上。在主进程中,有一个for循环,Accept函数一直阻塞着循环的进行,一旦有新的请求过来,Accept就会生成一个connection,然后主进程就创建一个子进程处理这个connection以及其他逻辑。 示例代码如下: import("fmt""net""os""runtime/trace""time")funchandler(cnet.Conn,chchanint){ch<-len(c.RemoteAddr().String())time.Sleep(10*time.Microsecond)c.Write([]byte("ok"))c.Close()}funclogger(chchanint){for{time.Sleep(1500*time.Millisecond)fmt.Println(<-ch)}}funcserver(lnet.Listener,chchanint){for{c,err:=l.Accept()iferr!=nil{continue}gohandler(c,ch)}}funcmain(){trace.Start(os.Stderr)l,err:=net.Listen("tcp",":5000")iferr!=nil{panic(err)}ch:=make(chanint)gologger(ch)goserver(l,ch)time.Sleep(10*time.Second)trace.Stop()}可以看到,主进程生成了一个tcp连接,启动了server和logger两个子进程。server用来监听外网的请求,一旦请求过来,就会生成一个handler进程,用来处理connection。同时,handler还会通过管道和logger通讯,logger负责异步记录相应日志。 这个程序运行时的输入需要模拟外部请求来产生,为此我写了一个脚本: #!/bin/shi=0while[[$i-lt20]];do#通过nc发起tcp请求。每秒请求一次echo"hello"$i|nclocalhost5000sleep1((++i))done运行时,先启动这个脚本,然后启动server或gotrace。 gotrace的运行结果如下: 为了解决这个问题,我们正好借助上面介绍的Worker模式,提高logger的并发性。 import("fmt""net""os""runtime/trace""time")funchandler(cnet.Conn,chchanint){ch<-0time.Sleep(50*time.Microsecond)c.Write([]byte("ok"))c.Close()}funclogger(wchchanint){for{fmt.Println(<-wch)//这里主要耗时time.Sleep(1500*time.Millisecond)}}funcpool(chchanint,nint){wch:=make(chanint)fori:=0;i 3D图如下:可以看到,此时server正好处理了10个请求。不再被logger拖延了。 注意我的题目是并发(concurrent)设计模式。那么并发和并行到底啥区别?? Parallelism(并行性):Aconditionthatariseswhenatleasttwothreadsareexecutingsimultaneously.这个就是狭义的并行,即线程、任务必须是同时进行的,否则不算parallelism。
入门goroutine并发设计模式以及goroutine可视化工具flybywind
THE END