🐿 声明式 SSE 接口
# SSE
SSE(Server-Sent Events),也被称作“事件流”(Event Stream),是一种旨在通过HTTP协议来实现服务器主动向客户端推送数据的技术手段。 它可以在客户端与服务器之间建立一条持久化的HTTP长连接,并通过这条连接实现服务器向客户端的实时数据推送,但客户端不能通过 SSE 向服务端发送数据。
自v1.6.0版本起,Forest 提供了对 SSE 的支持,并且支持声名式和编程式两种 SSE 请求方式。
# 声明式 SSE 接口
# 接口定义
SSE 的声名式接口定义方式和普通 HTTP 接口定义方式类型,只需要将ForestSSE类作为接口方法的返回值类型即可,调用该方法后即可返回ForestSSE类的对象实例。
ForestSSE是 Forest 的 SSE 控制器类,通过此类对象可以进行监听、添加消息回调函数等操作。
public interface SSEClient {
    
    // ForestSSE 为 Forest 的 SSE 控制器类
    // 只要以此类作为方法的返回值类型,该方法就自动成为 SSE 请求方法
    @Get(url = "/sse", contentType = "text/event-stream")
    ForestSSE testSSE();
}
 2
3
4
5
6
7
# 监听事件
调用接口方法后会返回ForestSSE类对象,此时变化把普通的 HTTP 请求转换为 SSE 控制器,但不会发送任何实际的网络请求。
sseClient.testSSE(); // 把普通的 HTTP 请求转换为 SSE 控制器,但不会发送任何实际的网络请求
 得到 SSE 控制器之后,便可调用listen()方法开始监听 SSE 的事件流,并阻塞当前调用的线程。
sseClient.testSSE().listen(); // 开始监听 SSE 事件流,并阻塞当前线程
 # 异步监听
ForestSSE 控制器中的listen()方法是一种同步的监听方法,另外 Forest 也提供了异步的监听方法:asyncListen()
sseClient.testSSE().asyncListen(); // 开始异步监听 SSE 事件流,并且不会阻塞当前线程
 ForestSSE 类还提供了await()方法,用于阻塞等待当前线程,直到异步监听结束为止。
ForestSSE sse = sseClient.testSSE().asyncListen(); // 开始异步监听 SSE 事件流
sse.await(); // 阻塞并等待异步监听结束
 2
# 行模式
通常情况下,SSE 的消息流有一套标准的格式,一组消息分多行,每一行都是name:value形式的键值对字符串,每组消息之间用空白行隔开,具体形式如下:
id:1
event:user
data:{"name":"张三"}
id:2
event:user
data:{"name":"李四"}
id:3
event:user
data:{"name":"王五"}
 2
3
4
5
6
7
8
9
10
11
这种格式的消息,可以通过制定行模式为多行模式MULTI_LINES来读取解析
sseClient.testSSE()
        .setOnMessage(event -> { // 处理多行模式的消息
            event.id(); // 消息名为 id 的值,这里应得到 1, 2, ...
            event.event(); // user
            event.data(); // {"name": "张三"}, ...
        })
        .listen(SSELinesMode.MULTI_LINES);
 2
3
4
5
6
7
SSE 的消息除了有标准格式,还有很多非标准的格式,比如每行都是一条JSON字符串,每一行都是单独的消息,它们之间没有空白行分割
{"name":"a"}
{"name":"b"}
{"name":"c"}
 2
3
对于这种类型的消息,就要使用单行模式(SINGLE_LINE)
sseClient.testSSE()
        .setOnMessage(event -> { // 处理单行模式的消息
            String str = event.value(); // 获取字符串类型的消息值
            MyUser user = event.value(MyUser.class); // 获取消息值并转换为自定义类型
        })
        .listen(SSELinesMode.SINGLE_LINE);
 2
3
4
5
6
listen方法不传参数的情况下,默认为AUTO,AUTO模式会自动识别需要采用的行模式
// AUTO 模式会自动识别需要采用的行模式
sseClient.testSSE().listen(); // 默认行模式为 AUTO
 2
# 事件处理
在上面的例子中,我们可以通过listen()和asyncListen()方法进行事件监听,但若在此时接受到实际事件消息,并不会进行任何处理。
如要针对不同事件内容,进行响应的处理,就需要选择一种事件处理方式来处理接受到的事件。
在 Forest 中事件处理的方式也有三种:SSE 回调函数、自定义 SSE 控制器、以及 SSE 拦截器。
# SSE 回调函数
SSE 回调函数来处理事件的方式最为简便和直接,在调用接口返回 ForestSSE 对象后,即可调用setOnMessage方法,来设置处 SSE 的事件消息
sseClient.testSSE() // 调用接口方法后返回 ForestSSE 对象
        .setOnOpen(event -> {
            // SSE 开始监听时调用
            // event 为 EventSource 事件源对象
            event.request(); // 获取 Forest 请求对象
            event.response(); // 获取 Forest 响应对象
        })
        .setOnClose(event -> {
            // SSE 结束监听时调用
        })
        .setOnMessage(event -> {
            event.id(); // 消息名为 id 的值
            event.event(); // 消息名为 event 的值
            event.data(); // 消息名为 data 的值
            event.value("text"); // 获取非标准名称的消息值,如: text
        })
        .listen(); // 开始监听 SSE 事件流,默认AUTO模式
 2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
除了基本的消息监听处理功能之外,也可以调用setOnOpen、setOnClose、addOnData、addOnEvent
等方法来设置处理 SSE 不同生命周期、不同类型事件的回调函数
sseClient.testSSE() // 调用接口方法后返回 ForestSSE 对象
        .setOnOpen(event -> {
            // SSE 开始监听时调用
            // event 为 EventSource 事件源对象
            event.request(); // 获取 Forest 请求对象
            event.response(); // 获取 Forest 响应对象
        })
        .setOnClose(event -> {
            // SSE 结束监听时调用
        })
        .setOnMessage(event -> { // 处理每一组 SSE 消息
            event.id(); // 消息名为 id 的值
            event.event(); // 消息名为 event 的值
            event.data(); // 消息名为 data 的值
            event.value("text"); // 获取非标准名称的消息值,如: text
        })
        .addOnData((event, name, value) -> {
            // 处理事件消息名称为 data 的事件
            // name 为事件消息名称 (同 eventSource.name())
            // value 为事件消息的值
            event.sse(); // 获取 SSE 控制器
            event.request(); // 获取 Forest 请求对象
            event.response(); // 获取 Forest 响应对象
            event.rawData(); // 获取事件消息的原始数据
            event.name(); // 获取事件消息的名称,如 data、event、id 等
            event.value(); // 获取事件消息的值,返回字符串类型数据
            // 获取事件消息的值,并将消息转换为指定的自定义类型
            event.value(MyUser.class);
            // 获取事件消息的值,并将消息转换为指定的自定义泛型类型
            event.value(new TypeReference<List<MyUser>>() {});
        })
        .addOnEvent((event, name, value) -> {
            // 处理事件消息名称为 event 的事件
            // 参数内容与上面的 addOnData 回调函数参数相同
        })
        .addOnId((event, name, value) -> {
            // 处理事件消息名称为 id 的事件
            // 参数内容与上面的 addOnData 或 addOnEvent 回调函数参数相同
        })
        .addOnRetry((event, name, value) -> {
            // 处理事件消息名称为 retry 的事件
            // 参数内容与上面的几个回调函数参数相同
        })
        .listen(); // 开始监听 SSE 事件流
 2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
addOnData、addOnEvent以及addOnId等函数都是监听内置固定消息名称的方法,也可以调用addConsumer方法来处理非固定的,通过字符串参数指定的消息名称的事件
sseClient.testSSE() // 调用接口方法后返回 ForestSSE 对象
        .addConsumer("data", (event, name, value) -> {
            // 通过字符串参数指定要监听的消息名称
            // 第一个参数传入 "data",就会监听消息名称为 data 的事件
            // 参数内容与上面的 addOnData、addOnId 等回调函数参数一致
        })
        .addConsumer("event", (event, name, value) -> {
            // 通过字符串参数,来指定要监听消息名为 event 的事件
        })
        .addConsumer("push", (event, name, value) -> {
            // 当然也可以指定非标准的消息名称
        })
        .listen(); // 开始监听 SSE 事件流
 2
3
4
5
6
7
8
9
10
11
12
13
# 自定义 SSE 控制器
如果不想在调用 SSE 接口的地方写一大堆 SSE 事件的处理代码,想把事件监听和接受到事件后的事件处理逻辑代码进行解耦,独立到另一个地方的话,可以使用 自定义 SSE 控制器
// 自定义 SSE 控制器类,需要继承 ForestSSE
public class MySSEHandler extends ForestSSE {
    
    @Override
    protected void onOpen(EventSource event) {
        // SSE 开始监听时调用
        System.out.println("onOpen");
    }
    @Override
    protected void onClose(EventSource event) {
        // SSE 结束监听时调用
        System.out.println("onClose");
    }
    // 1.6.4+ 版本可以重写 onMessage 方法来接收 SSE 消息
    @Override
    public void onMessage(EventSource event) {
        event.id(); // 获取消息名称为 id 的消息值
        event.event(); // 获取消息名称为 event 的消息值
        event.data(); // 获取消息名称为 data 的消息值
        event.retry(); // 获取消息名称为 retry 的消息值
        event.value("text"); // 获取非标准消息名称为 text 的消息值
    }
}
 2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
然后将接口方法中的返回类型改成自定义的 SSE 控制器类即可
public interface SSEClient {
    
    // 返回类型改成自定义的 SSE 控制器类
    @Get(url = "/sse", contentType = "text/event-stream")
    MySSEHandler testSSE();
}
 2
3
4
5
6
完成这一步后,在调用时就无需再写回调函数了
// 开始监听后,收到消息时会自动执行 MySSEHandler 中的事件处理方法
sseClient.testSSE().listen();
 2
关于具体如何实现自定义 SSE 控制器,请参见《自定义 SSE 控制器》
# SSE 拦截器
除了回调函数和自定义 SSE 控制器两种方式外,还可以使用 SSE 拦截器来实现 SSE 事件的处理
相比于 SSE 控制器,SSE 拦截器为单例对象,不会在每次发起 SSE 请求时创建一个实例,并且可以注入到 Spring 上下文中,可调用 Spring 上下文中的对象和资源
// 自定义 SSE 拦截器
// 一定要加上 @Component 注解,不加上就不能注入到 Spring 上下文
@Component
public class MySSEInterceptor implements SSEInterceptor {
    
    // 可以注入 Spring 上下文中的对象
    @Resource
    private MyUserService myUserService;
    @Override
    public void onSSEOpen(EventSource event) {
        // SSE 开始监听时调用
        System.out.println("onOpen");
    }
    @Override
    public void onSSEClose(EventSource event) {
        // SSE 结束监听时调用
        System.out.println("onClose");
    }
    // 1.6.4+ 版本可以重写 onMessage 方法来接收 SSE 消息
    @Override
    public void onMessage(EventSource event) {
        event.id(); // 获取消息名称为 id 的消息值
        event.event(); // 获取消息名称为 event 的消息值
        event.data(); // 获取消息名称为 data 的消息值
        event.retry(); // 获取消息名称为 retry 的消息值
        event.value("text"); // 获取非标准消息名称为 text 的消息值
    }
}
 2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
定义完 SSE 拦截器之后,只要像普通拦截器那样绑定到接口方法上接口
public interface SSEClient {
    
    // 将自定义 SSE 拦截器绑定到该接口方法上
    // 返回类型仍然为 ForestSSE
    @Get(
        url = "/sse",
        contentType = "text/event-stream",
        interceptor = MySSEInterceptor.class
    )
    ForestSSE testSSE();
}
 2
3
4
5
6
7
8
9
10
11
完成这一步后,在调用时就无需再写回调函数了
// 开始监听后,收到消息时会自动执行 MySSEInterceptor 拦截器中的事件处理方法
sseClient.testSSE().listen();
 2
关于 SSE 拦截器的详情,请参见《SSE 拦截器》
# 关闭 SSE 事件流
可以调用 SSE 控制器及其子类(也就是自定义 SSE 控制器类)的close()方法
使用此方法需要先获得 SSE 控制器对象
// 通过异步监听获得 SSE 控制器对象
ForestSSE sse = sseClient.testSSE().asyncListen();
 2
或是先获得 SSE 控制器,再进行同步监听
// 先获得 SSE 控制器对象
ForestSSE sse = sseClient.testSSE();
// 再进行同步监听
sse.listen();
 2
3
4
也可以通过EventSource对象获得 SSE 控制器
ForestSSE sse;
sseClient.testSSE()
        .setOnOpen(event -> {
            // SSE 开始监听时调用
            sse = event.sse();
        });
 2
3
4
5
6
7
在获得到 SSE 控制器之后即可调用close()方法关闭 SSE 事件流
sse.close(); // 手动关闭事件流,停止 SSE 监听
 第二种方法是,调用EventSource对象的close()方法,效果相同
sseClient.testSSE()
        .addOnData((event, name, value) -> {
            // 处理事件消息名称为 data 的事件
            if ("close".equals(value)) {
                event.close(); // 手动关闭事件流,停止 SSE 监听
            }
        })
        .listen();
 2
3
4
5
6
7
8