前言#
最近有個業務需求,需要判斷用戶上傳的數據是異常還是正常,如果是異常的情況下需要即時的推送報警消息到用戶。
在大部分情況下,通常是客戶端(瀏覽器)主動發送請求到服務端(伺服器),告訴它需要什麼數據。
但也有一部分情況,需要服務端主動的給客戶端推送消息,比如上面的情況,又比如網頁端常見的掃碼登錄
技術選擇#
實現服務端主動推送的方法有很多種,需要根據業務進行選擇。
簡單且常見的方法是,長輪詢,和短輪詢。聽名字就知道簡單粗暴。
更進階的是,SSE 和 websocket。
更深層的就是各種消息隊列,如 MQTT。
其他的推送也有短信推送或者 APP 通知,比如極光 Push。
實現方式 | 實現 | 原理 | 模式 |
---|---|---|---|
短輪詢 | 客戶端 | 在特定的的時間間隔(如每 1 秒),由客戶端對服務端發出請求,然後由伺服器返回最新的數據給客戶端的瀏覽器。 | / |
長輪詢 | 服務端 | 客戶端向伺服器發送請求,伺服器接到請求後hold住連接 ,直到有新消息才返回響應信息並關閉連接,客戶端處理完響應信息後再向伺服器發送新的請求。 | / |
SSE(Server-sent Events) | 服務端 | 客戶端先向伺服器註冊一個長連接,伺服器獲取到事件後可以發送消息到已註冊的客戶端 | 只能由伺服器推送 |
Websocket | 客戶端 / 服務端 | 使用 websocket 需要引入新的依賴,一旦客戶端和伺服器握手成功,它們就處在同一頻道,可以即時的收發消息了 | 全雙工,伺服器和客戶端都既能推送又能接收 |
Mqtt | 客戶端 / 服務端 | mqtt 常用在物聯網業務中,是一種基於發布 / 訂閱模式的輕量級通訊協議,該協議構建在 TCP/IP 協議上。 MQTT 最大的優點在於可以以極少的代碼和有限的帶寬,為遠程設備提供即時可靠的消息服務 | 發布 / 訂閱模式,伺服器和客戶端都既能推送又能接收 |
對於長 / 短輪詢不再贅述,本次主要講解SSE和MQTT方式。
SSE#
在 spring boot 中 SSE 是原生支持的,不需要再導入其他的依賴,這是它的優點,但是它的缺點也很明顯,只支持伺服器單向推送,只支持高級瀏覽器(chrome,Firefox 等)。受限於瀏覽器的限制,每個網頁最多只能保持最多 6 個的長連接;更多的連接可能會佔用更多的內存和計算資源。
首先創建一個 sse 工具類
@Component
@Slf4j
public class SseEmitterUtils {
/**
* 當前連接數
*/
private static AtomicInteger count = new AtomicInteger(0);
/**
* 存儲 SseEmitter 信息
*/
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 創建用戶連接並返回 SseEmitter
* @param key userId
* @return SseEmitter
*/
public static SseEmitter connect(String key) {
if (sseEmitterMap.containsKey(key)) {
return sseEmitterMap.get(key);
}
try {
// 設置超時時間,0表示不過期。默認30秒
SseEmitter sseEmitter = new SseEmitter(0L);
// 註冊回調
sseEmitter.onCompletion(completionCallBack(key));
sseEmitter.onError(errorCallBack(key));
sseEmitter.onTimeout(timeoutCallBack(key));
sseEmitterMap.put(key, sseEmitter);
// 數量+1
count.getAndIncrement();
return sseEmitter;
} catch (Exception e) {
log.info("創建新的SSE連接異常,當前連接Key為:{}", key);
}
return null;
}
/**
* 給指定用戶發送消息
* @param key userId
* @param message 消息內容
*/
public static void sendMessage(String key, String message) {
if (sseEmitterMap.containsKey(key)) {
try {
sseEmitterMap.get(key).send(message);
} catch (IOException e) {
log.error("用戶[{}]推送異常:{}", key, e.getMessage());
remove(key);
}
}
}
/**
* 向同組人發布消息,要求:key + groupId
* @param groupId 群組id
* @param message 消息內容
*/
public static void groupSendMessage(String groupId, String message) {
if (!CollectionUtils.isEmpty(sseEmitterMap)) {
sseEmitterMap.forEach((k, v) -> {
try {
if (k.startsWith(groupId)) {
v.send(message, MediaType.APPLICATION_JSON);
}
} catch (IOException e) {
log.error("用戶[{}]推送異常:{}", k, e.getMessage());
remove(k);
}
});
}
}
/**
* 廣播群發消息
* @param message 消息內容
*/
public static void batchSendMessage(String message) {
sseEmitterMap.forEach((k, v) -> {
try {
v.send(message, MediaType.APPLICATION_JSON);
} catch (IOException e) {
log.error("用戶[{}]推送異常:{}", k, e.getMessage());
remove(k);
}
});
}
/**
* 群發消息
* @param message 消息內容
* @param ids 用戶id集合
*/
public static void batchSendMessage(String message, Set<String> ids) {
ids.forEach(userId -> sendMessage(userId, message));
}
/**
* 移除連接
* @param key userId
*/
public static void remove(String key) {
sseEmitterMap.remove(key);
// 數量-1
count.getAndDecrement();
log.info("移除連接:{}", key);
}
/**
* 獲取當前連接信息
* @return Map
*/
public static List<String> getIds() {
return new ArrayList<>(sseEmitterMap.keySet());
}
/**
* 獲取當前連接數量
* @return int
*/
public static int getCount() {
return count.intValue();
}
private static Runnable completionCallBack(String key) {
return () -> {
log.info("結束連接:{}", key);
remove(key);
};
}
private static Runnable timeoutCallBack(String key) {
return () -> {
log.info("連接超時:{}", key);
remove(key);
};
}
private static Consumer<Throwable> errorCallBack(String key) {
return throwable -> {
log.info("連接異常:{}", key);
remove(key);
};
}
}
然後用工具類實現幾個接口,以便客戶端實現訂閱和服務端主動推送消息。
@RequestMapping("/sse")
@RestController
@Slf4j
@CrossOrigin
public class SSEEmitterController {
/**
* 創建連接
* @param id 用戶id
* @return SseEmitter
*/
@GetMapping(path = "/subscribe/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter subscribe(@PathVariable String id) {
return SseEmitterUtils.connect(id);
}
/**
* 向指定用戶推送消息
* @param id 用戶id
* @param content 推送內容
*/
@PostMapping(path = "/push")
public void push(String id, String content) {
SseEmitterUtils.sendMessage(id, content);
}
/**
* 向指定群組推送消息
* @param groupId 群組id
* @param content 推送內容
*/
@PostMapping(path = "/groupPush")
public void groupPush(String groupId, String content) {
SseEmitterUtils.groupSendMessage(groupId, content);
}
/**
* 廣播消息
* @param content 推送內容
*/
@PostMapping(path = "/pushAll")
public void pushAll(String content) {
SseEmitterUtils.batchSendMessage(content);
}
/**
* 關閉連接
* @param id 用戶id
* @param request 請求
*/
@DeleteMapping(path = "/close/{id}")
public void close(@PathVariable String id, HttpServletRequest request) {
request.startAsync();
SseEmitterUtils.remove(id);
}
}
最後你可以使用下面的 HTML 頁面進行測試
<!DOCTYPE html>
<html lang="en">
<head>
<title>SSE</title>
<meta charset="UTF-8">
<script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script>
<script>
if (window.EventSource) {
let sources = [];
// 創建連接
for (let i = 1; i < 10; i++) {
let id = "id_" + i;
sources[i] = new EventSource('http://localhost:8008/sse/subscribe/' + id);
}
/**
* 連接一旦建立,就會觸發open事件
* 另一種寫法:source.onopen = function (event) {}
*/
// 連接打開事件
sources.forEach(source => {
let id = source.url.split('/').pop();
source.addEventListener('open', function (e) {
setMessageInnerHTML(id + "連接打開")
console.log(id + "連接打開");
});
});
/**
* 客戶端收到伺服器發來的數據
* 另一種寫法:source.onmessage = function (event) {}
*/
// 消息事件
sources.forEach(source => {
let id = source.url.split('/').pop();
source.addEventListener('message', function (e) {
setMessageInnerHTML(id + "收到消息:" + e.data)
console.log(id + "收到消息:" + e.data);
});
});
/**
* 如果發生通信錯誤(比如連接中斷),就會觸發error事件
* 另一種寫法:source.onerror = function (event) {}
*/
// 錯誤處理
sources.forEach(source => {
let id = source.url.split('/').pop();
source.addEventListener('error', function (e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML(id + "連接關閉")
console.log(id + "連接關閉");
} else {
setMessageInnerHTML(id + "連接錯誤:", e)
console.log(id + "連接錯誤:", e);
}
});
});
} else {
setMessageInnerHTML("瀏覽器不支持SSE");
}
// 監聽窗口關閉事件,主動去關閉sse連接,如果伺服器設置永不過期,瀏覽器關閉後手動清理伺服器數據
window.onbeforeunload = function () {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', 'http://localhost:8008/sse/close/' + id, true);
httpRequest.send();
console.log("close");
};
// 將消息顯示在網頁上
function setMessageInnerHTML(innerHTML) {
$("#contentDiv").append("<br/>" + innerHTML);
}
</script>
</head>
<body>
<div>
<div>
<div id="contentDiv" style="height:800px; width:1000px; overflow:scroll; background:#ccc;">
</div>
</div>
</div>
</body>
</html>
然後調用接口進行測試
可以看到,瀏覽器最終只保留了 6 個連接,其它的都被丟棄掉。
MQTT#
要實現 mqtt 需要在伺服器添加下面的依賴
<!--mqtt依賴包-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
然後需要安裝中間件,比如EMQX,又或者直接使用 RabbitMq。需要注意的是 rabbitMq 不支持 QOS2 級別的消息等級
關於 mqtt 的 qos 消息質量等級可以看下表的解釋
QoS 級別 | 描述 | 适用場景 |
---|---|---|
0 | 最多一次交付,消息不會被確認或重傳 | 適用於不重要的數據傳輸,如傳感器數據等 |
1 | 至少一次交付,確保消息至少被傳遞一次,但可能重複傳遞 | 適用於需要確保消息傳遞,但允許重複傳遞的場景 |
2 | 只有一次交付,確保消息僅被傳遞一次,不允許重複傳遞 | 適用於需要確保消息精確傳遞,且不允許重複傳遞的場景 |
下面以使用 rabbitmq 中間件為例
配置 mqtt 協議信息
server:
port: 8008
spring:
application:
name: mqtt測試項目
mqtt:
url: tcp://127.0.0.1:1883
username: guest
password: guest
clientId: serverClientId
#發布的主題--MQTT-默認的消息推送主題,實際可在調用接口時指定
pubTopic: testTopic
#訂閱的主題
subTopic: gps-topic,oil-topic,broadcast-topic,fault-topic
completionTimeout: 3000
然後創建配置文件的實體類映射
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {
/**
* RabbitMQ連接用戶名
*/
private String username;
/**
* RabbitMQ連接密碼
*/
private String password;
/**
* 推送主題
*/
private String pubTopic;
/**
* RabbitMQ的MQTT連接地址
*/
private String url;
/**
* RabbitMQ的MQTT連接客戶端ID
*/
private String clientId;
/**
* 訂閱主題
*/
private String subTopic;
/**
* 超時時間
*/
private Integer completionTimeout;
}
接著創建一個消費者
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqttConsumer {
private final MqttProperties mqttProperties;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getUrl(), mqttProperties.getClientId(),
mqttProperties.getSubTopic().split(","));
adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
adapter.setConverter(new DefaultPahoMessageConverter());
//設置消息質量:0->至多一次;1->至少一次;2->只有一次
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
MessageHeaders headers = message.getHeaders();
log.info("headers: {}", headers);
String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
log.info("訂閱主題為: {}", topic);
String[] topics = mqttProperties.getSubTopic().split(",");
for (String t : topics) {
if (t.equals(topic)) {
log.info("訂閱主題為:{};接收到該主題消息為:{}",topic, message.getPayload());
}
}
}
};
}
}
再創建一個生產者
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProvider {
private final MqttProperties mqttProperties;
@Bean
@SneakyThrows
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { mqttProperties.getUrl()});
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
// 設置是否清空session,這裡如果設置為false表示伺服器會保留客戶端的連接記錄,
// 把配置里的 cleanSession 設為false,客戶端掉線後 伺服器端不會清除session,
// 當重連後可以接收之前訂閱主題的消息。當客戶端上線後會接受到它離線的這段時間的消息
options.setCleanSession(false);
// 設置超時時間 單位為秒
options.setConnectionTimeout(10);
// 設置會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
options.setKeepAliveInterval(20);
// 斷開後重連,但這個方法並沒有重新訂閱的機制
// 在嘗試重新連接之前,它將首先等待1秒,對於每次失敗的重新連接嘗試,延遲將加倍,直到達到2分鐘,此時延遲將保持在2分鐘。
options.setAutomaticReconnect(true);
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getPubTopic());
// 設置推送時的消息質量:0->至多一次;1->至少一次;2->只有一次
// 在rabbitmq 中 qos2會被降級為qos1
messageHandler.setDefaultQos(1);
// 設置是否保留消息,設置為true時保留消息會在每次重連時發送
// 除非發送一條內容空白的新的保留信息才能清除
messageHandler.setDefaultRetained(false);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
再創建一個網關接口用於發送消息
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 發送消息到默認topic
*/
void sendToMqtt(String payload);
/**
* 發送消息到指定topic
*/
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
/**
* 發送消息到指定topic並設置QOS
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
最後同樣編寫一個接口用於發送消息
@RestController
@RequiredArgsConstructor
@RequestMapping("/mqtt")
public class MqttController {
private final MqttGateway mqttGateway;
@PostMapping("/sendToDefaultTopic")
public void sendToDefaultTopic(String payload) {
mqttGateway.sendToMqtt(payload);
}
@PostMapping("/sendToTopic")
public void sendToTopic(String payload, String topic) {
mqttGateway.sendToMqtt(payload, topic);
}
}
可以使用第三方 mqtt 軟件進行測試,例如mqttx
安裝完成後,打開軟件創建一個連接
需要注意的是
- 每次連接的 ClinetId 應該是不同且唯一的,相同的 ClinetId 可能會因連接擠掉而丟失消息
- 伺服器連接協議可以是 tcp,也可以是 mqtt
- 如果是 rabbitmq,需要配置用戶的權限
- 如果是 rabbitmq,版本請選擇 3.1.1
- 如果想要消費者離線後仍能收到生產者的消息,請關閉 Clean Session。
連接成功後,調用接口發送消息訂閱者就能收到了