qinfengge

qinfengge

醉后不知天在水,满船清梦压星河
github
email
telegram

java实时消息推送(二)

之前写实时推送(一)的时候,是用的 rabbitmq 的 mqtt 插件。在当时它能满足我的需求,现在有一个新的需求,远程唤醒,判断设备是否在线,如果当前时间设备在线,则表示可以被远程唤醒。
一开始,我还是想在 rabbitmq 的基础上找找看有没有什么 API 能够获取到 mqtt 的在线设备。毕竟消息队列也是用的这个,没必要再增加中间件。然后我发现 mqtt 有很多约定系统主题,这些系统主题维护了 mqtt broker 的状态。
但正如我所说,系统主题是约定的配置,并非强制性的,且 rabbitmq 中的 mqtt 实现也只是作为插件补充。这是我在官方仓库提的discussions

没办法,看来只能引入专业的 mqtt 中间件了,这里我选了EMQX, 主要是文档确实详细。

安装#

这里我选择使用 WSL2 的 Docker 安装,安装命令很简单

$ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.0.0

安装成功后可以直接使用 http://localhost:18083 访问控制面板,默认的账号名是 admin,密码是 public

image

配置#

安装完成后还需要在程序中进行配置,因为版本切换成 mqtt V5 了,所以依赖和配置也要更改下

依赖#

 <!--mqtt依赖包-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
            <version>1.2.5</version>
        </dependency>

配置文件#

# mqtt协议配置
spring:
  mqtt:
    url: tcp://127.0.0.1:1883
    username: admin
    password: admin
    clientId: serverClientId
    #发布的主题--MQTT-默认的消息推送主题,实际可在调用接口时指定
    pubTopic: testTopic
    #订阅的主题
    subTopic: testTopic,remote-wake
    completionTimeout: 30000

因为 emqx 默认是允许匿名连接的,所以用户名和密码可以省略。

生产者#

接着创建生产者的配置类

/**
 * @author lza
 * @date 2023/11/24-10:24
 **/


@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProviderConfig {

    private final MqttProperties mqttProperties;

    /**
     * 客户端对象
     */
    private MqttClient providerClient;

    /**
     * 在bean初始化后连接到服务器
     * @author xct
     * @date 2021/7/30 16:48
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * 客户端连接服务端
     * @author xct
     * @date 2021/7/30 16:01
     */
    @SneakyThrows
    public void connect(){
        //创建MQTT客户端对象
        providerClient = new MqttClient(mqttProperties.getUrl(), "providerClient", new MemoryPersistence());
        //连接设置
        MqttConnectionOptions options = new MqttConnectionOptions();
        //是否清空session,设置为false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
        //设置为true表示每次连接到服务端都是以新的身份
        options.setCleanStart(true);
        //设置连接用户名
        options.setUserName(mqttProperties.getUsername());
        //设置连接密码
        options.setPassword(mqttProperties.getPassword().getBytes());
        //设置超时时间,单位为秒
        options.setConnectionTimeout(100);
        //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
        options.setKeepAliveInterval(20);
        //设置自动重连
        options.setAutomaticReconnect(true);

        //设置回调
        providerClient.setCallback(new MqttProviderCallBack());
        providerClient.connect(options);
    }

    /**
     * 发布消息
     *
     * @param qos      服务质量等级
     *                 0只会发送一次,不管成不成功
     *                 1未成功会继续发送,直到成功,可能会收到多次
     *                 2未成功会继续发送,但会保证只收到一次
     * @param retained 保留标志
     *                 如果设置为true,服务端必须存储这个应用消息和它的服务质量等级,当有订阅者订           阅这个主题时,会把消息推送给这个订阅者
     *                 但服务端对同一个主题只会保留一条retained消息(最后收到的那条)
     * @param topic    主题
     * @param message  消息
     * @author xct
     * @date 2021/7/30 16:27
     */
    @SneakyThrows
    public void publish(int qos, boolean retained, String topic, String message) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(message.getBytes());
        //主题目的地,用于发布/订阅消息
        MqttTopic mqttTopic = providerClient.getTopic(topic);
        //提供一种机制来跟踪消息的传递进度。
        //用于在以非阻塞方式(在后台运行)执行发布时跟踪消息的传递进度
        MqttToken token;
        //将指定消息发布到主题,但不等待消息传递完成。返回的token可用于跟踪消息的传递状态。
        //一旦此方法干净地返回,消息就已被客户端接受发布。当连接可用时,将在后台完成消息传递。
        token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }

主要做的事情就是根据配置文件的连接参数创建 mqtt 客户端对象,然后创建了一个 publish 推送消息的方法。
因为在创建生产者客户端对象时指定了使用 MqttProviderCallBack 做回调函数,所以还需要创建这个回调类

生产者回调#

生产者的回调方法按需实现

/**
 * @author lza
 * @date 2023/11/24-10:34
 **/

public class MqttProviderCallBack implements MqttCallback {



    @Override
    public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
        System.out.println("生产者:与服务器断开连接");
    }

    @Override
    public void mqttErrorOccurred(MqttException e) {

    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

    }

    @Override
    public void deliveryComplete(IMqttToken iMqttToken) {
        MqttClientInterface client = iMqttToken.getClient();
        System.out.println(client.getClientId() + "发布消息成功!");
    }

    @Override
    public void connectComplete(boolean b, String s) {
    }

    @Override
    public void authPacketArrived(int i, MqttProperties mqttProperties) {

    }


}

消费者#

创建完生产者后,还需要有个消费者

/**
 * @author lza
 * @date 2023/11/24-10:43
 **/

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttConsumerConfig {

    private final MqttProperties mqttProperties;

    /**
     * 客户端对象
     */
    public MqttClient consumerClient;

    /**
     * 在bean初始化后连接到服务器
     * @author xct
     * @date 2021/7/30 16:48
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * 客户端连接服务端
     * @author xct
     * @date 2021/7/30 16:01
     */
    @SneakyThrows
    public void connect(){
        //创建MQTT客户端对象
        consumerClient = new MqttClient(mqttProperties.getUrl(), "consumerClient", new MemoryPersistence());
        //连接设置
        MqttConnectionOptions options = new MqttConnectionOptions();
        //是否清空session,设置为false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
        //设置为true表示每次连接到服务端都是以新的身份
        options.setCleanStart(true);
        //设置连接用户名
        options.setUserName(mqttProperties.getUsername());
        //设置连接密码
        options.setPassword(mqttProperties.getPassword().getBytes());
        //设置超时时间,单位为秒
        options.setConnectionTimeout(100);
        //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
        options.setKeepAliveInterval(20);
        //设置自动重连
        options.setAutomaticReconnect(true);

        //设置回调
        consumerClient.setCallback(new MqttConsumerCallBack(this));
        consumerClient.connect(options);

        //订阅主题
        //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
        int[] qos = {1,1};
        //主题
        String[] topics = mqttProperties.getSubTopic().split(",");
        //订阅主题
        consumerClient.subscribe(topics,qos);
    }

    /**
     * 断开连接
     *
     * @author xct
     * @date 2021/8/2 09:30
     */
    @SneakyThrows
    public void disConnect() {
        consumerClient.disconnect();
    }


    /**
     * 订阅主题
     *
     * @param topic 主题
     * @param qos   消息等级
     * @author xct
     * @date 2021/7/30 17:12
     */
    @SneakyThrows
    public void subscribe(String topic, int qos) {
        consumerClient.subscribe(topic, qos);
    }
}

消费者同样需要使用连接信息创建一个消费者的客户端实例,也同样需要指定消费者的回调函数,不同的是消费者的方法是 subscribe 订阅方法和 disConnect 断开连接方法。

消费者回调#

/**
 * @author lza
 * @date 2023/11/24-10:55
 **/
public class MqttConsumerCallBack implements MqttCallback {

    private final MqttConsumerConfig consumerConfig;

    public MqttConsumerCallBack(MqttConsumerConfig consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

    @Override
    public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
        System.out.println("消费者;与服务器断开连接");
    }

    @Override
    public void mqttErrorOccurred(MqttException e) {

    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.printf("接收消息主题 : %s%n",s);
        System.out.printf("接收消息Qos : %d%n",mqttMessage.getQos());
        System.out.printf("接收消息内容 : %s%n",new String(mqttMessage.getPayload()));
        System.out.printf("接收消息retained : %b%n",mqttMessage.isRetained());

        // 设置mqttV5 请求响应模式
        List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
        if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
            MqttProperties properties = new MqttProperties();
            if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
                MqttMessage responseMessage = new MqttMessage();
                properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
                responseMessage.setProperties(properties);
                responseMessage.setPayload("设备未连接".getBytes());
                responseMessage.setQos(1);
                responseMessage.setRetained(false);
                consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
            }
        }
    }

    @Override
    public void deliveryComplete(IMqttToken iMqttToken) {

    }

    @Override
    public void connectComplete(boolean b, String s) {

    }

    @Override
    public void authPacketArrived(int i, MqttProperties mqttProperties) {

    }
}

消费者的回调中最重要的就是获取消息的方法 messageArrived 当消费者订阅的主题接受到消息时就会进入这个方法。

如果一切正确的话,你可以在 emqx 的后台管理页面看到这 2 个客户端,生产者和消费者已经上线了。

image

请求响应#

你也许已经看到在消费者的 messageArrived 回调方法中有请求响应的注释,这也是为什么要把 mqtt 的版本依赖从 3.1.1 切换到 5 的原因。

请求响应模式是 mqtt V5 的一大版本特性,你可以查看这篇介绍文档,和这篇包含有趣例子的文档

通俗点讲,我们都知道 HTTP 请求是明显的请求 / 响应模型。前端通过接口请求后端,后端处理完数据后把结果返回给前端,无论失败还是成功,前端总是能够获得返回值。
而在 mqtt 的推送订阅模型中,因为生产者和消费者互不关心的背景(因为 mqtt 通常用在物联网场景中,而物联网的大部分场景都会对生产者或消费者过分宽容)。想象一下,你订阅了一个公众号,你不必关注这个公众号什么时候给你推送,你只需要保证不会漏收公众号推送的消息。
举个例子,假如你有一个温度传感器放在房间中,且是联网的,你在外面想要通过手机 APP 获取传感器的当前温度。

image

有点像 HTTP 请求吧,不同的是后端的数据变成传感器采集的数据了。如果不用请求响应,当然也可以做,比如创建 2 个主题 A 和 B,请求时把消息发送到 A,然后设备订阅 A,采集完数据后再把数据发送到 B 主题让 APP 订阅。

而请求响应怎么做的呢?发送消息到 A 时,直接指定响应的主题,设备一收到消息,一看上面加粗写着,有事到 B 找我,直接就把数据返回到 B 主题了。有点像在邮局寄信的流程。

MQTTX中,连接 mqtt 时选择版本为 5 就可以进行请求响应测试了

image

里面增加了用户属性用作判断,对应的是消费者的回调方法

// 设置mqttV5 请求响应模式
        List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
        if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
            MqttProperties properties = new MqttProperties();
            if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
                MqttMessage responseMessage = new MqttMessage();
                properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
                responseMessage.setProperties(properties);
                responseMessage.setPayload("设备未连接".getBytes());
                responseMessage.setQos(1);
                responseMessage.setRetained(false);
                consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
            }
        }

如果用户属性 action 的值为 remoteWake, 则触发响应模式,获取到消息中的响应主题并自动回复。

上下线通知#

请求响应只是一个扩展,最初的需求是获取设备是否在线。
已经更换了 mqtt 中间件,那可以使用系统主题了吧?
答案是可以,但没必要。因为 emqx 提供了更优雅的方式。
最简单的方法,直接使用 emqx 的企业版

数据存储的主要使用场景包括将客户端上下线状态,订阅主题信息,消息内容,消息抵达后发送消息回执等操作记录到 Redis、MySQL、PostgreSQL、MongoDB、Cassandra 等各种数据库中。用户也可以通过订阅相关主题的方式来实现类似的功能,但是在企业版中内置了对这些持久化的支持;相比于前者,后者的执行效率更高,也能大大降低开发者的工作量。

第二种方法,也是推荐的方法,就是使用webhook来维护一个上下线通知。

首先,创建一个接口,用于获取 webhook 通知

    private final static String CLIENT_IDS = "clientIds";
    private final static String CONNECTED = "client_connected";
    private final static String DISCONNECTED = "client_disconnected";
    
/**
     * emqx webhook钩子,用于监听客户端上下线
     * @param vo 上下线vo
     */
    @SaIgnore
    @PostMapping("/onlineOrOffline")
    public void onlineOrOffline(@RequestBody OnlineOrOfflineVo vo) {
        System.err.println("客户端:" + vo.getClientid() +
            ",动作:" + vo.getAction() +
            ",原因:" + vo.getReason());
        List<Object> list = RedisUtils.getCacheList(CLIENT_IDS);
        if (vo.getAction().equals(CONNECTED)) {
            list.add(vo.getClientid());
            // 先删除原有的值
            RedisUtils.deleteKeys(CLIENT_IDS);
            // 去重
            ArrayList<Object> distinct = CollUtil.distinct(list);
            RedisUtils.setCacheList(CLIENT_IDS, distinct);
        } else if (vo.getAction().equals(DISCONNECTED)){
            list.remove(vo.getClientid());
            // 先删除原有的值
            RedisUtils.deleteKeys(CLIENT_IDS);
            RedisUtils.setCacheList(CLIENT_IDS, list);
        }

    }

去重的原因是,想象一下,假如 A 客户端上线,此时 redis 里面有 A,接着,服务 down 掉了,然后重启,A 再次上线,那此时 redis 里面就有 2 个 A 了。

然后在 emqx 的插件中选择 webhook 插件开启

image

接着进入 docker 容器内更改插件配置

docker exec -it emqx bash
cd /etc/plugins

image

修改插件

vi emqx_web_hook.conf

最主要的修改文件开头的 Webhook URL,改成接口地址

image

接着修改下面的通知规则

image

去掉前面的注释即可,这 2 个就是上下线通知的规则

web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}

完成后,客户端上下线就能看到通知了

image

但这还不够完美,假如 mqtt 中间件下线了,那此时 webhook 和 redis 保存的在线设备就没用了啊。

别急,emqx 提供了HTTP API,可以获取指定设备是否在线

事实上,API 中还提供了获取集群下所有客户端的信息的接口,但是是分页的,而且一旦有大量的客户端就不好搞了。所以还是用 webhook 来维护一个上下线通知,再使用接口判断指定客户端是否在线比较好,能满足绝大部分的需求。

/**
     * 使用emqx API 检测客户端是否在线
     * @param clientId 客户端id
     * @return 是否在线
     */
    @SaIgnore
    @GetMapping("/checkClientStatus/{clientId}")
    public R<Boolean> checkClientStatus(@PathVariable String clientId) {
        // 发送GET请求
        String url = "http://localhost:18083/api/v4/clients/" + clientId;
        HttpRequest request = HttpUtil.createGet(url);
        request.basicAuth("admin", "public");

        HttpResponse response = request.execute();

        ClientResponseDto dto = JSONUtil.toBean(response.body(), ClientResponseDto.class);
        return R.ok(!ObjectUtil.isEmpty(dto.getData()) && dto.getData().get(0).getConnected());
    }

EMQX 文档
EMQX 安装使用和部分坑
MQTT EMQX 中如何监听客户端上下线?并在业务中正常使用

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。