前言#
最近有个业务需求,需要判断用户上传的数据是异常还是正常,如果是异常的情况下需要实时的推送报警消息到用户。
在大部分情况下,通常是客户端(浏览器)主动发送请求到服务端(服务器),告诉它需要什么数据。
但也有一部分情况,需要服务端主动的给客户端推送消息,比如上面的情况,又比如网页端常见的扫码登录
技术选择#
实现服务端主动推送的方法有很多种,需要根据业务进行选择。
简单且常见的方法是,长轮询,和短轮询。听名字就知道简单粗暴。
更进阶的是,SSE 和 websocket。
更深层的就是各种消息队列,如 MQTT。
其他的推送也有短信推送或者 APP 通知,比如极光 Push。
实现方式 | 实现 | 原理 | 模式 |
---|---|---|---|
短轮询 | 客户端 | 在特定的的时间间隔(如每 1 秒),由客户端对服务端发出请求,然后由服务器返回最新的数据给客户端的浏览器。 | / |
长轮询 | 服务端 | 客户端向服务器发送请求,服务器接到请求后hold住连接 ,直到有新消息才返回响应信息并关闭连接,客户端处理完响应信息后再向服务器发送新的请求。 | / |
SSE(Server-sent Events) | 服务端 | 客户端先向服务器注册一个长连接,服务器获取到事件后可以发送消息到已注册的客户端 | 只能由服务器推送 |
Websocket | 客户端 / 服务端 | 使用 websocket 需要引入新的依赖,一旦客户端和服务端握手成功,它们就处在同一频道,可以实时的收发消息了 | 全双工,服务端和客户端都既能推送又能接收 |
Mqtt | 客户端 / 服务端 | mqtt 常用在物联网业务中,是一种基于发布 / 订阅模式的轻量级通讯协议,该协议构建在 TCP/IP 协议上。 MQTT 最大的有点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务 | 发布 / 订阅模式,服务端和客户端都既能推送又能接收 |
对于长 / 短轮询不再赘述,本次主要讲解SSE和MQTT方式。
SSE#
在 spring boot 中 SSE 是原生支持的,不需要再导入其他的依赖,这是它的优点,但是它的缺点也很明显,只支持服务端单向推送,只支持高级浏览器(chrome,Firefox 等)。受限于浏览器的限制,每个网页最多只能保持最多 6 个的长连接;更多的连接可能会占用更多的内存和计算资源。
首先创建一个 sse 工具类
@Component
@Slf4j
public class SseEmitterUtils {
/**
* 当前连接数
*/
private static AtomicInteger count = new AtomicInteger(0);
/**
* 存储 SseEmitter 信息
*/
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 创建用户连接并返回 SseEmitter
* @param key userId
* @return SseEmitter
*/
public static SseEmitter connect(String key) {
if (sseEmitterMap.containsKey(key)) {
return sseEmitterMap.get(key);
}
try {
// 设置超时时间,0表示不过期。默认30秒
SseEmitter sseEmitter = new SseEmitter(0L);
// 注册回调
sseEmitter.onCompletion(completionCallBack(key));
sseEmitter.onError(errorCallBack(key));
sseEmitter.onTimeout(timeoutCallBack(key));
sseEmitterMap.put(key, sseEmitter);
// 数量+1
count.getAndIncrement();
return sseEmitter;
} catch (Exception e) {
log.info("创建新的SSE连接异常,当前连接Key为:{}", key);
}
return null;
}
/**
* 给指定用户发送消息
* @param key userId
* @param message 消息内容
*/
public static void sendMessage(String key, String message) {
if (sseEmitterMap.containsKey(key)) {
try {
sseEmitterMap.get(key).send(message);
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", key, e.getMessage());
remove(key);
}
}
}
/**
* 向同组人发布消息,要求:key + groupId
* @param groupId 群组id
* @param message 消息内容
*/
public static void groupSendMessage(String groupId, String message) {
if (!CollectionUtils.isEmpty(sseEmitterMap)) {
sseEmitterMap.forEach((k, v) -> {
try {
if (k.startsWith(groupId)) {
v.send(message, MediaType.APPLICATION_JSON);
}
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", k, e.getMessage());
remove(k);
}
});
}
}
/**
* 广播群发消息
* @param message 消息内容
*/
public static void batchSendMessage(String message) {
sseEmitterMap.forEach((k, v) -> {
try {
v.send(message, MediaType.APPLICATION_JSON);
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", k, e.getMessage());
remove(k);
}
});
}
/**
* 群发消息
* @param message 消息内容
* @param ids 用户id集合
*/
public static void batchSendMessage(String message, Set<String> ids) {
ids.forEach(userId -> sendMessage(userId, message));
}
/**
* 移除连接
* @param key userId
*/
public static void remove(String key) {
sseEmitterMap.remove(key);
// 数量-1
count.getAndDecrement();
log.info("移除连接:{}", key);
}
/**
* 获取当前连接信息
* @return Map
*/
public static List<String> getIds() {
return new ArrayList<>(sseEmitterMap.keySet());
}
/**
* 获取当前连接数量
* @return int
*/
public static int getCount() {
return count.intValue();
}
private static Runnable completionCallBack(String key) {
return () -> {
log.info("结束连接:{}", key);
remove(key);
};
}
private static Runnable timeoutCallBack(String key) {
return () -> {
log.info("连接超时:{}", key);
remove(key);
};
}
private static Consumer<Throwable> errorCallBack(String key) {
return throwable -> {
log.info("连接异常:{}", key);
remove(key);
};
}
}
然后用工具类实现几个接口,以便客户端实现订阅和服务端主动推送消息。
@RequestMapping("/sse")
@RestController
@Slf4j
@CrossOrigin
public class SSEEmitterController {
/**
* 创建连接
* @param id 用户id
* @return SseEmitter
*/
@GetMapping(path = "/subscribe/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter subscribe(@PathVariable String id) {
return SseEmitterUtils.connect(id);
}
/**
* 向指定用户推送消息
* @param id 用户id
* @param content 推送内容
*/
@PostMapping(path = "/push")
public void push(String id, String content) {
SseEmitterUtils.sendMessage(id, content);
}
/**
* 向指定群组推送消息
* @param groupId 群组id
* @param content 推送内容
*/
@PostMapping(path = "/groupPush")
public void groupPush(String groupId, String content) {
SseEmitterUtils.groupSendMessage(groupId, content);
}
/**
* 广播消息
* @param content 推送内容
*/
@PostMapping(path = "/pushAll")
public void pushAll(String content) {
SseEmitterUtils.batchSendMessage(content);
}
/**
* 关闭连接
* @param id 用户id
* @param request 请求
*/
@DeleteMapping(path = "/close/{id}")
public void close(@PathVariable String id, HttpServletRequest request) {
request.startAsync();
SseEmitterUtils.remove(id);
}
}
最后你可以使用下面的 HTML 页面进行测试
<!DOCTYPE html>
<html lang="en">
<head>
<title>SSE</title>
<meta charset="UTF-8">
<script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script>
<script>
if (window.EventSource) {
let sources = [];
// 创建连接
for (let i = 1; i < 10; i++) {
let id = "id_" + i;
sources[i] = new EventSource('http://localhost:8008/sse/subscribe/' + id);
}
/**
* 连接一旦建立,就会触发open事件
* 另一种写法:source.onopen = function (event) {}
*/
// 连接打开事件
sources.forEach(source => {
let id = source.url.split('/').pop();
source.addEventListener('open', function (e) {
setMessageInnerHTML(id + "连接打开")
console.log(id + "连接打开");
});
});
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
// 消息事件
sources.forEach(source => {
let id = source.url.split('/').pop();
source.addEventListener('message', function (e) {
setMessageInnerHTML(id + "收到消息:" + e.data)
console.log(id + "收到消息:" + e.data);
});
});
/**
* 如果发生通信错误(比如连接中断),就会触发error事件
* 另一种写法:source.onerror = function (event) {}
*/
// 错误处理
sources.forEach(source => {
let id = source.url.split('/').pop();
source.addEventListener('error', function (e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML(id + "连接关闭")
console.log(id + "连接关闭");
} else {
setMessageInnerHTML(id + "连接错误:", e)
console.log(id + "连接错误:", e);
}
});
});
} else {
setMessageInnerHTML("浏览器不支持SSE");
}
// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
window.onbeforeunload = function () {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', 'http://localhost:8008/sse/close/' + id, true);
httpRequest.send();
console.log("close");
};
// 将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
$("#contentDiv").append("<br/>" + innerHTML);
}
</script>
</head>
<body>
<div>
<div>
<div id="contentDiv" style="height:800px; width:1000px; overflow:scroll; background:#ccc;">
</div>
</div>
</div>
</body>
</html>
然后调用接口进行测试
可以看到,浏览器最终只保留了 6 个连接,其它的都被丢弃掉。
MQTT#
要实现 mqtt 需要在服务端添加下面的依赖
<!--mqtt依赖包-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
然后需要安装中间件,比如EMQX,又或者直接使用 RabbitMq。需要注意的是 rabbitMq 不支持 QOS2 级别的消息等级
关于 mqtt 的 qos 消息质量等级可以看下表的解释
QoS 级别 | 描述 | 适用场景 |
---|---|---|
0 | 最多一次交付,消息不会被确认或重传 | 适用于不重要的数据传输,如传感器数据等 |
1 | 至少一次交付,确保消息至少被传递一次,但可能重复传递 | 适用于需要确保消息传递,但允许重复传递的场景 |
2 | 只有一次交付,确保消息仅被传递一次,不允许重复传递 | 适用于需要确保消息精确传递,且不允许重复传递的场景 |
下面以使用 rabbitmq 中间件为例
配置 mqtt 协议信息
server:
port: 8008
spring:
application:
name: mqtt测试项目
mqtt:
url: tcp://127.0.0.1:1883
username: guest
password: guest
clientId: serverClientId
#发布的主题--MQTT-默认的消息推送主题,实际可在调用接口时指定
pubTopic: testTopic
#订阅的主题
subTopic: gps-topic,oil-topic,broadcast-topic,fault-topic
completionTimeout: 3000
然后创建配置文件的实体类映射
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {
/**
* RabbitMQ连接用户名
*/
private String username;
/**
* RabbitMQ连接密码
*/
private String password;
/**
* 推送主题
*/
private String pubTopic;
/**
* RabbitMQ的MQTT连接地址
*/
private String url;
/**
* RabbitMQ的MQTT连接客户端ID
*/
private String clientId;
/**
* 订阅主题
*/
private String subTopic;
/**
* 超时时间
*/
private Integer completionTimeout;
}
接着创建一个消费者
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqttConsumer {
private final MqttProperties mqttProperties;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getUrl(), mqttProperties.getClientId(),
mqttProperties.getSubTopic().split(","));
adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
adapter.setConverter(new DefaultPahoMessageConverter());
//设置消息质量:0->至多一次;1->至少一次;2->只有一次
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
MessageHeaders headers = message.getHeaders();
log.info("headers: {}", headers);
String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
log.info("订阅主题为: {}", topic);
String[] topics = mqttProperties.getSubTopic().split(",");
for (String t : topics) {
if (t.equals(topic)) {
log.info("订阅主题为:{};接收到该主题消息为:{}",topic, message.getPayload());
}
}
}
};
}
}
再创建一个生产者
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProvider {
private final MqttProperties mqttProperties;
@Bean
@SneakyThrows
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { mqttProperties.getUrl()});
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
// 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,
// 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息
options.setCleanSession(false);
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 断开后重连,但这个方法并没有重新订阅的机制
// 在尝试重新连接之前,它将首先等待1秒,对于每次失败的重新连接尝试,延迟将加倍,直到达到2分钟,此时延迟将保持在2分钟。
options.setAutomaticReconnect(true);
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getPubTopic());
// 设置推送时的消息质量:0->至多一次;1->至少一次;2->只有一次
// 在rabbitmq 中 qos2会被降级为qos1
messageHandler.setDefaultQos(1);
// 设置是否保留消息,设置为true时保留消息会在每次重连时发送
// 除非发送一条内容空白的新的保留信息才能清除
messageHandler.setDefaultRetained(false);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
再创建一个网关接口用于发送消息
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 发送消息到默认topic
*/
void sendToMqtt(String payload);
/**
* 发送消息到指定topic
*/
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
/**
* 发送消息到指定topic并设置QOS
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
最后同样编写一个接口用于发送消息
@RestController
@RequiredArgsConstructor
@RequestMapping("/mqtt")
public class MqttController {
private final MqttGateway mqttGateway;
@PostMapping("/sendToDefaultTopic")
public void sendToDefaultTopic(String payload) {
mqttGateway.sendToMqtt(payload);
}
@PostMapping("/sendToTopic")
public void sendToTopic(String payload, String topic) {
mqttGateway.sendToMqtt(payload, topic);
}
}
可以使用第三方 mqtt 软件进行测试,例如mqttx
安装完成后,打开软件创建一个连接
需要注意的是
- 每次连接的 ClinetId 应该是不同且唯一的,相同的 ClinetId 可能会因连接挤掉而丢失消息
- 服务器连接协议可以是 tcp,也可以是 mqtt
- 如果是 rabbitmq,需要配置用户的权限
- 如果是 rabbitmq,版本请选择 3.1.1
- 如果想要消费者离线后仍能收到生产者的消息,请关闭 Clean Session。
连接成功后,调用接口发送消息订阅者就能收到了