qinfengge

qinfengge

醉后不知天在水,满船清梦压星河
github
email
telegram

Java Real-time Message Push (Part Two)

When I wrote about real-time push (Part 1) before, I used the MQTT plugin for RabbitMQ. At that time, it met my needs, but now there is a new requirement for remote wake-up, which involves determining whether the device is online. If the device is online at the current time, it indicates that it can be remotely awakened. Initially, I wanted to see if there was any API available on RabbitMQ to obtain the online devices for MQTT. After all, the message queue uses this, and there is no need to add more middleware. Then I discovered that MQTT has many convention-based system topics that maintain the state of the MQTT broker. However, as I mentioned, system topics are convention-based configurations and not mandatory, and the MQTT implementation in RabbitMQ is merely a plugin supplement. This is what I raised in the official repository discussions.

There’s no choice; it seems I can only introduce a specialized MQTT middleware. Here, I chose EMQX mainly because the documentation is indeed detailed.

Installation#

Here, I chose to install using Docker on WSL2, and the installation command is very simple:

$ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.0.0

After successful installation, you can directly access the control panel at http://localhost:18083. The default username is admin, and the password is public.

image

Configuration#

After installation, configuration is also needed in the program. Since the version has switched to MQTT V5, the dependencies and configurations also need to be changed.

Dependencies#

 <!--MQTT dependency package-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
            <version>1.2.5</version>
        </dependency>

Configuration File#

# MQTT protocol configuration
spring:
  mqtt:
    url: tcp://127.0.0.1:1883
    username: admin
    password: admin
    clientId: serverClientId
    # Published topic -- MQTT default message push topic, can be specified when calling the interface
    pubTopic: testTopic
    # Subscribed topics
    subTopic: testTopic,remote-wake
    completionTimeout: 30000

Since EMQX allows anonymous connections by default, the username and password can be omitted.

Producer#

Next, create a configuration class for the producer.

/**
 * @author lza
 * @date 2023/11/24-10:24
 **/

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProviderConfig {

    private final MqttProperties mqttProperties;

    /**
     * Client object
     */
    private MqttClient providerClient;

    /**
     * Connect to the server after bean initialization
     * @author xct
     * @date 2021/7/30 16:48
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * Client connects to the server
     * @author xct
     * @date 2021/7/30 16:01
     */
    @SneakyThrows
    public void connect(){
        // Create MQTT client object
        providerClient = new MqttClient(mqttProperties.getUrl(), "providerClient", new MemoryPersistence());
        // Connection settings
        MqttConnectionOptions options = new MqttConnectionOptions();
        // Whether to clear the session, set to false means the server will retain the client's connection record (subscribed topics, qos), the client can retrieve messages pushed by the server during the client's disconnection after reconnecting
        // Set to true means each connection to the server is with a new identity
        options.setCleanStart(true);
        // Set connection username
        options.setUserName(mqttProperties.getUsername());
        // Set connection password
        options.setPassword(mqttProperties.getPassword().getBytes());
        // Set timeout, in seconds
        options.setConnectionTimeout(100);
        // Set heartbeat time in seconds, indicating that the server sends a heartbeat to the client every 1.5*20 seconds to check if the client is online
        options.setKeepAliveInterval(20);
        // Set automatic reconnection
        options.setAutomaticReconnect(true);

        // Set callback
        providerClient.setCallback(new MqttProviderCallBack());
        providerClient.connect(options);
    }

    /**
     * Publish message
     *
     * @param qos      Quality of service level
     *                 0 will only send once, regardless of success or failure
     *                 1 will continue sending until successful, may receive multiple times
     *                 2 will continue sending until successful, but will ensure only received once
     * @param retained Retained flag
     *                 If set to true, the server must store this application message and its quality of service level, when there are subscribers to this topic, the message will be pushed to this subscriber
     *                 But the server will only retain one retained message for the same topic (the last one received)
     * @param topic    Topic
     * @param message  Message
     * @author xct
     * @date 2021/7/30 16:27
     */
    @SneakyThrows
    public void publish(int qos, boolean retained, String topic, String message) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(message.getBytes());
        // Destination topic for publishing/subscribing messages
        MqttTopic mqttTopic = providerClient.getTopic(topic);
        // Provide a mechanism to track the progress of message delivery.
        // Used to track the delivery progress of messages when publishing in a non-blocking manner (running in the background)
        MqttToken token;
        // Publish the specified message to the topic, but do not wait for the message delivery to complete. The returned token can be used to track the message delivery status.
        // Once this method returns cleanly, the message has been accepted for publication by the client. The message delivery will be completed in the background when the connection is available.
        token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }
}

The main task is to create the MQTT client object based on the connection parameters in the configuration file, and then create a publish method to push messages. Since the producer client object specifies the use of MqttProviderCallBack as the callback function, this callback class also needs to be created.

Producer Callback#

The producer's callback methods are implemented as needed.

/**
 * @author lza
 * @date 2023/11/24-10:34
 **/

public class MqttProviderCallBack implements MqttCallback {

    @Override
    public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
        System.out.println("Producer: Disconnected from the server");
    }

    @Override
    public void mqttErrorOccurred(MqttException e) {

    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

    }

    @Override
    public void deliveryComplete(IMqttToken iMqttToken) {
        MqttClientInterface client = iMqttToken.getClient();
        System.out.println(client.getClientId() + " message published successfully!");
    }

    @Override
    public void connectComplete(boolean b, String s) {
    }

    @Override
    public void authPacketArrived(int i, MqttProperties mqttProperties) {

    }
}

Consumer#

After creating the producer, a consumer is also needed.

/**
 * @author lza
 * @date 2023/11/24-10:43
 **/

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttConsumerConfig {

    private final MqttProperties mqttProperties;

    /**
     * Client object
     */
    public MqttClient consumerClient;

    /**
     * Connect to the server after bean initialization
     * @author xct
     * @date 2021/7/30 16:48
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * Client connects to the server
     * @author xct
     * @date 2021/7/30 16:01
     */
    @SneakyThrows
    public void connect(){
        // Create MQTT client object
        consumerClient = new MqttClient(mqttProperties.getUrl(), "consumerClient", new MemoryPersistence());
        // Connection settings
        MqttConnectionOptions options = new MqttConnectionOptions();
        // Whether to clear the session, set to false means the server will retain the client's connection record (subscribed topics, qos), the client can retrieve messages pushed by the server during the client's disconnection after reconnecting
        // Set to true means each connection to the server is with a new identity
        options.setCleanStart(true);
        // Set connection username
        options.setUserName(mqttProperties.getUsername());
        // Set connection password
        options.setPassword(mqttProperties.getPassword().getBytes());
        // Set timeout, in seconds
        options.setConnectionTimeout(100);
        // Set heartbeat time in seconds, indicating that the server sends a heartbeat to the client every 1.5*20 seconds to check if the client is online
        options.setKeepAliveInterval(20);
        // Set automatic reconnection
        options.setAutomaticReconnect(true);

        // Set callback
        consumerClient.setCallback(new MqttConsumerCallBack(this));
        consumerClient.connect(options);

        // Subscribe to topics
        // Message levels correspond to the topic array, the server will push messages to the subscribed clients according to the specified level
        int[] qos = {1,1};
        // Topics
        String[] topics = mqttProperties.getSubTopic().split(",");
        // Subscribe to topics
        consumerClient.subscribe(topics,qos);
    }

    /**
     * Disconnect
     *
     * @author xct
     * @date 2021/8/2 09:30
     */
    @SneakyThrows
    public void disConnect() {
        consumerClient.disconnect();
    }

    /**
     * Subscribe to topic
     *
     * @param topic Topic
     * @param qos   Message level
     * @author xct
     * @date 2021/7/30 17:12
     */
    @SneakyThrows
    public void subscribe(String topic, int qos) {
        consumerClient.subscribe(topic, qos);
    }
}

The consumer also needs to create a consumer client instance using the connection information and specify the consumer's callback function. The difference is that the consumer's methods are the subscribe method and the disConnect method.

Consumer Callback#

/**
 * @author lza
 * @date 2023/11/24-10:55
 **/
public class MqttConsumerCallBack implements MqttCallback {

    private final MqttConsumerConfig consumerConfig;

    public MqttConsumerCallBack(MqttConsumerConfig consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

    @Override
    public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
        System.out.println("Consumer: Disconnected from the server");
    }

    @Override
    public void mqttErrorOccurred(MqttException e) {

    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.printf("Received message topic: %s%n",s);
        System.out.printf("Received message QoS: %d%n",mqttMessage.getQos());
        System.out.printf("Received message content: %s%n",new String(mqttMessage.getPayload()));
        System.out.printf("Received message retained: %b%n",mqttMessage.isRetained());

        // Set MQTT V5 request-response mode
        List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
        if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
            MqttProperties properties = new MqttProperties();
            if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
                MqttMessage responseMessage = new MqttMessage();
                properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
                responseMessage.setProperties(properties);
                responseMessage.setPayload("Device not connected".getBytes());
                responseMessage.setQos(1);
                responseMessage.setRetained(false);
                consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
            }
        }
    }

    @Override
    public void deliveryComplete(IMqttToken iMqttToken) {

    }

    @Override
    public void connectComplete(boolean b, String s) {

    }

    @Override
    public void authPacketArrived(int i, MqttProperties mqttProperties) {

    }
}

The most important method in the consumer's callback is messageArrived, which is triggered when the consumer receives a message on the subscribed topic.

If everything is correct, you can see both clients, the producer and the consumer, online in the EMQX backend management page.

image

Request-Response#

You may have noticed the request-response comment in the consumer's messageArrived callback method, which is also why the MQTT version dependency was switched from 3.1.1 to 5.

The request-response mode is a major feature of MQTT V5. You can check this document for an introduction, and this document with interesting examples here.

In simple terms, we all know that HTTP requests are a clear request/response model. The frontend requests the backend through an interface, and after the backend processes the data, it returns the result to the frontend. Regardless of success or failure, the frontend can always obtain the return value. In the MQTT push-subscribe model, due to the indifferent background of producers and consumers (since MQTT is usually used in IoT scenarios, where most scenarios are overly tolerant of producers or consumers), imagine that you subscribe to a public account; you do not need to care about when this account pushes messages to you; you just need to ensure that you do not miss any messages pushed by the account.

For example, if you have a temperature sensor placed in a room that is connected to the internet, and you want to get the current temperature of the sensor through a mobile app while you are outside.

image

It resembles an HTTP request, but the backend data has become the data collected by the sensor. If you do not use request-response, you can still do it, for example, by creating two topics A and B. When requesting, send the message to A, and then the device subscribes to A. After collecting the data, it sends the data to topic B for the app to subscribe.

So how is request-response done? When sending a message to A, directly specify the response topic. Once the device receives the message, it sees the bold text indicating that it should respond to B, and it directly returns the data to topic B. It is somewhat like the process of sending a letter through the post office.

In MQTTX, when connecting to MQTT, select version 5 to conduct request-response testing.

image

User properties are added for judgment, corresponding to the consumer's callback method.

// Set MQTT V5 request-response mode
        List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
        if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
            MqttProperties properties = new MqttProperties();
            if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
                MqttMessage responseMessage = new MqttMessage();
                properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
                responseMessage.setProperties(properties);
                responseMessage.setPayload("Device not connected".getBytes());
                responseMessage.setQos(1);
                responseMessage.setRetained(false);
                consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
            }
        }

If the user property action has the value remoteWake, it triggers the response mode, retrieves the response topic from the message, and automatically replies.

Online and Offline Notifications#

Request-response is just an extension; the initial requirement was to check whether the device is online. Now that the MQTT middleware has been changed, can we use system topics? The answer is yes, but it is unnecessary. EMQX provides a more elegant way.

The simplest method is to directly use the enterprise version of EMQX.

The main use cases for data storage include recording client online and offline status, subscription topic information, message content, message receipts after message arrival, etc., into various databases such as Redis, MySQL, PostgreSQL, MongoDB, Cassandra, etc. Users can also achieve similar functionality by subscribing to related topics, but the enterprise version has built-in support for these persistences; compared to the former, the latter has higher execution efficiency and can significantly reduce the developer's workload.

The second method, which is also recommended, is to use webhooks to maintain online and offline notifications.

First, create an interface to receive webhook notifications.

    private final static String CLIENT_IDS = "clientIds";
    private final static String CONNECTED = "client_connected";
    private final static String DISCONNECTED = "client_disconnected";
    
/**
     * EMQX webhook hook to listen for client online and offline
     * @param vo Online or offline VO
     */
    @SaIgnore
    @PostMapping("/onlineOrOffline")
    public void onlineOrOffline(@RequestBody OnlineOrOfflineVo vo) {
        System.err.println("Client: " + vo.getClientid() +
            ", Action: " + vo.getAction() +
            ", Reason: " + vo.getReason());
        List<Object> list = RedisUtils.getCacheList(CLIENT_IDS);
        if (vo.getAction().equals(CONNECTED)) {
            list.add(vo.getClientid());
            // First delete the original value
            RedisUtils.deleteKeys(CLIENT_IDS);
            // Remove duplicates
            ArrayList<Object> distinct = CollUtil.distinct(list);
            RedisUtils.setCacheList(CLIENT_IDS, distinct);
        } else if (vo.getAction().equals(DISCONNECTED)){
            list.remove(vo.getClientid());
            // First delete the original value
            RedisUtils.deleteKeys(CLIENT_IDS);
            RedisUtils.setCacheList(CLIENT_IDS, list);
        }
    }

The reason for removing duplicates is that if client A goes online, and at that time Redis contains A, then the service goes down and restarts, and A comes online again, Redis will have two As.

Next, enable the webhook plugin in the EMQX plugins.

image

Then enter the Docker container to change the plugin configuration.

docker exec -it emqx bash
cd /etc/plugins

image

Modify the plugin.

vi emqx_web_hook.conf

The main modification is to change the webhook URL at the beginning of the file to the interface address.

image

Next, modify the notification rules below.

image

Simply uncomment the following two rules for online and offline notifications.

web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}

Once completed, you will be able to see notifications for client online and offline.

image

But this is not perfect; if the MQTT middleware goes offline, then the webhook and Redis storing online devices will be useless.

Don't worry; EMQX provides HTTP API to check whether a specified device is online.

In fact, the API also provides an interface to get information about all clients in the cluster, but it is paginated, and once there are a large number of clients, it becomes cumbersome. Therefore, it is better to use the webhook to maintain online and offline notifications and then use the interface to check whether a specified client is online, which can meet the vast majority of needs.

/**
     * Use EMQX API to check if the client is online
     * @param clientId Client ID
     * @return Whether online
     */
    @SaIgnore
    @GetMapping("/checkClientStatus/{clientId}")
    public R<Boolean> checkClientStatus(@PathVariable String clientId) {
        // Send GET request
        String url = "http://localhost:18083/api/v4/clients/" + clientId;
        HttpRequest request = HttpUtil.createGet(url);
        request.basicAuth("admin", "public");

        HttpResponse response = request.execute();

        ClientResponseDto dto = JSONUtil.toBean(response.body(), ClientResponseDto.class);
        return R.ok(!ObjectUtil.isEmpty(dto.getData()) && dto.getData().get(0).getConnected());
    }

EMQX Documentation
EMQX Installation and Usage and Some Pitfalls
How to Monitor Client Online and Offline in MQTT EMQX and Use it Normally in Business

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.