qinfengge

qinfengge

醉后不知天在水,满船清梦压星河
github
email
telegram

java即時消息推送(一)

前言#

最近有個業務需求,需要判斷用戶上傳的數據是異常還是正常,如果是異常的情況下需要即時的推送報警消息到用戶。
在大部分情況下,通常是客戶端(瀏覽器)主動發送請求到服務端(伺服器),告訴它需要什麼數據。
但也有一部分情況,需要服務端主動的給客戶端推送消息,比如上面的情況,又比如網頁端常見的掃碼登錄

技術選擇#

實現服務端主動推送的方法有很多種,需要根據業務進行選擇。
簡單且常見的方法是,長輪詢,和短輪詢。聽名字就知道簡單粗暴。
更進階的是,SSE 和 websocket。
更深層的就是各種消息隊列,如 MQTT。
其他的推送也有短信推送或者 APP 通知,比如極光 Push。

實現方式實現原理模式
短輪詢客戶端在特定的的時間間隔(如每 1 秒),由客戶端對服務端發出請求,然後由伺服器返回最新的數據給客戶端的瀏覽器。/
長輪詢服務端客戶端向伺服器發送請求,伺服器接到請求後hold住連接,直到有新消息才返回響應信息並關閉連接,客戶端處理完響應信息後再向伺服器發送新的請求。/
SSE(Server-sent Events)服務端客戶端先向伺服器註冊一個長連接,伺服器獲取到事件後可以發送消息到已註冊的客戶端只能由伺服器推送
Websocket客戶端 / 服務端使用 websocket 需要引入新的依賴,一旦客戶端和伺服器握手成功,它們就處在同一頻道,可以即時的收發消息了全雙工,伺服器和客戶端都既能推送又能接收
Mqtt客戶端 / 服務端mqtt 常用在物聯網業務中,是一種基於發布 / 訂閱模式的輕量級通訊協議,該協議構建在 TCP/IP 協議上。 MQTT 最大的優點在於可以以極少的代碼和有限的帶寬,為遠程設備提供即時可靠的消息服務發布 / 訂閱模式,伺服器和客戶端都既能推送又能接收

對於長 / 短輪詢不再贅述,本次主要講解SSEMQTT方式。

SSE#

在 spring boot 中 SSE 是原生支持的,不需要再導入其他的依賴,這是它的優點,但是它的缺點也很明顯,只支持伺服器單向推送,只支持高級瀏覽器(chrome,Firefox 等)。受限於瀏覽器的限制,每個網頁最多只能保持最多 6 個的長連接;更多的連接可能會佔用更多的內存和計算資源。
首先創建一個 sse 工具類

@Component
@Slf4j
public class SseEmitterUtils {
    /**
     * 當前連接數
     */
    private static AtomicInteger count = new AtomicInteger(0);

    /**
     * 存儲 SseEmitter 信息
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * 創建用戶連接並返回 SseEmitter
     * @param key userId
     * @return SseEmitter
     */
    public static SseEmitter connect(String key) {
        if (sseEmitterMap.containsKey(key)) {
            return sseEmitterMap.get(key);
        }

        try {
            // 設置超時時間,0表示不過期。默認30秒
            SseEmitter sseEmitter = new SseEmitter(0L);
            // 註冊回調
            sseEmitter.onCompletion(completionCallBack(key));
            sseEmitter.onError(errorCallBack(key));
            sseEmitter.onTimeout(timeoutCallBack(key));
            sseEmitterMap.put(key, sseEmitter);
            // 數量+1
            count.getAndIncrement();
            return sseEmitter;
        } catch (Exception e) {
            log.info("創建新的SSE連接異常,當前連接Key為:{}", key);
        }
        return null;
    }

    /**
     * 給指定用戶發送消息
     * @param key userId
     * @param message 消息內容
     */
    public static void sendMessage(String key, String message) {
        if (sseEmitterMap.containsKey(key)) {
            try {
                sseEmitterMap.get(key).send(message);
            } catch (IOException e) {
                log.error("用戶[{}]推送異常:{}", key, e.getMessage());
                remove(key);
            }
        }
    }

    /**
     * 向同組人發布消息,要求:key + groupId
     * @param groupId 群組id
     * @param message 消息內容
     */
    public static void groupSendMessage(String groupId, String message) {
        if (!CollectionUtils.isEmpty(sseEmitterMap)) {
            sseEmitterMap.forEach((k, v) -> {
                try {
                    if (k.startsWith(groupId)) {
                        v.send(message, MediaType.APPLICATION_JSON);
                    }
                } catch (IOException e) {
                    log.error("用戶[{}]推送異常:{}", k, e.getMessage());
                    remove(k);
                }
            });
        }
    }

    /**
     * 廣播群發消息
     * @param message 消息內容
     */
    public static void batchSendMessage(String message) {
        sseEmitterMap.forEach((k, v) -> {
            try {
                v.send(message, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                log.error("用戶[{}]推送異常:{}", k, e.getMessage());
                remove(k);
            }
        });
    }

    /**
     * 群發消息
     * @param message 消息內容
     * @param ids 用戶id集合
     */
    public static void batchSendMessage(String message, Set<String> ids) {
        ids.forEach(userId -> sendMessage(userId, message));
    }

    /**
     * 移除連接
     * @param key userId
     */
    public static void remove(String key) {
        sseEmitterMap.remove(key);
        // 數量-1
        count.getAndDecrement();
        log.info("移除連接:{}", key);
    }

    /**
     * 獲取當前連接信息
     * @return Map
     */
    public static List<String> getIds() {
        return new ArrayList<>(sseEmitterMap.keySet());
    }

    /**
     * 獲取當前連接數量
     * @return int
     */
    public static int getCount() {
        return count.intValue();
    }

    private static Runnable completionCallBack(String key) {
        return () -> {
            log.info("結束連接:{}", key);
            remove(key);
        };
    }

    private static Runnable timeoutCallBack(String key) {
        return () -> {
            log.info("連接超時:{}", key);
            remove(key);
        };
    }

    private static Consumer<Throwable> errorCallBack(String key) {
        return throwable -> {
            log.info("連接異常:{}", key);
            remove(key);
        };
    }
}

然後用工具類實現幾個接口,以便客戶端實現訂閱和服務端主動推送消息。

@RequestMapping("/sse")
@RestController
@Slf4j
@CrossOrigin
public class SSEEmitterController {

    /**
     * 創建連接
     * @param id 用戶id
     * @return SseEmitter
     */
    @GetMapping(path = "/subscribe/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter subscribe(@PathVariable String id) {
        return SseEmitterUtils.connect(id);
    }


    /**
     * 向指定用戶推送消息
     * @param id 用戶id
     * @param content 推送內容
     */
    @PostMapping(path = "/push")
    public void push(String id, String content) {
        SseEmitterUtils.sendMessage(id, content);
    }


    /**
     * 向指定群組推送消息
     * @param groupId 群組id
     * @param content 推送內容
     */
    @PostMapping(path = "/groupPush")
    public void groupPush(String groupId, String content) {
        SseEmitterUtils.groupSendMessage(groupId, content);
    }


    /**
     * 廣播消息
     * @param content 推送內容
     */
    @PostMapping(path = "/pushAll")
    public void pushAll(String content) {
        SseEmitterUtils.batchSendMessage(content);
    }

    /**
     * 關閉連接
     * @param id 用戶id
     * @param request 請求
     */
    @DeleteMapping(path = "/close/{id}")
    public void close(@PathVariable String id, HttpServletRequest request) {
        request.startAsync();
        SseEmitterUtils.remove(id);
    }

}

最後你可以使用下面的 HTML 頁面進行測試

<!DOCTYPE html>
<html lang="en">

<head>
    <title>SSE</title>
    <meta charset="UTF-8">
    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script>
    <script>
        if (window.EventSource) {

            let sources = [];
            // 創建連接  
            for (let i = 1; i < 10; i++) {
                let id = "id_" + i;
                sources[i] = new EventSource('http://localhost:8008/sse/subscribe/' + id);
            }

            /** 
             * 連接一旦建立,就會觸發open事件 
             * 另一種寫法:source.onopen = function (event) {} 
             */
            // 連接打開事件
            sources.forEach(source => {
                let id = source.url.split('/').pop();
                source.addEventListener('open', function (e) {
                    setMessageInnerHTML(id + "連接打開")
                    console.log(id + "連接打開");
                });
            });

            /** 
             * 客戶端收到伺服器發來的數據 
             * 另一種寫法:source.onmessage = function (event) {} 
             */
            // 消息事件 
            sources.forEach(source => {
                let id = source.url.split('/').pop();
                source.addEventListener('message', function (e) {
                    setMessageInnerHTML(id + "收到消息:" + e.data)
                    console.log(id + "收到消息:" + e.data);
                });
            });

            /** 
             * 如果發生通信錯誤(比如連接中斷),就會觸發error事件 
             * 另一種寫法:source.onerror = function (event) {} 
             */
            // 錯誤處理
            sources.forEach(source => {

                let id = source.url.split('/').pop();

                source.addEventListener('error', function (e) {

                    if (e.readyState === EventSource.CLOSED) {
                        setMessageInnerHTML(id + "連接關閉")
                        console.log(id + "連接關閉");
                    } else {
                        setMessageInnerHTML(id + "連接錯誤:", e)
                        console.log(id + "連接錯誤:", e);
                    }

                });

            });
        } else {
            setMessageInnerHTML("瀏覽器不支持SSE");
        }

        // 監聽窗口關閉事件,主動去關閉sse連接,如果伺服器設置永不過期,瀏覽器關閉後手動清理伺服器數據 
        window.onbeforeunload = function () {
            source.close();
            const httpRequest = new XMLHttpRequest();
            httpRequest.open('GET', 'http://localhost:8008/sse/close/' + id, true);
            httpRequest.send();
            console.log("close");
        };

        // 將消息顯示在網頁上 
        function setMessageInnerHTML(innerHTML) {
            $("#contentDiv").append("<br/>" + innerHTML);
        } 
    </script>
</head>

<body>
    <div>
        <div>
            <div id="contentDiv" style="height:800px; width:1000px; overflow:scroll; background:#ccc;">
            </div>
        </div>
    </div>
</body>

</html>

然後調用接口進行測試

image

可以看到,瀏覽器最終只保留了 6 個連接,其它的都被丟棄掉。

MQTT#

要實現 mqtt 需要在伺服器添加下面的依賴

<!--mqtt依賴包-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>

然後需要安裝中間件,比如EMQX,又或者直接使用 RabbitMq。需要注意的是 rabbitMq 不支持 QOS2 級別的消息等級

image

關於 mqtt 的 qos 消息質量等級可以看下表的解釋

QoS 級別描述适用場景
0最多一次交付,消息不會被確認或重傳適用於不重要的數據傳輸,如傳感器數據等
1至少一次交付,確保消息至少被傳遞一次,但可能重複傳遞適用於需要確保消息傳遞,但允許重複傳遞的場景
2只有一次交付,確保消息僅被傳遞一次,不允許重複傳遞適用於需要確保消息精確傳遞,且不允許重複傳遞的場景

下面以使用 rabbitmq 中間件為例
配置 mqtt 協議信息

server:
  port: 8008

spring:
  application:
    name: mqtt測試項目
  mqtt:
    url: tcp://127.0.0.1:1883
    username: guest
    password: guest
    clientId: serverClientId
      #發布的主題--MQTT-默認的消息推送主題,實際可在調用接口時指定
    pubTopic: testTopic
      #訂閱的主題
    subTopic: gps-topic,oil-topic,broadcast-topic,fault-topic
    completionTimeout: 3000

然後創建配置文件的實體類映射

@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {

    /**
     * RabbitMQ連接用戶名
     */
    private String username;
    /**
     * RabbitMQ連接密碼
     */
    private String password;
    /**
     * 推送主題
     */
    private String pubTopic;
    /**
     * RabbitMQ的MQTT連接地址
     */
    private String url;

    /**
     * RabbitMQ的MQTT連接客戶端ID
     */
    private String clientId;

    /**
     * 訂閱主題
     */
    private String subTopic;

    /**
     * 超時時間
     */
    private Integer completionTimeout;
}

接著創建一個消費者

@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqttConsumer {

    private final MqttProperties mqttProperties;

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getUrl(), mqttProperties.getClientId(),
                        mqttProperties.getSubTopic().split(","));
        adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
        adapter.setConverter(new DefaultPahoMessageConverter());
        //設置消息質量:0->至多一次;1->至少一次;2->只有一次
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                MessageHeaders headers = message.getHeaders();
                log.info("headers: {}", headers);
                String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
                log.info("訂閱主題為: {}", topic);
                String[] topics = mqttProperties.getSubTopic().split(",");
                for (String t : topics) {
                    if (t.equals(topic)) {
                        log.info("訂閱主題為:{};接收到該主題消息為:{}",topic, message.getPayload());
                    }
                }
            }

        };
    }
}

再創建一個生產者

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProvider {

    private final MqttProperties mqttProperties;

    @Bean
    @SneakyThrows
    public MqttPahoClientFactory mqttClientFactory() {

        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttProperties.getUrl()});
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        // 設置是否清空session,這裡如果設置為false表示伺服器會保留客戶端的連接記錄,
        // 把配置里的 cleanSession 設為false,客戶端掉線後 伺服器端不會清除session,
        // 當重連後可以接收之前訂閱主題的消息。當客戶端上線後會接受到它離線的這段時間的消息
        options.setCleanSession(false);
        // 設置超時時間 單位為秒
        options.setConnectionTimeout(10);
        // 設置會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
        options.setKeepAliveInterval(20);
        // 斷開後重連,但這個方法並沒有重新訂閱的機制
        // 在嘗試重新連接之前,它將首先等待1秒,對於每次失敗的重新連接嘗試,延遲將加倍,直到達到2分鐘,此時延遲將保持在2分鐘。
        options.setAutomaticReconnect(true);
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getPubTopic());
        // 設置推送時的消息質量:0->至多一次;1->至少一次;2->只有一次
        // 在rabbitmq 中 qos2會被降級為qos1
        messageHandler.setDefaultQos(1);
        // 設置是否保留消息,設置為true時保留消息會在每次重連時發送
        // 除非發送一條內容空白的新的保留信息才能清除
        messageHandler.setDefaultRetained(false);
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

再創建一個網關接口用於發送消息

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    /**
     * 發送消息到默認topic
     */
    void sendToMqtt(String payload);

    /**
     * 發送消息到指定topic
     */
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

    /**
     * 發送消息到指定topic並設置QOS
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

最後同樣編寫一個接口用於發送消息

@RestController
@RequiredArgsConstructor
@RequestMapping("/mqtt")
public class MqttController {

    private final MqttGateway mqttGateway;

    @PostMapping("/sendToDefaultTopic")
    public void sendToDefaultTopic(String payload) {
        mqttGateway.sendToMqtt(payload);
    }

    @PostMapping("/sendToTopic")
    public void sendToTopic(String payload, String topic) {
        mqttGateway.sendToMqtt(payload, topic);
    }
}

可以使用第三方 mqtt 軟件進行測試,例如mqttx
安裝完成後,打開軟件創建一個連接

image

需要注意的是

  1. 每次連接的 ClinetId 應該是不同且唯一的,相同的 ClinetId 可能會因連接擠掉而丟失消息
  2. 伺服器連接協議可以是 tcp,也可以是 mqtt
  3. 如果是 rabbitmq,需要配置用戶的權限
  4. 如果是 rabbitmq,版本請選擇 3.1.1
  5. 如果想要消費者離線後仍能收到生產者的消息,請關閉 Clean Session。

連接成功後,調用接口發送消息訂閱者就能收到了

image

springboot 整合 mqtt 實現消息發送和消費,以及客戶端斷線重連之後的消息恢復

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。