天天看点

用Jetty 9.1运行Java WebSockets微服务

  该案例目标是要建设一个从客户端程序接受消息并广播到当前连接的所有其他客户端WebSocket服务器。假设有一个消息模型:

package com.example.services;

public class Message {

    private String username;

    private String message;

    public Message() {

    }

    public Message( final String username, final String message ) {

        this.username = username;

        this.message = message;

    public String getMessage() {

        return message;

    public String getUsername() {

        return username;

    public void setMessage( final String message ) {

    }

    public void setUsername( final String username ) {

}

客户端代码:

@ClientEndpoint

public class BroadcastClientEndpoint {

    private static final Logger log = Logger.getLogger( 

        BroadcastClientEndpoint.class.getName() );

    @OnOpen

    public void onOpen( final Session session ) throws IOException, EncodeException  {

        session.getBasicRemote().sendObject( new Message( "Client", "Hello!" ) );

    @OnMessage

    public void onMessage( final Message message ) {

        log.info( String.format( "Received message '%s' from '%s'",

            message.getMessage(), message.getUsername() ) );

@OnOpen 是当客户端连接到服务器开始调用,@OnMessage是每次服务器向客户端发送消息时调用。

    public static class MessageDecoder implements Decoder.Text< Message > {

        private JsonReaderFactory factory = Json.createReaderFactory( Collections.< String, Object >emptyMap() );

        @Override

        public void init( final EndpointConfig config ) {

        }

        public Message decode( final String str ) throws DecodeException {

            final Message message = new Message();

            try( final JsonReader reader = factory.createReader( new StringReader( str ) ) ) {

                final JsonObject json = reader.readObject();

                message.setUsername( json.getString( "username" ) );

                message.setMessage( json.getString( "message" ) );

            }

            return message;

        public boolean willDecode( final String str ) {

            return true;

        public void destroy() {

我们需要告诉客户端,我们有一个Json编码器和解码器,在BroadcastClientEndpoint类上加入:

@ClientEndpoint( encoders = { MessageEncoder.class }, decoders = { MessageDecoder.class } )

下面是调用运行代码:

public class ClientStarter {

    public static void main( final String[] args ) throws Exception {

        final String client = UUID.randomUUID().toString().substring( 0, 8 );

        final WebSocketContainer container = ContainerProvider.getWebSocketContainer();    

        final String uri = "ws://localhost:8080/broadcast"; 

        try( Session session = container.connectToServer( BroadcastClientEndpoint.class, URI.create( uri ) ) ) {

            for( int i = 1; i <= 10; ++i ) {

                session.getBasicRemote().sendObject( new Message( client, "Message #" + i ) );

                Thread.sleep( 1000 );

        // Application doesn't exit if container's threads are still running

        ( ( ClientContainer )container ).stop();

这是连接URL ws://localhost:8080/broadcast,随机挑选一些客户端名称(从UUID),每1秒的延迟产生10条信息,(只是为了确保我们有时间去接收他们都回来了)。

下面是服务器端的代码:

@ServerEndpoint( 

    value = "/broadcast", 

    encoders = { MessageEncoder.class }, 

    decoders = { MessageDecoder.class } 

public class BroadcastServerEndpoint {

    private static final Set< Session > sessions = 

        Collections.synchronizedSet( new HashSet< Session >() );

    public void onOpen( final Session session ) {

        sessions.add( session );

    @OnClose

    public void onClose( final Session session ) {

        sessions.remove( session );

    public void onMessage( final Message message, final Session client ) 

            throws IOException, EncodeException {

        for( final Session session: sessions ) {

            session.getBasicRemote().sendObject( message );

为了使这个服务器端点能够运行,我们将其注册入Jetty服务器,Jetty9.能够在嵌入下运行:

public class ServerStarter  {

    public static void main( String[] args ) throws Exception {

        Server server = new Server( 8080 );

        // Create the 'root' Spring application context

        final ServletHolder servletHolder = new ServletHolder( new DefaultServlet() );

        final ServletContextHandler context = new ServletContextHandler();

        context.setContextPath( "/" );

        context.addServlet( servletHolder, "/*" );

        context.addEventListener( new ContextLoaderListener() );   

        context.setInitParameter( "contextClass", AnnotationConfigWebApplicationContext.class.getName() );

        context.setInitParameter( "contextConfigLocation", AppConfig.class.getName() );

        server.setHandler( context );

        WebSocketServerContainerInitializer.configureContext( context );       

        server.start();

        server.join(); 

最重要的是WebSocketServerContainerInitializer.configureContext:,它是创建一个Websockets的容器,目前容器内什么也没有,我们没有注册进入我们的服务器端点。

Spring的AppConfig 能够帮助我们做到这点:

@Configuration

public class AppConfig  {

    @Inject private WebApplicationContext context;

    private ServerContainer container;

    public class SpringServerEndpointConfigurator extends ServerEndpointConfig.Configurator {

        public < T > T getEndpointInstance( Class< T > endpointClass ) 

                throws InstantiationException {

            return context.getAutowireCapableBeanFactory().createBean( endpointClass );   

    @Bean

    public ServerEndpointConfig.Configurator configurator() {

        return new SpringServerEndpointConfigurator();

    @PostConstruct

    public void init() throws DeploymentException {

        container = ( ServerContainer )context.getServletContext().

            getAttribute( javax.websocket.server.ServerContainer.class.getName() );

        container.addEndpoint( 

            new AnnotatedServerEndpointConfig( 

                BroadcastServerEndpoint.class, 

                BroadcastServerEndpoint.class.getAnnotation( ServerEndpoint.class )  

            ) {

                @Override

                public Configurator getConfigurator() {

                    return configurator();

                }

        );

    }  

容器通过调用构造函数将创建container,然后每一次新的客户端连接创建一个服务器端点的新实例。

我们检索的WebSockets容器的方法是Jetty专用规范:查询来自名为“javax.websocket.server.ServerContainer”上下文的属性。

最后运行:

mvn clean package

java -jar target\jetty-web-sockets-jsr356-0.0.1-SNAPSHOT-server.jar // run server

java -jar target/jetty-web-sockets-jsr356-0.0.1-SNAPSHOT-client.jar // run yet another client

输出结果部分:

Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage

INFO: Received message 'Hello!' from 'Client'

INFO: Received message 'Message #1' from '392f68ef'

INFO: Received message 'Message #2' from '8e3a869d'

INFO: Received message 'Message #7' from 'ca3a06d0'

Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage

INFO: Received message 'Message #4' from '6cb82119'

INFO: Received message 'Message #2' from '392f68ef'

INFO: Received message 'Message #3' from '8e3a869d'

INFO: Received message 'Message #8' from 'ca3a06d0'

Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage

INFO: Received message 'Message #5' from '6cb82119'