qinfengge

qinfengge

醉后不知天在水,满船清梦压星河
github
email
telegram

Java Real-time Message Push (1)

Preface#

Recently, there has been a business requirement to determine whether the data uploaded by users is abnormal or normal. In the case of abnormalities, it is necessary to push alarm messages to users in real-time.
In most cases, the client (browser) actively sends requests to the server, informing it of the data needed.
However, there are also situations where the server needs to actively push messages to the client, such as the above case and the common QR code login on web pages.

Technical Choices#

There are many methods to implement server-initiated pushing, and the choice depends on the business.
A simple and common method is long polling and short polling, which are straightforward as the names suggest.
More advanced methods include SSE and WebSocket.
At a deeper level, there are various message queues, such as MQTT.
Other push methods include SMS notifications or app notifications, such as Jiguang Push.

Implementation MethodImplementationPrincipleMode
Short PollingClientAt specific time intervals (e.g., every 1 second), the client sends requests to the server, and the server returns the latest data to the client's browser./
Long PollingServerThe client sends a request to the server, which holds the connection until there is new information to return, then closes the connection. After processing the response, the client sends a new request to the server./
SSE (Server-sent Events)ServerThe client first registers a long connection with the server, and the server can send messages to registered clients after obtaining events.Can only be pushed by the server
WebSocketClient/ServerUsing WebSocket requires introducing new dependencies. Once the client and server successfully handshake, they are on the same channel and can send and receive messages in real-time.Full duplex, both server and client can push and receive
MQTTClient/ServerMQTT is commonly used in IoT applications and is a lightweight communication protocol based on the publish/subscribe model, built on the TCP/IP protocol. The biggest advantage of MQTT is that it can provide real-time reliable messaging services for remote devices with minimal code and limited bandwidth.Publish/subscribe model, both server and client can push and receive

No further elaboration on long/short polling will be provided; this discussion will mainly focus on SSE and MQTT methods.

SSE#

In Spring Boot, SSE is natively supported without the need to import additional dependencies, which is its advantage. However, its drawbacks are also evident: it only supports one-way pushing from the server and only supports advanced browsers (Chrome, Firefox, etc.). Due to browser limitations, each webpage can maintain at most 6 long connections; more connections may consume more memory and computing resources.
First, create an SSE utility class.

@Component
@Slf4j
public class SseEmitterUtils {
    /**
     * Current connection count
     */
    private static AtomicInteger count = new AtomicInteger(0);

    /**
     * Store SseEmitter information
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * Create user connection and return SseEmitter
     * @param key userId
     * @return SseEmitter
     */
    public static SseEmitter connect(String key) {
        if (sseEmitterMap.containsKey(key)) {
            return sseEmitterMap.get(key);
        }

        try {
            // Set timeout, 0 means no expiration. Default is 30 seconds
            SseEmitter sseEmitter = new SseEmitter(0L);
            // Register callbacks
            sseEmitter.onCompletion(completionCallBack(key));
            sseEmitter.onError(errorCallBack(key));
            sseEmitter.onTimeout(timeoutCallBack(key));
            sseEmitterMap.put(key, sseEmitter);
            // Increment count
            count.getAndIncrement();
            return sseEmitter;
        } catch (Exception e) {
            log.info("Exception creating new SSE connection, current connection Key: {}", key);
        }
        return null;
    }

    /**
     * Send message to specified user
     * @param key userId
     * @param message message content
     */
    public static void sendMessage(String key, String message) {
        if (sseEmitterMap.containsKey(key)) {
            try {
                sseEmitterMap.get(key).send(message);
            } catch (IOException e) {
                log.error("User[{}] push exception:{}", key, e.getMessage());
                remove(key);
            }
        }
    }

    /**
     * Publish message to the same group, requirement: key + groupId
     * @param groupId group id
     * @param message message content
     */
    public static void groupSendMessage(String groupId, String message) {
        if (!CollectionUtils.isEmpty(sseEmitterMap)) {
            sseEmitterMap.forEach((k, v) -> {
                try {
                    if (k.startsWith(groupId)) {
                        v.send(message, MediaType.APPLICATION_JSON);
                    }
                } catch (IOException e) {
                    log.error("User[{}] push exception:{}", k, e.getMessage());
                    remove(k);
                }
            });
        }
    }

    /**
     * Broadcast message
     * @param message message content
     */
    public static void batchSendMessage(String message) {
        sseEmitterMap.forEach((k, v) -> {
            try {
                v.send(message, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                log.error("User[{}] push exception:{}", k, e.getMessage());
                remove(k);
            }
        });
    }

    /**
     * Broadcast message
     * @param message message content
     * @param ids user id set
     */
    public static void batchSendMessage(String message, Set<String> ids) {
        ids.forEach(userId -> sendMessage(userId, message));
    }

    /**
     * Remove connection
     * @param key userId
     */
    public static void remove(String key) {
        sseEmitterMap.remove(key);
        // Decrement count
        count.getAndDecrement();
        log.info("Removed connection: {}", key);
    }

    /**
     * Get current connection information
     * @return Map
     */
    public static List<String> getIds() {
        return new ArrayList<>(sseEmitterMap.keySet());
    }

    /**
     * Get current connection count
     * @return int
     */
    public static int getCount() {
        return count.intValue();
    }

    private static Runnable completionCallBack(String key) {
        return () -> {
            log.info("Connection ended: {}", key);
            remove(key);
        };
    }

    private static Runnable timeoutCallBack(String key) {
        return () -> {
            log.info("Connection timed out: {}", key);
            remove(key);
        };
    }

    private static Consumer<Throwable> errorCallBack(String key) {
        return throwable -> {
            log.info("Connection exception: {}", key);
            remove(key);
        };
    }
}

Then use the utility class to implement several interfaces for the client to subscribe and for the server to actively push messages.

@RequestMapping("/sse")
@RestController
@Slf4j
@CrossOrigin
public class SSEEmitterController {

    /**
     * Create connection
     * @param id user id
     * @return SseEmitter
     */
    @GetMapping(path = "/subscribe/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter subscribe(@PathVariable String id) {
        return SseEmitterUtils.connect(id);
    }


    /**
     * Push message to specified user
     * @param id user id
     * @param content push content
     */
    @PostMapping(path = "/push")
    public void push(String id, String content) {
        SseEmitterUtils.sendMessage(id, content);
    }


    /**
     * Push message to specified group
     * @param groupId group id
     * @param content push content
     */
    @PostMapping(path = "/groupPush")
    public void groupPush(String groupId, String content) {
        SseEmitterUtils.groupSendMessage(groupId, content);
    }


    /**
     * Broadcast message
     * @param content push content
     */
    @PostMapping(path = "/pushAll")
    public void pushAll(String content) {
        SseEmitterUtils.batchSendMessage(content);
    }

    /**
     * Close connection
     * @param id user id
     * @param request request
     */
    @DeleteMapping(path = "/close/{id}")
    public void close(@PathVariable String id, HttpServletRequest request) {
        request.startAsync();
        SseEmitterUtils.remove(id);
    }

}

Finally, you can use the following HTML page for testing.

<!DOCTYPE html>
<html lang="en">

<head>
    <title>SSE</title>
    <meta charset="UTF-8">
    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script>
    <script>
        if (window.EventSource) {

            let sources = [];
            // Create connection  
            for (let i = 1; i < 10; i++) {
                let id = "id_" + i;
                sources[i] = new EventSource('http://localhost:8008/sse/subscribe/' + id);
            }

            /** 
             * Once the connection is established, the open event will be triggered 
             * Another way to write: source.onopen = function (event) {} 
             */
            // Connection open event
            sources.forEach(source => {
                let id = source.url.split('/').pop();
                source.addEventListener('open', function (e) {
                    setMessageInnerHTML(id + " connection opened")
                    console.log(id + " connection opened");
                });
            });

            /** 
             * Client receives data sent by the server 
             * Another way to write: source.onmessage = function (event) {} 
             */
            // Message event 
            sources.forEach(source => {
                let id = source.url.split('/').pop();
                source.addEventListener('message', function (e) {
                    setMessageInnerHTML(id + " received message:" + e.data)
                    console.log(id + " received message:" + e.data);
                });
            });

            /** 
             * If a communication error occurs (e.g., connection interruption), the error event will be triggered 
             * Another way to write: source.onerror = function (event) {} 
             */
            // Error handling
            sources.forEach(source => {

                let id = source.url.split('/').pop();

                source.addEventListener('error', function (e) {

                    if (e.readyState === EventSource.CLOSED) {
                        setMessageInnerHTML(id + " connection closed")
                        console.log(id + " connection closed");
                    } else {
                        setMessageInnerHTML(id + " connection error:", e)
                        console.log(id + " connection error:", e);
                    }

                });

            });
        } else {
            setMessageInnerHTML("Browser does not support SSE");
        }

        // Listen for window close event, actively close SSE connection. If the server is set to never expire, manually clean server data after the browser closes. 
        window.onbeforeunload = function () {
            source.close();
            const httpRequest = new XMLHttpRequest();
            httpRequest.open('GET', 'http://localhost:8008/sse/close/' + id, true);
            httpRequest.send();
            console.log("close");
        };

        // Display messages on the webpage 
        function setMessageInnerHTML(innerHTML) {
            $("#contentDiv").append("<br/>" + innerHTML);
        } 
    </script>
</head>

<body>
    <div>
        <div>
            <div id="contentDiv" style="height:800px; width:1000px; overflow:scroll; background:#ccc;">
            </div>
        </div>
    </div>
</body>

</html>

Then call the interface for testing.

image

You can see that the browser ultimately retains only 6 connections, while the others are discarded.

MQTT#

To implement MQTT, you need to add the following dependencies on the server side.

<!-- MQTT dependency package -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>

Then you need to install middleware, such as EMQX, or directly use RabbitMQ. It is important to note that RabbitMQ does not support QoS level 2 message quality.

image

The QoS message quality levels of MQTT can be explained in the following table:

QoS LevelDescriptionApplicable Scenarios
0At most once delivery, messages are not confirmed or retransmittedSuitable for unimportant data transmission, such as sensor data
1At least once delivery, ensuring messages are delivered at least once, but may be duplicatedSuitable for scenarios where message delivery is required, but duplicates are allowed
2Exactly once delivery, ensuring messages are delivered only once, no duplicates allowedSuitable for scenarios where precise message delivery is required, and duplicates are not allowed

Below is an example using RabbitMQ middleware.
Configure MQTT protocol information.

server:
  port: 8008

spring:
  application:
    name: mqtt test project
  mqtt:
    url: tcp://127.0.0.1:1883
    username: guest
    password: guest
    clientId: serverClientId
      # Published topic -- default MQTT message push topic, can be specified when calling the interface
    pubTopic: testTopic
      # Subscribed topics
    subTopic: gps-topic,oil-topic,broadcast-topic,fault-topic
    completionTimeout: 3000

Then create an entity class mapping for the configuration file.

@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {

    /**
     * RabbitMQ connection username
     */
    private String username;
    /**
     * RabbitMQ connection password
     */
    private String password;
    /**
     * Push topic
     */
    private String pubTopic;
    /**
     * RabbitMQ MQTT connection address
     */
    private String url;

    /**
     * RabbitMQ MQTT connection client ID
     */
    private String clientId;

    /**
     * Subscribed topics
     */
    private String subTopic;

    /**
     * Timeout
     */
    private Integer completionTimeout;
}

Next, create a consumer.

@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqttConsumer {

    private final MqttProperties mqttProperties;

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getUrl(), mqttProperties.getClientId(),
                        mqttProperties.getSubTopic().split(","));
        adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
        adapter.setConverter(new DefaultPahoMessageConverter());
        // Set message quality: 0-> at most once; 1-> at least once; 2-> exactly once
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                MessageHeaders headers = message.getHeaders();
                log.info("headers: {}", headers);
                String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
                log.info("Subscribed topic: {}", topic);
                String[] topics = mqttProperties.getSubTopic().split(",");
                for (String t : topics) {
                    if (t.equals(topic)) {
                        log.info("Subscribed topic: {}; received message for this topic: {}", topic, message.getPayload());
                    }
                }
            }

        };
    }
}

Then create a producer.

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProvider {

    private final MqttProperties mqttProperties;

    @Bean
    @SneakyThrows
    public MqttPahoClientFactory mqttClientFactory() {

        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttProperties.getUrl()});
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        // Set whether to clear session; if set to false, the server will retain the client's connection records.
        // If cleanSession is set to false, the server will not clear the session after the client disconnects,
        // allowing the client to receive messages from previously subscribed topics when reconnecting.
        options.setCleanSession(false);
        // Set timeout in seconds
        options.setConnectionTimeout(10);
        // Set session heartbeat time in seconds; the server will send a message to the client every 1.5*20 seconds to check if the client is online, but this method does not have a reconnection mechanism.
        options.setKeepAliveInterval(20);
        // Reconnect after disconnection, but this method does not have a resubscription mechanism.
        // It will wait 1 second before attempting to reconnect, and for each failed reconnection attempt, the delay will double until it reaches 2 minutes, at which point the delay will remain at 2 minutes.
        options.setAutomaticReconnect(true);
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getPubTopic());
        // Set message quality during push: 0-> at most once; 1-> at least once; 2-> exactly once
        // In RabbitMQ, QoS 2 will be downgraded to QoS 1
        messageHandler.setDefaultQos(1);
        // Set whether to retain messages; if set to true, retained messages will be sent on each reconnection.
        // A new empty message must be sent to clear retained messages.
        messageHandler.setDefaultRetained(false);
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

Then create a gateway interface for sending messages.

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    /**
     * Send message to default topic
     */
    void sendToMqtt(String payload);

    /**
     * Send message to specified topic
     */
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

    /**
     * Send message to specified topic and set QoS
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

Finally, also write an interface for sending messages.

@RestController
@RequiredArgsConstructor
@RequestMapping("/mqtt")
public class MqttController {

    private final MqttGateway mqttGateway;

    @PostMapping("/sendToDefaultTopic")
    public void sendToDefaultTopic(String payload) {
        mqttGateway.sendToMqtt(payload);
    }

    @PostMapping("/sendToTopic")
    public void sendToTopic(String payload, String topic) {
        mqttGateway.sendToMqtt(payload, topic);
    }
}

You can use third-party MQTT software for testing, such as MQTTX.
After installation, open the software and create a connection.

image

It is important to note:

  1. Each connection's ClientId should be different and unique; the same ClientId may cause message loss due to connection conflicts.
  2. The server connection protocol can be TCP or MQTT.
  3. If using RabbitMQ, user permissions must be configured.
  4. If using RabbitMQ, please select version 3.1.1.
  5. If you want the consumer to still receive messages from the producer after going offline, please disable Clean Session.

Once connected successfully, calling the interface to send messages will allow subscribers to receive them.

image

Integrating Spring Boot with MQTT to achieve message sending and consumption, as well as message recovery after client disconnection

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.