以前リアルタイムプッシュ(1)を書いたときは、rabbitmq の mqtt プラグインを使用していました。その時は私のニーズを満たしていましたが、今は新しい要求があり、リモートウェイクアップを判断するためにデバイスがオンラインかどうかを確認する必要があります。現在の時間にデバイスがオンラインであれば、リモートウェイクアップが可能であることを示します。
最初は、rabbitmq の基盤の上で、mqtt のオンラインデバイスを取得できる API がないか探そうと思いました。結局、メッセージキューもこれを使用しているので、ミドルウェアを増やす必要はありませんでした。そして、mqtt には多くの規約のシステムトピックがあり、これらのシステムトピックは mqtt ブローカーの状態を維持しています。
しかし、私が言ったように、システムトピックは規約の設定であり、強制的なものではなく、rabbitmq の mqtt 実装もプラグインとしての補完に過ぎません。これは私が公式リポジトリで提起したディスカッションです。
仕方がないので、専門の mqtt ミドルウェアを導入するしかないようです。ここではEMQXを選びました。主にドキュメントが非常に詳細だからです。
インストール#
ここでは WSL2 の Docker を使用してインストールすることにしました。インストールコマンドは非常に簡単です。
$ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.0.0
インストールが成功した後、直接 http://localhost:18083
にアクセスしてコントロールパネルを利用できます。デフォルトのユーザー名は admin
、パスワードは public
です。
設定#
インストールが完了した後、プログラム内で設定を行う必要があります。mqtt V5 にバージョンを切り替えたため、依存関係と設定も変更する必要があります。
依存関係#
<!--mqtt依存パッケージ-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
設定ファイル#
# mqttプロトコル設定
spring:
mqtt:
url: tcp://127.0.0.1:1883
username: admin
password: admin
clientId: serverClientId
#発行するトピック--MQTT-デフォルトのメッセージプッシュトピック、実際には呼び出しインターフェース時に指定可能
pubTopic: testTopic
#購読するトピック
subTopic: testTopic,remote-wake
completionTimeout: 30000
emqx はデフォルトで匿名接続を許可しているため、ユーザー名とパスワードは省略できます。
プロデューサー#
次に、プロデューサーの設定クラスを作成します。
/**
* @author lza
* @date 2023/11/24-10:24
**/
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProviderConfig {
private final MqttProperties mqttProperties;
/**
* クライアントオブジェクト
*/
private MqttClient providerClient;
/**
* bean初期化後にサーバーに接続
* @author xct
* @date 2021/7/30 16:48
*/
@PostConstruct
public void init(){
connect();
}
/**
* クライアントがサーバーに接続
* @author xct
* @date 2021/7/30 16:01
*/
@SneakyThrows
public void connect(){
//MQTTクライアントオブジェクトを作成
providerClient = new MqttClient(mqttProperties.getUrl(), "providerClient", new MemoryPersistence());
//接続設定
MqttConnectionOptions options = new MqttConnectionOptions();
//セッションをクリアするかどうか、falseに設定するとサーバーはクライアントの接続記録(購読トピック、qos)を保持し、クライアントが再接続した後にサーバーがクライアントの切断中にプッシュしたメッセージを取得できる
//trueに設定すると、毎回サーバーに接続する際に新しい身分として接続される
options.setCleanStart(true);
//接続ユーザー名を設定
options.setUserName(mqttProperties.getUsername());
//接続パスワードを設定
options.setPassword(mqttProperties.getPassword().getBytes());
//タイムアウト時間を設定、単位は秒
options.setConnectionTimeout(100);
//ハートビート時間を設定、単位は秒、サーバーが1.5*20秒ごとにクライアントにハートビートを送信し、クライアントがオンラインかどうかを判断する
options.setKeepAliveInterval(20);
//自動再接続を設定
options.setAutomaticReconnect(true);
//コールバックを設定
providerClient.setCallback(new MqttProviderCallBack());
providerClient.connect(options);
}
/**
* メッセージを発行
*
* @param qos サービス品質レベル
* 0は一度だけ送信し、成功かどうかに関係なく
* 1は未成功の場合、成功するまで送信し続け、複数回受信する可能性がある
* 2は未成功の場合、送信し続けるが、一度だけ受信することを保証する
* @param retained 保持フラグ
* trueに設定すると、サーバーはこのアプリケーションメッセージとそのサービス品質レベルを保存し、購読者がこのトピックを購読すると、メッセージをその購読者にプッシュする
* ただし、サーバーは同じトピックに対して1つのretainedメッセージ(最後に受信したもの)しか保持しない
* @param topic トピック
* @param message メッセージ
* @author xct
* @date 2021/7/30 16:27
*/
@SneakyThrows
public void publish(int qos, boolean retained, String topic, String message) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
//トピックの宛先、メッセージを発行/購読するために使用
MqttTopic mqttTopic = providerClient.getTopic(topic);
//メッセージの配信進捗を追跡するメカニズムを提供
//非ブロッキング方式(バックグラウンドで実行)で発行を実行する際にメッセージの配信進捗を追跡するために使用
MqttToken token;
//指定されたメッセージをトピックに発行するが、メッセージの配信が完了するのを待たない。返されたトークンはメッセージの配信状態を追跡するために使用できる。
//このメソッドがクリーンに戻ると、メッセージはクライアントによって受け入れられた発行となる。接続が利用可能なときに、バックグラウンドでメッセージの配信が完了する。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
主に行うことは、設定ファイルの接続パラメータに基づいて mqtt クライアントオブジェクトを作成し、publish
メソッドでメッセージをプッシュすることです。
プロデューサークライアントオブジェクトを作成する際に MqttProviderCallBack
をコールバック関数として指定したため、このコールバッククラスも作成する必要があります。
プロデューサーコールバック#
プロデューサーのコールバックメソッドは必要に応じて実装します。
/**
* @author lza
* @date 2023/11/24-10:34
**/
public class MqttProviderCallBack implements MqttCallback {
@Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
System.out.println("プロデューサー:サーバーとの接続が切断されました");
}
@Override
public void mqttErrorOccurred(MqttException e) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
@Override
public void deliveryComplete(IMqttToken iMqttToken) {
MqttClientInterface client = iMqttToken.getClient();
System.out.println(client.getClientId() + "メッセージの発行に成功しました!");
}
@Override
public void connectComplete(boolean b, String s) {
}
@Override
public void authPacketArrived(int i, MqttProperties mqttProperties) {
}
}
コンシューマー#
プロデューサーを作成した後、コンシューマーも必要です。
/**
* @author lza
* @date 2023/11/24-10:43
**/
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttConsumerConfig {
private final MqttProperties mqttProperties;
/**
* クライアントオブジェクト
*/
public MqttClient consumerClient;
/**
* bean初期化後にサーバーに接続
* @author xct
* @date 2021/7/30 16:48
*/
@PostConstruct
public void init(){
connect();
}
/**
* クライアントがサーバーに接続
* @author xct
* @date 2021/7/30 16:01
*/
@SneakyThrows
public void connect(){
//MQTTクライアントオブジェクトを作成
consumerClient = new MqttClient(mqttProperties.getUrl(), "consumerClient", new MemoryPersistence());
//接続設定
MqttConnectionOptions options = new MqttConnectionOptions();
//セッションをクリアするかどうか、falseに設定するとサーバーはクライアントの接続記録(購読トピック、qos)を保持し、クライアントが再接続した後にサーバーがクライアントの切断中にプッシュしたメッセージを取得できる
//trueに設定すると、毎回サーバーに接続する際に新しい身分として接続される
options.setCleanStart(true);
//接続ユーザー名を設定
options.setUserName(mqttProperties.getUsername());
//接続パスワードを設定
options.setPassword(mqttProperties.getPassword().getBytes());
//タイムアウト時間を設定、単位は秒
options.setConnectionTimeout(100);
//ハートビート時間を設定、単位は秒、サーバーが1.5*20秒ごとにクライアントにハートビートを送信し、クライアントがオンラインかどうかを判断する
options.setKeepAliveInterval(20);
//自動再接続を設定
options.setAutomaticReconnect(true);
//コールバックを設定
consumerClient.setCallback(new MqttConsumerCallBack(this));
consumerClient.connect(options);
//トピックを購読
//メッセージレベル、トピック配列に一対一で対応し、サーバーは指定されたレベルで購読したトピックのクライアントにメッセージをプッシュします
int[] qos = {1,1};
//トピック
String[] topics = mqttProperties.getSubTopic().split(",");
//トピックを購読
consumerClient.subscribe(topics,qos);
}
/**
* 接続を切断
*
* @author xct
* @date 2021/8/2 09:30
*/
@SneakyThrows
public void disConnect() {
consumerClient.disconnect();
}
/**
* トピックを購読
*
* @param topic トピック
* @param qos メッセージレベル
* @author xct
* @date 2021/7/30 17:12
*/
@SneakyThrows
public void subscribe(String topic, int qos) {
consumerClient.subscribe(topic, qos);
}
}
コンシューマーも同様に接続情報を使用してコンシューマークライアントインスタンスを作成する必要があり、同様にコンシューマーのコールバック関数を指定する必要がありますが、異なるのはコンシューマーのメソッドが subscribe
購読メソッドと disConnect
接続切断メソッドであることです。
コンシューマーコールバック#
/**
* @author lza
* @date 2023/11/24-10:55
**/
public class MqttConsumerCallBack implements MqttCallback {
private final MqttConsumerConfig consumerConfig;
public MqttConsumerCallBack(MqttConsumerConfig consumerConfig) {
this.consumerConfig = consumerConfig;
}
@Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
System.out.println("コンシューマー;サーバーとの接続が切断されました");
}
@Override
public void mqttErrorOccurred(MqttException e) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.printf("受信メッセージトピック : %s%n",s);
System.out.printf("受信メッセージQos : %d%n",mqttMessage.getQos());
System.out.printf("受信メッセージ内容 : %s%n",new String(mqttMessage.getPayload()));
System.out.printf("受信メッセージretained : %b%n",mqttMessage.isRetained());
// mqttV5リクエスト応答モードを設定
List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
MqttProperties properties = new MqttProperties();
if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
MqttMessage responseMessage = new MqttMessage();
properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
responseMessage.setProperties(properties);
responseMessage.setPayload("デバイスが接続されていません".getBytes());
responseMessage.setQos(1);
responseMessage.setRetained(false);
consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
}
}
}
@Override
public void deliveryComplete(IMqttToken iMqttToken) {
}
@Override
public void connectComplete(boolean b, String s) {
}
@Override
public void authPacketArrived(int i, MqttProperties mqttProperties) {
}
}
コンシューマーのコールバックで最も重要なのはメッセージを取得するメソッド messageArrived
です。コンシューマーが購読しているトピックがメッセージを受信すると、このメソッドに入ります。
すべてが正しければ、emqx のバックエンド管理ページでこの 2 つのクライアント、プロデューサーとコンシューマーがオンラインになっているのを見ることができます。
リクエスト応答#
あなたはおそらくコンシューマーの messageArrived
コールバックメソッドにリクエスト応答の注釈があるのを見たことがあるでしょう。これが mqtt のバージョン依存関係を 3.1.1 から 5 に切り替えた理由です。
リクエスト応答モードは mqtt V5 の大きなバージョン特性であり、この文書を参照することができます。また、面白い例を含む文書もあります。
簡単に言うと、私たちは HTTP リクエストが明確なリクエスト / 応答モデルであることを知っています。フロントエンドはバックエンドにインターフェースを通じてリクエストを行い、バックエンドはデータを処理した後、結果をフロントエンドに返します。失敗しても成功しても、フロントエンドは常に返り値を受け取ることができます。
しかし、mqtt のプッシュ購読モデルでは、プロデューサーとコンシューマーが互いに関心を持たない背景(mqtt は通常 IoT シーンで使用され、IoT のほとんどのシーンはプロデューサーまたはコンシューマーに対して過度に寛容である)を考えてみてください。想像してみてください、あなたはある公式アカウントを購読していて、その公式アカウントがいつプッシュしてくるかを気にする必要はありません。あなたはただ、その公式アカウントがプッシュしたメッセージを見逃さないことを保証する必要があります。
例えば、部屋に温度センサーがあり、ネットワークに接続されているとします。外出先でスマートフォンのアプリを通じてセンサーの現在の温度を取得したいと思っています。
HTTP リクエストのようなものですが、異なるのはバックエンドのデータがセンサーが収集したデータに変わることです。リクエスト応答を使用しない場合でも、2 つのトピック A と B を作成し、リクエスト時にメッセージを A に送信し、デバイスが A を購読し、データを収集した後に B トピックにデータを送信してアプリが購読することができます。
では、リクエスト応答はどのように行うのでしょうか?A にメッセージを送信する際に、応答のトピックを直接指定します。デバイスがメッセージを受け取ると、上に太字で書かれているのを見て、「B に来てください」となり、直接 B トピックにデータを返します。郵便局で手紙を送るプロセスに似ています。
MQTTXで mqtt に接続する際にバージョンを 5 に選択すれば、リクエスト応答のテストができます。
ユーザー属性を追加して判断に使用し、コンシューマーのコールバックメソッドに対応します。
// mqttV5リクエスト応答モードを設定
List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
MqttProperties properties = new MqttProperties();
if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
MqttMessage responseMessage = new MqttMessage();
properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
responseMessage.setProperties(properties);
responseMessage.setPayload("デバイスが接続されていません".getBytes());
responseMessage.setQos(1);
responseMessage.setRetained(false);
consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
}
}
ユーザー属性 action
の値が remoteWake
の場合、応答モードがトリガーされ、メッセージ内の応答トピックを取得して自動的に返信します。
オンライン・オフライン通知#
リクエスト応答は単なる拡張であり、最初の要求はデバイスがオンラインかどうかを取得することでした。
mqtt ミドルウェアを変更したので、システムトピックを使用できますか?
答えはできますが、必要ありません。なぜなら、emqx はより優雅な方法を提供しているからです。
最も簡単な方法は、emqx のエンタープライズ版を直接使用することです。
データストレージの主な使用シーンには、クライアントのオンライン・オフライン状態、購読トピック情報、メッセージ内容、メッセージ到達後のメッセージ確認などの操作を Redis、MySQL、PostgreSQL、MongoDB、Cassandra などのさまざまなデータベースに記録することが含まれます。ユーザーは関連トピックを購読することで同様の機能を実現できますが、エンタープライズ版ではこれらの永続化のサポートが組み込まれています。前者に比べて、後者の実行効率は高く、開発者の作業量を大幅に削減できます。
第二の方法、推奨される方法は、webhookを使用してオンライン・オフライン通知を維持することです。
まず、webhook 通知を取得するためのインターフェースを作成します。
private final static String CLIENT_IDS = "clientIds";
private final static String CONNECTED = "client_connected";
private final static String DISCONNECTED = "client_disconnected";
/**
* emqx webhookフック、クライアントのオンライン・オフラインを監視するためのもの
* @param vo オンライン・オフラインvo
*/
@SaIgnore
@PostMapping("/onlineOrOffline")
public void onlineOrOffline(@RequestBody OnlineOrOfflineVo vo) {
System.err.println("クライアント:" + vo.getClientid() +
",アクション:" + vo.getAction() +
",理由:" + vo.getReason());
List<Object> list = RedisUtils.getCacheList(CLIENT_IDS);
if (vo.getAction().equals(CONNECTED)) {
list.add(vo.getClientid());
// 既存の値を削除
RedisUtils.deleteKeys(CLIENT_IDS);
// 重複を排除
ArrayList<Object> distinct = CollUtil.distinct(list);
RedisUtils.setCacheList(CLIENT_IDS, distinct);
} else if (vo.getAction().equals(DISCONNECTED)){
list.remove(vo.getClientid());
// 既存の値を削除
RedisUtils.deleteKeys(CLIENT_IDS);
RedisUtils.setCacheList(CLIENT_IDS, list);
}
}
重複を排除する理由は、A クライアントがオンラインになったとき、redis に A があると仮定します。その後、サービスがダウンし、再起動すると、A が再度オンラインになると、redis には 2 つの A が存在することになります。
次に、emqx のプラグインで webhook プラグインを有効にします。
次に、docker コンテナ内に入り、プラグイン設定を変更します。
docker exec -it emqx bash
cd /etc/plugins
プラグインを変更します。
vi emqx_web_hook.conf
最も重要な変更は、ファイルの先頭にある Webhook URL をインターフェースアドレスに変更することです。
次に、通知ルールを変更します。
前のコメントを削除するだけで、これら 2 つがオンライン・オフライン通知のルールです。
web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
完了すると、クライアントのオンライン・オフライン通知が表示されるようになります。
しかし、これだけでは完璧ではありません。もし mqtt ミドルウェアがオフラインになった場合、その時 webhook と redis に保存されたオンラインデバイスは無効になります。
心配しないでください。emqx はHTTP APIを提供しており、特定のデバイスがオンラインかどうかを取得できます。
実際、API にはクラスター内のすべてのクライアント情報を取得するインターフェースもありますが、ページネーションがあり、大量のクライアントがいると扱いが難しくなります。したがって、webhook を使用してオンライン・オフライン通知を維持し、特定のクライアントがオンラインかどうかを判断するためにインターフェースを使用するのが最も良い方法です。これにより、ほとんどのニーズを満たすことができます。
/**
* emqx APIを使用してクライアントがオンラインかどうかを確認
* @param clientId クライアントID
* @return オンラインかどうか
*/
@SaIgnore
@GetMapping("/checkClientStatus/{clientId}")
public R<Boolean> checkClientStatus(@PathVariable String clientId) {
// GETリクエストを送信
String url = "http://localhost:18083/api/v4/clients/" + clientId;
HttpRequest request = HttpUtil.createGet(url);
request.basicAuth("admin", "public");
HttpResponse response = request.execute();
ClientResponseDto dto = JSONUtil.toBean(response.body(), ClientResponseDto.class);
return R.ok(!ObjectUtil.isEmpty(dto.getData()) && dto.getData().get(0).getConnected());
}
EMQX ドキュメント
EMQX のインストールと使用および一部の問題
MQTT EMQX でクライアントのオンライン・オフラインを監視し、ビジネスで正常に使用する方法