文章
问答
冒泡
HTML5 服务器发送事件(Server-Sent Events)

SSE概述

Server-Sent Events就是HTML5 服务器发送事件。

传统方式是网页必须向服务器发送接收新数据的请求,服务器才能向网页响应数据。使用Server-Sent Events,服务器可以随时通过将消息推送到网页的方式来向网页发送新数据。

SSE通信协议

SSE是基于HTTP协议实现的一种推送数据的技术。SSE是单向的,它只支持从服务端向客户端发送消息,不支持客户端向服务端发送消息,与WebSocket这种全双工通信方式有本质区别。

主要特点

  • 简单易用:SSE使用起来很简单,支持中断重连的机制,另外只支持文本格式内容的传输。

  • 单向通信:只支持服务端向客户端主动推送数据

适用场景

SSE适用那种需要持续或间接向客户端更新数据、并且不需要客户端响应数据的应用场景,比如数据大屏。像ChatGPT就是使用的SSE来与前端进行交互。

使用SSE

前端创建EventSource实例

这里我使用React项目来建立前端的项目,在其中也遇到了很多怪异的问题,在下面会特别说明。

import {FC, useState} from "react";

const MainView:FC = () => {
  const [eventSource, setEventSource] = useState<EventSource|null>();
  const [id, setId] = useState<any>();

  const checkSupportEventSource = () => {
    return typeof(EventSource) !== "undefined"
  }

  const initEventSource = () => {
    const id = randomNumber();
    const url = `/sse/test/${id}`;
    console.log("链接URL: ", url);

    const eventSource = new EventSource(url);

    //链接创建成功回调
    eventSource.onopen = (event) => {
      console.log("onopen: ", event);
    }

    //默认消息回调函数
    eventSource.onmessage = (event) => {
      console.log("onmessage: ", event);
    }

    //错误处理
    eventSource.onerror = (event) => {
      console.log("onerror: ", event);
    }

    //自定义事件
    eventSource.addEventListener("testEvent", (event) => {
      console.log("testEvent message: ", event);
    });

    setId(id);
    setEventSource(eventSource);
  }

  const randomNumber = () => {
    return Math.floor(Math.random() * 10000000);
  }

  const handleOpenSSE = () => {
    if (checkSupportEventSource()) {
      console.log("浏览器支持EventSource,初始化链接");
      initEventSource();
    } else {
      console.log("浏览器不支持EventSource");
    }
  }

  const handleCloseSSE = () => {
    if (eventSource) {
      console.log("eventSource close");
      //关闭事件流
      eventSource.close();

      setId(null);
      setEventSource(null);
    }
  }

  return <div style={{margin: "20px"}}>
    <button onClick={handleOpenSSE}>开启SSE</button>&nbsp;&nbsp;&nbsp;&nbsp;
    <button onClick={handleCloseSSE}>停止SSE</button>
  </div>
}
export default MainView;

这里我着重说明一下:

1)如果使用了webpack的devServer插件,需要将gzip的压缩功能关闭,否则客户端只能在服务端complete的时候才能收到所有消息

2)需要配置一下onProxyReq,否则后端无法感知到客户端的链接断开状态,其配置如下(我本地启动的,所以target配置的是localhost:8080):

devServer: {
	historyApiFallback: true,
	compress: false,
	proxy: {
		"/sse/*": {
			target: "http://localhost:8080/",
			changeOrigin: true,
			secure: false,
			onProxyReq: (proxyReq, req, res) => {
				res.on('close', () => proxyReq.destroy())
			}
		},
	},
}

创建服务端服务发送事件

服务端我使用Java语言,基于spring boot框架做实现,下面只简单贴一下控制层的代码,其他没有特殊的配置。

方式一:基于Servlet 3.0javax.servlet.AsyncContext实现

package com.justin.controller;

import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Date;
import java.util.UUID;

@RequiredArgsConstructor
@RestController
@RequestMapping("/sse")
public class EventStreamDemoController {

    @GetMapping(value = "/test/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public void testSSE(HttpServletRequest request, HttpServletResponse response,
                        @PathVariable("id")String id) {
        System.out.println("======start");

        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);
        response.setCharacterEncoding("UTF-8");
        response.setHeader("Cache-Control", "no-cache");

        AsyncContext asyncContext = request.startAsync();
        //永不超时
        asyncContext.setTimeout(0);

        //注册事件监听器
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                System.out.println("onComplete: " + event);
            }

            @Override
            public void onTimeout(AsyncEvent event) throws IOException {
                System.out.println("onTimeout: " + event);
            }

            @Override
            public void onError(AsyncEvent event) throws IOException {
                System.out.println("onError: " + event);
                asyncContext.complete();
            }

            @Override
            public void onStartAsync(AsyncEvent event) throws IOException {
                System.out.println("onStartAsync: " + event);
            }
        }, request, response);

        new Thread(() -> {
            try {
                PrintWriter writer = asyncContext.getResponse().getWriter();
                int count = 0;
                while (true) {
                    if (count >= 100) {
                        break;
                    }

                    System.out.println("send message: " + count);

                    String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");

                    //发送自定义事件
                    writer.write("id: " + UUID.randomUUID() + "\n");
                    writer.write("event: testEvent" + "\n");
                    writer.write("data: " + ("[" + time + "]m1_" + count) + "\n");
                    writer.write("\n\n");

                    //发送默认事件,前端定义的onmessage回调函数会消费
                    writer.write("id: " + UUID.randomUUID() + "\n");
                    writer.write("data: " + ("[" + time + "]m2_" + count) + "\n");
                    writer.write("\n\n");
                    writer.flush();

                    count++;

                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                asyncContext.complete();
            }
        }).start();

        System.out.println("======end");
    }
}

方式二:基于Spring BootSseEmitter实现

package com.justin.controller;

import lombok.RequiredArgsConstructor;
import org.apache.catalina.connector.ClientAbortException;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.Date;
import java.util.UUID;

@RequiredArgsConstructor
@RestController
@RequestMapping("/sse")
public class EventStreamDemoController {

    @GetMapping(value = "/test/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter testSSE2(@PathVariable("id")String id) {
        SseEmitter sseEmitter = new SseEmitter(0L);
        sseEmitter.onError(Throwable::printStackTrace);
        sseEmitter.onTimeout(() -> {
            System.out.println("timeout");
        });
        sseEmitter.onCompletion(() -> {
            System.out.println("onCompletion");
        });

        new Thread(() -> {
            try {
                int count = 0;
                while (true) {
                    if (count >= 10) {
                        break;
                    }

                    String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");

                    //自定义事件
                    sseEmitter.send(SseEmitter.event()
                            .id(UUID.randomUUID().toString())
                            .name("testEvent")
                            .reconnectTime(100L)
                            .data("[" + time + "]a_" + count));

                    //默认事件
                    sseEmitter.send(SseEmitter.event()
                            .id(UUID.randomUUID().toString())
                            .reconnectTime(100L)
                            .data("[" + time + "]b_" + count));

                    count++;

                    Thread.sleep(1000);
                }
            } catch (ClientAbortException e) {
                sseEmitter.complete();
                e.printStackTrace();
            } catch (Exception e) {
                sseEmitter.completeWithError(e);
                e.printStackTrace();
            } finally {
                sseEmitter.complete();
            }
        }).start();

        System.out.println("======end");
        return sseEmitter;
    }
}

这两种方式任选其一,这里我演示了一次发送两个事件,一个是自定义事件testEvent,一个是默认事件。

其他说明:在客户端主动断开链接的场景下,服务端可以感知到对应的断开状态,服务端定义的回调函数onError会被调用,如果当前正在发送消息,也会报异常,这里面的保障机制仍需要自己完善。

查看结果

启动前端和后端的服务,我们可以看到前端可以持续的收到消息:

上面打印的是收到的消息日志。

从这张图中,我们可以看到实际收到了哪些消息,里面有两种类型的类型,一个是我自定义的事件消息testEvent,另外一个就是默认发送的。

Event格式

event的格式就是一个很简单的文本内容,且必须是用UTF-8编码,消息之间必须使用一对换行符进项分隔,作为一行中冒号之前的字符本质上就是一个注释,会被自动忽略。

每条消息由一行或多行文本组成,在这其中列出了该消息的字段,每个字段由字段名、冒号和该字段的文本值组成,下面是所涉及的字段:

字段名 说明
id 事件标识
event 事件类型,如果指定了值,客户端需要使用addEventListener()监听这个事件名称,如果没有指定值,则会使用onmessage回调函数处理。
retry 重连时间,单位为毫秒,如果客户端感知到与服务端的链接断开了,浏览器会等待相应的时间之后才会尝试重连,必须为整数,单位为:毫秒。
data 消息的数据字段,当EventSource接收到以data:开头的多个连续行时,它会将它们连接起来,在每个行之间插入一个换行符。尾部换行符将被删除。

除了以上的字段之外,其他字段都将被忽略,下面展示一些例子。

单行数据体的消息

下面是两条消息。

data: some text 1


data: some text 2

多行数据体消息

下面是一条消息。

data: message
data: with two lines

自定义事件

下面是两条消息。

event: customEvent1
data: {"username": "1", "time": "11:22:00"}


event: customEvent2
data: some text

EventSource

EventSource是Web端与服务器发送的事件的接口。一个EventSource实例会建立一个与服务端的以text/event-stream协议发送事件数据的HTTP长连接请求,这个连接会一直存在,直到调用了方法EventSource.close()

实例属性

readyState(只读)

描述连接状态:CONNECTING (0), OPEN (1), or CLOSED (2)

url(只读)

连接的服务器地址

withCredentials(只读)

布尔值,默认falseurl如果为跨域地址,需要设置此值为true,以支持跨域通信(另外后端服务需要针对跨域做对应的配置,否则也是不起作用的)

事件

error

当无法链接服务端的时候触发

message

当服务端发送过来消息时触发

open

当与服务端的链接建立成功时触发

遇到的问题

1)消息在服务端complete的时候,客户端才一次性的收到所有的消息

我使用的是react,并且基于webpack的devServer构建的前端服务,所以需要将gzip功能关闭:

devServer: {
	......
	compress: false,
	......
}

2)服务端无法感知客户端连接的断开

我使用的是react,并且基于webpack的devServer构建的前端服务,所以需要配置onProxyReq回调,如果无此配置,客户端断开的时候无法通知到对应的服务端:

devServer: {
	......
	proxy: {
		"/sse/*": {
			......
			onProxyReq: (proxyReq, req, res) => {
				res.on('close', () => proxyReq.destroy())
			}
			......
		},
	},
}

JavaSript
SSE

关于作者

justin
123456
获得点赞
文章被阅读