天天看点

C++ 多线程编程总结

在开发C++程序时,一般在吞吐量、并发、实时性上有较高的要求。设计C++程序时,总结起来可以从如下几点提高效率:

l  并发

l  异步

l  缓存

下面将我平常工作中遇到一些问题例举一二,其设计思想无非以上三点。

  生产者-消费者模型是人们非常熟悉的模型,比如在某个服务器程序中,当User数据被逻辑模块修改后,就产生一个更新数据库的任务(produce),投递给IO模块任务队列,IO模块从任务队列中取出任务执行sql操作(consume)。

  设计通用的任务队列,示例代码如下:

  详细实现可参见:

<a href="http://ffown.googlecode.com/svn/trunk/fflib/include/detail/task_queue_impl.h">  http://ffown.googlecode.com/svn/trunk/fflib/include/detail/task_queue_impl.h</a>

<code>void</code> <code>task_queue_t::produce(</code><code>const</code> <code>task_t&amp; task_) {      </code>

<code>        </code><code>lock_guard_t lock(m_mutex);</code>

<code>        </code><code>if</code> <code>(m_tasklist-&gt;empty()){</code><code>//! 条件满足唤醒等待线程</code>

<code>            </code><code>m_cond.</code><code>signal</code><code>();</code>

<code>        </code><code>}</code>

<code>        </code><code>m_tasklist-&gt;push_back(task_);</code>

<code>    </code><code>}</code>

<code>int</code>   <code>task_queue_t::comsume(task_t&amp; task_){</code>

<code>        </code><code>while</code> <code>(m_tasklist-&gt;empty())</code><code>//! 当没有作业时,就等待直到条件满足被唤醒{</code>

<code>            </code><code>if</code> <code>(</code><code>false</code> <code>== m_flag){</code>

<code>                </code><code>return</code> <code>-1;</code>

<code>            </code><code>}</code>

<code>            </code><code>m_cond.wait();</code>

<code>        </code><code>task_ = m_tasklist-&gt;front();</code>

<code>        </code><code>m_tasklist-&gt;pop_front();</code>

<code>        </code><code>return</code> <code>0;</code>

<code>}</code>

  比如网络游戏服务器程序中,网络模块收到消息包,投递给逻辑层后立即返回,继续接受下一个消息包。逻辑线程在一个没有io操作的环境下运行,以保障实时性。示例:

<code>void</code> <code>handle_xx_msg(</code><code>long</code> <code>uid,</code><code>const</code> <code>xx_msg_t&amp; msg){</code>

<code>    </code><code>logic_task_queue-&gt;post(boost::bind(&amp;servie_t::proces, uid, msg));</code>

  注意,此模式下为单任务队列,每个任务队列单线程。

         上面的只是完成了io 和 cpu运算的并行,而cpu中逻辑操作是串行的。在某些场合,cpu逻辑运算部分也可实现并行,如游戏中用户A种菜和B种菜两种操作是完全可以并行的,因为两个操作没有共享数据。最简单的方式是A、B相关的操作被分配到不同的任务队列中。示例如下:    

<code>void</code> <code>handle_xx_msg(</code><code>long</code> <code>uid,</code><code>const</code> <code>xx_msg_t&amp; msg) {</code>

<code>  logic_task_queue_array[uid %</code><code>sizeof</code><code>(logic_task_queue_array)]-&gt;post(</code>

<code>    boost::bind(&amp;servie_t::proces, uid, msg));</code>

注意,此模式下为多任务队列,每个任务队列单线程。

  比如逻辑Service模块需要数据库模块异步载入用户数据,并做后续处理计算。而数据库模块拥有一个固定连接数的连接池,当执行SQL的任务到来时,选择一个空闲的连接,执行SQL,并把SQL 通过回调函数传递给逻辑层。其步骤如下:

n  预先分配好线程池,每个线程创建一个连接到数据库的连接

n  为数据库模块创建一个任务队列,所有线程都是这个任务队列的消费者

n  逻辑层想数据库模块投递sql执行任务,同时传递一个回调函数来接受sql执行结果

  示例如下:

<code>void</code> <code>db_t:load(</code><code>long</code> <code>uid_, boost::function&lt;</code><code>void</code> <code>(user_data_t&amp;) func_){</code>

<code>    </code><code>//! sql execute, construct user_data_t user</code>

<code>    </code><code>func_(user)</code>

<code>void</code> <code>process_user_data_loaded(user_data_t&amp;){</code>

<code>    </code><code>//! todo something</code>

<code>db_task_queue-&gt;post(boost::bind(&amp;db_t:load, uid, func));</code>

         注意,此模式下为单任务队列,每个任务队列多线程。

         本文主要讲C++多线程编程,日志系统不是为了提高程序效率,但是在程序调试、运行期排错上,日志是无可替代的工具,相信开发后台程序的朋友都会使用日志。常见的日志使用方式有如下几种:

n  流式,如logstream &lt;&lt; "start servie time[%d]" &lt;&lt; time(0) &lt;&lt; " app name[%s]" &lt;&lt; app_string.c_str() &lt;&lt; endl;

n  Printf 格式如:logtrace(LOG_MODULE, "start servie time[%d] app name[%s]", time(0), app_string.c_str());

  二者各有优缺点,流式是线程安全的,printf格式格式化字符串会更直接,但缺点是线程不安全,如果把app_string.c_str() 换成app_string (std::string),编译被通过,但是运行期会crash(如果运气好每次都crash,运气不好偶尔会crash)。我个人钟爱printf风格,可以做如下改进:

l  增加线程安全,利用C++模板的traits机制,可以实现线程安全。示例:

<code>template</code><code>&lt;</code><code>typename</code> <code>ARG1&gt;</code>

<code>void</code> <code>logtrace(</code><code>const</code> <code>char</code><code>* module,</code><code>const</code> <code>char</code><code>* fmt, ARG1 arg1){</code>

<code>    </code><code>boost::format s(fmt);</code>

<code>    </code><code>f % arg1;</code>

  这样,除了标准类型+std::string 传入其他类型将编译不能通过。这里只列举了一个参数的例子,可以重载该版本支持更多参数,如果你愿意,可以支持9个参数或更多。

l  为日志增加颜色,在printf中加入控制字符,可以再屏幕终端上显示颜色,Linux下示例:printf("\033[32;49;1m [DONE] \033[39;49;0m") 

  更多颜色方案参见:

l  每个线程启动时,都应该用日志打印该线程负责什么功能。这样,程序跑起来的时候通过top –H – p pid 可以得知那个功能使用cpu的多少。实际上,我的每行日志都会打印线程id,此线程id非pthread_id,而其实是线程对应的系统分配的进程id号。

         尽管已经有很多工具可以分析c++程序运行性能,但是其大部分还是运行在程序debug阶段。我们需要一种手段在debug和release阶段都能监控程序,一方面得知程序瓶颈之所在,一方面尽早发现哪些组件在运行期出现了异常。

         通常都是使用gettimeofday 来计算某个函数开销,可以精确到微妙。可以利用C++的确定性析构,非常方便的实现获取函数开销的小工具,示例如下

C++ 多线程编程总结
C++ 多线程编程总结

         Cost 应该被投递到性能统计管理器中,该管理器定时讲性能统计数据输出到文件中。

         很多编程语言已经内建了foreach,但是c++还没有。所以建议自己在需要遍历容器的地方编写foreach函数。习惯函数式编程的人应该会非常钟情使用foreach,使用foreach的好处多多少少有些,如:

         但主要是编程哲学上层面的。

示例:

<code>void</code> <code>user_mgr_t::foreach(boost::function&lt;</code><code>void</code> <code>(user_t&amp;)&gt; func_){</code>

<code>    </code><code>for</code> <code>(iterator it = m_users.begin(); it != m_users.end() ++it){</code>

<code>        </code><code>func_(it-&gt;second);</code>

  比如要实现dump 接口,不需要重写关于迭代器的代码

<code>void</code> <code>user_mgr_t:dump(){</code>

<code>    </code><code>struct</code> <code>lambda {</code>

<code>        </code><code>static</code> <code>void</code> <code>print(user_t&amp; user){</code>

<code>            </code><code>//! print(tostring(user);</code>

<code>    </code><code>};</code>

<code>    </code><code>this</code><code>-&gt;foreach(lambda::print);</code>

  实际上,上面的代码变通的生成了匿名函数,如果是c++ 11 标准的编译器,本可以写的更简洁一些:

  this-&gt;foreach([](user_t&amp; user) {} );

  但是我大部分时间编写的程序都要运行在centos 上,你知道吗它的gcc版本是gcc 4.1.2, 所以大部分时间我都是用变通的方式使用lambda函数。

  常见的使用任务队列实现异步的代码如下:

<code>void</code> <code>service_t:async_update_user(</code><code>long</code> <code>uid){</code>

<code>    </code><code>task_queue-&gt;post(boost::bind(&amp;service_t:sync_update_user_impl,</code><code>this</code><code>, uid));</code>

<code>void</code> <code>service_t:sync_update_user_impl(</code><code>long</code> <code>uid){</code>

<code>    </code><code>user_t&amp; user = get_user(uid);</code>

<code>    </code><code>user.update()</code>

  这样做的缺点是,一个接口要响应的写两遍函数,如果一个函数的参数变了,那么另一个参数也要跟着改动。并且代码也不是很美观。使用lambda可以让异步看起来更直观,仿佛就是在接口函数中立刻完成一样。示例代码:

<code>        </code><code>static</code> <code>void</code> <code>update_user_impl(service_t* servie,</code><code>long</code> <code>uid){</code>

<code>            </code><code>user_t&amp; user = servie-&gt;get_user(uid);</code>

<code>            </code><code>user.update();</code>

<code>    </code><code>task_queue-&gt;post(boost::bind(&amp;lambda:update_user_impl,</code><code>this</code><code>, uid));</code>

  这样当要改动该接口时,直接在该接口内修改代码,非常直观。

         Map/reduce的语义是先将任务划分为多个任务,投递到多个worker中并发执行,其产生的结果经reduce汇总后生成最终的结果。Shared_ptr的语义是什么呢?当最后一个shared_ptr析构时,将会调用托管对象的析构函数。语义和map/reduce过程非常相近。我们只需自己实现讲请求划分多个任务即可。示例过程如下:

l  定义请求托管对象,加入我们需要在10个文件中搜索“oh nice”字符串出现的次数,定义托管结构体如下:

<code>struct</code> <code>reducer{</code>

<code>    </code><code>void</code> <code>set_result(</code><code>int</code> <code>index,</code><code>long</code> <code>result) {</code>

<code>        </code><code>m_result[index] = result;</code>

<code>    </code><code>~reducer(){</code>

<code>        </code><code>long</code> <code>total = 0;</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i = 0; i &lt;</code><code>sizeof</code><code>(m_result); ++i){</code>

<code>            </code><code>total += m_result[i];</code>

<code>        </code><code>//! post total to somewhere</code>

<code>    </code><code>long</code> <code>m_result[10];</code>

<code>};</code>

l  定义执行任务的 worker

<code>void</code> <code>worker_t:exe(</code><code>int</code> <code>index_, shared_ptr&lt;reducer&gt; ret) {</code>

<code>  ret-&gt;set_result(index, 100);</code>

l  将任务分割后,投递给不同的worker

<code>shared_ptr&lt;reducer&gt; ret(</code><code>new</code> <code>reducer());</code>

<code>for</code> <code>(</code><code>int</code> <code>i = 0; i &lt; 10; ++i)</code>

<code>{</code>

<code>    </code><code>task_queue[i]-&gt;post(boost::bind(&amp;worker_t:exe, i, ret));</code>