天天看點

用Jetty 9.1運作Java WebSockets微服務

  該案例目标是要建設一個從用戶端程式接受消息并廣播到目前連接配接的所有其他用戶端WebSocket伺服器。假設有一個消息模型:

package com.example.services;

public class Message {

    private String username;

    private String message;

    public Message() {

    }

    public Message( final String username, final String message ) {

        this.username = username;

        this.message = message;

    public String getMessage() {

        return message;

    public String getUsername() {

        return username;

    public void setMessage( final String message ) {

    }

    public void setUsername( final String username ) {

}

用戶端代碼:

@ClientEndpoint

public class BroadcastClientEndpoint {

    private static final Logger log = Logger.getLogger( 

        BroadcastClientEndpoint.class.getName() );

    @OnOpen

    public void onOpen( final Session session ) throws IOException, EncodeException  {

        session.getBasicRemote().sendObject( new Message( "Client", "Hello!" ) );

    @OnMessage

    public void onMessage( final Message message ) {

        log.info( String.format( "Received message '%s' from '%s'",

            message.getMessage(), message.getUsername() ) );

@OnOpen 是當用戶端連接配接到伺服器開始調用,@OnMessage是每次伺服器向用戶端發送消息時調用。

    public static class MessageDecoder implements Decoder.Text< Message > {

        private JsonReaderFactory factory = Json.createReaderFactory( Collections.< String, Object >emptyMap() );

        @Override

        public void init( final EndpointConfig config ) {

        }

        public Message decode( final String str ) throws DecodeException {

            final Message message = new Message();

            try( final JsonReader reader = factory.createReader( new StringReader( str ) ) ) {

                final JsonObject json = reader.readObject();

                message.setUsername( json.getString( "username" ) );

                message.setMessage( json.getString( "message" ) );

            }

            return message;

        public boolean willDecode( final String str ) {

            return true;

        public void destroy() {

我們需要告訴用戶端,我們有一個Json編碼器和解碼器,在BroadcastClientEndpoint類上加入:

@ClientEndpoint( encoders = { MessageEncoder.class }, decoders = { MessageDecoder.class } )

下面是調用運作代碼:

public class ClientStarter {

    public static void main( final String[] args ) throws Exception {

        final String client = UUID.randomUUID().toString().substring( 0, 8 );

        final WebSocketContainer container = ContainerProvider.getWebSocketContainer();    

        final String uri = "ws://localhost:8080/broadcast"; 

        try( Session session = container.connectToServer( BroadcastClientEndpoint.class, URI.create( uri ) ) ) {

            for( int i = 1; i <= 10; ++i ) {

                session.getBasicRemote().sendObject( new Message( client, "Message #" + i ) );

                Thread.sleep( 1000 );

        // Application doesn't exit if container's threads are still running

        ( ( ClientContainer )container ).stop();

這是連接配接URL ws://localhost:8080/broadcast,随機挑選一些用戶端名稱(從UUID),每1秒的延遲産生10條資訊,(隻是為了確定我們有時間去接收他們都回來了)。

下面是伺服器端的代碼:

@ServerEndpoint( 

    value = "/broadcast", 

    encoders = { MessageEncoder.class }, 

    decoders = { MessageDecoder.class } 

public class BroadcastServerEndpoint {

    private static final Set< Session > sessions = 

        Collections.synchronizedSet( new HashSet< Session >() );

    public void onOpen( final Session session ) {

        sessions.add( session );

    @OnClose

    public void onClose( final Session session ) {

        sessions.remove( session );

    public void onMessage( final Message message, final Session client ) 

            throws IOException, EncodeException {

        for( final Session session: sessions ) {

            session.getBasicRemote().sendObject( message );

為了使這個伺服器端點能夠運作,我們将其注冊入Jetty伺服器,Jetty9.能夠在嵌入下運作:

public class ServerStarter  {

    public static void main( String[] args ) throws Exception {

        Server server = new Server( 8080 );

        // Create the 'root' Spring application context

        final ServletHolder servletHolder = new ServletHolder( new DefaultServlet() );

        final ServletContextHandler context = new ServletContextHandler();

        context.setContextPath( "/" );

        context.addServlet( servletHolder, "/*" );

        context.addEventListener( new ContextLoaderListener() );   

        context.setInitParameter( "contextClass", AnnotationConfigWebApplicationContext.class.getName() );

        context.setInitParameter( "contextConfigLocation", AppConfig.class.getName() );

        server.setHandler( context );

        WebSocketServerContainerInitializer.configureContext( context );       

        server.start();

        server.join(); 

最重要的是WebSocketServerContainerInitializer.configureContext:,它是建立一個Websockets的容器,目前容器内什麼也沒有,我們沒有注冊進入我們的伺服器端點。

Spring的AppConfig 能夠幫助我們做到這點:

@Configuration

public class AppConfig  {

    @Inject private WebApplicationContext context;

    private ServerContainer container;

    public class SpringServerEndpointConfigurator extends ServerEndpointConfig.Configurator {

        public < T > T getEndpointInstance( Class< T > endpointClass ) 

                throws InstantiationException {

            return context.getAutowireCapableBeanFactory().createBean( endpointClass );   

    @Bean

    public ServerEndpointConfig.Configurator configurator() {

        return new SpringServerEndpointConfigurator();

    @PostConstruct

    public void init() throws DeploymentException {

        container = ( ServerContainer )context.getServletContext().

            getAttribute( javax.websocket.server.ServerContainer.class.getName() );

        container.addEndpoint( 

            new AnnotatedServerEndpointConfig( 

                BroadcastServerEndpoint.class, 

                BroadcastServerEndpoint.class.getAnnotation( ServerEndpoint.class )  

            ) {

                @Override

                public Configurator getConfigurator() {

                    return configurator();

                }

        );

    }  

容器通過調用構造函數将建立container,然後每一次新的用戶端連接配接建立一個伺服器端點的新執行個體。

我們檢索的WebSockets容器的方法是Jetty專用規範:查詢來自名為“javax.websocket.server.ServerContainer”上下文的屬性。

最後運作:

mvn clean package

java -jar target\jetty-web-sockets-jsr356-0.0.1-SNAPSHOT-server.jar // run server

java -jar target/jetty-web-sockets-jsr356-0.0.1-SNAPSHOT-client.jar // run yet another client

輸出結果部分:

Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage

INFO: Received message 'Hello!' from 'Client'

INFO: Received message 'Message #1' from '392f68ef'

INFO: Received message 'Message #2' from '8e3a869d'

INFO: Received message 'Message #7' from 'ca3a06d0'

Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage

INFO: Received message 'Message #4' from '6cb82119'

INFO: Received message 'Message #2' from '392f68ef'

INFO: Received message 'Message #3' from '8e3a869d'

INFO: Received message 'Message #8' from 'ca3a06d0'

Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage

INFO: Received message 'Message #5' from '6cb82119'