天天看点

如何合理地估算线程池大小?

如何合理地估算线程池大小?

这个问题虽然看起来很小,却并不那么容易回答。大家如果有更好的方法欢迎赐教,先来一个天真的估算方法:假设要求一个系统的tps(transaction per second或者task per second)至少为20,然后假设每个transaction由一个线程完成,继续假设平均每个线程处理一个transaction的时间为4s。那么问题转化为:

如何设计线程池大小,使得可以在1s内处理完20个transaction?

计算过程很简单,每个线程的处理能力为0.25tps,那么要达到20tps,显然需要20/0.25=80个线程。

很显然这个估算方法很天真,因为它没有考虑到cpu数目。一般服务器的cpu核数为16或者32,如果有80个线程,那么肯定会带来太多不必要的线程上下文切换开销。

再来第二种简单的但不知是否可行的方法(n为cpu总核数):

如果是cpu密集型应用,则线程池大小设置为n+1

如果是io密集型应用,则线程池大小设置为2n+1

如果一台服务器上只部署这一个应用并且只有这一个线程池,那么这种估算或许合理,具体还需自行测试验证。

接下来在这个文档:服务器性能io优化 中发现一个估算公式:

<code>1</code>

<code>最佳线程数目 = ((线程等待时间+线程cpu时间)/线程cpu时间 )* cpu数目</code>

比如平均每个线程cpu运行时间为0.5s,而线程等待时间(非cpu运行时间,比如io)为1.5s,cpu核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:

<code>最佳线程数目 = (线程等待时间与线程cpu时间之比 + 1)* cpu数目</code>

可以得出一个结论:

线程等待时间所占比例越高,需要越多线程。线程cpu时间所占比例越高,需要越少线程。

上一种估算方法也和这个结论相合。

一个系统最快的部分是cpu,所以决定一个系统吞吐量上限的是cpu。增强cpu处理能力,可以提高系统吞吐量上限。但根据短板效应,真实的系统吞吐量并不能单纯根据cpu来计算。那要提高系统吞吐量,就需要从“系统短板”(比如网络延迟、io)着手:

尽量提高短板操作的并行化比率,比如多线程下载技术

增强短板能力,比如用nio替代io

第一条可以联系到amdahl定律,这条定律定义了串行系统并行化后的加速比计算公式:

<code>加速比=优化前系统耗时 / 优化后系统耗时</code>

加速比越大,表明系统并行化的优化效果越好。addahl定律还给出了系统并行度、cpu数目和加速比的关系,加速比为speedup,系统串行化比率(指串行执行代码所占比率)为f,cpu数目为n:

<code>speedup &lt;=</code><code>1</code> <code>/ (f + (</code><code>1</code><code>-f)/n)</code>

当n足够大时,串行化比率f越小,加速比speedup越大。

写到这里,我突然冒出一个问题。

是否使用线程池就一定比使用单线程高效呢?

答案是否定的,比如redis就是单线程的,但它却非常高效,基本操作都能达到十万量级/s。从线程这个角度来看,部分原因在于:

多线程带来线程上下文切换开销,单线程就没有这种开销

当然“redis很快”更本质的原因在于:redis基本都是内存操作,这种情况下单线程可以很高效地利用cpu。而多线程适用场景一般是:存在相当比例的io和网络操作。

所以即使有上面的简单估算方法,也许看似合理,但实际上也未必合理,都需要结合系统真实情况(比如是io密集型或者是cpu密集型或者是纯内存操作)和硬件环境(cpu、内存、硬盘读写速度、网络状况等)来不断尝试达到一个符合实际的合理估算值。

最后来一个“dark magic”估算方法(因为我暂时还没有搞懂它的原理),使用下面的类:

<code>001</code>

<code>package</code> <code>pool_size_calculate;</code>

<code>002</code>

<code>003</code>

<code>import</code> <code>java.math.bigdecimal;</code>

<code>004</code>

<code>import</code> <code>java.math.roundingmode;</code>

<code>005</code>

<code>import</code> <code>java.util.timer;</code>

<code>006</code>

<code>import</code> <code>java.util.timertask;</code>

<code>007</code>

<code>import</code> <code>java.util.concurrent.blockingqueue;</code>

<code>008</code>

<code>009</code>

<code>/**</code>

<code>010</code>

<code></code><code>* a class that calculates the optimal thread pool boundaries. it takes the</code>

<code>011</code>

<code></code><code>* desired target utilization and the desired work queue memory consumption as</code>

<code>012</code>

<code></code><code>* input and retuns thread count and work queue capacity.</code>

<code>013</code>

<code></code><code>*</code>

<code>014</code>

<code></code><code>* @author niklas schlimm</code>

<code>015</code>

<code>016</code>

<code></code><code>*/</code>

<code>017</code>

<code>public</code> <code>abstract</code> <code>class</code> <code>poolsizecalculator {</code>

<code>018</code>

<code>019</code>

<code></code><code>/**</code>

<code>020</code>

<code></code><code>* the sample queue size to calculate the size of a single {@link runnable}</code>

<code>021</code>

<code></code><code>* element.</code>

<code>022</code>

<code>023</code>

<code></code><code>private</code> <code>final</code> <code>int</code> <code>sample_queue_size =</code><code>1000</code><code>;</code>

<code>024</code>

<code>025</code>

<code>026</code>

<code></code><code>* accuracy of test run. it must finish within 20ms of the testtime</code>

<code>027</code>

<code></code><code>* otherwise we retry the test. this could be configurable.</code>

<code>028</code>

<code>029</code>

<code></code><code>private</code> <code>final</code> <code>int</code> <code>epsylon =</code><code>20</code><code>;</code>

<code>030</code>

<code>031</code>

<code>032</code>

<code></code><code>* control variable for the cpu time investigation.</code>

<code>033</code>

<code>034</code>

<code></code><code>private</code> <code>volatile</code> <code>boolean</code> <code>expired;</code>

<code>035</code>

<code>036</code>

<code>037</code>

<code></code><code>* time (millis) of the test run in the cpu time calculation.</code>

<code>038</code>

<code>039</code>

<code></code><code>private</code> <code>final</code> <code>long</code> <code>testtime =</code><code>3000</code><code>;</code>

<code>040</code>

<code>041</code>

<code>042</code>

<code></code><code>* calculates the boundaries of a thread pool for a given {@link runnable}.</code>

<code>043</code>

<code>044</code>

<code></code><code>* @param targetutilization</code>

<code>045</code>

<code>046</code>

<code></code><code>throw</code> <code>new</code> <code>illegalstateexception(</code><code>"test not accurate"</code><code>);</code>

<code>047</code>

<code></code><code>}</code>

<code>048</code>

<code></code><code>expired =</code><code>false</code><code>;</code>

<code>049</code>

<code></code><code>start = system.currenttimemillis();</code>

<code>050</code>

<code></code><code>timer timer =</code><code>new</code> <code>timer();</code>

<code>051</code>

<code></code><code>timer.schedule(</code><code>new</code> <code>timertask() {</code>

<code>052</code>

<code></code><code>public</code> <code>void</code> <code>run() {</code>

<code>053</code>

<code></code><code>expired =</code><code>true</code><code>;</code>

<code>054</code>

<code>055</code>

<code></code><code>}, testtime);</code>

<code>056</code>

<code></code><code>while</code> <code>(!expired) {</code>

<code>057</code>

<code></code><code>task.run();</code>

<code>058</code>

<code>059</code>

<code></code><code>start = system.currenttimemillis() - start;</code>

<code>060</code>

<code></code><code>timer.cancel();</code>

<code>061</code>

<code></code><code>}</code><code>while</code> <code>(math.abs(start - testtime) &gt; epsylon);</code>

<code>062</code>

<code></code><code>collectgarbage(</code><code>3</code><code>);</code>

<code>063</code>

<code>064</code>

<code>065</code>

<code></code><code>private</code> <code>void</code> <code>collectgarbage(</code><code>int</code> <code>times) {</code>

<code>066</code>

<code></code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt; times; i++) {</code>

<code>067</code>

<code></code><code>system.gc();</code>

<code>068</code>

<code></code><code>try</code> <code>{</code>

<code>069</code>

<code></code><code>thread.sleep(</code><code>10</code><code>);</code>

<code>070</code>

<code></code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>071</code>

<code></code><code>thread.currentthread().interrupt();</code>

<code>072</code>

<code></code><code>break</code><code>;</code>

<code>073</code>

<code>074</code>

<code>075</code>

<code>076</code>

<code>077</code>

<code>078</code>

<code></code><code>* calculates the memory usage of a single element in a work queue. based on</code>

<code>079</code>

<code></code><code>* heinz kabbutz' ideas</code>

<code>080</code>

<code>081</code>

<code>082</code>

<code></code><code>* @return memory usage of a single {@link runnable} element in the thread</code>

<code>083</code>

<code></code><code>* pools work queue</code>

<code>084</code>

<code>085</code>

<code></code><code>public</code> <code>long</code> <code>calculatememoryusage() {</code>

<code>086</code>

<code></code><code>blockingqueue queue = createworkqueue();</code>

<code>087</code>

<code></code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt; sample_queue_size; i++) {</code>

<code>088</code>

<code></code><code>queue.add(creattask());</code>

<code>089</code>

<code>090</code>

<code></code><code>long</code> <code>mem0 = runtime.getruntime().totalmemory()</code>

<code>091</code>

<code></code><code>- runtime.getruntime().freememory();</code>

<code>092</code>

<code></code><code>long</code> <code>mem1 = runtime.getruntime().totalmemory()</code>

<code>093</code>

<code>094</code>

<code></code><code>queue =</code><code>null</code><code>;</code>

<code>095</code>

<code></code><code>collectgarbage(</code><code>15</code><code>);</code>

<code>096</code>

<code></code><code>mem0 = runtime.getruntime().totalmemory()</code>

<code>097</code>

<code>098</code>

<code></code><code>queue = createworkqueue();</code>

<code>099</code>

<code>100</code>

<code>101</code>

<code>102</code>

<code>103</code>

<code></code><code>mem1 = runtime.getruntime().totalmemory()</code>

<code>104</code>

<code>105</code>

<code></code><code>return</code> <code>(mem1 - mem0) / sample_queue_size;</code>

<code>106</code>

<code>107</code>

<code>108</code>

<code>109</code>

<code></code><code>* create your runnable task here.</code>

<code>110</code>

<code>111</code>

<code></code><code>* @return an instance of your runnable task under investigation</code>

<code>112</code>

<code>113</code>

<code></code><code>protected</code> <code>abstract</code> <code>runnable creattask();</code>

<code>114</code>

<code>115</code>

<code>116</code>

<code></code><code>* return an instance of the queue used in the thread pool.</code>

<code>117</code>

<code>118</code>

<code></code><code>* @return queue instance</code>

<code>119</code>

<code>120</code>

<code></code><code>protected</code> <code>abstract</code> <code>blockingqueue createworkqueue();</code>

<code>121</code>

<code>122</code>

<code>123</code>

<code></code><code>* calculate current cpu time. various frameworks may be used here,</code>

<code>124</code>

<code></code><code>* depending on the operating system in use. (e.g.</code>

<code>125</code>

<code>126</code>

<code></code><code>* measurement, the more accurate the results for thread count boundaries.</code>

<code>127</code>

<code>128</code>

<code></code><code>* @return current cpu time of current thread</code>

<code>129</code>

<code>130</code>

<code></code><code>protected</code> <code>abstract</code> <code>long</code> <code>getcurrentthreadcputime();</code>

<code>131</code>

<code>132</code>

<code>}</code>

然后自己继承这个抽象类并实现它的三个抽象方法,比如下面是我写的一个示例(任务是请求网络数据),其中我指定期望cpu利用率为1.0(即100%),任务队列总大小不超过100,000字节:

<code>01</code>

<code>02</code>

<code>03</code>

<code>import</code> <code>java.io.bufferedreader;</code>

<code>04</code>

<code>import</code> <code>java.io.ioexception;</code>

<code>05</code>

<code>import</code> <code>java.io.inputstreamreader;</code>

<code>06</code>

<code>import</code> <code>java.lang.management.managementfactory;</code>

<code>07</code>

<code>08</code>

<code>import</code> <code>java.net.httpurlconnection;</code>

<code>09</code>

<code>import</code> <code>java.net.url;</code>

<code>10</code>

<code>11</code>

<code>import</code> <code>java.util.concurrent.linkedblockingqueue;</code>

<code>12</code>

<code>13</code>

<code>public</code> <code>class</code> <code>simplepoolsizecaculatorimpl</code><code>extends</code> <code>poolsizecalculator {</code>

<code>14</code>

<code>15</code>

<code></code><code>@override</code>

<code>16</code>

<code></code><code>protected</code> <code>runnable creattask() {</code>

<code>17</code>

<code></code><code>return</code> <code>new</code> <code>asynciotask();</code>

<code>18</code>

<code>19</code>

<code>20</code>

<code>21</code>

<code></code><code>protected</code> <code>blockingqueue createworkqueue() {</code>

<code>22</code>

<code></code><code>return</code> <code>new</code> <code>linkedblockingqueue(</code><code>1000</code><code>);</code>

<code>23</code>

<code>24</code>

<code>25</code>

<code>26</code>

<code></code><code>protected</code> <code>long</code> <code>getcurrentthreadcputime() {</code>

<code>27</code>

<code></code><code>return</code> <code>managementfactory.getthreadmxbean().getcurrentthreadcputime();</code>

<code>28</code>

<code>29</code>

<code>30</code>

<code></code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

<code>31</code>

<code></code><code>poolsizecalculator poolsizecalculator =</code><code>new</code> <code>simplepoolsizecaculatorimpl();</code>

<code>32</code>

<code></code><code>poolsizecalculator.calculateboundaries(</code><code>new</code> <code>bigdecimal(</code><code>1.0</code><code>),</code><code>new</code><code>bigdecimal(</code><code>100000</code><code>));</code>

<code>33</code>

<code>34</code>

<code>35</code>

<code>36</code>

<code>37</code>

<code>38</code>

<code></code><code>* 自定义的异步io任务</code>

<code>39</code>

<code></code><code>* @author will</code>

<code>40</code>

<code>41</code>

<code>42</code>

<code>class</code> <code>asynciotask</code><code>implements</code> <code>runnable {</code>

<code>43</code>

<code>44</code>

<code>45</code>

<code>46</code>

<code></code><code>httpurlconnection connection =</code><code>null</code><code>;</code>

<code>47</code>

<code></code><code>bufferedreader reader =</code><code>null</code><code>;</code>

<code>48</code>

<code>49</code>

<code>50</code>

<code></code><code>url geturl =</code><code>new</code> <code>url(geturl);</code>

<code>51</code>

<code>52</code>

<code></code><code>connection = (httpurlconnection) geturl.openconnection();</code>

<code>53</code>

<code></code><code>connection.connect();</code>

<code>54</code>

<code></code><code>reader =</code><code>new</code> <code>bufferedreader(</code><code>new</code> <code>inputstreamreader(</code>

<code>55</code>

<code></code><code>connection.getinputstream()));</code>

<code>56</code>

<code>57</code>

<code></code><code>string line;</code>

<code>58</code>

<code></code><code>while</code> <code>((line = reader.readline()) !=</code><code>null</code><code>) {</code>

<code>59</code>

<code></code><code>// empty loop</code>

<code>60</code>

<code>61</code>

<code>62</code>

<code>63</code>

<code></code><code>catch</code> <code>(ioexception e) {</code>

<code>64</code>

<code>65</code>

<code></code><code>}</code><code>finally</code> <code>{</code>

<code>66</code>

<code></code><code>if</code><code>(reader !=</code><code>null</code><code>) {</code>

<code>67</code>

<code>68</code>

<code></code><code>reader.close();</code>

<code>69</code>

<code>70</code>

<code></code><code>catch</code><code>(exception e) {</code>

<code>71</code>

<code>72</code>

<code>73</code>

<code>74</code>

<code></code><code>connection.disconnect();</code>

<code>75</code>

<code>76</code>

<code>77</code>

<code>78</code>

<code>79</code>

得到的输出如下:

<a href="http://ifeve.com/how-to-calculate-threadpool-size/#viewsource">查看源代码</a>

<code>target queue memory usage (bytes): 100000</code>

<code>createtask() produced pool_size_calculate.asynciotask which took 40 bytes in a queue</code>

<code>formula: 100000 / 40</code>

<code>* recommended queue capacity (bytes): 2500</code>

<code>number of cpu: 4</code>

<code>target utilization: 1</code>

<code>elapsed time (nanos): 3000000000</code>

<code>compute time (nanos): 47181000</code>

<code>wait time (nanos): 2952819000</code>

<code>formula: 4 * 1 * (1 + 2952819000 / 47181000)</code>

<code>* optimal thread count: 256</code>

推荐的任务队列大小为2500,线程数为256,有点出乎意料之外。我可以如下构造一个线程池:

<code>threadpoolexecutor pool =</code>

<code>2</code>