qinfengge

qinfengge

醉后不知天在水,满船清梦压星河
github
email
telegram

java即時消息推送(二)

之前寫實時推送(一)的時候,是用的 rabbitmq 的 mqtt 插件。在當時它能滿足我的需求,現在有一個新的需求,遠程喚醒,判斷設備是否在線,如果當前時間設備在線,則表示可以被遠程喚醒。 一開始,我還是想在 rabbitmq 的基礎上找找看有沒有什麼 API 能夠獲取到 mqtt 的在線設備。畢竟消息隊列也是用的這個,沒必要再增加中間件。然後我發現 mqtt 有很多約定系統主題,這些系統主題維護了 mqtt broker 的狀態。 但正如我所說,系統主題是約定的配置,並非強制性的,且 rabbitmq 中的 mqtt 實現也只是作為插件補充。這是我在官方倉庫提的discussions

沒辦法,看来只能引入專業的 mqtt 中間件了,這裡我選了EMQX,主要是文檔確實詳細。

安裝 這裡我選擇使用 WSL2 的 Docker 安裝,安裝命令很簡單#

$ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.0.0

安裝成功後可以直接使用 http://localhost:18083 訪問控制面板,默認的賬號名是 admin,密碼是 public

image

配置 安裝完成後還需要在程序中進行配置,因為版本切換成 mqtt V5 了,所以依賴和配置也要更改下#

依賴#

 <!--mqtt依賴包-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
            <version>1.2.5</version>
        </dependency>

配置文件#

# mqtt協議配置
spring:
  mqtt:
    url: tcp://127.0.0.1:1883
    username: admin
    password: admin
    clientId: serverClientId
    #發布的主題--MQTT-默認的消息推送主題,實際可在調用接口時指定
    pubTopic: testTopic
    #訂閱的主題
    subTopic: testTopic,remote-wake
    completionTimeout: 30000

因為 emqx 默認是允許匿名連接的,所以用戶名和密碼可以省略。

生產者 接著創建生產者的配置類#

/**
 * @author lza
 * @date 2023/11/24-10:24
 **/

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProviderConfig {

    private final MqttProperties mqttProperties;

    /**
     * 客戶端對象
     */
    private MqttClient providerClient;

    /**
     * 在bean初始化後連接到服務器
     * @author xct
     * @date 2021/7/30 16:48
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * 客戶端連接服務端
     * @author xct
     * @date 2021/7/30 16:01
     */
    @SneakyThrows
    public void connect(){
        //創建MQTT客戶端對象
        providerClient = new MqttClient(mqttProperties.getUrl(), "providerClient", new MemoryPersistence());
        //連接設置
        MqttConnectionOptions options = new MqttConnectionOptions();
        //是否清空session,設置為false表示服務器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之後能獲取到服務器在客戶端斷開連接期間推送的消息
        //設置為true表示每次連接到服務端都是以新的身份
        options.setCleanStart(true);
        //設置連接用戶名
        options.setUserName(mqttProperties.getUsername());
        //設置連接密碼
        options.setPassword(mqttProperties.getPassword().getBytes());
        //設置超時時間,單位為秒
        options.setConnectionTimeout(100);
        //設置心跳時間 單位為秒,表示服務器每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線
        options.setKeepAliveInterval(20);
        //設置自動重連
        options.setAutomaticReconnect(true);

        //設置回調
        providerClient.setCallback(new MqttProviderCallBack());
        providerClient.connect(options);
    }

    /**
     * 發布消息
     *
     * @param qos      服務質量等級
     *                 0只會發送一次,不管成不成功
     *                 1未成功會繼續發送,直到成功,可能會收到多次
     *                 2未成功會繼續發送,但會保證只收到一次
     * @param retained 保留標誌
     *                 如果設置為true,服務端必須存儲這個應用消息和它的服務質量等級,當有訂閱者訂閱這個主題時,會把消息推送給這個訂閱者
     *                 但服務端對同一個主題只會保留一條retained消息(最後收到的那條)
     * @param topic    主題
     * @param message  消息
     * @author xct
     * @date 2021/7/30 16:27
     */
    @SneakyThrows
    public void publish(int qos, boolean retained, String topic, String message) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(message.getBytes());
        //主題目的地,用於發布/訂閱消息
        MqttTopic mqttTopic = providerClient.getTopic(topic);
        //提供一種機制來跟蹤消息的傳遞進度。
        //用於在以非阻塞方式(在後台運行)執行發布時跟蹤消息的傳遞進度
        MqttToken token;
        //將指定消息發布到主題,但不等待消息傳遞完成。返回的token可用於跟蹤消息的傳遞狀態。
        //一旦此方法乾淨地返回,消息就已被客戶端接受發布。當連接可用時,將在後台完成消息傳遞。
        token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }
}

主要做的事情就是根據配置文件的連接參數創建 mqtt 客戶端對象,然後創建了一個 publish 推送消息的方法。因為在創建生產者客戶端對象時指定了使用 MqttProviderCallBack 做回調函數,所以還需要創建這個回調類

生產者回調 生產者的回調方法按需實現#

/**
 * @author lza
 * @date 2023/11/24-10:34
 **/

public class MqttProviderCallBack implements MqttCallback {



    @Override
    public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
        System.out.println("生產者:與服務器斷開連接");
    }

    @Override
    public void mqttErrorOccurred(MqttException e) {

    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

    }

    @Override
    public void deliveryComplete(IMqttToken iMqttToken) {
        MqttClientInterface client = iMqttToken.getClient();
        System.out.println(client.getClientId() + "發布消息成功!");
    }

    @Override
    public void connectComplete(boolean b, String s) {
    }

    @Override
    public void authPacketArrived(int i, MqttProperties mqttProperties) {

    }


}

消費者 創建完生產者後,還需要有個消費者#

/**
 * @author lza
 * @date 2023/11/24-10:43
 **/

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttConsumerConfig {

    private final MqttProperties mqttProperties;

    /**
     * 客戶端對象
     */
    public MqttClient consumerClient;

    /**
     * 在bean初始化後連接到服務器
     * @author xct
     * @date 2021/7/30 16:48
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * 客戶端連接服務端
     * @author xct
     * @date 2021/7/30 16:01
     */
    @SneakyThrows
    public void connect(){
        //創建MQTT客戶端對象
        consumerClient = new MqttClient(mqttProperties.getUrl(), "consumerClient", new MemoryPersistence());
        //連接設置
        MqttConnectionOptions options = new MqttConnectionOptions();
        //是否清空session,設置為false表示服務器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之後能獲取到服務器在客戶端斷開連接期間推送的消息
        //設置為true表示每次連接到服務端都是以新的身份
        options.setCleanStart(true);
        //設置連接用戶名
        options.setUserName(mqttProperties.getUsername());
        //設置連接密碼
        options.setPassword(mqttProperties.getPassword().getBytes());
        //設置超時時間,單位為秒
        options.setConnectionTimeout(100);
        //設置心跳時間 單位為秒,表示服務器每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線
        options.setKeepAliveInterval(20);
        //設置自動重連
        options.setAutomaticReconnect(true);

        //設置回調
        consumerClient.setCallback(new MqttConsumerCallBack(this));
        consumerClient.connect(options);

        //訂閱主題
        //消息等級,和主題數組一一對應,服務端將按照指定等級給訂閱了主題的客戶端推送消息
        int[] qos = {1,1};
        //主題
        String[] topics = mqttProperties.getSubTopic().split(",");
        //訂閱主題
        consumerClient.subscribe(topics,qos);
    }

    /**
     * 斷開連接
     *
     * @author xct
     * @date 2021/8/2 09:30
     */
    @SneakyThrows
    public void disConnect() {
        consumerClient.disconnect();
    }


    /**
     * 訂閱主題
     *
     * @param topic 主題
     * @param qos   消息等級
     * @author xct
     * @date 2021/7/30 17:12
     */
    @SneakyThrows
    public void subscribe(String topic, int qos) {
        consumerClient.subscribe(topic, qos);
    }
}

消費者同樣需要使用連接信息創建一個消費者的客戶端實例,也同樣需要指定消費者的回調函數,不同的是消費者的方法是 subscribe 訂閱方法和 disConnect 斷開連接方法。

消費者回調#

/**
 * @author lza
 * @date 2023/11/24-10:55
 **/
public class MqttConsumerCallBack implements MqttCallback {

    private final MqttConsumerConfig consumerConfig;

    public MqttConsumerCallBack(MqttConsumerConfig consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

    @Override
    public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
        System.out.println("消費者;與服務器斷開連接");
    }

    @Override
    public void mqttErrorOccurred(MqttException e) {

    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.printf("接收消息主題 : %s%n",s);
        System.out.printf("接收消息Qos : %d%n",mqttMessage.getQos());
        System.out.printf("接收消息內容 : %s%n",new String(mqttMessage.getPayload()));
        System.out.printf("接收消息retained : %b%n",mqttMessage.isRetained());

        // 設置mqttV5 請求響應模式
        List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
        if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
            MqttProperties properties = new MqttProperties();
            if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
                MqttMessage responseMessage = new MqttMessage();
                properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
                responseMessage.setProperties(properties);
                responseMessage.setPayload("設備未連接".getBytes());
                responseMessage.setQos(1);
                responseMessage.setRetained(false);
                consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
            }
        }
    }

    @Override
    public void deliveryComplete(IMqttToken iMqttToken) {

    }

    @Override
    public void connectComplete(boolean b, String s) {

    }

    @Override
    public void authPacketArrived(int i, MqttProperties mqttProperties) {

    }
}

消費者的回調中最重要的就是獲取消息的方法 messageArrived 當消費者訂閱的主題接受到消息時就會進入這個方法。

如果一切正確的話,你可以在 emqx 的後台管理頁面看到這 2 個客戶端,生產者和消費者已經上線了。

image

請求響應 你也許已經看到在消費者的 messageArrived 回調方法中有請求響應的註釋,這也是為什麼要把 mqtt 的版本依賴從 3.1.1 切換到 5 的原因。#

請求響應模式是 mqtt V5 的一大版本特性,你可以查看這篇介紹文檔,和這篇包含有趣例子的文檔

通俗點講,我們都知道 HTTP 請求是明顯的請求 / 響應模型。前端通過接口請求後端,後端處理完數據後把結果返回給前端,無論失敗還是成功,前端總是能夠獲得返回值。 而在 mqtt 的推送訂閱模型中,因為生產者和消費者互不關心的背景(因為 mqtt 通常用在物聯網場景中,而物聯網的大部分場景都會對生產者或消費者過分寬容)。想象一下,你訂閱了一個公眾號,你不必關注這個公眾號什麼時候給你推送,你只需要保證不會漏收公眾號推送的消息。 舉個例子,假如你有一個溫度傳感器放在房間中,且是聯網的,你在外面想要通過手機 APP 獲取傳感器的當前溫度。

image

有點像 HTTP 請求吧,不同的是後端的數據變成傳感器採集的數據了。如果不用請求響應,當然也可以做,比如創建 2 個主題 A 和 B,請求時把消息發送到 A,然後設備訂閱 A,採集完數據後再把數據發送到 B 主題讓 APP 訂閱。

而請求響應怎麼做的呢?發送消息到 A 時,直接指定響應的主題,設備一收到消息,一看上面加粗寫著,有事到 B 找我,直接就把數據返回到 B 主題了。有點像在郵局寄信的流程。

MQTTX中,連接 mqtt 時選擇版本為 5 就可以進行請求響應測試了

image

裡面增加了用戶屬性用作判斷,對應的是消費者的回調方法

// 設置mqttV5 請求響應模式
        List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
        if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
            MqttProperties properties = new MqttProperties();
            if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
                MqttMessage responseMessage = new MqttMessage();
                properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
                responseMessage.setProperties(properties);
                responseMessage.setPayload("設備未連接".getBytes());
                responseMessage.setQos(1);
                responseMessage.setRetained(false);
                consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
            }
        }

如果用戶屬性 action 的值為 remoteWake, 則觸發響應模式,獲取到消息中的響應主題並自動回復。

上下線通知 請求響應只是一個擴展,最初的需求是獲取設備是否在線。 已經更換了 mqtt 中間件,那可以使用系統主題了吧? 答案是可以,但沒必要。因為 emqx 提供了更優雅的方式。 最簡單的方法,直接使用 emqx 的企業版 > 數據存儲的主要使用場景包括將客戶端上下線狀態,訂閱主題信息,消息內容,消息抵達後發送消息回執等操作記錄到 Redis、MySQL、PostgreSQL、MongoDB、Cassandra 等各種數據庫中。用戶也可以通過訂閱相關主題的方式來實現類似的功能,但是在企業版中內置了對這些持久化的支持;相比於前者,後者的執行效率更高,也能大大降低開發者的工作量。#

第二種方法,也是推薦的方法,就是使用webhook來維護一個上下線通知。

首先,創建一個接口,用於獲取 webhook 通知

    private final static String CLIENT_IDS = "clientIds";
    private final static String CONNECTED = "client_connected";
    private final static String DISCONNECTED = "client_disconnected";
    
/**
     * emqx webhook鉤子,用於監聽客戶端上下線
     * @param vo 上下線vo
     */
    @SaIgnore
    @PostMapping("/onlineOrOffline")
    public void onlineOrOffline(@RequestBody OnlineOrOfflineVo vo) {
        System.err.println("客戶端:" + vo.getClientid() +
            ",動作:" + vo.getAction() +
            ",原因:" + vo.getReason());
        List<Object> list = RedisUtils.getCacheList(CLIENT_IDS);
        if (vo.getAction().equals(CONNECTED)) {
            list.add(vo.getClientid());
            // 先刪除原有的值
            RedisUtils.deleteKeys(CLIENT_IDS);
            // 去重
            ArrayList<Object> distinct = CollUtil.distinct(list);
            RedisUtils.setCacheList(CLIENT_IDS, distinct);
        } else if (vo.getAction().equals(DISCONNECTED)){
            list.remove(vo.getClientid());
            // 先刪除原有的值
            RedisUtils.deleteKeys(CLIENT_IDS);
            RedisUtils.setCacheList(CLIENT_IDS, list);
        }

    }

去重的原因是,想象一下,假如 A 客戶端上線,此時 redis 裡面有 A,接著,服務 down 掉了,然後重啟,A 再次上線,那此時 redis 裡面就有 2 個 A 了。

然後在 emqx 的插件中選擇 webhook 插件開啟

image

接著進入 docker 容器內更改插件配置

docker exec -it emqx bash
cd /etc/plugins

image

修改插件

vi emqx_web_hook.conf

最主要的修改文件開頭的 Webhook URL,改成接口地址

image

接著修改下面的通知規則

image

去掉前面的註釋即可,這 2 個就是上下線通知的規則

web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}

完成後,客戶端上下線就能看到通知了

image

但這還不夠完美,假如 mqtt 中間件下線了,那此時 webhook 和 redis 保存的在線設備就沒用了啊。

別急,emqx 提供了HTTP API,可以獲取指定設備是否在線

事實上,API 中還提供了獲取集群下所有客戶端的信息的接口,但是是分頁的,而且一旦有大量的客戶端就不好搞了。所以還是用 webhook 來維護一個上下線通知,再使用接口判斷指定客戶端是否在線比較好,能滿足絕大部分的需求。

/**
     * 使用emqx API 檢測客戶端是否在線
     * @param clientId 客戶端id
     * @return 是否在線
     */
    @SaIgnore
    @GetMapping("/checkClientStatus/{clientId}")
    public R<Boolean> checkClientStatus(@PathVariable String clientId) {
        // 發送GET請求
        String url = "http://localhost:18083/api/v4/clients/" + clientId;
        HttpRequest request = HttpUtil.createGet(url);
        request.basicAuth("admin", "public");

        HttpResponse response = request.execute();

        ClientResponseDto dto = JSONUtil.toBean(response.body(), ClientResponseDto.class);
        return R.ok(!ObjectUtil.isEmpty(dto.getData()) && dto.getData().get(0).getConnected());
    }

EMQX 文檔 > EMQX 安裝使用和部分坑 > MQTT EMQX 中如何監聽客戶端上下線?並在業務中正常使用

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。