前言#
最近、ユーザーがアップロードしたデータが異常か正常かを判断する必要があるビジネス要件があり、異常な場合にはリアルタイムでユーザーに警報メッセージをプッシュする必要があります。
ほとんどの場合、通常はクライアント(ブラウザ)がサーバー(サーバー)にリクエストを送信し、必要なデータを通知します。
しかし、一部の状況では、サーバーがクライアントにメッセージをプッシュする必要があります。例えば、上記の状況や、ウェブページで一般的なQR コードログインなどです。
技術選択#
サーバーが積極的にプッシュする方法はいくつかあり、ビジネスに応じて選択する必要があります。
シンプルで一般的な方法は、長いポーリングと短いポーリングです。名前からしてシンプルで粗暴です。
さらに進んだ方法として、SSE や WebSocket があります。
さらに深いレベルでは、さまざまなメッセージキュー(MQTT など)があります。
他のプッシュ方法としては、SMS プッシュやアプリ通知(例えば、極光 Push)があります。
実現方式 | 実現 | 原理 | モード |
---|---|---|---|
短輪詢 | クライアント | 特定の時間間隔(例えば 1 秒ごと)で、クライアントがサーバーにリクエストを送信し、サーバーが最新のデータをクライアントのブラウザに返します。 | / |
長輪詢 | サーバー | クライアントがサーバーにリクエストを送信し、サーバーがリクエストを受け取った後、接続を保持 し、新しいメッセージがあるまで応答情報を返さず、クライアントが応答情報を処理した後に新しいリクエストをサーバーに送信します。 | / |
SSE(Server-sent Events) | サーバー | クライアントがサーバーに長接続を登録し、サーバーがイベントを取得した後、登録されたクライアントにメッセージを送信できます。 | サーバーからのみプッシュ |
Websocket | クライアント / サーバー | WebSocket を使用するには新しい依存関係を導入する必要があり、一度クライアントとサーバーのハンドシェイクが成功すると、同じチャンネルにいてリアルタイムでメッセージを送受信できます。 | フルデュプレックス、サーバーとクライアントの両方がプッシュと受信が可能 |
MQTT | クライアント / サーバー | MQTT は IoT ビジネスで一般的に使用される、パブリッシュ / サブスクライブモデルに基づく軽量通信プロトコルで、TCP/IP プロトコルの上に構築されています。MQTT の最大の利点は、非常に少ないコードと限られた帯域幅で、リモートデバイスにリアルタイムで信頼性の高いメッセージサービスを提供できることです。 | パブリッシュ / サブスクライブモデル、サーバーとクライアントの両方がプッシュと受信が可能 |
長 / 短ポーリングについては繰り返しませんが、今回はSSEとMQTTの方法について主に説明します。
SSE#
Spring Boot では SSE がネイティブにサポートされており、他の依存関係をインポートする必要はありません。これが利点ですが、欠点も明らかで、サーバーからの一方向のプッシュのみをサポートし、高度なブラウザ(Chrome、Firefox など)のみをサポートします。ブラウザの制限により、各ウェブページは最大 6 つの長接続を保持できます。
最初に SSE ユーティリティクラスを作成します。
@Component
@Slf4j
public class SseEmitterUtils {
/**
* 現在の接続数
*/
private static AtomicInteger count = new AtomicInteger(0);
/**
* SseEmitter情報を格納
*/
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* ユーザー接続を作成し、SseEmitterを返す
* @param key userId
* @return SseEmitter
*/
public static SseEmitter connect(String key) {
if (sseEmitterMap.containsKey(key)) {
return sseEmitterMap.get(key);
}
try {
// タイムアウト時間を設定、0は期限なし。デフォルトは30秒
SseEmitter sseEmitter = new SseEmitter(0L);
// コールバックを登録
sseEmitter.onCompletion(completionCallBack(key));
sseEmitter.onError(errorCallBack(key));
sseEmitter.onTimeout(timeoutCallBack(key));
sseEmitterMap.put(key, sseEmitter);
// 数量+1
count.getAndIncrement();
return sseEmitter;
} catch (Exception e) {
log.info("新しいSSE接続の作成に失敗しました。現在の接続キーは:{}", key);
}
return null;
}
/**
* 指定されたユーザーにメッセージを送信
* @param key userId
* @param message メッセージ内容
*/
public static void sendMessage(String key, String message) {
if (sseEmitterMap.containsKey(key)) {
try {
sseEmitterMap.get(key).send(message);
} catch (IOException e) {
log.error("ユーザー[{}]のプッシュに失敗しました:{}", key, e.getMessage());
remove(key);
}
}
}
/**
* 同じグループの人にメッセージを公開、条件:key + groupId
* @param groupId グループID
* @param message メッセージ内容
*/
public static void groupSendMessage(String groupId, String message) {
if (!CollectionUtils.isEmpty(sseEmitterMap)) {
sseEmitterMap.forEach((k, v) -> {
try {
if (k.startsWith(groupId)) {
v.send(message, MediaType.APPLICATION_JSON);
}
} catch (IOException e) {
log.error("ユーザー[{}]のプッシュに失敗しました:{}", k, e.getMessage());
remove(k);
}
});
}
}
/**
* ブロードキャストメッセージを送信
* @param message メッセージ内容
*/
public static void batchSendMessage(String message) {
sseEmitterMap.forEach((k, v) -> {
try {
v.send(message, MediaType.APPLICATION_JSON);
} catch (IOException e) {
log.error("ユーザー[{}]のプッシュに失敗しました:{}", k, e.getMessage());
remove(k);
}
});
}
/**
* メッセージをグループ送信
* @param message メッセージ内容
* @param ids ユーザーIDの集合
*/
public static void batchSendMessage(String message, Set<String> ids) {
ids.forEach(userId -> sendMessage(userId, message));
}
/**
* 接続を削除
* @param key userId
*/
public static void remove(String key) {
sseEmitterMap.remove(key);
// 数量-1
count.getAndDecrement();
log.info("接続を削除:{}", key);
}
/**
* 現在の接続情報を取得
* @return Map
*/
public static List<String> getIds() {
return new ArrayList<>(sseEmitterMap.keySet());
}
/**
* 現在の接続数を取得
* @return int
*/
public static int getCount() {
return count.intValue();
}
private static Runnable completionCallBack(String key) {
return () -> {
log.info("接続終了:{}", key);
remove(key);
};
}
private static Runnable timeoutCallBack(String key) {
return () -> {
log.info("接続タイムアウト:{}", key);
remove(key);
};
}
private static Consumer<Throwable> errorCallBack(String key) {
return throwable -> {
log.info("接続異常:{}", key);
remove(key);
};
}
}
次に、ユーティリティクラスを使用していくつかのインターフェースを実装し、クライアントが購読し、サーバーが積極的にメッセージをプッシュできるようにします。
@RequestMapping("/sse")
@RestController
@Slf4j
@CrossOrigin
public class SSEEmitterController {
/**
* 接続を作成
* @param id ユーザーID
* @return SseEmitter
*/
@GetMapping(path = "/subscribe/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter subscribe(@PathVariable String id) {
return SseEmitterUtils.connect(id);
}
/**
* 指定されたユーザーにメッセージをプッシュ
* @param id ユーザーID
* @param content プッシュ内容
*/
@PostMapping(path = "/push")
public void push(String id, String content) {
SseEmitterUtils.sendMessage(id, content);
}
/**
* 指定されたグループにメッセージをプッシュ
* @param groupId グループID
* @param content プッシュ内容
*/
@PostMapping(path = "/groupPush")
public void groupPush(String groupId, String content) {
SseEmitterUtils.groupSendMessage(groupId, content);
}
/**
* ブロードキャストメッセージ
* @param content プッシュ内容
*/
@PostMapping(path = "/pushAll")
public void pushAll(String content) {
SseEmitterUtils.batchSendMessage(content);
}
/**
* 接続を閉じる
* @param id ユーザーID
* @param request リクエスト
*/
@DeleteMapping(path = "/close/{id}")
public void close(@PathVariable String id, HttpServletRequest request) {
request.startAsync();
SseEmitterUtils.remove(id);
}
}
最後に、以下の HTML ページを使用してテストできます。
<!DOCTYPE html>
<html lang="en">
<head>
<title>SSE</title>
<meta charset="UTF-8">
<script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script>
<script>
if (window.EventSource) {
let sources = [];
// 接続を作成
for (let i = 1; i < 10; i++) {
let id = "id_" + i;
sources[i] = new EventSource('http://localhost:8008/sse/subscribe/' + id);
}
/**
* 接続が確立されると、openイベントがトリガーされます
* 別の書き方:source.onopen = function (event) {}
*/
// 接続オープンイベント
sources.forEach(source => {
let id = source.url.split('/').pop();
source.addEventListener('open', function (e) {
setMessageInnerHTML(id + "接続オープン")
console.log(id + "接続オープン");
});
});
/**
* クライアントがサーバーから送信されたデータを受信
* 別の書き方:source.onmessage = function (event) {}
*/
// メッセージイベント
sources.forEach(source => {
let id = source.url.split('/').pop();
source.addEventListener('message', function (e) {
setMessageInnerHTML(id + "メッセージを受信:" + e.data)
console.log(id + "メッセージを受信:" + e.data);
});
});
/**
* 通信エラーが発生した場合(接続が中断された場合など)、errorイベントがトリガーされます
* 別の書き方:source.onerror = function (event) {}
*/
// エラーハンドリング
sources.forEach(source => {
let id = source.url.split('/').pop();
source.addEventListener('error', function (e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML(id + "接続閉鎖")
console.log(id + "接続閉鎖");
} else {
setMessageInnerHTML(id + "接続エラー:", e)
console.log(id + "接続エラー:", e);
}
});
});
} else {
setMessageInnerHTML("ブラウザはSSEをサポートしていません");
}
// ウィンドウ閉鎖イベントをリッスンし、SSE接続を手動で閉じます。サーバーが期限なしに設定されている場合、ブラウザが閉じられた後にサーバーデータを手動でクリーンアップします。
window.onbeforeunload = function () {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', 'http://localhost:8008/sse/close/' + id, true);
httpRequest.send();
console.log("close");
};
// メッセージをウェブページに表示
function setMessageInnerHTML(innerHTML) {
$("#contentDiv").append("<br/>" + innerHTML);
}
</script>
</head>
<body>
<div>
<div>
<div id="contentDiv" style="height:800px; width:1000px; overflow:scroll; background:#ccc;">
</div>
</div>
</div>
</body>
</html>
次に、インターフェースを呼び出してテストします。
ブラウザは最終的に 6 つの接続のみを保持し、他の接続はすべて破棄されることがわかります。
MQTT#
MQTT を実現するには、サーバーに以下の依存関係を追加する必要があります。
<!--mqtt依存パッケージ-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
次に、ミドルウェアをインストールする必要があります。例えば、EMQXや、直接 RabbitMq を使用することもできます。RabbitMq は QOS2 レベルのメッセージ品質をサポートしていないことに注意してください。
MQTT の QoS メッセージ品質レベルについては、以下の表の説明を参照してください。
QoS レベル | 説明 | 適用シーン |
---|---|---|
0 | 最多 1 回の配信、メッセージは確認されず再送信されない | 重要でないデータ転送に適している、例えばセンサーのデータなど |
1 | 最低 1 回の配信、メッセージが少なくとも 1 回配信されることを保証しますが、重複する可能性があります | メッセージの配信を保証する必要があるが、重複を許可するシーンに適しています |
2 | 1 回のみの配信、メッセージが 1 回のみ配信されることを保証し、重複を許可しません | メッセージの正確な配信を保証する必要があり、重複を許可しないシーンに適しています |
以下は RabbitMQ ミドルウェアを使用する例です。
MQTT プロトコル情報を設定します。
server:
port: 8008
spring:
application:
name: mqttテストプロジェクト
mqtt:
url: tcp://127.0.0.1:1883
username: guest
password: guest
clientId: serverClientId
# 発行するトピック--MQTTのデフォルトのメッセージプッシュトピック、実際には呼び出しインターフェースで指定できます
pubTopic: testTopic
# 購読するトピック
subTopic: gps-topic,oil-topic,broadcast-topic,fault-topic
completionTimeout: 3000
次に、設定ファイルのエンティティクラスマッピングを作成します。
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {
/**
* RabbitMQ接続ユーザー名
*/
private String username;
/**
* RabbitMQ接続パスワード
*/
private String password;
/**
* プッシュトピック
*/
private String pubTopic;
/**
* RabbitMQのMQTT接続アドレス
*/
private String url;
/**
* RabbitMQのMQTT接続クライアントID
*/
private String clientId;
/**
* 購読トピック
*/
private String subTopic;
/**
* タイムアウト時間
*/
private Integer completionTimeout;
}
次に、コンシューマーを作成します。
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqttConsumer {
private final MqttProperties mqttProperties;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getUrl(), mqttProperties.getClientId(),
mqttProperties.getSubTopic().split(","));
adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
adapter.setConverter(new DefaultPahoMessageConverter());
// メッセージ品質を設定:0->最多1回;1->最低1回;2->1回のみ
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
MessageHeaders headers = message.getHeaders();
log.info("headers: {}", headers);
String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
log.info("購読トピックは: {}", topic);
String[] topics = mqttProperties.getSubTopic().split(",");
for (String t : topics) {
if (t.equals(topic)) {
log.info("購読トピックは:{};このトピックのメッセージを受信しました:{}",topic, message.getPayload());
}
}
}
};
}
}
次に、プロデューサーを作成します。
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProvider {
private final MqttProperties mqttProperties;
@Bean
@SneakyThrows
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { mqttProperties.getUrl()});
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
// セッションをクリアするかどうかを設定します。ここでfalseに設定すると、サーバーはクライアントの接続記録を保持します。
// cleanSessionをfalseに設定すると、クライアントがオフラインになった後、サーバーはセッションをクリアしません。
// 再接続後、以前に購読したトピックのメッセージを受信できます。クライアントがオンラインになると、オフラインの間のメッセージを受信します。
options.setCleanSession(false);
// タイムアウト時間を設定、単位は秒
options.setConnectionTimeout(10);
// セッションのハートビート時間を設定、単位は秒。サーバーは1.5*20秒ごとにクライアントにメッセージを送信し、クライアントがオンラインかどうかを確認しますが、この方法には再接続のメカニズムはありません。
options.setKeepAliveInterval(20);
// 切断後に再接続しますが、この方法には再購読のメカニズムはありません。
// 再接続を試みる前に、最初に1秒待機し、各失敗した再接続試行の遅延が倍増し、2分に達すると遅延が2分に固定されます。
options.setAutomaticReconnect(true);
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getPubTopic());
// プッシュ時のメッセージ品質を設定:0->最多1回;1->最低1回;2->1回のみ
// RabbitMQではQOS2はQOS1にダウングレードされます。
messageHandler.setDefaultQos(1);
// メッセージを保持するかどうかを設定します。trueに設定すると、メッセージは再接続時に送信されます。
// 新しい空の保持メッセージを送信することでのみクリアできます。
messageHandler.setDefaultRetained(false);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
次に、メッセージを送信するためのゲートウェイインターフェースを作成します。
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* デフォルトトピックにメッセージを送信
*/
void sendToMqtt(String payload);
/**
* 指定されたトピックにメッセージを送信
*/
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
/**
* 指定されたトピックにメッセージを送信し、QOSを設定
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
最後に、メッセージを送信するためのインターフェースを作成します。
@RestController
@RequiredArgsConstructor
@RequestMapping("/mqtt")
public class MqttController {
private final MqttGateway mqttGateway;
@PostMapping("/sendToDefaultTopic")
public void sendToDefaultTopic(String payload) {
mqttGateway.sendToMqtt(payload);
}
@PostMapping("/sendToTopic")
public void sendToTopic(String payload, String topic) {
mqttGateway.sendToMqtt(payload, topic);
}
}
サードパーティの MQTT ソフトウェアを使用してテストできます。例えば、mqttxを使用します。
インストールが完了したら、ソフトウェアを開いて接続を作成します。
注意が必要です:
- 各接続の ClientId は異なり、一意である必要があります。同じ ClientId は接続が競合し、メッセージが失われる可能性があります。
- サーバー接続プロトコルは TCP でも MQTT でもかまいません。
- RabbitMQ を使用する場合は、ユーザーの権限を設定する必要があります。
- RabbitMQ の場合、バージョンは 3.1.1 を選択してください。
- 消費者がオフラインの後もプロデューサーのメッセージを受信したい場合は、Clean Session をオフにしてください。
接続が成功したら、インターフェースを呼び出してメッセージを送信すると、購読者が受信できます。