天天看点

golang一个例子引出的几个问题1 如何测试客户端服务端?2 这里的wg变量有什么用?3 为什么要有gate这个channel buffer?4 这里的atomic什么作用?5 procs的作用?

这个例子是从go源码src/pkg/net/rpc/server_test.go截取出来的

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

<code>func benchmarkendtoendasync(dial func() (*client, error), b *testing.b) {</code>

<code>     </code><code>const</code> <code>maxconcurrentcalls = 100</code>

<code>     </code><code>b.stoptimer()</code>

<code>     </code><code>once.do(startserver)</code>

<code>     </code><code>client, err := dial()</code>

<code>     </code><code>if</code> <code>err != nil {</code>

<code>          </code><code>b.fatal(</code><code>"error dialing:"</code><code>, err)</code>

<code>     </code><code>}</code>

<code>     </code><code>// asynchronous calls</code>

<code>     </code><code>args := &amp;args{7, 8}</code>

<code>     </code><code>procs := 4 * runtime.gomaxprocs(-1)</code>

<code>     </code><code>send := int32(b.n)</code>

<code>     </code><code>recv := int32(b.n)</code>

<code>     </code><code>var</code> <code>wg sync.waitgroup</code>

<code>     </code><code>wg.add(procs)</code>

<code>     </code><code>gate := make(chan bool, maxconcurrentcalls)</code>

<code>     </code><code>res := make(chan *call, maxconcurrentcalls)</code>

<code>     </code><code>b.starttimer()</code>

<code>     </code><code>for</code> <code>p := 0; p &lt; procs; p++ {</code>

<code>          </code><code>go func() {</code>

<code>               </code><code>for</code> <code>atomic.addint32(&amp;send, -1) &gt;= 0 {</code>

<code>                    </code><code>gate &lt;- true</code>

<code>                    </code><code>reply :=</code><code>new</code><code>(reply)</code>

<code>                    </code><code>client.go(</code><code>"arith.add"</code><code>, args, reply, res)</code>

<code>               </code><code>}</code>

<code>          </code><code>}()</code>

<code>               </code><code>for</code> <code>call := range res {</code>

<code>                    </code><code>a := call.args.(*args).a</code>

<code>                    </code><code>b := call.args.(*args).b</code>

<code>                    </code><code>c := call.reply.(*reply).c</code>

<code>                    </code><code>if</code> <code>a+b != c {</code>

<code>                         </code><code>b.fatalf(</code><code>"incorrect reply: add: expected %d got %d"</code><code>, a+b, c)</code>

<code>                    </code><code>}</code>

<code>                    </code><code>&lt;-gate</code>

<code>                    </code><code>if</code> <code>atomic.addint32(&amp;recv, -1) == 0 {</code>

<code>                         </code><code>close(res)</code>

<code>               </code><code>wg.done()</code>

<code>     </code><code>wg.wait()</code>

<code>}</code>

这个代码用来对rpc的客户端go函数进行压力测试。

这里有几个地方值得揣摩下:

先使用startserver(这个函数里面具体是开启了一个routine)进行服务器服务。然后在每个测试用例中启动server,如果是benchtest的话记得这里的timer要在启动服务器行为之后再开启。

wg变量是sync.waitgroup类型,add增加计数,done减少计数,wait进行阻塞等待,等计数减为0的时候再停止阻塞。

这里如果不使用waitgroup进行wait阻塞的话,主routine会先于次routine先结束。会导致程序提早退出。

因此这里也给出了一个测试用例中测试异步函数的方法。就是使用waitgroup

看起来gate好像是没什么用啊,如果去掉gate呢?有可能会出现“rpc: discarding call reply due to insufficient done chan capacity”

这个gate完全是因为client.go这个函数,rpc包的client.go是异步的调用,虽然是异步调用,这个异步调用的最后一个done参数是一个channel buffer。

当client.go进行完rpc调用后,将信号传入这个channel buffer。但是这个channel buffer却是不会阻塞的。

具体看源码:

golang一个例子引出的几个问题1 如何测试客户端服务端?2 这里的wg变量有什么用?3 为什么要有gate这个channel buffer?4 这里的atomic什么作用?5 procs的作用?

这里select加了个default分支,说明了done是非阻塞的。看注释,作者认为这个buffer的大小容量应该由调用者来保证。rpc包并不保证容量大小。

方法有个两个:

这个方法就是gate的使用原因了。只有gate容量有剩余的时候才会容许调用client.go

在这个例子中,bench的channel最大只会是b.n,所以,如果我们分配的res的channel buffer大小为b.n也能解决这个问题。

这个方法导致的效果就是bench的时间变快了,但是mem分配增加了。

因为这里会有多个routine会对send和recive进行操作,这里就需要保证原子性。

多个并发routine对一个共享变量进行操作有两种方法,channel和锁。

这里当然使用channel也能起到原子操作的效果。sync包的atomic和sync的mutex都是锁的方式。

所以说这里其实可以使用channel,mutex,atomic三种方法。

bench test在运行前自身会调用runtime.gomaxprocs进行多核的设置,然后再每个处理器中并行运行测试。

这里的runtime.gomaxprocs(-1)是获取你要跑的cpu核数,这个核数是根据bench test的 -test.cpu设置的。具体可以看下src/testing/testing.go parsecpulist。在没有设置过gomaxprocs和test.cpu的情况下,这里的runtime.gomaxprocs就默认是1。

你可以使用-test.cpu 1,2,4来设置你的压力测试用例是有几个cpu,每个cpu是几核的。

这里的procs设置为处理器核数的4倍就是为了测试routine能分配远大于核数的个数,这样每个核承担的goroutine能大于1。

上面的for循环就是保证起的routine数是足够的。