文章
问答
冒泡
SpringBoot整合WebSocket

前言

WebSocket是一种单TCP连接上的全双工通讯协议,客户端和服务端只需要完成一次握手,就可以建立持久性的连接,并进行双向数据传输。在项目中往往用于消息推送,页面实时刷新等功能。

本文主要介绍SpringBoot整合WebSocket常用的三种方式:

  1. 原生注解
  2. Spring封装API
  3. 基于STOMP消息

下面是spring对websocket的详细介绍

在Spring Boot项目中,你可以添加WebSocket Starter来引入WebSocket的支持

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

 

原生注解

配置类,往spring容器中注入 ServerEndpointExporter

/**
* 自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
* 要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。
*/
@Configuration
    public class WebSocketAnnotationConfig {

        @Bean
        public ServerEndpointExporter serverEndpointExporter() {
            return new ServerEndpointExporter();
        }
    }

通过注解来具体实现

@Slf4j
@Component
@ServerEndpoint("/demo/{userName}") // WebSocket客户端建立连接的地址
public class WebSocketAnnotationController {
    /**
     * 用线程安全的map存放存活的session
     */
    private static Map<String, Session> livingSessionMap = new ConcurrentHashMap<>();

    /**
     * 建立连接的回调方法
     *
     * @param session  与客户端的WebSocket连接会话
     * @param userName 用户名,WebSocket支持路径参数
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userName") String userName) {
        livingSessionMap.put(session.getId(), session);
        sendMessageToAll(userName + " 连接成功!");
    }

    /**
     * 收到客户端消息的回调方法
     *
     * @param message 客户端传过来的消息
     * @param session 对应的session
     */
    @OnMessage
    public void onMessage(String message, Session session, @PathParam("userName") String userName) {
        sendMessageToAll(userName + " : " + message);
    }

    /**
     * 发生错误的回调方法
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error(error.getMessage());
    }

    /**
     * 关闭连接的回调方法
     */
    @OnClose
    public void onClose(Session session, @PathParam("userName") String userName) {
        livingSessionMap.remove(session.getId());
        sendMessageToAll(userName + " 关闭连接!");
    }

    /**
     * 单独发送消息
     *
     * @param session
     * @param message
     */
    public void sendMessage(Session session, String message) {
        try {
            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 群发消息
     *
     * @param message
     */
    public void sendMessageToAll(String message) {
        livingSessionMap.forEach((sessionId, session) -> {
            sendMessage(session, message);
        });
    }
}

这个类里用到的注解都是jdk自带的不是spring提供的,@ServerEndpoint类似于@RequestMapping,@OnOpen、@OnClose、@OnError都要有一个Session的参数,

而且@OnMessage第一个String类型的参数是客户端传入的值。

Html页面

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8">
    <title>demo</title>
    <script src="http://code.jquery.com/jquery-1.8.0.min.js"></script>
  </head>
  <body>
    <h1>WebSocketAnnotation</h1>
    <div>
      <label>用户</label>
      <input id="user_name">
      <button id="connect">连接</button>
      <button id="disconnect">断开</button>
    </div>
    <div>
      <label>发送消息</label>
      <input type="text" id="send_msg">
      <button id="send">发送</button>
    </div>
    <div>
      <label>接收到的消息</label>
      <textarea id="receive_msg" readonly="readonly"></textarea>
    </div>
  </body>
</html>

<script>
  $(function () {

    const prefixUrl = 'ws://localhost:8080/demo/';

    let ws;//WebSocket连接对象

    //判断当前浏览器是否支持WebSocket
    if (!('WebSocket' in window)) {
      alert('Not support websocket');
    }

    $('#connect').click(function () {

      const userName = $('#user_name').val();

      //创建WebSocket连接对象
      ws = new WebSocket(prefixUrl + userName);

      //连接成功建立的回调方法
      ws.onopen = function (event) {
        console.log('建立连接')
      }

      //接收到消息的回调方法
      ws.onmessage = function (event) {
        console.log('接收到的消息:' + event.data)
        $('#receive_msg').append(event.data + '\n')
      }

      //连接发生错误的回调方法
      ws.onerror = function (event) {
        console.log('发生错误')
      }

      //连接关闭的回调方法
      ws.onclose = function (event) {
        console.log('关闭连接')
      }

    })

    //发送消息
    function sendMessage(message) {
      ws.send(message);
    }

    //关闭连接
    function closeWebSocket() {
      ws.close();
    }

    //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
    window.onbeforeunload = function () {
      wx.close();
    }


    //发送消息
    $('#send').click(function () {
      sendMessage($('#send_msg').val())
    })

    //点击断开连接
    $('#disconnect').click(function () {
      closeWebSocket();
    })
  })
</script>

 

Spring封装API

首先是拦截器,继承了HandshakeInterceptor,重写了握手前和握手后的两个回调方法,这边在握手前将url中的参数放到attributes中以便于后面获取

@Component
public class WebSocketIntercept implements HandshakeInterceptor {
    /**
     * 握手前
     * @param request
     * @param response
     * @param wsHandler
     * @param attributes
     * @return
     * @throws Exception
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        // TODO:这边因为参数只有userName=,所以简单处理
        String query = request.getURI().getQuery();
        String[] split = query.split("=");
        attributes.put(split[0],split[1]);
        return true;
    }

    /**
     * 握手后
     * @param request
     * @param response
     * @param wsHandler
     * @param exception
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {

    }
}

WebSocket的处理类,这边因为传输文本消息继承了TextWebSocketHandler,如果业务复杂可以继承它的父类AbstractWebSocketHandler,重写的方法和上面的注解实现的效果是一致的

注意:这边WebSocketSession的getAttributes方法里面的参数是上面拦截器存进去的

@Slf4j
@Component
public class WebSocketHandlerImp extends TextWebSocketHandler {

    /**
     * 用线程安全的map存放存活的session
     */
    private static Map<String, WebSocketSession> livingSessionMap = new ConcurrentHashMap<>();

    /**
     * 建立连接的回调方法
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        livingSessionMap.put(session.getId(), session);
        String userName = String.valueOf(session.getAttributes().get("userName"));
        sendMessageToAll(userName + " 连接成功!");
    }

    /**
     * 收到客户端消息的回调方法
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String userName = String.valueOf(session.getAttributes().get("userName"));
        sendMessageToAll(userName + " : " + message.getPayload());
    }

    /**
     * 发生错误的回调方法
     * @param session
     * @param exception
     * @throws Exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        log.error(exception.getMessage());
    }

    /**
     * 关闭连接的回调方法
     * @param session
     * @param status
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        livingSessionMap.remove(session.getId());
        String userName = String.valueOf(session.getAttributes().get("userName"));
        sendMessageToAll(userName + " 关闭连接!");
    }


    /**
     * 群发消息
     *
     * @param message
     */
    public void sendMessageToAll(String message) {
        livingSessionMap.forEach((sessionId, session) -> {
            try {
                session.sendMessage(new TextMessage(message));
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }
}

最后是配置类,要实现WebSocketConfigurer,类上要加注解@EnableWebSocket,将上面的拦截器和处理类添加进去,然后withsockJS表示支持SockJS

@Configuration
@EnableWebSocket
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketConfigurer {
    private final WebSocketHandlerImp webSocketHandler;
    private final WebSocketIntercept webSocketIntercept;

    /**
     * 注册WebSocketHandlers处理类和访问的地址
     * @param registry
     */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(webSocketHandler, "/demo2")
                .addInterceptors(webSocketIntercept) //添加拦截器,获取url上的参数
                .withSockJS();// 支持SockJS
    }

}

Html页面

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>demo</title>
    <script src="http://code.jquery.com/jquery-1.8.0.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
</head>
<body>
    <h1>WebSocketHandler</h1>
    <div>
        <label>用户</label>
        <input id="user_name">
        <button id="connect">连接</button>
        <button id="disconnect">断开</button>
    </div>
    <div>
        <label>发送消息</label>
        <input type="text" id="send_msg">
        <button id="send">发送</button>
    </div>
    <div>
        <label>接收到的消息</label>
        <textarea id="receive_msg" readonly="readonly"></textarea>
    </div>
</body>
</html>

<script>
    $(function () {

        // const prefixUrl = 'ws://localhost:8080/demo2';
        const socketUrl = 'http://localhost:8080/demo2';

        let ws;//WebSocket连接对象

        //判断当前浏览器是否支持WebSocket
        if (!('WebSocket' in window)) {
            alert('Not support websocket');
        }

        $('#connect').click(function () {

            const userName = $('#user_name').val();

            //创建WebSocket连接对象
            // ws = new WebSocket(prefixUrl);
            //创建SockJS对象
            ws = new SockJS(socketUrl+'?userName='+userName);


            //连接成功建立的回调方法
            ws.onopen = function (event) {
                console.log('建立连接')
            }

            //接收到消息的回调方法
            ws.onmessage = function (event) {
                console.log('接收到的消息:' + event.data)
                $('#receive_msg').append(event.data + '\n')
            }

            //连接发生错误的回调方法
            ws.onerror = function (event) {
                console.log('发生错误')
            }

            //连接关闭的回调方法
            ws.onclose = function (event) {
                console.log('关闭连接')
            }

        })

        //发送消息
        function sendMessage(message) {
            ws.send(message);
        }

        //关闭连接
        function closeWebSocket() {
            ws.close();
        }

        //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
        window.onbeforeunload = function () {
            wx.close();
        }


        //发送消息
        $('#send').click(function () {
            sendMessage($('#send_msg').val())
        })

        //点击断开连接
        $('#disconnect').click(function () {
            closeWebSocket();
        })
    })
</script>

SockJS的api和WebSocket的完全一致,只有url的要求不同,WebSocket的开头是ws://或者wss://,而SockJS的开头是http://或https://

sockjs-client官方文档:https://github.com/sockjs/sockjs-client

 

基于STOMP消息

STOMP是一个用于C/S之间进行异步消息传输的简单文本协议;STOMP服务端被设计为客户端可以向其发送消息的一组目标地址,比如可以用定义/topic作为发布订阅模式,消息会被所有客户端接收到,还可以用/user作为点对点只有特定的客户端可以接收;STOMP客户端可以作为生产者send发送消息到指定地址,也可以作为消费者SUBSCRIBE订阅消息。

上图是spring中WebSocket的架构图

主要还是通过代码来分析

首先是配置类,要实现WebSocketMessageBrokerConfigurer,并且类上要加注解@EnableWebSocketMessageBroker

@Configuration
// 此注解表示使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
    private final WebSocketStompIntercept webSocketStompIntercept;

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        registry.setMessageSizeLimit(2048 * 2048);
        registry.setSendBufferSizeLimit(2048 * 2048);
        registry.setSendTimeLimit(2048 * 2048);
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册Stomp的端点
        registry.addEndpoint("/demo3") // 添加STOMP协议的端点。这个HTTP URL是供WebSocket或SockJS客户端访问的地址
                .setAllowedOriginPatterns("*") // 指定允许跨域访问
                .withSockJS(); // 指定端点使用SockJS协议
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 用户可以订阅来自以/topic、/queue为前缀的消息
        registry.enableSimpleBroker( "/topic", "/queue");
        // 全局使用的订阅前缀(客户端订阅路径上会体现出来)
        registry.setApplicationDestinationPrefixes("/app");
        // 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user
        registry.setUserDestinationPrefix("/user");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        // 添加拦截器
        registration.interceptors(webSocketStompIntercept);
    }
}

然后是自定义的拦截器,实现ChannelInterceptor,然后我这边只是在服务器发送完消息后判断是否连接

@Slf4j
@Configuration
@RequiredArgsConstructor
public class WebSocketStompIntercept implements ChannelInterceptor {

    private static Map<String, Object> livingSessionMap = new ConcurrentHashMap<>();

    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        String sessionId = accessor.getSessionId();
        Object headers = accessor.getMessageHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
        Object userName = null;
        if (headers instanceof Map) {
            userName = ((Map) headers).get("userName");
        }

        // 用户建立连接
        if (StompCommand.CONNECT.equals(command)) {
            livingSessionMap.put(sessionId, userName);
            log.info(userName + " 建立连接!");
        }

        // 用户断开连接
        if(StompCommand.DISCONNECT.equals(command)) {
            userName = livingSessionMap.get(sessionId);
            if (userName != null) {
                livingSessionMap.remove(sessionId);
                log.info(userName + " 断开连接!");
            }
        }
    }

}

接着是Controller,首先@MessageMapping注解可以接受客户端send方法发过来的消息,@SubscribeMapping注解可以接受客户端subscribe方法的订阅,注解方法的返回值可以直接被客户端接收到。

然后发送消息可以通过SimpMessagingTemplate,也可以通过注解@SendTo来发送,如果是点对点的模式可以通过@SendToUser来实现。

@Slf4j
@Controller
@RequiredArgsConstructor
public class WebSocketStompController {

    /**
     * 可以在应用的任意地方发送消息
     */
    private final SimpMessagingTemplate simpMessageTemplate;

    @SubscribeMapping("/topic/chat") //处理subscribe发送的消息,subscribe方法的连接是/topic/chat
    public String onSubscribe() {
        return "init ok!";
    }

    @MessageMapping("/message") //处理send发送的消息,send方法的连接是/app/message
    public void onSend(String message) {
        simpMessageTemplate.convertAndSend("/topic/chat", message);
    }

    @MessageMapping("/guide_all")
    @SendTo("/queue/greeting1")//广播推送,客户端订阅/queue/greeting1
    public String onUser1(String message) {
        return message;
    }

    @MessageMapping("/guide_user")
    @SendToUser("/queue/greeting2")//精准推送,客户端订阅/user/queue/greeting2
    public String onUser2(String message, Principal principal) {
        return message;
    }

}

最后是页面,引用了stompjs

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>demo</title>
    <script src="http://code.jquery.com/jquery-1.8.0.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
    <script src="https://cdn.bootcdn.net/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</head>
<body>
    <h1>WebSocketStomp</h1>
    <div>
        <label>用户</label>
        <input id="user_name">
        <button id="connect">连接</button>
        <button id="disconnect">断开</button>
    </div>
    <div>
        <label>发送消息</label>
        <input type="text" id="send_msg">
        <button id="send">发送</button>
    </div>
    <div>
        <label>接收到的消息</label>
        <textarea id="receive_msg" readonly="readonly"></textarea>
    </div>
    <div>
        <button id="guide_all">广播推送</button>
        <button id="guide_user">精准推送</button>
        <div id="guide_msg"></div>
    </div>
</body>
</html>

<script>
    $(function () {

        const socketUrl = 'http://localhost:8080/demo3';

        let stompClient;//Stomp客户端对象

        //判断当前浏览器是否支持WebSocket
        if (!('WebSocket' in window)) {
            alert('Not support websocket');
        }

        $('#connect').click(function () {

            const userName = $('#user_name').val();

            //创建SockJS对象
            const ws = new SockJS(socketUrl);
            stompClient = Stomp.over(ws);
            stompClient.connect({userName: userName}, function() {
                console.log('建立连接');

                // 订阅服务器/topic/chat
                stompClient.subscribe("/topic/chat", function (message) {
                    if (message.body) {
                        $('#receive_msg').append(message.body + '\n');
                    }
                });
                //服务端可以通过@SubscribeMapping("/topic/chat")来接收订阅
                stompClient.subscribe("/app/topic/chat", function (message) {
                    console.log("app:" + message.body);
                });
                //==========================
                //广播推送
                stompClient.subscribe("/queue/greeting1", function (message) {
                    $("#guide_msg").html(message.body);
                });
                //精准推送
                stompClient.subscribe("/user/queue/greeting2", function (message) {
                    $("#guide_msg").html(message.body);
                });
            }, function () {
                // 连接失败
                console.log('连接失败');
            });

        })

        //发送消息
        function sendMessage(message) {
            if (stompClient != null) {
                const userName = $("#user_name").val();
                stompClient.send("/app/message", {}, userName + ":" + message);
            } else {
                alert('STOMP connection not established, please connect.');
            }
        }

        //关闭连接
        function closeWebSocket() {
            stompClient.disconnect(function () {
                console.log('关闭连接');
            }, {});
        }

        //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
        window.onbeforeunload = function () {
            stompClient.disconnect();
        }

        function pushMessage(broadcast) {
            if (stompClient == null) {
                alert('STOMP connection not established, please connect.');
                return;
            }
            if (broadcast === true) {
                stompClient.send("/app/guide_all", {}, "广播推送");
            }else {
                stompClient.send("/app/guide_user", {}, "精准推送");
            }
        }


        //发送消息
        $('#send').click(function () {
            sendMessage($('#send_msg').val())
        })

        //广播推送
        $("#guide_all").click(function () {
            pushMessage(true);
        })

        //精确推送
        $("#guide_user").click(function () {
            pushMessage(false);
        })

        //点击断开连接
        $('#disconnect').click(function () {
            closeWebSocket();
        })
    })
</script>

  

springboot

关于作者

TimothyC
天不造人上之人,亦不造人下之人
获得点赞
文章被阅读