SSE(Server-sent events)
SSE 它是基于 HTTP 协议的,一般意义上的 HTTP 协议是无法做到服务端主动向客户端推送消息的。有一种变通方法,就是服务器向客户端声明,发送的是流信息,本质上,这种通信就是以流信息的方式。
SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是
text/event-stream
类型的数据流信息,在有数据变更时从服务器流式传输到客户端。
SSE 与 WebSocket 作用相似,都可以建立服务端与浏览器之间的通信,实现服务端向客户端推送消息,两者区别:
-
SSE 是基于 HTTP 协议的,不需要特殊的协议或服务器实现即可工作,WebSocket 需单独服务器来处理协议;
-
SSE 单向通信,只能由服务端向客户端单向通信,webSocket 全双工通信,即通信的双方可以同时发送和接受信息。
-
SSE 实现简单开发成本低,无需引入其他组件,WebSocket 传输数据需做二次解析,开发门槛高一些。
-
SSE 默认支持断线重连,WebSocket 则需要自己实现。
-
SSE 只能传送文本消息,二进制数据需要经过编码后传送,WebSocket 默认支持传送二进制数据。
SSE 具有 WebSockets 在设计上缺乏的多种功能,例如:自动重新连接、事件 ID 和发送任意事件的能力。
编码
1.SseEmitterUtils
package com.demo.utils;
import cn.hutool.core.map.MapUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
* @ClassName:SseEmitterUtils.java
* @ClassPath:com.demo.utils.SseEmitterUtils.java
* @Description:SSE 服务器发送事件
* @Author:tanyp
* @Date:2022/9/13 11:03
@Slf4j
@Component
public class SseEmitterUtils {
// 当前连接数
private static AtomicInteger count = new AtomicInteger(0);
// 存储 SseEmitter 信息
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
* @MonthName:connect
* @Description: 创建用户连接并返回 SseEmitter
* @Author:tanyp
* @Date:2022/9/13 11:09
* @Param: [userId]
* @return:org.springframework.web.servlet.mvc.method.annotation.SseEmitter
public static SseEmitter connect(String key) {
if (sseEmitterMap.containsKey(key)) {
return sseEmitterMap.get(key);
try {
// 设置超时时间,0表示不过期。默认30秒
SseEmitter sseEmitter = new SseEmitter(0L);
// 注册回调
sseEmitter.onCompletion(completionCallBack(key));
sseEmitter.onError(errorCallBack(key));
sseEmitter.onTimeout(timeoutCallBack(key));
sseEmitterMap.put(key, sseEmitter);
// 数量+1
count.getAndIncrement();
return sseEmitter;
} catch (Exception e) {
log.info("创建新的SSE连接异常,当前连接Key为:{}", key);
return null;
* @MonthName:sendMessage
* @Description: 给指定用户发送消息
* @Author:tanyp
* @Date:2022/9/13 11:10
* @Param: [userId, message]
* @return:void
public static void sendMessage(String key, String message) {
if (sseEmitterMap.containsKey(key)) {
try {
sseEmitterMap.get(key).send(message);
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", key, e.getMessage());
remove(key);
* @MonthName:groupSendMessage
* @Description: 向同组人发布消息,要求:key + groupId
* @Author:tanyp
* @Date:2022/9/13 11:15
* @Param: [groupId, message]
* @return:void
public static void groupSendMessage(String groupId, String message) {
if (MapUtils.isNotEmpty(sseEmitterMap)) {
sseEmitterMap.forEach((k, v) -> {
try {
if (k.startsWith(groupId)) {
v.send(message, MediaType.APPLICATION_JSON);
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", k, e.getMessage());
remove(k);
* @MonthName:batchSendMessage
* @Description: 广播群发消息
* @Author:tanyp
* @Date:2022/9/13 11:15
* @Param: [message]
* @return:void
public static void batchSendMessage(String message) {
sseEmitterMap.forEach((k, v) -> {
try {
v.send(message, MediaType.APPLICATION_JSON);
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", k, e.getMessage());
remove(k);
* @MonthName:batchSendMessage
* @Description: 群发消息
* @Author:tanyp
* @Date:2022/9/13 11:16
* @Param: [message, ids]
* @return:void
public static void batchSendMessage(String message, Set<String> ids) {
ids.forEach(userId -> sendMessage(userId, message));
* @MonthName:remove
* @Description: 移除连接
* @Author:tanyp
* @Date:2022/9/13 11:17
* @Param: [userId]
* @return:void
public static void remove(String key) {
sseEmitterMap.remove(key);
// 数量-1
count.getAndDecrement();
log.info("移除连接:{}", key);
* @MonthName:getIds
* @Description: 获取当前连接信息
* @Author:tanyp
* @Date:2022/9/13 11:17
* @Param: []
* @return:java.util.List<java.lang.String>
public static List<String> getIds() {
return new ArrayList<>(sseEmitterMap.keySet());
* @MonthName:getUserCount
* @Description: 获取当前连接数量
* @Author:tanyp
* @Date:2022/9/13 11:18
* @Param: []
* @return:int
public static int getCount() {
return count.intValue();
private static Runnable completionCallBack(String key) {
return () -> {
log.info("结束连接:{}", key);
remove(key);
private static Runnable timeoutCallBack(String key) {
return () -> {
log.info("连接超时:{}", key);
remove(key);
private static Consumer<Throwable> errorCallBack(String key) {
return throwable -> {
log.info("连接异常:{}", key);
remove(key);
2.服务端
package com.demo.controller;
import com.demo.utils.SseEmitterUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.servlet.http.HttpServletRequest;
* @ClassName:SSEController.java
* @ClassPath:com.demo.controller.SSEController.java
* @Description:SSE消息推送
* @Author:tanyp
* @Date:2022/9/13 11:29
@Slf4j
@RestController
@RequestMapping("/sse")
@Api(value = "sse", tags = "SSE消息推送")
public class SSEController {
@ApiOperation(value = "订阅消息", notes = "订阅消息")
@GetMapping(path = "subscribe/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter subscribe(@PathVariable String id) {
return SseEmitterUtils.connect(id);
@ApiOperation(value = "发布消息", notes = "发布消息")
@GetMapping(path = "push")
public void push(String id, String content) {
SseEmitterUtils.sendMessage(id, content);
@ApiOperation(value = "清除连接", notes = "清除连接")
@GetMapping(path = "close")
public void close(String id, HttpServletRequest request) {
request.startAsync();
SseEmitterUtils.remove(id);
}
3.浏览器端
<!DOCTYPE html>
<html lang="en">
<title>SSE</title>
<meta charset="UTF-8">
<script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script>
<script>
let source = null;
const id = "k000001";
if (window.EventSource) {
// 建立连接
source = new EventSource('http://localhost:8000/sse/subscribe/' + id);
setMessageInnerHTML("连接key:" + id);
* 连接一旦建立,就会触发open事件
* 另一种写法:source.onopen = function (event) {}
source.addEventListener('open', function (e) {
setMessageInnerHTML("建立连接。。。");
}, false);
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
* 如果发生通信错误(比如连接中断),就会触发error事件
* 另一种写法:source.onerror = function (event) {}
source.addEventListener('error', function (e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML("连接关闭");
} else {
console.log(e);
}, false);
} else {
setMessageInnerHTML("浏览器不支持SSE");
// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
window.onbeforeunload = function () {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', 'http://localhost:8000/sse/close/' + id, true);
httpRequest.send();
console.log("close");
// 将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
$("#contentDiv").append("<br/>" + innerHTML);
</script>
</head>
<div id="contentDiv" style="height:800px; width:1000px; overflow:scroll; background:#ccc;">
</body>
</html>
注:SSE 是基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。