之前寫實時推送(一)的時候,是用的 rabbitmq 的 mqtt 插件。在當時它能滿足我的需求,現在有一個新的需求,遠程喚醒,判斷設備是否在線,如果當前時間設備在線,則表示可以被遠程喚醒。 一開始,我還是想在 rabbitmq 的基礎上找找看有沒有什麼 API 能夠獲取到 mqtt 的在線設備。畢竟消息隊列也是用的這個,沒必要再增加中間件。然後我發現 mqtt 有很多約定的系統主題,這些系統主題維護了 mqtt broker 的狀態。 但正如我所說,系統主題是約定的配置,並非強制性的,且 rabbitmq 中的 mqtt 實現也只是作為插件補充。這是我在官方倉庫提的discussions
沒辦法,看来只能引入專業的 mqtt 中間件了,這裡我選了EMQX,主要是文檔確實詳細。
安裝 這裡我選擇使用 WSL2 的 Docker 安裝,安裝命令很簡單#
$ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.0.0
安裝成功後可以直接使用 http://localhost:18083
訪問控制面板,默認的賬號名是 admin
,密碼是 public
配置 安裝完成後還需要在程序中進行配置,因為版本切換成 mqtt V5 了,所以依賴和配置也要更改下#
依賴#
<!--mqtt依賴包-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
配置文件#
# mqtt協議配置
spring:
mqtt:
url: tcp://127.0.0.1:1883
username: admin
password: admin
clientId: serverClientId
#發布的主題--MQTT-默認的消息推送主題,實際可在調用接口時指定
pubTopic: testTopic
#訂閱的主題
subTopic: testTopic,remote-wake
completionTimeout: 30000
因為 emqx 默認是允許匿名連接的,所以用戶名和密碼可以省略。
生產者 接著創建生產者的配置類#
/**
* @author lza
* @date 2023/11/24-10:24
**/
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProviderConfig {
private final MqttProperties mqttProperties;
/**
* 客戶端對象
*/
private MqttClient providerClient;
/**
* 在bean初始化後連接到服務器
* @author xct
* @date 2021/7/30 16:48
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客戶端連接服務端
* @author xct
* @date 2021/7/30 16:01
*/
@SneakyThrows
public void connect(){
//創建MQTT客戶端對象
providerClient = new MqttClient(mqttProperties.getUrl(), "providerClient", new MemoryPersistence());
//連接設置
MqttConnectionOptions options = new MqttConnectionOptions();
//是否清空session,設置為false表示服務器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之後能獲取到服務器在客戶端斷開連接期間推送的消息
//設置為true表示每次連接到服務端都是以新的身份
options.setCleanStart(true);
//設置連接用戶名
options.setUserName(mqttProperties.getUsername());
//設置連接密碼
options.setPassword(mqttProperties.getPassword().getBytes());
//設置超時時間,單位為秒
options.setConnectionTimeout(100);
//設置心跳時間 單位為秒,表示服務器每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
//設置自動重連
options.setAutomaticReconnect(true);
//設置回調
providerClient.setCallback(new MqttProviderCallBack());
providerClient.connect(options);
}
/**
* 發布消息
*
* @param qos 服務質量等級
* 0只會發送一次,不管成不成功
* 1未成功會繼續發送,直到成功,可能會收到多次
* 2未成功會繼續發送,但會保證只收到一次
* @param retained 保留標誌
* 如果設置為true,服務端必須存儲這個應用消息和它的服務質量等級,當有訂閱者訂閱這個主題時,會把消息推送給這個訂閱者
* 但服務端對同一個主題只會保留一條retained消息(最後收到的那條)
* @param topic 主題
* @param message 消息
* @author xct
* @date 2021/7/30 16:27
*/
@SneakyThrows
public void publish(int qos, boolean retained, String topic, String message) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
//主題目的地,用於發布/訂閱消息
MqttTopic mqttTopic = providerClient.getTopic(topic);
//提供一種機制來跟蹤消息的傳遞進度。
//用於在以非阻塞方式(在後台運行)執行發布時跟蹤消息的傳遞進度
MqttToken token;
//將指定消息發布到主題,但不等待消息傳遞完成。返回的token可用於跟蹤消息的傳遞狀態。
//一旦此方法乾淨地返回,消息就已被客戶端接受發布。當連接可用時,將在後台完成消息傳遞。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
}
主要做的事情就是根據配置文件的連接參數創建 mqtt 客戶端對象,然後創建了一個 publish
推送消息的方法。因為在創建生產者客戶端對象時指定了使用 MqttProviderCallBack
做回調函數,所以還需要創建這個回調類
生產者回調 生產者的回調方法按需實現#
/**
* @author lza
* @date 2023/11/24-10:34
**/
public class MqttProviderCallBack implements MqttCallback {
@Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
System.out.println("生產者:與服務器斷開連接");
}
@Override
public void mqttErrorOccurred(MqttException e) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
@Override
public void deliveryComplete(IMqttToken iMqttToken) {
MqttClientInterface client = iMqttToken.getClient();
System.out.println(client.getClientId() + "發布消息成功!");
}
@Override
public void connectComplete(boolean b, String s) {
}
@Override
public void authPacketArrived(int i, MqttProperties mqttProperties) {
}
}
消費者 創建完生產者後,還需要有個消費者#
/**
* @author lza
* @date 2023/11/24-10:43
**/
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttConsumerConfig {
private final MqttProperties mqttProperties;
/**
* 客戶端對象
*/
public MqttClient consumerClient;
/**
* 在bean初始化後連接到服務器
* @author xct
* @date 2021/7/30 16:48
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客戶端連接服務端
* @author xct
* @date 2021/7/30 16:01
*/
@SneakyThrows
public void connect(){
//創建MQTT客戶端對象
consumerClient = new MqttClient(mqttProperties.getUrl(), "consumerClient", new MemoryPersistence());
//連接設置
MqttConnectionOptions options = new MqttConnectionOptions();
//是否清空session,設置為false表示服務器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之後能獲取到服務器在客戶端斷開連接期間推送的消息
//設置為true表示每次連接到服務端都是以新的身份
options.setCleanStart(true);
//設置連接用戶名
options.setUserName(mqttProperties.getUsername());
//設置連接密碼
options.setPassword(mqttProperties.getPassword().getBytes());
//設置超時時間,單位為秒
options.setConnectionTimeout(100);
//設置心跳時間 單位為秒,表示服務器每隔1.5*20秒的時間向客戶端發送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
//設置自動重連
options.setAutomaticReconnect(true);
//設置回調
consumerClient.setCallback(new MqttConsumerCallBack(this));
consumerClient.connect(options);
//訂閱主題
//消息等級,和主題數組一一對應,服務端將按照指定等級給訂閱了主題的客戶端推送消息
int[] qos = {1,1};
//主題
String[] topics = mqttProperties.getSubTopic().split(",");
//訂閱主題
consumerClient.subscribe(topics,qos);
}
/**
* 斷開連接
*
* @author xct
* @date 2021/8/2 09:30
*/
@SneakyThrows
public void disConnect() {
consumerClient.disconnect();
}
/**
* 訂閱主題
*
* @param topic 主題
* @param qos 消息等級
* @author xct
* @date 2021/7/30 17:12
*/
@SneakyThrows
public void subscribe(String topic, int qos) {
consumerClient.subscribe(topic, qos);
}
}
消費者同樣需要使用連接信息創建一個消費者的客戶端實例,也同樣需要指定消費者的回調函數,不同的是消費者的方法是 subscribe
訂閱方法和 disConnect
斷開連接方法。
消費者回調#
/**
* @author lza
* @date 2023/11/24-10:55
**/
public class MqttConsumerCallBack implements MqttCallback {
private final MqttConsumerConfig consumerConfig;
public MqttConsumerCallBack(MqttConsumerConfig consumerConfig) {
this.consumerConfig = consumerConfig;
}
@Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
System.out.println("消費者;與服務器斷開連接");
}
@Override
public void mqttErrorOccurred(MqttException e) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.printf("接收消息主題 : %s%n",s);
System.out.printf("接收消息Qos : %d%n",mqttMessage.getQos());
System.out.printf("接收消息內容 : %s%n",new String(mqttMessage.getPayload()));
System.out.printf("接收消息retained : %b%n",mqttMessage.isRetained());
// 設置mqttV5 請求響應模式
List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
MqttProperties properties = new MqttProperties();
if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
MqttMessage responseMessage = new MqttMessage();
properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
responseMessage.setProperties(properties);
responseMessage.setPayload("設備未連接".getBytes());
responseMessage.setQos(1);
responseMessage.setRetained(false);
consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
}
}
}
@Override
public void deliveryComplete(IMqttToken iMqttToken) {
}
@Override
public void connectComplete(boolean b, String s) {
}
@Override
public void authPacketArrived(int i, MqttProperties mqttProperties) {
}
}
消費者的回調中最重要的就是獲取消息的方法 messageArrived
當消費者訂閱的主題接受到消息時就會進入這個方法。
如果一切正確的話,你可以在 emqx 的後台管理頁面看到這 2 個客戶端,生產者和消費者已經上線了。
請求響應 你也許已經看到在消費者的 messageArrived
回調方法中有請求響應的註釋,這也是為什麼要把 mqtt 的版本依賴從 3.1.1 切換到 5 的原因。#
請求響應模式是 mqtt V5 的一大版本特性,你可以查看這篇介紹文檔,和這篇包含有趣例子的文檔
通俗點講,我們都知道 HTTP 請求是明顯的請求 / 響應模型。前端通過接口請求後端,後端處理完數據後把結果返回給前端,無論失敗還是成功,前端總是能夠獲得返回值。 而在 mqtt 的推送訂閱模型中,因為生產者和消費者互不關心的背景(因為 mqtt 通常用在物聯網場景中,而物聯網的大部分場景都會對生產者或消費者過分寬容)。想象一下,你訂閱了一個公眾號,你不必關注這個公眾號什麼時候給你推送,你只需要保證不會漏收公眾號推送的消息。 舉個例子,假如你有一個溫度傳感器放在房間中,且是聯網的,你在外面想要通過手機 APP 獲取傳感器的當前溫度。
有點像 HTTP 請求吧,不同的是後端的數據變成傳感器採集的數據了。如果不用請求響應,當然也可以做,比如創建 2 個主題 A 和 B,請求時把消息發送到 A,然後設備訂閱 A,採集完數據後再把數據發送到 B 主題讓 APP 訂閱。
而請求響應怎麼做的呢?發送消息到 A 時,直接指定響應的主題,設備一收到消息,一看上面加粗寫著,有事到 B 找我,直接就把數據返回到 B 主題了。有點像在郵局寄信的流程。
在MQTTX中,連接 mqtt 時選擇版本為 5 就可以進行請求響應測試了
裡面增加了用戶屬性用作判斷,對應的是消費者的回調方法
// 設置mqttV5 請求響應模式
List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
MqttProperties properties = new MqttProperties();
if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
MqttMessage responseMessage = new MqttMessage();
properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
responseMessage.setProperties(properties);
responseMessage.setPayload("設備未連接".getBytes());
responseMessage.setQos(1);
responseMessage.setRetained(false);
consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
}
}
如果用戶屬性 action
的值為 remoteWake
, 則觸發響應模式,獲取到消息中的響應主題並自動回復。
上下線通知 請求響應只是一個擴展,最初的需求是獲取設備是否在線。 已經更換了 mqtt 中間件,那可以使用系統主題了吧? 答案是可以,但沒必要。因為 emqx 提供了更優雅的方式。 最簡單的方法,直接使用 emqx 的企業版 > 數據存儲的主要使用場景包括將客戶端上下線狀態,訂閱主題信息,消息內容,消息抵達後發送消息回執等操作記錄到 Redis、MySQL、PostgreSQL、MongoDB、Cassandra 等各種數據庫中。用戶也可以通過訂閱相關主題的方式來實現類似的功能,但是在企業版中內置了對這些持久化的支持;相比於前者,後者的執行效率更高,也能大大降低開發者的工作量。#
第二種方法,也是推薦的方法,就是使用webhook來維護一個上下線通知。
首先,創建一個接口,用於獲取 webhook 通知
private final static String CLIENT_IDS = "clientIds";
private final static String CONNECTED = "client_connected";
private final static String DISCONNECTED = "client_disconnected";
/**
* emqx webhook鉤子,用於監聽客戶端上下線
* @param vo 上下線vo
*/
@SaIgnore
@PostMapping("/onlineOrOffline")
public void onlineOrOffline(@RequestBody OnlineOrOfflineVo vo) {
System.err.println("客戶端:" + vo.getClientid() +
",動作:" + vo.getAction() +
",原因:" + vo.getReason());
List<Object> list = RedisUtils.getCacheList(CLIENT_IDS);
if (vo.getAction().equals(CONNECTED)) {
list.add(vo.getClientid());
// 先刪除原有的值
RedisUtils.deleteKeys(CLIENT_IDS);
// 去重
ArrayList<Object> distinct = CollUtil.distinct(list);
RedisUtils.setCacheList(CLIENT_IDS, distinct);
} else if (vo.getAction().equals(DISCONNECTED)){
list.remove(vo.getClientid());
// 先刪除原有的值
RedisUtils.deleteKeys(CLIENT_IDS);
RedisUtils.setCacheList(CLIENT_IDS, list);
}
}
去重的原因是,想象一下,假如 A 客戶端上線,此時 redis 裡面有 A,接著,服務 down 掉了,然後重啟,A 再次上線,那此時 redis 裡面就有 2 個 A 了。
然後在 emqx 的插件中選擇 webhook 插件開啟
接著進入 docker 容器內更改插件配置
docker exec -it emqx bash
cd /etc/plugins
修改插件
vi emqx_web_hook.conf
最主要的修改文件開頭的 Webhook URL,改成接口地址
接著修改下面的通知規則
去掉前面的註釋即可,這 2 個就是上下線通知的規則
web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
完成後,客戶端上下線就能看到通知了
但這還不夠完美,假如 mqtt 中間件下線了,那此時 webhook 和 redis 保存的在線設備就沒用了啊。
別急,emqx 提供了HTTP API,可以獲取指定設備是否在線
事實上,API 中還提供了獲取集群下所有客戶端的信息的接口,但是是分頁的,而且一旦有大量的客戶端就不好搞了。所以還是用 webhook 來維護一個上下線通知,再使用接口判斷指定客戶端是否在線比較好,能滿足絕大部分的需求。
/**
* 使用emqx API 檢測客戶端是否在線
* @param clientId 客戶端id
* @return 是否在線
*/
@SaIgnore
@GetMapping("/checkClientStatus/{clientId}")
public R<Boolean> checkClientStatus(@PathVariable String clientId) {
// 發送GET請求
String url = "http://localhost:18083/api/v4/clients/" + clientId;
HttpRequest request = HttpUtil.createGet(url);
request.basicAuth("admin", "public");
HttpResponse response = request.execute();
ClientResponseDto dto = JSONUtil.toBean(response.body(), ClientResponseDto.class);
return R.ok(!ObjectUtil.isEmpty(dto.getData()) && dto.getData().get(0).getConnected());
}