イベントドリブンなアーキテクチャのパターン
最近 microservices architectureのパターンを知った。マイクロサービスに関するパターンが網羅されていてこういう名前がついているんだ、ということが知れた。
RDBMSとCloud Tasksを組み合わせて使うことが多いので
あたりは初めて知ったので、活用できそうだなと思ったのでメモ。まぁ考えれば結局この結論になりそうだが…。
Transactional outbox
一般的にRDBMSに何らかのデータを作成し、メッセージブローカー(例えばCloud TasksやPub/Sub、SQSとか)にメッセージを送信するのがマイクロサービスの基本(マイクロサービス自体は詳しくないが)。めっちゃ簡潔に書くなら下記だろう。
await db.transaction(async (tx) => {
const order = await tx.createOrder({...});
});
await sendMessage(order);
sendMessageはトランザクションがコミットされた後、つまりDBに更新がかかった後にメッセージを送信する。しかし、sendMessageが失敗した場合、トランザクションはロールバックされない。つまり、DBにはデータが残っているのにメッセージは送信されていない状態になる。これを解消するためにトランザクションの中でメッセージを送信するケースを考える。
await db.transaction(async (tx) => {
const order = await tx.createOrder({...});
await sendMessage(order);
});
sendMessageはトランザクションの中で実行されるため、sendMessage終了後、今回の例ではトランザクションのコミットに失敗した場合、メッセージが送信されたのにも関わらずDBにはデータが残っていない状態になる。より複雑なトランザクションであればメッセージ送信後に処理を行う必要があるかも知れない。そうなるとメッセージを送信したのにROLLBACKされている可能性はある。
これを解消するのがTransactional outboxパターンである。トランザクションの中でメッセージを送信するのではなく、トランザクションの中でメッセージをDBに保存する。トランザクションがコミットされた後、別のプロセスがメッセージを送信する。
await db.transaction(async (tx) => {
const order = await tx.createOrder({...});
await tx.saveMessage(order);
});
別サービスでは下記のような処理が何らかのスケジューラー等で実行される。
const messages = await db.getMessages();
for (const message of messages) {
await sendMessage(message);
await db.deleteMessage(message);
}
このようにすることで、DBのデータとメッセージングをアトミックにできる。
Idempotent consumer
Idempotent consumerは、メッセージを受信した際に同じメッセージを何度も受信しても問題ないようにするパターンである。特にCloud TasksやPub/Sub、SQSのサービスは"least once"の保証をしている(Pub/SubやSQSはexactly onceもあるらしいが)。逆に言えば同じリクエストがに2回発行されることもあるわけで、これを考慮する必要がある。
解決方法はシンプルで、処理開始時にDBに何らかのIDを保存しておく。処理が終わった後、DBからIDを削除する。次回同じメッセージが来た場合、DBにIDが存在するか確認し、存在した場合は処理をスキップする。
const id = requestBody.id;
const rowsAffected = await db.execQuery(
'INSERT INTO processed_messages (id) VALUES (?) ON CONFLICT DO NOTHING',
[id],
);
if(rowsAffected === 0) {
// メッセージはすでに処理済みなのでスキップ
return;
}
// 処理を行う
await db.execQuery('DELETE FROM processed_messages WHERE id = ?', [id]);
ここではDBを用いているが、RedisのNX
オプションを使うなりすることでも解決可能だ。
色々な記事を漁ると、最後にDELETEを入れない場合が多いが、Cloud Tasksのようにメッセージを再送信する場合は消すか、もしくはCloud Tasksから渡されるリトライカウントなどもキーに入れて保存するなどが必要になるかも知れない。