前言#
最近有個業務需求,需要判斷用戶上傳的數據是異常還是正常,如果是異常的情況下需要即時的推送報警消息到用戶。
在大部分情況下,通常是客戶端(瀏覽器)主動發送請求到服務端(伺服器),告訴它需要什麼數據。
但也有一部分情況,需要服務端主動的給客戶端推送消息,比如上面的情況,又比如網頁端常見的掃碼登錄
技術選擇#
實現服務端主動推送的方法有很多種,需要根據業務進行選擇。
簡單且常見的方法是,長輪詢,和短輪詢。聽名字就知道簡單粗暴。
更進階的是,SSE 和 websocket。
更深層的就是各種消息隊列,如 MQTT。
其他的推送也有短信推送或者 APP 通知,比如極光 Push。
| 實現方式 | 實現 | 原理 | 模式 | 
|---|---|---|---|
| 短輪詢 | 客戶端 | 在特定的的時間間隔(如每 1 秒),由客戶端對服務端發出請求,然後由伺服器返回最新的數據給客戶端的瀏覽器。 | / | 
| 長輪詢 | 服務端 | 客戶端向伺服器發送請求,伺服器接到請求後 hold住連接,直到有新消息才返回響應信息並關閉連接,客戶端處理完響應信息後再向伺服器發送新的請求。 | / | 
| SSE(Server-sent Events) | 服務端 | 客戶端先向伺服器註冊一個長連接,伺服器獲取到事件後可以發送消息到已註冊的客戶端 | 只能由伺服器推送 | 
| Websocket | 客戶端 / 服務端 | 使用 websocket 需要引入新的依賴,一旦客戶端和伺服器握手成功,它們就處在同一頻道,可以即時的收發消息了 | 全雙工,伺服器和客戶端都既能推送又能接收 | 
| Mqtt | 客戶端 / 服務端 | mqtt 常用在物聯網業務中,是一種基於發布 / 訂閱模式的輕量級通訊協議,該協議構建在 TCP/IP 協議上。 MQTT 最大的優點在於可以以極少的代碼和有限的帶寬,為遠程設備提供即時可靠的消息服務 | 發布 / 訂閱模式,伺服器和客戶端都既能推送又能接收 | 
對於長 / 短輪詢不再贅述,本次主要講解SSE和MQTT方式。
SSE#
在 spring boot 中 SSE 是原生支持的,不需要再導入其他的依賴,這是它的優點,但是它的缺點也很明顯,只支持伺服器單向推送,只支持高級瀏覽器(chrome,Firefox 等)。受限於瀏覽器的限制,每個網頁最多只能保持最多 6 個的長連接;更多的連接可能會佔用更多的內存和計算資源。
首先創建一個 sse 工具類
@Component
@Slf4j
public class SseEmitterUtils {
    /**
     * 當前連接數
     */
    private static AtomicInteger count = new AtomicInteger(0);
    /**
     * 存儲 SseEmitter 信息
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
    /**
     * 創建用戶連接並返回 SseEmitter
     * @param key userId
     * @return SseEmitter
     */
    public static SseEmitter connect(String key) {
        if (sseEmitterMap.containsKey(key)) {
            return sseEmitterMap.get(key);
        }
        try {
            // 設置超時時間,0表示不過期。默認30秒
            SseEmitter sseEmitter = new SseEmitter(0L);
            // 註冊回調
            sseEmitter.onCompletion(completionCallBack(key));
            sseEmitter.onError(errorCallBack(key));
            sseEmitter.onTimeout(timeoutCallBack(key));
            sseEmitterMap.put(key, sseEmitter);
            // 數量+1
            count.getAndIncrement();
            return sseEmitter;
        } catch (Exception e) {
            log.info("創建新的SSE連接異常,當前連接Key為:{}", key);
        }
        return null;
    }
    /**
     * 給指定用戶發送消息
     * @param key userId
     * @param message 消息內容
     */
    public static void sendMessage(String key, String message) {
        if (sseEmitterMap.containsKey(key)) {
            try {
                sseEmitterMap.get(key).send(message);
            } catch (IOException e) {
                log.error("用戶[{}]推送異常:{}", key, e.getMessage());
                remove(key);
            }
        }
    }
    /**
     * 向同組人發布消息,要求:key + groupId
     * @param groupId 群組id
     * @param message 消息內容
     */
    public static void groupSendMessage(String groupId, String message) {
        if (!CollectionUtils.isEmpty(sseEmitterMap)) {
            sseEmitterMap.forEach((k, v) -> {
                try {
                    if (k.startsWith(groupId)) {
                        v.send(message, MediaType.APPLICATION_JSON);
                    }
                } catch (IOException e) {
                    log.error("用戶[{}]推送異常:{}", k, e.getMessage());
                    remove(k);
                }
            });
        }
    }
    /**
     * 廣播群發消息
     * @param message 消息內容
     */
    public static void batchSendMessage(String message) {
        sseEmitterMap.forEach((k, v) -> {
            try {
                v.send(message, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                log.error("用戶[{}]推送異常:{}", k, e.getMessage());
                remove(k);
            }
        });
    }
    /**
     * 群發消息
     * @param message 消息內容
     * @param ids 用戶id集合
     */
    public static void batchSendMessage(String message, Set<String> ids) {
        ids.forEach(userId -> sendMessage(userId, message));
    }
    /**
     * 移除連接
     * @param key userId
     */
    public static void remove(String key) {
        sseEmitterMap.remove(key);
        // 數量-1
        count.getAndDecrement();
        log.info("移除連接:{}", key);
    }
    /**
     * 獲取當前連接信息
     * @return Map
     */
    public static List<String> getIds() {
        return new ArrayList<>(sseEmitterMap.keySet());
    }
    /**
     * 獲取當前連接數量
     * @return int
     */
    public static int getCount() {
        return count.intValue();
    }
    private static Runnable completionCallBack(String key) {
        return () -> {
            log.info("結束連接:{}", key);
            remove(key);
        };
    }
    private static Runnable timeoutCallBack(String key) {
        return () -> {
            log.info("連接超時:{}", key);
            remove(key);
        };
    }
    private static Consumer<Throwable> errorCallBack(String key) {
        return throwable -> {
            log.info("連接異常:{}", key);
            remove(key);
        };
    }
}
然後用工具類實現幾個接口,以便客戶端實現訂閱和服務端主動推送消息。
@RequestMapping("/sse")
@RestController
@Slf4j
@CrossOrigin
public class SSEEmitterController {
    /**
     * 創建連接
     * @param id 用戶id
     * @return SseEmitter
     */
    @GetMapping(path = "/subscribe/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter subscribe(@PathVariable String id) {
        return SseEmitterUtils.connect(id);
    }
    /**
     * 向指定用戶推送消息
     * @param id 用戶id
     * @param content 推送內容
     */
    @PostMapping(path = "/push")
    public void push(String id, String content) {
        SseEmitterUtils.sendMessage(id, content);
    }
    /**
     * 向指定群組推送消息
     * @param groupId 群組id
     * @param content 推送內容
     */
    @PostMapping(path = "/groupPush")
    public void groupPush(String groupId, String content) {
        SseEmitterUtils.groupSendMessage(groupId, content);
    }
    /**
     * 廣播消息
     * @param content 推送內容
     */
    @PostMapping(path = "/pushAll")
    public void pushAll(String content) {
        SseEmitterUtils.batchSendMessage(content);
    }
    /**
     * 關閉連接
     * @param id 用戶id
     * @param request 請求
     */
    @DeleteMapping(path = "/close/{id}")
    public void close(@PathVariable String id, HttpServletRequest request) {
        request.startAsync();
        SseEmitterUtils.remove(id);
    }
}
最後你可以使用下面的 HTML 頁面進行測試
<!DOCTYPE html>
<html lang="en">
<head>
    <title>SSE</title>
    <meta charset="UTF-8">
    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script>
    <script>
        if (window.EventSource) {
            let sources = [];
            // 創建連接  
            for (let i = 1; i < 10; i++) {
                let id = "id_" + i;
                sources[i] = new EventSource('http://localhost:8008/sse/subscribe/' + id);
            }
            /** 
             * 連接一旦建立,就會觸發open事件 
             * 另一種寫法:source.onopen = function (event) {} 
             */
            // 連接打開事件
            sources.forEach(source => {
                let id = source.url.split('/').pop();
                source.addEventListener('open', function (e) {
                    setMessageInnerHTML(id + "連接打開")
                    console.log(id + "連接打開");
                });
            });
            /** 
             * 客戶端收到伺服器發來的數據 
             * 另一種寫法:source.onmessage = function (event) {} 
             */
            // 消息事件 
            sources.forEach(source => {
                let id = source.url.split('/').pop();
                source.addEventListener('message', function (e) {
                    setMessageInnerHTML(id + "收到消息:" + e.data)
                    console.log(id + "收到消息:" + e.data);
                });
            });
            /** 
             * 如果發生通信錯誤(比如連接中斷),就會觸發error事件 
             * 另一種寫法:source.onerror = function (event) {} 
             */
            // 錯誤處理
            sources.forEach(source => {
                let id = source.url.split('/').pop();
                source.addEventListener('error', function (e) {
                    if (e.readyState === EventSource.CLOSED) {
                        setMessageInnerHTML(id + "連接關閉")
                        console.log(id + "連接關閉");
                    } else {
                        setMessageInnerHTML(id + "連接錯誤:", e)
                        console.log(id + "連接錯誤:", e);
                    }
                });
            });
        } else {
            setMessageInnerHTML("瀏覽器不支持SSE");
        }
        // 監聽窗口關閉事件,主動去關閉sse連接,如果伺服器設置永不過期,瀏覽器關閉後手動清理伺服器數據 
        window.onbeforeunload = function () {
            source.close();
            const httpRequest = new XMLHttpRequest();
            httpRequest.open('GET', 'http://localhost:8008/sse/close/' + id, true);
            httpRequest.send();
            console.log("close");
        };
        // 將消息顯示在網頁上 
        function setMessageInnerHTML(innerHTML) {
            $("#contentDiv").append("<br/>" + innerHTML);
        } 
    </script>
</head>
<body>
    <div>
        <div>
            <div id="contentDiv" style="height:800px; width:1000px; overflow:scroll; background:#ccc;">
            </div>
        </div>
    </div>
</body>
</html>
然後調用接口進行測試
可以看到,瀏覽器最終只保留了 6 個連接,其它的都被丟棄掉。
MQTT#
要實現 mqtt 需要在伺服器添加下面的依賴
<!--mqtt依賴包-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
然後需要安裝中間件,比如EMQX,又或者直接使用 RabbitMq。需要注意的是 rabbitMq 不支持 QOS2 級別的消息等級
關於 mqtt 的 qos 消息質量等級可以看下表的解釋
| QoS 級別 | 描述 | 适用場景 | 
|---|---|---|
| 0 | 最多一次交付,消息不會被確認或重傳 | 適用於不重要的數據傳輸,如傳感器數據等 | 
| 1 | 至少一次交付,確保消息至少被傳遞一次,但可能重複傳遞 | 適用於需要確保消息傳遞,但允許重複傳遞的場景 | 
| 2 | 只有一次交付,確保消息僅被傳遞一次,不允許重複傳遞 | 適用於需要確保消息精確傳遞,且不允許重複傳遞的場景 | 
下面以使用 rabbitmq 中間件為例
配置 mqtt 協議信息
server:
  port: 8008
spring:
  application:
    name: mqtt測試項目
  mqtt:
    url: tcp://127.0.0.1:1883
    username: guest
    password: guest
    clientId: serverClientId
      #發布的主題--MQTT-默認的消息推送主題,實際可在調用接口時指定
    pubTopic: testTopic
      #訂閱的主題
    subTopic: gps-topic,oil-topic,broadcast-topic,fault-topic
    completionTimeout: 3000
然後創建配置文件的實體類映射
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {
    /**
     * RabbitMQ連接用戶名
     */
    private String username;
    /**
     * RabbitMQ連接密碼
     */
    private String password;
    /**
     * 推送主題
     */
    private String pubTopic;
    /**
     * RabbitMQ的MQTT連接地址
     */
    private String url;
    /**
     * RabbitMQ的MQTT連接客戶端ID
     */
    private String clientId;
    /**
     * 訂閱主題
     */
    private String subTopic;
    /**
     * 超時時間
     */
    private Integer completionTimeout;
}
接著創建一個消費者
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqttConsumer {
    private final MqttProperties mqttProperties;
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getUrl(), mqttProperties.getClientId(),
                        mqttProperties.getSubTopic().split(","));
        adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
        adapter.setConverter(new DefaultPahoMessageConverter());
        //設置消息質量:0->至多一次;1->至少一次;2->只有一次
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                MessageHeaders headers = message.getHeaders();
                log.info("headers: {}", headers);
                String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
                log.info("訂閱主題為: {}", topic);
                String[] topics = mqttProperties.getSubTopic().split(",");
                for (String t : topics) {
                    if (t.equals(topic)) {
                        log.info("訂閱主題為:{};接收到該主題消息為:{}",topic, message.getPayload());
                    }
                }
            }
        };
    }
}
再創建一個生產者
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProvider {
    private final MqttProperties mqttProperties;
    @Bean
    @SneakyThrows
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttProperties.getUrl()});
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        // 設置是否清空session,這裡如果設置為false表示伺服器會保留客戶端的連接記錄,
        // 把配置里的 cleanSession 設為false,客戶端掉線後 伺服器端不會清除session,
        // 當重連後可以接收之前訂閱主題的消息。當客戶端上線後會接受到它離線的這段時間的消息
        options.setCleanSession(false);
        // 設置超時時間 單位為秒
        options.setConnectionTimeout(10);
        // 設置會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
        options.setKeepAliveInterval(20);
        // 斷開後重連,但這個方法並沒有重新訂閱的機制
        // 在嘗試重新連接之前,它將首先等待1秒,對於每次失敗的重新連接嘗試,延遲將加倍,直到達到2分鐘,此時延遲將保持在2分鐘。
        options.setAutomaticReconnect(true);
        factory.setConnectionOptions(options);
        return factory;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getPubTopic());
        // 設置推送時的消息質量:0->至多一次;1->至少一次;2->只有一次
        // 在rabbitmq 中 qos2會被降級為qos1
        messageHandler.setDefaultQos(1);
        // 設置是否保留消息,設置為true時保留消息會在每次重連時發送
        // 除非發送一條內容空白的新的保留信息才能清除
        messageHandler.setDefaultRetained(false);
        return messageHandler;
    }
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}
再創建一個網關接口用於發送消息
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    /**
     * 發送消息到默認topic
     */
    void sendToMqtt(String payload);
    /**
     * 發送消息到指定topic
     */
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
    /**
     * 發送消息到指定topic並設置QOS
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
最後同樣編寫一個接口用於發送消息
@RestController
@RequiredArgsConstructor
@RequestMapping("/mqtt")
public class MqttController {
    private final MqttGateway mqttGateway;
    @PostMapping("/sendToDefaultTopic")
    public void sendToDefaultTopic(String payload) {
        mqttGateway.sendToMqtt(payload);
    }
    @PostMapping("/sendToTopic")
    public void sendToTopic(String payload, String topic) {
        mqttGateway.sendToMqtt(payload, topic);
    }
}
可以使用第三方 mqtt 軟件進行測試,例如mqttx
安裝完成後,打開軟件創建一個連接
需要注意的是
- 每次連接的 ClinetId 應該是不同且唯一的,相同的 ClinetId 可能會因連接擠掉而丟失消息
- 伺服器連接協議可以是 tcp,也可以是 mqtt
- 如果是 rabbitmq,需要配置用戶的權限
- 如果是 rabbitmq,版本請選擇 3.1.1
- 如果想要消費者離線後仍能收到生產者的消息,請關閉 Clean Session。
連接成功後,調用接口發送消息訂閱者就能收到了
