前言
WebSocket是一种单TCP连接上的全双工通讯协议,客户端和服务端只需要完成一次握手,就可以建立持久性的连接,并进行双向数据传输。在项目中往往用于消息推送,页面实时刷新等功能。
本文主要介绍SpringBoot整合WebSocket常用的三种方式:
- 原生注解
- Spring封装API
- 基于STOMP消息
下面是spring对websocket的详细介绍
在Spring Boot项目中,你可以添加WebSocket Starter来引入WebSocket的支持
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
原生注解
配置类,往spring容器中注入 ServerEndpointExporter
/**
* 自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
* 要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。
*/
@Configuration
public class WebSocketAnnotationConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
通过注解来具体实现
@Slf4j
@Component
@ServerEndpoint("/demo/{userName}") // WebSocket客户端建立连接的地址
public class WebSocketAnnotationController {
/**
* 用线程安全的map存放存活的session
*/
private static Map<String, Session> livingSessionMap = new ConcurrentHashMap<>();
/**
* 建立连接的回调方法
*
* @param session 与客户端的WebSocket连接会话
* @param userName 用户名,WebSocket支持路径参数
*/
@OnOpen
public void onOpen(Session session, @PathParam("userName") String userName) {
livingSessionMap.put(session.getId(), session);
sendMessageToAll(userName + " 连接成功!");
}
/**
* 收到客户端消息的回调方法
*
* @param message 客户端传过来的消息
* @param session 对应的session
*/
@OnMessage
public void onMessage(String message, Session session, @PathParam("userName") String userName) {
sendMessageToAll(userName + " : " + message);
}
/**
* 发生错误的回调方法
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(error.getMessage());
}
/**
* 关闭连接的回调方法
*/
@OnClose
public void onClose(Session session, @PathParam("userName") String userName) {
livingSessionMap.remove(session.getId());
sendMessageToAll(userName + " 关闭连接!");
}
/**
* 单独发送消息
*
* @param session
* @param message
*/
public void sendMessage(Session session, String message) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 群发消息
*
* @param message
*/
public void sendMessageToAll(String message) {
livingSessionMap.forEach((sessionId, session) -> {
sendMessage(session, message);
});
}
}
这个类里用到的注解都是jdk自带的不是spring提供的,@ServerEndpoint类似于@RequestMapping,@OnOpen、@OnClose、@OnError都要有一个Session的参数,
而且@OnMessage第一个String类型的参数是客户端传入的值。
Html页面
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>demo</title>
<script src="http://code.jquery.com/jquery-1.8.0.min.js"></script>
</head>
<body>
<h1>WebSocketAnnotation</h1>
<div>
<label>用户</label>
<input id="user_name">
<button id="connect">连接</button>
<button id="disconnect">断开</button>
</div>
<div>
<label>发送消息</label>
<input type="text" id="send_msg">
<button id="send">发送</button>
</div>
<div>
<label>接收到的消息</label>
<textarea id="receive_msg" readonly="readonly"></textarea>
</div>
</body>
</html>
<script>
$(function () {
const prefixUrl = 'ws://localhost:8080/demo/';
let ws;//WebSocket连接对象
//判断当前浏览器是否支持WebSocket
if (!('WebSocket' in window)) {
alert('Not support websocket');
}
$('#connect').click(function () {
const userName = $('#user_name').val();
//创建WebSocket连接对象
ws = new WebSocket(prefixUrl + userName);
//连接成功建立的回调方法
ws.onopen = function (event) {
console.log('建立连接')
}
//接收到消息的回调方法
ws.onmessage = function (event) {
console.log('接收到的消息:' + event.data)
$('#receive_msg').append(event.data + '\n')
}
//连接发生错误的回调方法
ws.onerror = function (event) {
console.log('发生错误')
}
//连接关闭的回调方法
ws.onclose = function (event) {
console.log('关闭连接')
}
})
//发送消息
function sendMessage(message) {
ws.send(message);
}
//关闭连接
function closeWebSocket() {
ws.close();
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function () {
wx.close();
}
//发送消息
$('#send').click(function () {
sendMessage($('#send_msg').val())
})
//点击断开连接
$('#disconnect').click(function () {
closeWebSocket();
})
})
</script>
Spring封装API
首先是拦截器,继承了HandshakeInterceptor,重写了握手前和握手后的两个回调方法,这边在握手前将url中的参数放到attributes中以便于后面获取
@Component
public class WebSocketIntercept implements HandshakeInterceptor {
/**
* 握手前
* @param request
* @param response
* @param wsHandler
* @param attributes
* @return
* @throws Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
// TODO:这边因为参数只有userName=,所以简单处理
String query = request.getURI().getQuery();
String[] split = query.split("=");
attributes.put(split[0],split[1]);
return true;
}
/**
* 握手后
* @param request
* @param response
* @param wsHandler
* @param exception
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}
WebSocket的处理类,这边因为传输文本消息继承了TextWebSocketHandler,如果业务复杂可以继承它的父类AbstractWebSocketHandler,重写的方法和上面的注解实现的效果是一致的
注意:这边WebSocketSession的getAttributes方法里面的参数是上面拦截器存进去的
@Slf4j
@Component
public class WebSocketHandlerImp extends TextWebSocketHandler {
/**
* 用线程安全的map存放存活的session
*/
private static Map<String, WebSocketSession> livingSessionMap = new ConcurrentHashMap<>();
/**
* 建立连接的回调方法
* @param session
* @throws Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
livingSessionMap.put(session.getId(), session);
String userName = String.valueOf(session.getAttributes().get("userName"));
sendMessageToAll(userName + " 连接成功!");
}
/**
* 收到客户端消息的回调方法
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String userName = String.valueOf(session.getAttributes().get("userName"));
sendMessageToAll(userName + " : " + message.getPayload());
}
/**
* 发生错误的回调方法
* @param session
* @param exception
* @throws Exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.error(exception.getMessage());
}
/**
* 关闭连接的回调方法
* @param session
* @param status
* @throws Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
livingSessionMap.remove(session.getId());
String userName = String.valueOf(session.getAttributes().get("userName"));
sendMessageToAll(userName + " 关闭连接!");
}
/**
* 群发消息
*
* @param message
*/
public void sendMessageToAll(String message) {
livingSessionMap.forEach((sessionId, session) -> {
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
最后是配置类,要实现WebSocketConfigurer,类上要加注解@EnableWebSocket,将上面的拦截器和处理类添加进去,然后withsockJS表示支持SockJS
@Configuration
@EnableWebSocket
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketConfigurer {
private final WebSocketHandlerImp webSocketHandler;
private final WebSocketIntercept webSocketIntercept;
/**
* 注册WebSocketHandlers处理类和访问的地址
* @param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketHandler, "/demo2")
.addInterceptors(webSocketIntercept) //添加拦截器,获取url上的参数
.withSockJS();// 支持SockJS
}
}
Html页面
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>demo</title>
<script src="http://code.jquery.com/jquery-1.8.0.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
</head>
<body>
<h1>WebSocketHandler</h1>
<div>
<label>用户</label>
<input id="user_name">
<button id="connect">连接</button>
<button id="disconnect">断开</button>
</div>
<div>
<label>发送消息</label>
<input type="text" id="send_msg">
<button id="send">发送</button>
</div>
<div>
<label>接收到的消息</label>
<textarea id="receive_msg" readonly="readonly"></textarea>
</div>
</body>
</html>
<script>
$(function () {
// const prefixUrl = 'ws://localhost:8080/demo2';
const socketUrl = 'http://localhost:8080/demo2';
let ws;//WebSocket连接对象
//判断当前浏览器是否支持WebSocket
if (!('WebSocket' in window)) {
alert('Not support websocket');
}
$('#connect').click(function () {
const userName = $('#user_name').val();
//创建WebSocket连接对象
// ws = new WebSocket(prefixUrl);
//创建SockJS对象
ws = new SockJS(socketUrl+'?userName='+userName);
//连接成功建立的回调方法
ws.onopen = function (event) {
console.log('建立连接')
}
//接收到消息的回调方法
ws.onmessage = function (event) {
console.log('接收到的消息:' + event.data)
$('#receive_msg').append(event.data + '\n')
}
//连接发生错误的回调方法
ws.onerror = function (event) {
console.log('发生错误')
}
//连接关闭的回调方法
ws.onclose = function (event) {
console.log('关闭连接')
}
})
//发送消息
function sendMessage(message) {
ws.send(message);
}
//关闭连接
function closeWebSocket() {
ws.close();
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function () {
wx.close();
}
//发送消息
$('#send').click(function () {
sendMessage($('#send_msg').val())
})
//点击断开连接
$('#disconnect').click(function () {
closeWebSocket();
})
})
</script>
SockJS的api和WebSocket的完全一致,只有url的要求不同,WebSocket的开头是ws://或者wss://,而SockJS的开头是http://或https://
sockjs-client官方文档:https://github.com/sockjs/sockjs-client
基于STOMP消息
STOMP是一个用于C/S之间进行异步消息传输的简单文本协议;STOMP服务端被设计为客户端可以向其发送消息的一组目标地址,比如可以用定义/topic作为发布订阅模式,消息会被所有客户端接收到,还可以用/user作为点对点只有特定的客户端可以接收;STOMP客户端可以作为生产者send发送消息到指定地址,也可以作为消费者SUBSCRIBE订阅消息。
上图是spring中WebSocket的架构图
主要还是通过代码来分析
首先是配置类,要实现WebSocketMessageBrokerConfigurer,并且类上要加注解@EnableWebSocketMessageBroker
@Configuration
// 此注解表示使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
private final WebSocketStompIntercept webSocketStompIntercept;
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(2048 * 2048);
registry.setSendBufferSizeLimit(2048 * 2048);
registry.setSendTimeLimit(2048 * 2048);
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册Stomp的端点
registry.addEndpoint("/demo3") // 添加STOMP协议的端点。这个HTTP URL是供WebSocket或SockJS客户端访问的地址
.setAllowedOriginPatterns("*") // 指定允许跨域访问
.withSockJS(); // 指定端点使用SockJS协议
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 用户可以订阅来自以/topic、/queue为前缀的消息
registry.enableSimpleBroker( "/topic", "/queue");
// 全局使用的订阅前缀(客户端订阅路径上会体现出来)
registry.setApplicationDestinationPrefixes("/app");
// 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user
registry.setUserDestinationPrefix("/user");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
// 添加拦截器
registration.interceptors(webSocketStompIntercept);
}
}
然后是自定义的拦截器,实现ChannelInterceptor,然后我这边只是在服务器发送完消息后判断是否连接
@Slf4j
@Configuration
@RequiredArgsConstructor
public class WebSocketStompIntercept implements ChannelInterceptor {
private static Map<String, Object> livingSessionMap = new ConcurrentHashMap<>();
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getCommand();
String sessionId = accessor.getSessionId();
Object headers = accessor.getMessageHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
Object userName = null;
if (headers instanceof Map) {
userName = ((Map) headers).get("userName");
}
// 用户建立连接
if (StompCommand.CONNECT.equals(command)) {
livingSessionMap.put(sessionId, userName);
log.info(userName + " 建立连接!");
}
// 用户断开连接
if(StompCommand.DISCONNECT.equals(command)) {
userName = livingSessionMap.get(sessionId);
if (userName != null) {
livingSessionMap.remove(sessionId);
log.info(userName + " 断开连接!");
}
}
}
}
接着是Controller,首先@MessageMapping注解可以接受客户端send方法发过来的消息,@SubscribeMapping注解可以接受客户端subscribe方法的订阅,注解方法的返回值可以直接被客户端接收到。
然后发送消息可以通过SimpMessagingTemplate,也可以通过注解@SendTo来发送,如果是点对点的模式可以通过@SendToUser来实现。
@Slf4j
@Controller
@RequiredArgsConstructor
public class WebSocketStompController {
/**
* 可以在应用的任意地方发送消息
*/
private final SimpMessagingTemplate simpMessageTemplate;
@SubscribeMapping("/topic/chat") //处理subscribe发送的消息,subscribe方法的连接是/topic/chat
public String onSubscribe() {
return "init ok!";
}
@MessageMapping("/message") //处理send发送的消息,send方法的连接是/app/message
public void onSend(String message) {
simpMessageTemplate.convertAndSend("/topic/chat", message);
}
@MessageMapping("/guide_all")
@SendTo("/queue/greeting1")//广播推送,客户端订阅/queue/greeting1
public String onUser1(String message) {
return message;
}
@MessageMapping("/guide_user")
@SendToUser("/queue/greeting2")//精准推送,客户端订阅/user/queue/greeting2
public String onUser2(String message, Principal principal) {
return message;
}
}
最后是页面,引用了stompjs
stompjs官方文档:https://github.com/stomp-js/stomp-websocket
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>demo</title>
<script src="http://code.jquery.com/jquery-1.8.0.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
<script src="https://cdn.bootcdn.net/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</head>
<body>
<h1>WebSocketStomp</h1>
<div>
<label>用户</label>
<input id="user_name">
<button id="connect">连接</button>
<button id="disconnect">断开</button>
</div>
<div>
<label>发送消息</label>
<input type="text" id="send_msg">
<button id="send">发送</button>
</div>
<div>
<label>接收到的消息</label>
<textarea id="receive_msg" readonly="readonly"></textarea>
</div>
<div>
<button id="guide_all">广播推送</button>
<button id="guide_user">精准推送</button>
<div id="guide_msg"></div>
</div>
</body>
</html>
<script>
$(function () {
const socketUrl = 'http://localhost:8080/demo3';
let stompClient;//Stomp客户端对象
//判断当前浏览器是否支持WebSocket
if (!('WebSocket' in window)) {
alert('Not support websocket');
}
$('#connect').click(function () {
const userName = $('#user_name').val();
//创建SockJS对象
const ws = new SockJS(socketUrl);
stompClient = Stomp.over(ws);
stompClient.connect({userName: userName}, function() {
console.log('建立连接');
// 订阅服务器/topic/chat
stompClient.subscribe("/topic/chat", function (message) {
if (message.body) {
$('#receive_msg').append(message.body + '\n');
}
});
//服务端可以通过@SubscribeMapping("/topic/chat")来接收订阅
stompClient.subscribe("/app/topic/chat", function (message) {
console.log("app:" + message.body);
});
//==========================
//广播推送
stompClient.subscribe("/queue/greeting1", function (message) {
$("#guide_msg").html(message.body);
});
//精准推送
stompClient.subscribe("/user/queue/greeting2", function (message) {
$("#guide_msg").html(message.body);
});
}, function () {
// 连接失败
console.log('连接失败');
});
})
//发送消息
function sendMessage(message) {
if (stompClient != null) {
const userName = $("#user_name").val();
stompClient.send("/app/message", {}, userName + ":" + message);
} else {
alert('STOMP connection not established, please connect.');
}
}
//关闭连接
function closeWebSocket() {
stompClient.disconnect(function () {
console.log('关闭连接');
}, {});
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function () {
stompClient.disconnect();
}
function pushMessage(broadcast) {
if (stompClient == null) {
alert('STOMP connection not established, please connect.');
return;
}
if (broadcast === true) {
stompClient.send("/app/guide_all", {}, "广播推送");
}else {
stompClient.send("/app/guide_user", {}, "精准推送");
}
}
//发送消息
$('#send').click(function () {
sendMessage($('#send_msg').val())
})
//广播推送
$("#guide_all").click(function () {
pushMessage(true);
})
//精确推送
$("#guide_user").click(function () {
pushMessage(false);
})
//点击断开连接
$('#disconnect').click(function () {
closeWebSocket();
})
})
</script>