天天看點

Mina源碼閱讀筆記(四)—Mina的連接配接IoConnector2

接着Mina源碼閱讀筆記(四)—Mina的連接配接IoConnector1,,我們繼續:

AbstractIoAcceptor:

001

package

org.apache.mina.core.rewrite.service;

002

003

import

java.io.IOException;

004

import

java.net.SocketAddress;

005

import

java.util.ArrayList;

006

import

java.util.Collections;

007

import

java.util.HashSet;

008

import

java.util.List;

009

import

java.util.Set;

010

import

java.util.concurrent.Executor;

011

012

public

abstract

class

AbstractIoAcceptor 

extends

AbstractIoService 

implements

013

IoAcceptor {

014

015

private

final

List<SocketAddress> defaultLocalAddresses = 

new

ArrayList<SocketAddress>();

016

017

private

final

List<SocketAddress> unmodifiableDeffaultLocalAddresses = Collections

018

.unmodifiableList(defaultLocalAddresses);

019

020

private

final

Set<SocketAddress> boundAddresses = 

new

HashSet<SocketAddress>();

021

022

private

boolean

disconnectOnUnbind = 

true

;

023

024

025

protected

final

Object bindLock = 

new

Object();

026

027

034

protected

AbstractIoAcceptor(Object param, Executor executor) {

035

super

(param, executor);

036

defaultLocalAddresses.add(

null

);

037

}

038

039

@Override

040

public

SocketAddress getLocalAddress() {

041

042

Set<SocketAddress> localAddresses = getLocalAddresses();

043

if

(localAddresses.isEmpty()) {

044

return

null

;

045

}

046

return

localAddresses.iterator().next();

047

}

048

049

@Override

050

public

final

Set<SocketAddress> getLocalAddresses() {

051

Set<SocketAddress> localAddresses = 

new

HashSet<SocketAddress>();

052

synchronized

(boundAddresses) {

053

localAddresses.addAll(boundAddresses);

054

}

055

return

localAddresses;

056

}

057

058

@Override

059

public

void

bind(SocketAddress localAddress) 

throws

IOException {

060

// TODO Auto-generated method stub

061

062

}

063

064

@Override

065

public

void

bind(Iterable<? 

extends

SocketAddress> localAddresses)

066

throws

IOException {

067

// TODO isDisposing()

068

069

if

(localAddresses == 

null

) {

070

throw

new

IllegalArgumentException(

"localAddresses"

);

071

}

072

073

List<SocketAddress> localAddressesCopy = 

new

ArrayList<SocketAddress>();

074

075

for

(SocketAddress a : localAddresses) {

076

// TODO check address type

077

localAddressesCopy.add(a);

078

}

079

080

if

(localAddressesCopy.isEmpty()) {

081

throw

new

IllegalArgumentException(

"localAddresses is empty"

);

082

}

083

084

boolean

active = 

false

;

085

086

synchronized

(bindLock) {

087

synchronized

(boundAddresses) {

088

if

(boundAddresses.isEmpty()) {

089

active = 

true

;

090

}

091

}

092

}

093

094

if

(getHandler() == 

null

) {

095

throw

new

IllegalArgumentException(

"handler is not set"

);

096

}

097

098

try

{

099

Set<SocketAddress> addresses = bindInternal(localAddressesCopy);

100

101

synchronized

(boundAddresses) {

102

boundAddresses.addAll(addresses);

103

}

104

catch

(IOException e) {

105

throw

e;

106

catch

(RuntimeException e) {

107

throw

e;

108

catch

(Throwable e) {

109

throw

new

RuntimeException(

"Filed ti bind"

);

110

}

111

112

if

(active){

113

//do sth

114

}

115

}

116

117

protected

abstract

Set<SocketAddress> bindInternal(

118

List<? 

extends

SocketAddress> localAddress) 

throws

Exception;

119

120

@Override

121

public

void

unbind(SocketAddress localAddress) {

122

// TODO Auto-generated method stub

123

124

}

125

}

polling:

01

package

org.apache.mina.core.rewrite.polling;

02

03

import

java.net.SocketAddress;

04

import

java.nio.channels.ServerSocketChannel;

05

import

java.util.List;

06

import

java.util.Set;

07

import

java.util.concurrent.Executor;

08

import

java.util.concurrent.Semaphore;

09

import

java.util.concurrent.atomic.AtomicReference;

10

11

import

org.apache.mina.core.rewrite.service.AbstractIoAcceptor;

12

13

public

abstract

class

AbstractPollingIoAcceptor 

extends

AbstractIoAcceptor {

14

15

private

final

Semaphore lock = 

new

Semaphore(

1

);

16

17

private

volatile

boolean

selectable;

18

19

private

AtomicReference<Acceptor> acceptorRef = 

new

AtomicReference<Acceptor>();

20

21

24

protected

int

backlog = 

50

;

25

26

32

protected

AbstractPollingIoAcceptor(Object param, Executor executor) {

33

super

(param, executor);

34

// TODO Auto-generated constructor stub

35

}

36

37

42

protected

abstract

void

init() 

throws

Exception;

43

44

protected

abstract

void

destory() 

throws

Exception;

45

46

protected

abstract

int

select() 

throws

Exception;

47

48

protected

abstract

ServerSocketChannel open(SocketAddress localAddress) 

throws

Exception;

49

50

@Override

51

protected

Set<SocketAddress> bindInternal(

52

List<? 

extends

SocketAddress> localAddress) 

throws

Exception {

53

// ...

54

try

{

55

lock.acquire();

56

Thread.sleep(

10

);

57

finally

{

58

lock.release();

59

}

60

// ...

61

return

null

;

62

}

63

64

71

private

class

Acceptor 

implements

Runnable {

72

@Override

73

public

void

run() {

74

assert

(acceptorRef.get() == 

this

);

75

76

int

nHandles = 

;

77

78

lock.release();

79

80

while

(selectable) {

81

try

{

82

int

selected = select();

83

84

// nHandles+=registerHandles();

85

86

if

(nHandles == 

) {

87

acceptorRef.set(

null

);

88

// ...

89

}

90

catch

(Exception e) {

91

92

}

93

}

94

}

95

}

96

}

好了最後看NioSoeketAcceptor:

001

package

org.apache.mina.rewrite.transport.socket.nio;

002

003

import

java.net.InetSocketAddress;

004

import

java.net.ServerSocket;

005

import

java.net.SocketAddress;

006

import

java.nio.channels.SelectionKey;

007

import

java.nio.channels.Selector;

008

import

java.nio.channels.ServerSocketChannel;

009

import

java.util.concurrent.Executor;

010

011

import

org.apache.mina.core.rewrite.polling.AbstractPollingIoAcceptor;

012

import

org.apache.mina.rewrite.transport.socket.SocketAcceptor;

013

014

public

final

class

NioSocketAcceptor 

extends

AbstractPollingIoAcceptor

015

implements

SocketAcceptor {

016

017

private

volatile

Selector selector;

018

019

protected

NioSocketAcceptor(Object param, Executor executor) {

020

super

(param, executor);

021

// TODO Auto-generated constructor stub

022

}

023

024

@Override

025

public

int

getManagedSessionCount() {

026

// TODO Auto-generated method stub

027

return

;

028

}

029

030

037

@Override

038

public

InetSocketAddress getLocalAddress() {

039

// TODO Auto-generated method stub

040

return

null

;

041

}

042

043

@Override

044

public

void

setDefaultLocalAddress(InetSocketAddress localAddress) {

045

// TODO Auto-generated method stub

046

047

}

048

049

@Override

050

public

boolean

isReuseAddress() {

051

// TODO Auto-generated method stub

052

return

false

;

053

}

054

055

@Override

056

protected

void

init() 

throws

Exception {

057

selector = Selector.open();

058

}

059

060

@Override

061

protected

void

destory() 

throws

Exception {

062

if

(selector != 

null

) {

063

selector.close();

064

}

065

}

066

067

@Override

068

protected

int

select() 

throws

Exception {

069

return

selector.select();

070

}

071

072

@Override

073

protected

void

dispose0() 

throws

Exception {

074

// TODO Auto-generated method stub

075

076

}

077

078

protected

ServerSocketChannel open(SocketAddress localAddress)

079

throws

Exception {

080

ServerSocketChannel channel =ServerSocketChannel.open();

081

082

boolean

success=

false

;

083

084

try

{

085

channel.configureBlocking(

false

);

086

087

ServerSocket socket=channel.socket();

088

089

socket.setReuseAddress(isReuseAddress());

090

091

socket.bind(localAddress);

092

093

channel.register(selector, SelectionKey.OP_ACCEPT);

094

095

success=

true

;

096

}

finally

{

097

if

(!success){

098

//close(channel);

099

}

100

}

101

return

channel;

102

}

103

104

@Override

105

public

boolean

isActive() {

106

// TODO Auto-generated method stub

107

return

false

;

108

}

109

110

}

------------------------------------------------------

到此為止将連接配接部分都寫完了,在連接配接部分還有些零碎的東西,比如handler、polling,這些都隻是稍稍提了一下,具體後面會在介紹其他部分是肯定還會碰上,我還是想把重心放在最主要的部分去寫,下一篇應該要寫到session了。

繼續閱讀