之前写实时推送(一)的时候,是用的 rabbitmq 的 mqtt 插件。在当时它能满足我的需求,现在有一个新的需求,远程唤醒,判断设备是否在线,如果当前时间设备在线,则表示可以被远程唤醒。
一开始,我还是想在 rabbitmq 的基础上找找看有没有什么 API 能够获取到 mqtt 的在线设备。毕竟消息队列也是用的这个,没必要再增加中间件。然后我发现 mqtt 有很多约定的系统主题,这些系统主题维护了 mqtt broker 的状态。
但正如我所说,系统主题是约定的配置,并非强制性的,且 rabbitmq 中的 mqtt 实现也只是作为插件补充。这是我在官方仓库提的discussions
没办法,看来只能引入专业的 mqtt 中间件了,这里我选了EMQX, 主要是文档确实详细。
安装#
这里我选择使用 WSL2 的 Docker 安装,安装命令很简单
$ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.0.0
安装成功后可以直接使用 http://localhost:18083
访问控制面板,默认的账号名是 admin
,密码是 public
配置#
安装完成后还需要在程序中进行配置,因为版本切换成 mqtt V5 了,所以依赖和配置也要更改下
依赖#
<!--mqtt依赖包-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
配置文件#
# mqtt协议配置
spring:
mqtt:
url: tcp://127.0.0.1:1883
username: admin
password: admin
clientId: serverClientId
#发布的主题--MQTT-默认的消息推送主题,实际可在调用接口时指定
pubTopic: testTopic
#订阅的主题
subTopic: testTopic,remote-wake
completionTimeout: 30000
因为 emqx 默认是允许匿名连接的,所以用户名和密码可以省略。
生产者#
接着创建生产者的配置类
/**
* @author lza
* @date 2023/11/24-10:24
**/
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProviderConfig {
private final MqttProperties mqttProperties;
/**
* 客户端对象
*/
private MqttClient providerClient;
/**
* 在bean初始化后连接到服务器
* @author xct
* @date 2021/7/30 16:48
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客户端连接服务端
* @author xct
* @date 2021/7/30 16:01
*/
@SneakyThrows
public void connect(){
//创建MQTT客户端对象
providerClient = new MqttClient(mqttProperties.getUrl(), "providerClient", new MemoryPersistence());
//连接设置
MqttConnectionOptions options = new MqttConnectionOptions();
//是否清空session,设置为false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接到服务端都是以新的身份
options.setCleanStart(true);
//设置连接用户名
options.setUserName(mqttProperties.getUsername());
//设置连接密码
options.setPassword(mqttProperties.getPassword().getBytes());
//设置超时时间,单位为秒
options.setConnectionTimeout(100);
//设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
//设置自动重连
options.setAutomaticReconnect(true);
//设置回调
providerClient.setCallback(new MqttProviderCallBack());
providerClient.connect(options);
}
/**
* 发布消息
*
* @param qos 服务质量等级
* 0只会发送一次,不管成不成功
* 1未成功会继续发送,直到成功,可能会收到多次
* 2未成功会继续发送,但会保证只收到一次
* @param retained 保留标志
* 如果设置为true,服务端必须存储这个应用消息和它的服务质量等级,当有订阅者订 阅这个主题时,会把消息推送给这个订阅者
* 但服务端对同一个主题只会保留一条retained消息(最后收到的那条)
* @param topic 主题
* @param message 消息
* @author xct
* @date 2021/7/30 16:27
*/
@SneakyThrows
public void publish(int qos, boolean retained, String topic, String message) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
//主题目的地,用于发布/订阅消息
MqttTopic mqttTopic = providerClient.getTopic(topic);
//提供一种机制来跟踪消息的传递进度。
//用于在以非阻塞方式(在后台运行)执行发布时跟踪消息的传递进度
MqttToken token;
//将指定消息发布到主题,但不等待消息传递完成。返回的token可用于跟踪消息的传递状态。
//一旦此方法干净地返回,消息就已被客户端接受发布。当连接可用时,将在后台完成消息传递。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
主要做的事情就是根据配置文件的连接参数创建 mqtt 客户端对象,然后创建了一个 publish
推送消息的方法。
因为在创建生产者客户端对象时指定了使用 MqttProviderCallBack
做回调函数,所以还需要创建这个回调类
生产者回调#
生产者的回调方法按需实现
/**
* @author lza
* @date 2023/11/24-10:34
**/
public class MqttProviderCallBack implements MqttCallback {
@Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
System.out.println("生产者:与服务器断开连接");
}
@Override
public void mqttErrorOccurred(MqttException e) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
@Override
public void deliveryComplete(IMqttToken iMqttToken) {
MqttClientInterface client = iMqttToken.getClient();
System.out.println(client.getClientId() + "发布消息成功!");
}
@Override
public void connectComplete(boolean b, String s) {
}
@Override
public void authPacketArrived(int i, MqttProperties mqttProperties) {
}
}
消费者#
创建完生产者后,还需要有个消费者
/**
* @author lza
* @date 2023/11/24-10:43
**/
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttConsumerConfig {
private final MqttProperties mqttProperties;
/**
* 客户端对象
*/
public MqttClient consumerClient;
/**
* 在bean初始化后连接到服务器
* @author xct
* @date 2021/7/30 16:48
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客户端连接服务端
* @author xct
* @date 2021/7/30 16:01
*/
@SneakyThrows
public void connect(){
//创建MQTT客户端对象
consumerClient = new MqttClient(mqttProperties.getUrl(), "consumerClient", new MemoryPersistence());
//连接设置
MqttConnectionOptions options = new MqttConnectionOptions();
//是否清空session,设置为false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接到服务端都是以新的身份
options.setCleanStart(true);
//设置连接用户名
options.setUserName(mqttProperties.getUsername());
//设置连接密码
options.setPassword(mqttProperties.getPassword().getBytes());
//设置超时时间,单位为秒
options.setConnectionTimeout(100);
//设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
//设置自动重连
options.setAutomaticReconnect(true);
//设置回调
consumerClient.setCallback(new MqttConsumerCallBack(this));
consumerClient.connect(options);
//订阅主题
//消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
int[] qos = {1,1};
//主题
String[] topics = mqttProperties.getSubTopic().split(",");
//订阅主题
consumerClient.subscribe(topics,qos);
}
/**
* 断开连接
*
* @author xct
* @date 2021/8/2 09:30
*/
@SneakyThrows
public void disConnect() {
consumerClient.disconnect();
}
/**
* 订阅主题
*
* @param topic 主题
* @param qos 消息等级
* @author xct
* @date 2021/7/30 17:12
*/
@SneakyThrows
public void subscribe(String topic, int qos) {
consumerClient.subscribe(topic, qos);
}
}
消费者同样需要使用连接信息创建一个消费者的客户端实例,也同样需要指定消费者的回调函数,不同的是消费者的方法是 subscribe
订阅方法和 disConnect
断开连接方法。
消费者回调#
/**
* @author lza
* @date 2023/11/24-10:55
**/
public class MqttConsumerCallBack implements MqttCallback {
private final MqttConsumerConfig consumerConfig;
public MqttConsumerCallBack(MqttConsumerConfig consumerConfig) {
this.consumerConfig = consumerConfig;
}
@Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
System.out.println("消费者;与服务器断开连接");
}
@Override
public void mqttErrorOccurred(MqttException e) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.printf("接收消息主题 : %s%n",s);
System.out.printf("接收消息Qos : %d%n",mqttMessage.getQos());
System.out.printf("接收消息内容 : %s%n",new String(mqttMessage.getPayload()));
System.out.printf("接收消息retained : %b%n",mqttMessage.isRetained());
// 设置mqttV5 请求响应模式
List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
MqttProperties properties = new MqttProperties();
if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
MqttMessage responseMessage = new MqttMessage();
properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
responseMessage.setProperties(properties);
responseMessage.setPayload("设备未连接".getBytes());
responseMessage.setQos(1);
responseMessage.setRetained(false);
consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
}
}
}
@Override
public void deliveryComplete(IMqttToken iMqttToken) {
}
@Override
public void connectComplete(boolean b, String s) {
}
@Override
public void authPacketArrived(int i, MqttProperties mqttProperties) {
}
}
消费者的回调中最重要的就是获取消息的方法 messageArrived
当消费者订阅的主题接受到消息时就会进入这个方法。
如果一切正确的话,你可以在 emqx 的后台管理页面看到这 2 个客户端,生产者和消费者已经上线了。
请求响应#
你也许已经看到在消费者的 messageArrived
回调方法中有请求响应的注释,这也是为什么要把 mqtt 的版本依赖从 3.1.1 切换到 5 的原因。
请求响应模式是 mqtt V5 的一大版本特性,你可以查看这篇介绍文档,和这篇包含有趣例子的文档
通俗点讲,我们都知道 HTTP 请求是明显的请求 / 响应模型。前端通过接口请求后端,后端处理完数据后把结果返回给前端,无论失败还是成功,前端总是能够获得返回值。
而在 mqtt 的推送订阅模型中,因为生产者和消费者互不关心的背景(因为 mqtt 通常用在物联网场景中,而物联网的大部分场景都会对生产者或消费者过分宽容)。想象一下,你订阅了一个公众号,你不必关注这个公众号什么时候给你推送,你只需要保证不会漏收公众号推送的消息。
举个例子,假如你有一个温度传感器放在房间中,且是联网的,你在外面想要通过手机 APP 获取传感器的当前温度。
有点像 HTTP 请求吧,不同的是后端的数据变成传感器采集的数据了。如果不用请求响应,当然也可以做,比如创建 2 个主题 A 和 B,请求时把消息发送到 A,然后设备订阅 A,采集完数据后再把数据发送到 B 主题让 APP 订阅。
而请求响应怎么做的呢?发送消息到 A 时,直接指定响应的主题,设备一收到消息,一看上面加粗写着,有事到 B 找我,直接就把数据返回到 B 主题了。有点像在邮局寄信的流程。
在MQTTX中,连接 mqtt 时选择版本为 5 就可以进行请求响应测试了
里面增加了用户属性用作判断,对应的是消费者的回调方法
// 设置mqttV5 请求响应模式
List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
MqttProperties properties = new MqttProperties();
if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
MqttMessage responseMessage = new MqttMessage();
properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
responseMessage.setProperties(properties);
responseMessage.setPayload("设备未连接".getBytes());
responseMessage.setQos(1);
responseMessage.setRetained(false);
consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
}
}
如果用户属性 action
的值为 remoteWake
, 则触发响应模式,获取到消息中的响应主题并自动回复。
上下线通知#
请求响应只是一个扩展,最初的需求是获取设备是否在线。
已经更换了 mqtt 中间件,那可以使用系统主题了吧?
答案是可以,但没必要。因为 emqx 提供了更优雅的方式。
最简单的方法,直接使用 emqx 的企业版
数据存储的主要使用场景包括将客户端上下线状态,订阅主题信息,消息内容,消息抵达后发送消息回执等操作记录到 Redis、MySQL、PostgreSQL、MongoDB、Cassandra 等各种数据库中。用户也可以通过订阅相关主题的方式来实现类似的功能,但是在企业版中内置了对这些持久化的支持;相比于前者,后者的执行效率更高,也能大大降低开发者的工作量。
第二种方法,也是推荐的方法,就是使用webhook来维护一个上下线通知。
首先,创建一个接口,用于获取 webhook 通知
private final static String CLIENT_IDS = "clientIds";
private final static String CONNECTED = "client_connected";
private final static String DISCONNECTED = "client_disconnected";
/**
* emqx webhook钩子,用于监听客户端上下线
* @param vo 上下线vo
*/
@SaIgnore
@PostMapping("/onlineOrOffline")
public void onlineOrOffline(@RequestBody OnlineOrOfflineVo vo) {
System.err.println("客户端:" + vo.getClientid() +
",动作:" + vo.getAction() +
",原因:" + vo.getReason());
List<Object> list = RedisUtils.getCacheList(CLIENT_IDS);
if (vo.getAction().equals(CONNECTED)) {
list.add(vo.getClientid());
// 先删除原有的值
RedisUtils.deleteKeys(CLIENT_IDS);
// 去重
ArrayList<Object> distinct = CollUtil.distinct(list);
RedisUtils.setCacheList(CLIENT_IDS, distinct);
} else if (vo.getAction().equals(DISCONNECTED)){
list.remove(vo.getClientid());
// 先删除原有的值
RedisUtils.deleteKeys(CLIENT_IDS);
RedisUtils.setCacheList(CLIENT_IDS, list);
}
}
去重的原因是,想象一下,假如 A 客户端上线,此时 redis 里面有 A,接着,服务 down 掉了,然后重启,A 再次上线,那此时 redis 里面就有 2 个 A 了。
然后在 emqx 的插件中选择 webhook 插件开启
接着进入 docker 容器内更改插件配置
docker exec -it emqx bash
cd /etc/plugins
修改插件
vi emqx_web_hook.conf
最主要的修改文件开头的 Webhook URL,改成接口地址
接着修改下面的通知规则
去掉前面的注释即可,这 2 个就是上下线通知的规则
web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
完成后,客户端上下线就能看到通知了
但这还不够完美,假如 mqtt 中间件下线了,那此时 webhook 和 redis 保存的在线设备就没用了啊。
别急,emqx 提供了HTTP API,可以获取指定设备是否在线
事实上,API 中还提供了获取集群下所有客户端的信息的接口,但是是分页的,而且一旦有大量的客户端就不好搞了。所以还是用 webhook 来维护一个上下线通知,再使用接口判断指定客户端是否在线比较好,能满足绝大部分的需求。
/**
* 使用emqx API 检测客户端是否在线
* @param clientId 客户端id
* @return 是否在线
*/
@SaIgnore
@GetMapping("/checkClientStatus/{clientId}")
public R<Boolean> checkClientStatus(@PathVariable String clientId) {
// 发送GET请求
String url = "http://localhost:18083/api/v4/clients/" + clientId;
HttpRequest request = HttpUtil.createGet(url);
request.basicAuth("admin", "public");
HttpResponse response = request.execute();
ClientResponseDto dto = JSONUtil.toBean(response.body(), ClientResponseDto.class);
return R.ok(!ObjectUtil.isEmpty(dto.getData()) && dto.getData().get(0).getConnected());
}