SSE概述
Server-Sent Events
就是HTML5 服务器发送事件。
传统方式是网页必须向服务器发送接收新数据的请求,服务器才能向网页响应数据。使用Server-Sent Events
,服务器可以随时通过将消息推送到网页的方式来向网页发送新数据。
SSE通信协议
SSE是基于HTTP协议实现的一种推送数据的技术。SSE是单向的,它只支持从服务端向客户端发送消息,不支持客户端向服务端发送消息,与WebSocket这种全双工通信方式有本质区别。
主要特点
-
简单易用:SSE使用起来很简单,支持中断重连的机制,另外只支持文本格式内容的传输。
-
单向通信:只支持服务端向客户端主动推送数据
适用场景
SSE适用那种需要持续或间接向客户端更新数据、并且不需要客户端响应数据的应用场景,比如数据大屏。像ChatGPT就是使用的SSE来与前端进行交互。
使用SSE
前端创建EventSource
实例
这里我使用React项目来建立前端的项目,在其中也遇到了很多怪异的问题,在下面会特别说明。
import {FC, useState} from "react";
const MainView:FC = () => {
const [eventSource, setEventSource] = useState<EventSource|null>();
const [id, setId] = useState<any>();
const checkSupportEventSource = () => {
return typeof(EventSource) !== "undefined"
}
const initEventSource = () => {
const id = randomNumber();
const url = `/sse/test/${id}`;
console.log("链接URL: ", url);
const eventSource = new EventSource(url);
//链接创建成功回调
eventSource.onopen = (event) => {
console.log("onopen: ", event);
}
//默认消息回调函数
eventSource.onmessage = (event) => {
console.log("onmessage: ", event);
}
//错误处理
eventSource.onerror = (event) => {
console.log("onerror: ", event);
}
//自定义事件
eventSource.addEventListener("testEvent", (event) => {
console.log("testEvent message: ", event);
});
setId(id);
setEventSource(eventSource);
}
const randomNumber = () => {
return Math.floor(Math.random() * 10000000);
}
const handleOpenSSE = () => {
if (checkSupportEventSource()) {
console.log("浏览器支持EventSource,初始化链接");
initEventSource();
} else {
console.log("浏览器不支持EventSource");
}
}
const handleCloseSSE = () => {
if (eventSource) {
console.log("eventSource close");
//关闭事件流
eventSource.close();
setId(null);
setEventSource(null);
}
}
return <div style={{margin: "20px"}}>
<button onClick={handleOpenSSE}>开启SSE</button>
<button onClick={handleCloseSSE}>停止SSE</button>
</div>
}
export default MainView;
这里我着重说明一下:
1)如果使用了webpack的devServer插件,需要将gzip
的压缩功能关闭,否则客户端只能在服务端complete的时候才能收到所有消息
2)需要配置一下onProxyReq
,否则后端无法感知到客户端的链接断开状态,其配置如下(我本地启动的,所以target配置的是localhost:8080
):
devServer: {
historyApiFallback: true,
compress: false,
proxy: {
"/sse/*": {
target: "http://localhost:8080/",
changeOrigin: true,
secure: false,
onProxyReq: (proxyReq, req, res) => {
res.on('close', () => proxyReq.destroy())
}
},
},
}
创建服务端服务发送事件
服务端我使用Java语言,基于spring boot框架做实现,下面只简单贴一下控制层的代码,其他没有特殊的配置。
方式一:基于Servlet 3.0
的javax.servlet.AsyncContext
实现
package com.justin.controller;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Date;
import java.util.UUID;
@RequiredArgsConstructor
@RestController
@RequestMapping("/sse")
public class EventStreamDemoController {
@GetMapping(value = "/test/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public void testSSE(HttpServletRequest request, HttpServletResponse response,
@PathVariable("id")String id) {
System.out.println("======start");
response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);
response.setCharacterEncoding("UTF-8");
response.setHeader("Cache-Control", "no-cache");
AsyncContext asyncContext = request.startAsync();
//永不超时
asyncContext.setTimeout(0);
//注册事件监听器
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
System.out.println("onComplete: " + event);
}
@Override
public void onTimeout(AsyncEvent event) throws IOException {
System.out.println("onTimeout: " + event);
}
@Override
public void onError(AsyncEvent event) throws IOException {
System.out.println("onError: " + event);
asyncContext.complete();
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
System.out.println("onStartAsync: " + event);
}
}, request, response);
new Thread(() -> {
try {
PrintWriter writer = asyncContext.getResponse().getWriter();
int count = 0;
while (true) {
if (count >= 100) {
break;
}
System.out.println("send message: " + count);
String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");
//发送自定义事件
writer.write("id: " + UUID.randomUUID() + "\n");
writer.write("event: testEvent" + "\n");
writer.write("data: " + ("[" + time + "]m1_" + count) + "\n");
writer.write("\n\n");
//发送默认事件,前端定义的onmessage回调函数会消费
writer.write("id: " + UUID.randomUUID() + "\n");
writer.write("data: " + ("[" + time + "]m2_" + count) + "\n");
writer.write("\n\n");
writer.flush();
count++;
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
asyncContext.complete();
}
}).start();
System.out.println("======end");
}
}
方式二:基于Spring Boot
的SseEmitter
实现
package com.justin.controller;
import lombok.RequiredArgsConstructor;
import org.apache.catalina.connector.ClientAbortException;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Date;
import java.util.UUID;
@RequiredArgsConstructor
@RestController
@RequestMapping("/sse")
public class EventStreamDemoController {
@GetMapping(value = "/test/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter testSSE2(@PathVariable("id")String id) {
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitter.onError(Throwable::printStackTrace);
sseEmitter.onTimeout(() -> {
System.out.println("timeout");
});
sseEmitter.onCompletion(() -> {
System.out.println("onCompletion");
});
new Thread(() -> {
try {
int count = 0;
while (true) {
if (count >= 10) {
break;
}
String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");
//自定义事件
sseEmitter.send(SseEmitter.event()
.id(UUID.randomUUID().toString())
.name("testEvent")
.reconnectTime(100L)
.data("[" + time + "]a_" + count));
//默认事件
sseEmitter.send(SseEmitter.event()
.id(UUID.randomUUID().toString())
.reconnectTime(100L)
.data("[" + time + "]b_" + count));
count++;
Thread.sleep(1000);
}
} catch (ClientAbortException e) {
sseEmitter.complete();
e.printStackTrace();
} catch (Exception e) {
sseEmitter.completeWithError(e);
e.printStackTrace();
} finally {
sseEmitter.complete();
}
}).start();
System.out.println("======end");
return sseEmitter;
}
}
这两种方式任选其一,这里我演示了一次发送两个事件,一个是自定义事件testEvent
,一个是默认事件。
其他说明:在客户端主动断开链接的场景下,服务端可以感知到对应的断开状态,服务端定义的回调函数onError
会被调用,如果当前正在发送消息,也会报异常,这里面的保障机制仍需要自己完善。
查看结果
启动前端和后端的服务,我们可以看到前端可以持续的收到消息:
上面打印的是收到的消息日志。
从这张图中,我们可以看到实际收到了哪些消息,里面有两种类型的类型,一个是我自定义的事件消息testEvent
,另外一个就是默认发送的。
Event格式
event的格式就是一个很简单的文本内容,且必须是用UTF-8
编码,消息之间必须使用一对换行符进项分隔,作为一行中冒号之前的字符本质上就是一个注释,会被自动忽略。
每条消息由一行或多行文本组成,在这其中列出了该消息的字段,每个字段由字段名、冒号和该字段的文本值组成,下面是所涉及的字段:
字段名 | 说明 |
---|---|
id | 事件标识 |
event | 事件类型,如果指定了值,客户端需要使用addEventListener() 监听这个事件名称,如果没有指定值,则会使用onmessage 回调函数处理。 |
retry | 重连时间,单位为毫秒,如果客户端感知到与服务端的链接断开了,浏览器会等待相应的时间之后才会尝试重连,必须为整数,单位为:毫秒。 |
data | 消息的数据字段,当EventSource接收到以data:开头的多个连续行时,它会将它们连接起来,在每个行之间插入一个换行符。尾部换行符将被删除。 |
除了以上的字段之外,其他字段都将被忽略,下面展示一些例子。
单行数据体的消息
下面是两条消息。
data: some text 1
data: some text 2
多行数据体消息
下面是一条消息。
data: message
data: with two lines
自定义事件
下面是两条消息。
event: customEvent1
data: {"username": "1", "time": "11:22:00"}
event: customEvent2
data: some text
EventSource
EventSource是Web端与服务器发送的事件的接口。一个EventSource实例会建立一个与服务端的以text/event-stream
协议发送事件数据的HTTP长连接请求,这个连接会一直存在,直到调用了方法EventSource.close()
。
实例属性
readyState(只读)
描述连接状态:CONNECTING
(0
), OPEN
(1
), or CLOSED
(2
)
url(只读)
连接的服务器地址
withCredentials(只读)
布尔值,默认false
,url
如果为跨域地址,需要设置此值为true
,以支持跨域通信(另外后端服务需要针对跨域做对应的配置,否则也是不起作用的)
事件
error
当无法链接服务端的时候触发
message
当服务端发送过来消息时触发
open
当与服务端的链接建立成功时触发
遇到的问题
1)消息在服务端complete的时候,客户端才一次性的收到所有的消息
我使用的是react,并且基于webpack的devServer构建的前端服务,所以需要将gzip功能关闭:
devServer: {
......
compress: false,
......
}
2)服务端无法感知客户端连接的断开
我使用的是react,并且基于webpack的devServer构建的前端服务,所以需要配置onProxyReq
回调,如果无此配置,客户端断开的时候无法通知到对应的服务端:
devServer: {
......
proxy: {
"/sse/*": {
......
onProxyReq: (proxyReq, req, res) => {
res.on('close', () => proxyReq.destroy())
}
......
},
},
}