天天看点

Mina框架IoHandler与IoProcessor详解

我们已经知道,iohandler是开发网络应用程序的时候,与实际业务逻辑相关的组件,即属于mina核心框架之外的应用层组件。从mina 官方文档上,我们几乎没有看到对ioprocessor的说明,实际上ioprocessor对实际使用mina框架的开发人员透明,无需你去了解它的实现逻辑,它在mina中用来处理实际的i/o操作。

我们分析的思路是,先分别对iohandler与ioprocessor进行单独分析,然后再阐述它们之间的不同以及联系。

iohandler

当我们通过iosession执行相关操作的时候,如写数据,这些事件会触发mina框架抽象的ioservice实例,从而调用mina框架底层的相关组件进行处理。这时,配置的iohandler就被用来处理mina所触发的相关事件,处理这些事件的操作被抽象出来。

实际上,iohandler的继承层次非常简单,也说明了基于mina框架开发实际网络应用程序,对业务逻辑的处理也还是相对比较容易的。看一下 iohandler的继承层次,如图所示:

Mina框架IoHandler与IoProcessor详解

iohandler接口所定义的操作,一共定义了7个处理事件的操作,如下所示:

<code>1</code>

<code>public</code> <code>interface</code> <code>iohandler {</code>

<code>2</code>

<code></code><code>void</code> <code>sessioncreated(iosession session)</code><code>throws</code> <code>exception;</code>

<code>3</code>

<code></code><code>void</code> <code>sessionclosed(iosession session)</code><code>throws</code> <code>exception;</code>

<code>4</code>

<code></code><code>void</code> <code>sessionidle(iosession session, idlestatus status)</code><code>throws</code> <code>exception;</code>

<code>5</code>

<code></code><code>void</code> <code>exceptioncaught(iosession session, throwable cause)</code><code>throws</code> <code>exception;</code>

<code>6</code>

<code></code><code>void</code> <code>messagereceived(iosession session, object message)</code><code>throws</code> <code>exception;</code>

<code>7</code>

<code></code><code>void</code> <code>messagesent(iosession session, object message)</code><code>throws</code> <code>exception;</code>

<code>8</code>

<code>}</code>

因为iohandler是一个接口,所以如果使用该接口我们就必须实现所有的方法,mina通过使用iohandleradapter来默认实现 iohandler接口,并在iohandleradapter中全部给出空实现,如果我们要开发自己的iohandler,可以继承自iohandleradapter,根据需要选择重写指定的处理mina事件的方法,而对于你不感兴趣的方法就默认不给予实现(默认使用 iohandleradapter的空实现)。

那么,mina调用iohandler的时机是什么呢?又是如何调用的呢?

其实,根据mina的架构,我们知道,在客户端主动发起i/o操作请求以后,会等待mina触发相应的事件,在经过一组iofilter之后,在 iofilter链的最后一个iofilter被调用将要结束的时候,会调用我们注册的iohandler实现,经过处理来满足实际业务逻辑需要。我们可以在defaultiofilterchain中看到一个内部iofilter实现类tailfilter,在该类里调用了 iohandler封装的逻辑,代码如下所示:

<code>01</code>

<code>private</code> <code>static</code> <code>class</code> <code>tailfilter</code><code>extends</code> <code>iofilteradapter {</code>

<code>02</code>

<code></code><code>@override</code>

<code>03</code>

<code></code><code>public</code> <code>void</code> <code>sessioncreated(nextfilter nextfilter, iosession session)</code><code>throws</code><code>exception {</code>

<code>04</code>

<code></code><code>try</code> <code>{</code>

<code>05</code>

<code></code><code>session.gethandler().sessioncreated(session);</code>

<code>06</code>

<code></code><code>}</code><code>finally</code> <code>{</code>

<code>07</code>

<code></code><code>// notify the related future.</code>

<code>08</code>

<code></code><code>connectfuture future = (connectfuture) session.removeattribute(session_created_future);</code>

<code>09</code>

<code></code><code>if</code> <code>(future !=</code><code>null</code><code>) {</code>

<code>10</code>

<code></code><code>future.setsession(session);</code>

<code>11</code>

<code></code><code>}</code>

<code>12</code>

<code>13</code>

<code>14</code>

<code>15</code>

<code>16</code>

<code></code><code>public</code> <code>void</code> <code>sessionopened(nextfilter nextfilter, iosession session)</code><code>throws</code><code>exception {</code>

<code>17</code>

<code></code><code>session.gethandler().sessionopened(session);</code>

<code>18</code>

<code>19</code>

<code>20</code>

<code>21</code>

<code></code><code>public</code> <code>void</code> <code>sessionclosed(nextfilter nextfilter, iosession session)</code><code>throws</code><code>exception {</code>

<code>22</code>

<code></code><code>abstractiosession s = (abstractiosession) session;</code>

<code>23</code>

<code>24</code>

<code></code><code>s.gethandler().sessionclosed(session);</code>

<code>25</code>

<code>26</code>

<code>27</code>

<code></code><code>s.getwriterequestqueue().dispose(session);</code>

<code>28</code>

<code>29</code>

<code>30</code>

<code></code><code>s.getattributemap().dispose(session);</code>

<code>31</code>

<code>32</code>

<code>33</code>

<code></code><code>// remove all filters.</code>

<code>34</code>

<code></code><code>session.getfilterchain().clear();</code>

<code>35</code>

<code>36</code>

<code></code><code>if</code> <code>(s.getconfig().isusereadoperation()) {</code>

<code>37</code>

<code></code><code>s.offerclosedreadfuture();</code>

<code>38</code>

<code>39</code>

<code>40</code>

<code>41</code>

<code>42</code>

<code>43</code>

<code>44</code>

<code>45</code>

<code>46</code>

<code></code><code>public</code> <code>void</code> <code>sessionidle(nextfilter nextfilter, iosession session, idlestatus status)</code><code>throws</code> <code>exception {</code>

<code>47</code>

<code></code><code>session.gethandler().sessionidle(session, status);</code>

<code>48</code>

<code>49</code>

<code>50</code>

<code>51</code>

<code></code><code>public</code> <code>void</code> <code>exceptioncaught(nextfilter nextfilter, iosession session, throwable cause)</code><code>throws</code> <code>exception {</code>

<code>52</code>

<code>53</code>

<code>54</code>

<code></code><code>s.gethandler().exceptioncaught(s, cause);</code>

<code>55</code>

<code>56</code>

<code>57</code>

<code></code><code>s.offerfailedreadfuture(cause);</code>

<code>58</code>

<code>59</code>

<code>60</code>

<code>61</code>

<code>62</code>

<code>63</code>

<code></code><code>public</code> <code>void</code> <code>messagereceived(nextfilter nextfilter, iosession session, object message)</code><code>throws</code> <code>exception {</code>

<code>64</code>

<code>65</code>

<code></code><code>if</code> <code>(!(message</code><code>instanceof</code> <code>iobuffer)) {</code>

<code>66</code>

<code></code><code>s.increasereadmessages(system.currenttimemillis());</code>

<code>67</code>

<code></code><code>}</code><code>else</code> <code>if</code> <code>(!((iobuffer) message).hasremaining()) {</code>

<code>68</code>

<code>69</code>

<code>70</code>

<code>71</code>

<code>72</code>

<code></code><code>session.gethandler().messagereceived(s, message);</code>

<code>73</code>

<code>74</code>

<code>75</code>

<code></code><code>s.offerreadfuture(message);</code>

<code>76</code>

<code>77</code>

<code>78</code>

<code>79</code>

<code>80</code>

<code>81</code>

<code></code><code>public</code> <code>void</code> <code>messagesent(nextfilter nextfilter, iosession session, writerequest writerequest)</code><code>throws</code> <code>exception {</code>

<code>82</code>

<code></code><code>session.gethandler().messagesent(session, writerequest.getmessage());</code>

<code>83</code>

<code>84</code>

<code>85</code>

<code>86</code>

<code></code><code>public</code> <code>void</code> <code>filterwrite(nextfilter nextfilter, iosession session, writerequest writerequest)</code><code>throws</code> <code>exception {</code>

<code>87</code>

<code></code><code>nextfilter.filterwrite(session, writerequest);</code>

<code>88</code>

<code>89</code>

<code>90</code>

<code>91</code>

<code></code><code>public</code> <code>void</code> <code>filterclose(nextfilter nextfilter, iosession session)</code><code>throws</code> <code>exception {</code>

<code>92</code>

<code></code><code>nextfilter.filterclose(session);</code>

<code>93</code>

<code>94</code>

在上面的代码中,正好调用了iohandler接口定义的7个处理事件的方法。如果你还想知道iofilterchain实例是在何时被调用的, 可以跟踪mina的源码。

* ioprocessor

基于网络的端到端的通信,mina通过一个iosession对象(任何获取到一个iosession实例的持有者都可以)来间接执行 i/o操作,如发送数据、读取数据等。在mina内部,当一个iosession调用对应的方法,则直接触发ioprocessor来对指定的事件进行处理,它基于ractor模式来简化网络传输的实现(事实上,java nio就是基于reactor模式实现,属于同步非阻塞io模式)。

我们看一下ioprocessor相关类的继承关系,如图所示:

Mina框架IoHandler与IoProcessor详解

看到上面abstractpollingconnectionlessioacceptor,我们知道,它同时也是ioservice的实现,用于网络通信中的服务端,处理接收请求。可见,对于基于udp/ip的传输,ioacceptor和ioprocessor的处理是实现在一起的,可能实际处理的逻辑本身比较简单,放到一起能够更好地表达它们之间的紧密联系。

下面我们看一下ioprocessor接口的定义,如下所示:

<code>public</code> <code>interface</code> <code>ioprocessor&lt;s</code><code>extends</code> <code>iosession&gt; {</code>

<code></code><code>boolean</code> <code>isdisposing();</code>

<code></code><code>boolean</code> <code>isdisposed();</code>

<code></code><code>void</code> <code>dispose();</code>

<code></code><code>void</code> <code>add(s session);</code>

<code></code><code>void</code> <code>flush(s session);</code>

<code></code><code>void</code> <code>write(s session, writerequest writerequest);</code>

<code></code><code>void</code> <code>updatetrafficcontrol(s session);</code>

<code></code><code>void</code> <code>remove(s session);</code>

我们根据上面接口总结一下,一个ioprocessor实际处理了如下内容:

添加iosession实例,主要是使用ioprocessor内部的一个iosession队列newsessions来存放。

flush指定iosession实例到ioprocessor内部的flushingsessions队列。

write一个iosession实例对应的writerequest,主要是将一个writerequest加入到 iosession实例所持有的writerequestqueue writerequestqueue队列。至于加入到队列的请求何时处理,其实我们可以参考ioprocessor的实现abstractpollingioprocessor 类,其内部有一个org.apache.mina.core.polling.abstractpollingioprocessor.processor 线程类,这个线程会在调用一个ioprocessor的方法public final void add(s session)的时候被启动(实际,在调用dispose()、add(s session)、remove(s session)这三个方法的时候,都会尝试着去启动一个processor线程,如果没有启动则会启动这个线程),然后反复循环检测并处理队列中的写请求。

当ioprocessor关闭与一个iosession实例实例相关的连接,则会将这个iosession实例从 removingsessions队列中移除。

控制处理事件的通信量,主要是控制读写:如果没有注册读操作(selectionkey.op_read),则注册一个,否则当一个 读操作已经就绪,则进行读数据的处理;如果没有注册写操作(selectionkey.op_write),则注册一个,否则当一个写操 作已经就绪,则进行写数据的处理。

释放所有与ioprocessor有关的资源。

总结

经过上面的对比,我们已经能够知道iohandler与ioprocessor的本质区别。

iohandler是在iofilterchain执行中最后一个iofilter中被调用,比如,经过iofilterchain进行 codec、logging等等操作之后,已经将通信层的数据转化成我们需要的业务对象数据,这时就可以调用我们定义的iohandler实 现来进行处理。

ioprocessor是与实际的socket或channel相关的操作紧密相关的,也就是说,它是支撑mina进行处理底层实际i/o请 求的处理器。