如何合理地估算线程池大小?
这个问题虽然看起来很小,却并不那么容易回答。大家如果有更好的方法欢迎赐教,先来一个天真的估算方法:假设要求一个系统的tps(transaction per second或者task per second)至少为20,然后假设每个transaction由一个线程完成,继续假设平均每个线程处理一个transaction的时间为4s。那么问题转化为:
如何设计线程池大小,使得可以在1s内处理完20个transaction?
计算过程很简单,每个线程的处理能力为0.25tps,那么要达到20tps,显然需要20/0.25=80个线程。
很显然这个估算方法很天真,因为它没有考虑到cpu数目。一般服务器的cpu核数为16或者32,如果有80个线程,那么肯定会带来太多不必要的线程上下文切换开销。
再来第二种简单的但不知是否可行的方法(n为cpu总核数):
如果是cpu密集型应用,则线程池大小设置为n+1
如果是io密集型应用,则线程池大小设置为2n+1
如果一台服务器上只部署这一个应用并且只有这一个线程池,那么这种估算或许合理,具体还需自行测试验证。
接下来在这个文档:服务器性能io优化 中发现一个估算公式:
<code>1</code>
<code>最佳线程数目 = ((线程等待时间+线程cpu时间)/线程cpu时间 )* cpu数目</code>
比如平均每个线程cpu运行时间为0.5s,而线程等待时间(非cpu运行时间,比如io)为1.5s,cpu核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:
<code>最佳线程数目 = (线程等待时间与线程cpu时间之比 + 1)* cpu数目</code>
可以得出一个结论:
线程等待时间所占比例越高,需要越多线程。线程cpu时间所占比例越高,需要越少线程。
上一种估算方法也和这个结论相合。
一个系统最快的部分是cpu,所以决定一个系统吞吐量上限的是cpu。增强cpu处理能力,可以提高系统吞吐量上限。但根据短板效应,真实的系统吞吐量并不能单纯根据cpu来计算。那要提高系统吞吐量,就需要从“系统短板”(比如网络延迟、io)着手:
尽量提高短板操作的并行化比率,比如多线程下载技术
增强短板能力,比如用nio替代io
第一条可以联系到amdahl定律,这条定律定义了串行系统并行化后的加速比计算公式:
<code>加速比=优化前系统耗时 / 优化后系统耗时</code>
加速比越大,表明系统并行化的优化效果越好。addahl定律还给出了系统并行度、cpu数目和加速比的关系,加速比为speedup,系统串行化比率(指串行执行代码所占比率)为f,cpu数目为n:
<code>speedup <=</code><code>1</code> <code>/ (f + (</code><code>1</code><code>-f)/n)</code>
当n足够大时,串行化比率f越小,加速比speedup越大。
写到这里,我突然冒出一个问题。
是否使用线程池就一定比使用单线程高效呢?
答案是否定的,比如redis就是单线程的,但它却非常高效,基本操作都能达到十万量级/s。从线程这个角度来看,部分原因在于:
多线程带来线程上下文切换开销,单线程就没有这种开销
锁
当然“redis很快”更本质的原因在于:redis基本都是内存操作,这种情况下单线程可以很高效地利用cpu。而多线程适用场景一般是:存在相当比例的io和网络操作。
所以即使有上面的简单估算方法,也许看似合理,但实际上也未必合理,都需要结合系统真实情况(比如是io密集型或者是cpu密集型或者是纯内存操作)和硬件环境(cpu、内存、硬盘读写速度、网络状况等)来不断尝试达到一个符合实际的合理估算值。
最后来一个“dark magic”估算方法(因为我暂时还没有搞懂它的原理),使用下面的类:
<code>001</code>
<code>package</code> <code>pool_size_calculate;</code>
<code>002</code>
<code>003</code>
<code>import</code> <code>java.math.bigdecimal;</code>
<code>004</code>
<code>import</code> <code>java.math.roundingmode;</code>
<code>005</code>
<code>import</code> <code>java.util.timer;</code>
<code>006</code>
<code>import</code> <code>java.util.timertask;</code>
<code>007</code>
<code>import</code> <code>java.util.concurrent.blockingqueue;</code>
<code>008</code>
<code>009</code>
<code>/**</code>
<code>010</code>
<code></code><code>* a class that calculates the optimal thread pool boundaries. it takes the</code>
<code>011</code>
<code></code><code>* desired target utilization and the desired work queue memory consumption as</code>
<code>012</code>
<code></code><code>* input and retuns thread count and work queue capacity.</code>
<code>013</code>
<code></code><code>*</code>
<code>014</code>
<code></code><code>* @author niklas schlimm</code>
<code>015</code>
<code>016</code>
<code></code><code>*/</code>
<code>017</code>
<code>public</code> <code>abstract</code> <code>class</code> <code>poolsizecalculator {</code>
<code>018</code>
<code>019</code>
<code></code><code>/**</code>
<code>020</code>
<code></code><code>* the sample queue size to calculate the size of a single {@link runnable}</code>
<code>021</code>
<code></code><code>* element.</code>
<code>022</code>
<code>023</code>
<code></code><code>private</code> <code>final</code> <code>int</code> <code>sample_queue_size =</code><code>1000</code><code>;</code>
<code>024</code>
<code>025</code>
<code>026</code>
<code></code><code>* accuracy of test run. it must finish within 20ms of the testtime</code>
<code>027</code>
<code></code><code>* otherwise we retry the test. this could be configurable.</code>
<code>028</code>
<code>029</code>
<code></code><code>private</code> <code>final</code> <code>int</code> <code>epsylon =</code><code>20</code><code>;</code>
<code>030</code>
<code>031</code>
<code>032</code>
<code></code><code>* control variable for the cpu time investigation.</code>
<code>033</code>
<code>034</code>
<code></code><code>private</code> <code>volatile</code> <code>boolean</code> <code>expired;</code>
<code>035</code>
<code>036</code>
<code>037</code>
<code></code><code>* time (millis) of the test run in the cpu time calculation.</code>
<code>038</code>
<code>039</code>
<code></code><code>private</code> <code>final</code> <code>long</code> <code>testtime =</code><code>3000</code><code>;</code>
<code>040</code>
<code>041</code>
<code>042</code>
<code></code><code>* calculates the boundaries of a thread pool for a given {@link runnable}.</code>
<code>043</code>
<code>044</code>
<code></code><code>* @param targetutilization</code>
<code>045</code>
<code>046</code>
<code></code><code>throw</code> <code>new</code> <code>illegalstateexception(</code><code>"test not accurate"</code><code>);</code>
<code>047</code>
<code></code><code>}</code>
<code>048</code>
<code></code><code>expired =</code><code>false</code><code>;</code>
<code>049</code>
<code></code><code>start = system.currenttimemillis();</code>
<code>050</code>
<code></code><code>timer timer =</code><code>new</code> <code>timer();</code>
<code>051</code>
<code></code><code>timer.schedule(</code><code>new</code> <code>timertask() {</code>
<code>052</code>
<code></code><code>public</code> <code>void</code> <code>run() {</code>
<code>053</code>
<code></code><code>expired =</code><code>true</code><code>;</code>
<code>054</code>
<code>055</code>
<code></code><code>}, testtime);</code>
<code>056</code>
<code></code><code>while</code> <code>(!expired) {</code>
<code>057</code>
<code></code><code>task.run();</code>
<code>058</code>
<code>059</code>
<code></code><code>start = system.currenttimemillis() - start;</code>
<code>060</code>
<code></code><code>timer.cancel();</code>
<code>061</code>
<code></code><code>}</code><code>while</code> <code>(math.abs(start - testtime) > epsylon);</code>
<code>062</code>
<code></code><code>collectgarbage(</code><code>3</code><code>);</code>
<code>063</code>
<code>064</code>
<code>065</code>
<code></code><code>private</code> <code>void</code> <code>collectgarbage(</code><code>int</code> <code>times) {</code>
<code>066</code>
<code></code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i < times; i++) {</code>
<code>067</code>
<code></code><code>system.gc();</code>
<code>068</code>
<code></code><code>try</code> <code>{</code>
<code>069</code>
<code></code><code>thread.sleep(</code><code>10</code><code>);</code>
<code>070</code>
<code></code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>
<code>071</code>
<code></code><code>thread.currentthread().interrupt();</code>
<code>072</code>
<code></code><code>break</code><code>;</code>
<code>073</code>
<code>074</code>
<code>075</code>
<code>076</code>
<code>077</code>
<code>078</code>
<code></code><code>* calculates the memory usage of a single element in a work queue. based on</code>
<code>079</code>
<code></code><code>* heinz kabbutz' ideas</code>
<code>080</code>
<code>081</code>
<code>082</code>
<code></code><code>* @return memory usage of a single {@link runnable} element in the thread</code>
<code>083</code>
<code></code><code>* pools work queue</code>
<code>084</code>
<code>085</code>
<code></code><code>public</code> <code>long</code> <code>calculatememoryusage() {</code>
<code>086</code>
<code></code><code>blockingqueue queue = createworkqueue();</code>
<code>087</code>
<code></code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i < sample_queue_size; i++) {</code>
<code>088</code>
<code></code><code>queue.add(creattask());</code>
<code>089</code>
<code>090</code>
<code></code><code>long</code> <code>mem0 = runtime.getruntime().totalmemory()</code>
<code>091</code>
<code></code><code>- runtime.getruntime().freememory();</code>
<code>092</code>
<code></code><code>long</code> <code>mem1 = runtime.getruntime().totalmemory()</code>
<code>093</code>
<code>094</code>
<code></code><code>queue =</code><code>null</code><code>;</code>
<code>095</code>
<code></code><code>collectgarbage(</code><code>15</code><code>);</code>
<code>096</code>
<code></code><code>mem0 = runtime.getruntime().totalmemory()</code>
<code>097</code>
<code>098</code>
<code></code><code>queue = createworkqueue();</code>
<code>099</code>
<code>100</code>
<code>101</code>
<code>102</code>
<code>103</code>
<code></code><code>mem1 = runtime.getruntime().totalmemory()</code>
<code>104</code>
<code>105</code>
<code></code><code>return</code> <code>(mem1 - mem0) / sample_queue_size;</code>
<code>106</code>
<code>107</code>
<code>108</code>
<code>109</code>
<code></code><code>* create your runnable task here.</code>
<code>110</code>
<code>111</code>
<code></code><code>* @return an instance of your runnable task under investigation</code>
<code>112</code>
<code>113</code>
<code></code><code>protected</code> <code>abstract</code> <code>runnable creattask();</code>
<code>114</code>
<code>115</code>
<code>116</code>
<code></code><code>* return an instance of the queue used in the thread pool.</code>
<code>117</code>
<code>118</code>
<code></code><code>* @return queue instance</code>
<code>119</code>
<code>120</code>
<code></code><code>protected</code> <code>abstract</code> <code>blockingqueue createworkqueue();</code>
<code>121</code>
<code>122</code>
<code>123</code>
<code></code><code>* calculate current cpu time. various frameworks may be used here,</code>
<code>124</code>
<code></code><code>* depending on the operating system in use. (e.g.</code>
<code>125</code>
<code>126</code>
<code></code><code>* measurement, the more accurate the results for thread count boundaries.</code>
<code>127</code>
<code>128</code>
<code></code><code>* @return current cpu time of current thread</code>
<code>129</code>
<code>130</code>
<code></code><code>protected</code> <code>abstract</code> <code>long</code> <code>getcurrentthreadcputime();</code>
<code>131</code>
<code>132</code>
<code>}</code>
然后自己继承这个抽象类并实现它的三个抽象方法,比如下面是我写的一个示例(任务是请求网络数据),其中我指定期望cpu利用率为1.0(即100%),任务队列总大小不超过100,000字节:
<code>01</code>
<code>02</code>
<code>03</code>
<code>import</code> <code>java.io.bufferedreader;</code>
<code>04</code>
<code>import</code> <code>java.io.ioexception;</code>
<code>05</code>
<code>import</code> <code>java.io.inputstreamreader;</code>
<code>06</code>
<code>import</code> <code>java.lang.management.managementfactory;</code>
<code>07</code>
<code>08</code>
<code>import</code> <code>java.net.httpurlconnection;</code>
<code>09</code>
<code>import</code> <code>java.net.url;</code>
<code>10</code>
<code>11</code>
<code>import</code> <code>java.util.concurrent.linkedblockingqueue;</code>
<code>12</code>
<code>13</code>
<code>public</code> <code>class</code> <code>simplepoolsizecaculatorimpl</code><code>extends</code> <code>poolsizecalculator {</code>
<code>14</code>
<code>15</code>
<code></code><code>@override</code>
<code>16</code>
<code></code><code>protected</code> <code>runnable creattask() {</code>
<code>17</code>
<code></code><code>return</code> <code>new</code> <code>asynciotask();</code>
<code>18</code>
<code>19</code>
<code>20</code>
<code>21</code>
<code></code><code>protected</code> <code>blockingqueue createworkqueue() {</code>
<code>22</code>
<code></code><code>return</code> <code>new</code> <code>linkedblockingqueue(</code><code>1000</code><code>);</code>
<code>23</code>
<code>24</code>
<code>25</code>
<code>26</code>
<code></code><code>protected</code> <code>long</code> <code>getcurrentthreadcputime() {</code>
<code>27</code>
<code></code><code>return</code> <code>managementfactory.getthreadmxbean().getcurrentthreadcputime();</code>
<code>28</code>
<code>29</code>
<code>30</code>
<code></code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>
<code>31</code>
<code></code><code>poolsizecalculator poolsizecalculator =</code><code>new</code> <code>simplepoolsizecaculatorimpl();</code>
<code>32</code>
<code></code><code>poolsizecalculator.calculateboundaries(</code><code>new</code> <code>bigdecimal(</code><code>1.0</code><code>),</code><code>new</code><code>bigdecimal(</code><code>100000</code><code>));</code>
<code>33</code>
<code>34</code>
<code>35</code>
<code>36</code>
<code>37</code>
<code>38</code>
<code></code><code>* 自定义的异步io任务</code>
<code>39</code>
<code></code><code>* @author will</code>
<code>40</code>
<code>41</code>
<code>42</code>
<code>class</code> <code>asynciotask</code><code>implements</code> <code>runnable {</code>
<code>43</code>
<code>44</code>
<code>45</code>
<code>46</code>
<code></code><code>httpurlconnection connection =</code><code>null</code><code>;</code>
<code>47</code>
<code></code><code>bufferedreader reader =</code><code>null</code><code>;</code>
<code>48</code>
<code>49</code>
<code>50</code>
<code></code><code>url geturl =</code><code>new</code> <code>url(geturl);</code>
<code>51</code>
<code>52</code>
<code></code><code>connection = (httpurlconnection) geturl.openconnection();</code>
<code>53</code>
<code></code><code>connection.connect();</code>
<code>54</code>
<code></code><code>reader =</code><code>new</code> <code>bufferedreader(</code><code>new</code> <code>inputstreamreader(</code>
<code>55</code>
<code></code><code>connection.getinputstream()));</code>
<code>56</code>
<code>57</code>
<code></code><code>string line;</code>
<code>58</code>
<code></code><code>while</code> <code>((line = reader.readline()) !=</code><code>null</code><code>) {</code>
<code>59</code>
<code></code><code>// empty loop</code>
<code>60</code>
<code>61</code>
<code>62</code>
<code>63</code>
<code></code><code>catch</code> <code>(ioexception e) {</code>
<code>64</code>
<code>65</code>
<code></code><code>}</code><code>finally</code> <code>{</code>
<code>66</code>
<code></code><code>if</code><code>(reader !=</code><code>null</code><code>) {</code>
<code>67</code>
<code>68</code>
<code></code><code>reader.close();</code>
<code>69</code>
<code>70</code>
<code></code><code>catch</code><code>(exception e) {</code>
<code>71</code>
<code>72</code>
<code>73</code>
<code>74</code>
<code></code><code>connection.disconnect();</code>
<code>75</code>
<code>76</code>
<code>77</code>
<code>78</code>
<code>79</code>
得到的输出如下:
<a href="http://ifeve.com/how-to-calculate-threadpool-size/#viewsource">查看源代码</a>
<code>target queue memory usage (bytes): 100000</code>
<code>createtask() produced pool_size_calculate.asynciotask which took 40 bytes in a queue</code>
<code>formula: 100000 / 40</code>
<code>* recommended queue capacity (bytes): 2500</code>
<code>number of cpu: 4</code>
<code>target utilization: 1</code>
<code>elapsed time (nanos): 3000000000</code>
<code>compute time (nanos): 47181000</code>
<code>wait time (nanos): 2952819000</code>
<code>formula: 4 * 1 * (1 + 2952819000 / 47181000)</code>
<code>* optimal thread count: 256</code>
推荐的任务队列大小为2500,线程数为256,有点出乎意料之外。我可以如下构造一个线程池:
<code>threadpoolexecutor pool =</code>
<code>2</code>