qinfengge

qinfengge

醉后不知天在水,满船清梦压星河
github
email
telegram

javaリアルタイムメッセージプッシュ(1)

前言#

最近、ユーザーがアップロードしたデータが異常か正常かを判断する必要があるビジネス要件があり、異常な場合にはリアルタイムでユーザーに警報メッセージをプッシュする必要があります。
ほとんどの場合、通常はクライアント(ブラウザ)がサーバー(サーバー)にリクエストを送信し、必要なデータを通知します。
しかし、一部の状況では、サーバーがクライアントにメッセージをプッシュする必要があります。例えば、上記の状況や、ウェブページで一般的なQR コードログインなどです。

技術選択#

サーバーが積極的にプッシュする方法はいくつかあり、ビジネスに応じて選択する必要があります。
シンプルで一般的な方法は、長いポーリングと短いポーリングです。名前からしてシンプルで粗暴です。
さらに進んだ方法として、SSE や WebSocket があります。
さらに深いレベルでは、さまざまなメッセージキュー(MQTT など)があります。
他のプッシュ方法としては、SMS プッシュやアプリ通知(例えば、極光 Push)があります。

実現方式実現原理モード
短輪詢クライアント特定の時間間隔(例えば 1 秒ごと)で、クライアントがサーバーにリクエストを送信し、サーバーが最新のデータをクライアントのブラウザに返します。/
長輪詢サーバークライアントがサーバーにリクエストを送信し、サーバーがリクエストを受け取った後、接続を保持し、新しいメッセージがあるまで応答情報を返さず、クライアントが応答情報を処理した後に新しいリクエストをサーバーに送信します。/
SSE(Server-sent Events)サーバークライアントがサーバーに長接続を登録し、サーバーがイベントを取得した後、登録されたクライアントにメッセージを送信できます。サーバーからのみプッシュ
Websocketクライアント / サーバーWebSocket を使用するには新しい依存関係を導入する必要があり、一度クライアントとサーバーのハンドシェイクが成功すると、同じチャンネルにいてリアルタイムでメッセージを送受信できます。フルデュプレックス、サーバーとクライアントの両方がプッシュと受信が可能
MQTTクライアント / サーバーMQTT は IoT ビジネスで一般的に使用される、パブリッシュ / サブスクライブモデルに基づく軽量通信プロトコルで、TCP/IP プロトコルの上に構築されています。MQTT の最大の利点は、非常に少ないコードと限られた帯域幅で、リモートデバイスにリアルタイムで信頼性の高いメッセージサービスを提供できることです。パブリッシュ / サブスクライブモデル、サーバーとクライアントの両方がプッシュと受信が可能

長 / 短ポーリングについては繰り返しませんが、今回はSSEMQTTの方法について主に説明します。

SSE#

Spring Boot では SSE がネイティブにサポートされており、他の依存関係をインポートする必要はありません。これが利点ですが、欠点も明らかで、サーバーからの一方向のプッシュのみをサポートし、高度なブラウザ(Chrome、Firefox など)のみをサポートします。ブラウザの制限により、各ウェブページは最大 6 つの長接続を保持できます。
最初に SSE ユーティリティクラスを作成します。

@Component
@Slf4j
public class SseEmitterUtils {
    /**
     * 現在の接続数
     */
    private static AtomicInteger count = new AtomicInteger(0);

    /**
     * SseEmitter情報を格納
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * ユーザー接続を作成し、SseEmitterを返す
     * @param key userId
     * @return SseEmitter
     */
    public static SseEmitter connect(String key) {
        if (sseEmitterMap.containsKey(key)) {
            return sseEmitterMap.get(key);
        }

        try {
            // タイムアウト時間を設定、0は期限なし。デフォルトは30秒
            SseEmitter sseEmitter = new SseEmitter(0L);
            // コールバックを登録
            sseEmitter.onCompletion(completionCallBack(key));
            sseEmitter.onError(errorCallBack(key));
            sseEmitter.onTimeout(timeoutCallBack(key));
            sseEmitterMap.put(key, sseEmitter);
            // 数量+1
            count.getAndIncrement();
            return sseEmitter;
        } catch (Exception e) {
            log.info("新しいSSE接続の作成に失敗しました。現在の接続キーは:{}", key);
        }
        return null;
    }

    /**
     * 指定されたユーザーにメッセージを送信
     * @param key userId
     * @param message メッセージ内容
     */
    public static void sendMessage(String key, String message) {
        if (sseEmitterMap.containsKey(key)) {
            try {
                sseEmitterMap.get(key).send(message);
            } catch (IOException e) {
                log.error("ユーザー[{}]のプッシュに失敗しました:{}", key, e.getMessage());
                remove(key);
            }
        }
    }

    /**
     * 同じグループの人にメッセージを公開、条件:key + groupId
     * @param groupId グループID
     * @param message メッセージ内容
     */
    public static void groupSendMessage(String groupId, String message) {
        if (!CollectionUtils.isEmpty(sseEmitterMap)) {
            sseEmitterMap.forEach((k, v) -> {
                try {
                    if (k.startsWith(groupId)) {
                        v.send(message, MediaType.APPLICATION_JSON);
                    }
                } catch (IOException e) {
                    log.error("ユーザー[{}]のプッシュに失敗しました:{}", k, e.getMessage());
                    remove(k);
                }
            });
        }
    }

    /**
     * ブロードキャストメッセージを送信
     * @param message メッセージ内容
     */
    public static void batchSendMessage(String message) {
        sseEmitterMap.forEach((k, v) -> {
            try {
                v.send(message, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                log.error("ユーザー[{}]のプッシュに失敗しました:{}", k, e.getMessage());
                remove(k);
            }
        });
    }

    /**
     * メッセージをグループ送信
     * @param message メッセージ内容
     * @param ids ユーザーIDの集合
     */
    public static void batchSendMessage(String message, Set<String> ids) {
        ids.forEach(userId -> sendMessage(userId, message));
    }

    /**
     * 接続を削除
     * @param key userId
     */
    public static void remove(String key) {
        sseEmitterMap.remove(key);
        // 数量-1
        count.getAndDecrement();
        log.info("接続を削除:{}", key);
    }

    /**
     * 現在の接続情報を取得
     * @return Map
     */
    public static List<String> getIds() {
        return new ArrayList<>(sseEmitterMap.keySet());
    }

    /**
     * 現在の接続数を取得
     * @return int
     */
    public static int getCount() {
        return count.intValue();
    }

    private static Runnable completionCallBack(String key) {
        return () -> {
            log.info("接続終了:{}", key);
            remove(key);
        };
    }

    private static Runnable timeoutCallBack(String key) {
        return () -> {
            log.info("接続タイムアウト:{}", key);
            remove(key);
        };
    }

    private static Consumer<Throwable> errorCallBack(String key) {
        return throwable -> {
            log.info("接続異常:{}", key);
            remove(key);
        };
    }
}

次に、ユーティリティクラスを使用していくつかのインターフェースを実装し、クライアントが購読し、サーバーが積極的にメッセージをプッシュできるようにします。

@RequestMapping("/sse")
@RestController
@Slf4j
@CrossOrigin
public class SSEEmitterController {

    /**
     * 接続を作成
     * @param id ユーザーID
     * @return SseEmitter
     */
    @GetMapping(path = "/subscribe/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter subscribe(@PathVariable String id) {
        return SseEmitterUtils.connect(id);
    }


    /**
     * 指定されたユーザーにメッセージをプッシュ
     * @param id ユーザーID
     * @param content プッシュ内容
     */
    @PostMapping(path = "/push")
    public void push(String id, String content) {
        SseEmitterUtils.sendMessage(id, content);
    }


    /**
     * 指定されたグループにメッセージをプッシュ
     * @param groupId グループID
     * @param content プッシュ内容
     */
    @PostMapping(path = "/groupPush")
    public void groupPush(String groupId, String content) {
        SseEmitterUtils.groupSendMessage(groupId, content);
    }


    /**
     * ブロードキャストメッセージ
     * @param content プッシュ内容
     */
    @PostMapping(path = "/pushAll")
    public void pushAll(String content) {
        SseEmitterUtils.batchSendMessage(content);
    }

    /**
     * 接続を閉じる
     * @param id ユーザーID
     * @param request リクエスト
     */
    @DeleteMapping(path = "/close/{id}")
    public void close(@PathVariable String id, HttpServletRequest request) {
        request.startAsync();
        SseEmitterUtils.remove(id);
    }

}

最後に、以下の HTML ページを使用してテストできます。

<!DOCTYPE html>
<html lang="en">

<head>
    <title>SSE</title>
    <meta charset="UTF-8">
    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script>
    <script>
        if (window.EventSource) {

            let sources = [];
            // 接続を作成  
            for (let i = 1; i < 10; i++) {
                let id = "id_" + i;
                sources[i] = new EventSource('http://localhost:8008/sse/subscribe/' + id);
            }

            /** 
             * 接続が確立されると、openイベントがトリガーされます 
             * 別の書き方:source.onopen = function (event) {} 
             */
            // 接続オープンイベント
            sources.forEach(source => {
                let id = source.url.split('/').pop();
                source.addEventListener('open', function (e) {
                    setMessageInnerHTML(id + "接続オープン")
                    console.log(id + "接続オープン");
                });
            });

            /** 
             * クライアントがサーバーから送信されたデータを受信 
             * 別の書き方:source.onmessage = function (event) {} 
             */
            // メッセージイベント 
            sources.forEach(source => {
                let id = source.url.split('/').pop();
                source.addEventListener('message', function (e) {
                    setMessageInnerHTML(id + "メッセージを受信:" + e.data)
                    console.log(id + "メッセージを受信:" + e.data);
                });
            });

            /** 
             * 通信エラーが発生した場合(接続が中断された場合など)、errorイベントがトリガーされます 
             * 別の書き方:source.onerror = function (event) {} 
             */
            // エラーハンドリング
            sources.forEach(source => {

                let id = source.url.split('/').pop();

                source.addEventListener('error', function (e) {

                    if (e.readyState === EventSource.CLOSED) {
                        setMessageInnerHTML(id + "接続閉鎖")
                        console.log(id + "接続閉鎖");
                    } else {
                        setMessageInnerHTML(id + "接続エラー:", e)
                        console.log(id + "接続エラー:", e);
                    }

                });

            });
        } else {
            setMessageInnerHTML("ブラウザはSSEをサポートしていません");
        }

        // ウィンドウ閉鎖イベントをリッスンし、SSE接続を手動で閉じます。サーバーが期限なしに設定されている場合、ブラウザが閉じられた後にサーバーデータを手動でクリーンアップします。 
        window.onbeforeunload = function () {
            source.close();
            const httpRequest = new XMLHttpRequest();
            httpRequest.open('GET', 'http://localhost:8008/sse/close/' + id, true);
            httpRequest.send();
            console.log("close");
        };

        // メッセージをウェブページに表示 
        function setMessageInnerHTML(innerHTML) {
            $("#contentDiv").append("<br/>" + innerHTML);
        } 
    </script>
</head>

<body>
    <div>
        <div>
            <div id="contentDiv" style="height:800px; width:1000px; overflow:scroll; background:#ccc;">
            </div>
        </div>
    </div>
</body>

</html>

次に、インターフェースを呼び出してテストします。

image

ブラウザは最終的に 6 つの接続のみを保持し、他の接続はすべて破棄されることがわかります。

MQTT#

MQTT を実現するには、サーバーに以下の依存関係を追加する必要があります。

<!--mqtt依存パッケージ-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>

次に、ミドルウェアをインストールする必要があります。例えば、EMQXや、直接 RabbitMq を使用することもできます。RabbitMq は QOS2 レベルのメッセージ品質をサポートしていないことに注意してください。

image

MQTT の QoS メッセージ品質レベルについては、以下の表の説明を参照してください。

QoS レベル説明適用シーン
0最多 1 回の配信、メッセージは確認されず再送信されない重要でないデータ転送に適している、例えばセンサーのデータなど
1最低 1 回の配信、メッセージが少なくとも 1 回配信されることを保証しますが、重複する可能性がありますメッセージの配信を保証する必要があるが、重複を許可するシーンに適しています
21 回のみの配信、メッセージが 1 回のみ配信されることを保証し、重複を許可しませんメッセージの正確な配信を保証する必要があり、重複を許可しないシーンに適しています

以下は RabbitMQ ミドルウェアを使用する例です。
MQTT プロトコル情報を設定します。

server:
  port: 8008

spring:
  application:
    name: mqttテストプロジェクト
  mqtt:
    url: tcp://127.0.0.1:1883
    username: guest
    password: guest
    clientId: serverClientId
      # 発行するトピック--MQTTのデフォルトのメッセージプッシュトピック、実際には呼び出しインターフェースで指定できます
    pubTopic: testTopic
      # 購読するトピック
    subTopic: gps-topic,oil-topic,broadcast-topic,fault-topic
    completionTimeout: 3000

次に、設定ファイルのエンティティクラスマッピングを作成します。

@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {

    /**
     * RabbitMQ接続ユーザー名
     */
    private String username;
    /**
     * RabbitMQ接続パスワード
     */
    private String password;
    /**
     * プッシュトピック
     */
    private String pubTopic;
    /**
     * RabbitMQのMQTT接続アドレス
     */
    private String url;

    /**
     * RabbitMQのMQTT接続クライアントID
     */
    private String clientId;

    /**
     * 購読トピック
     */
    private String subTopic;

    /**
     * タイムアウト時間
     */
    private Integer completionTimeout;
}

次に、コンシューマーを作成します。

@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqttConsumer {

    private final MqttProperties mqttProperties;

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getUrl(), mqttProperties.getClientId(),
                        mqttProperties.getSubTopic().split(","));
        adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
        adapter.setConverter(new DefaultPahoMessageConverter());
        // メッセージ品質を設定:0->最多1回;1->最低1回;2->1回のみ
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                MessageHeaders headers = message.getHeaders();
                log.info("headers: {}", headers);
                String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
                log.info("購読トピックは: {}", topic);
                String[] topics = mqttProperties.getSubTopic().split(",");
                for (String t : topics) {
                    if (t.equals(topic)) {
                        log.info("購読トピックは:{};このトピックのメッセージを受信しました:{}",topic, message.getPayload());
                    }
                }
            }

        };
    }
}

次に、プロデューサーを作成します。

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProvider {

    private final MqttProperties mqttProperties;

    @Bean
    @SneakyThrows
    public MqttPahoClientFactory mqttClientFactory() {

        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttProperties.getUrl()});
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        // セッションをクリアするかどうかを設定します。ここでfalseに設定すると、サーバーはクライアントの接続記録を保持します。
        // cleanSessionをfalseに設定すると、クライアントがオフラインになった後、サーバーはセッションをクリアしません。
        // 再接続後、以前に購読したトピックのメッセージを受信できます。クライアントがオンラインになると、オフラインの間のメッセージを受信します。
        options.setCleanSession(false);
        // タイムアウト時間を設定、単位は秒
        options.setConnectionTimeout(10);
        // セッションのハートビート時間を設定、単位は秒。サーバーは1.5*20秒ごとにクライアントにメッセージを送信し、クライアントがオンラインかどうかを確認しますが、この方法には再接続のメカニズムはありません。
        options.setKeepAliveInterval(20);
        // 切断後に再接続しますが、この方法には再購読のメカニズムはありません。
        // 再接続を試みる前に、最初に1秒待機し、各失敗した再接続試行の遅延が倍増し、2分に達すると遅延が2分に固定されます。
        options.setAutomaticReconnect(true);
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getPubTopic());
        // プッシュ時のメッセージ品質を設定:0->最多1回;1->最低1回;2->1回のみ
        // RabbitMQではQOS2はQOS1にダウングレードされます。
        messageHandler.setDefaultQos(1);
        // メッセージを保持するかどうかを設定します。trueに設定すると、メッセージは再接続時に送信されます。
        // 新しい空の保持メッセージを送信することでのみクリアできます。
        messageHandler.setDefaultRetained(false);
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

次に、メッセージを送信するためのゲートウェイインターフェースを作成します。

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    /**
     * デフォルトトピックにメッセージを送信
     */
    void sendToMqtt(String payload);

    /**
     * 指定されたトピックにメッセージを送信
     */
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

    /**
     * 指定されたトピックにメッセージを送信し、QOSを設定
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

最後に、メッセージを送信するためのインターフェースを作成します。

@RestController
@RequiredArgsConstructor
@RequestMapping("/mqtt")
public class MqttController {

    private final MqttGateway mqttGateway;

    @PostMapping("/sendToDefaultTopic")
    public void sendToDefaultTopic(String payload) {
        mqttGateway.sendToMqtt(payload);
    }

    @PostMapping("/sendToTopic")
    public void sendToTopic(String payload, String topic) {
        mqttGateway.sendToMqtt(payload, topic);
    }
}

サードパーティの MQTT ソフトウェアを使用してテストできます。例えば、mqttxを使用します。
インストールが完了したら、ソフトウェアを開いて接続を作成します。
image

注意が必要です:

  1. 各接続の ClientId は異なり、一意である必要があります。同じ ClientId は接続が競合し、メッセージが失われる可能性があります。
  2. サーバー接続プロトコルは TCP でも MQTT でもかまいません。
  3. RabbitMQ を使用する場合は、ユーザーの権限を設定する必要があります。
  4. RabbitMQ の場合、バージョンは 3.1.1 を選択してください。
  5. 消費者がオフラインの後もプロデューサーのメッセージを受信したい場合は、Clean Session をオフにしてください。

接続が成功したら、インターフェースを呼び出してメッセージを送信すると、購読者が受信できます。

image

Spring Boot と MQTT を統合してメッセージの送信と消費を実現し、クライアントの切断後のメッセージの復元

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。