• architecture

    イベントドリブンなアーキテクチャのパターン

    最近 microservices architectureのパターンを知った。マイクロサービスに関するパターンが網羅されていてこういう名前がついているんだ、ということが知れた。

    RDBMSとCloud Tasksを組み合わせて使うことが多いので

    あたりは初めて知ったので、活用できそうだなと思ったのでメモ。まぁ考えれば結局この結論になりそうだが…。

    Transactional outbox

    一般的にRDBMSに何らかのデータを作成し、メッセージブローカー(例えばCloud TasksやPub/Sub、SQSとか)にメッセージを送信するのがマイクロサービスの基本(マイクロサービス自体は詳しくないが)。めっちゃ簡潔に書くなら下記だろう。

    await db.transaction(async (tx) => { const order = await tx.createOrder({...}); }); await sendMessage(order);

    sendMessageはトランザクションがコミットされた後、つまりDBに更新がかかった後にメッセージを送信する。しかし、sendMessageが失敗した場合、トランザクションはロールバックされない。つまり、DBにはデータが残っているのにメッセージは送信されていない状態になる。これを解消するためにトランザクションの中でメッセージを送信するケースを考える。

    await db.transaction(async (tx) => { const order = await tx.createOrder({...}); await sendMessage(order); });

    sendMessageはトランザクションの中で実行されるため、sendMessage終了後、今回の例ではトランザクションのコミットに失敗した場合、メッセージが送信されたのにも関わらずDBにはデータが残っていない状態になる。より複雑なトランザクションであればメッセージ送信後に処理を行う必要があるかも知れない。そうなるとメッセージを送信したのにROLLBACKされている可能性はある。

    これを解消するのがTransactional outboxパターンである。トランザクションの中でメッセージを送信するのではなく、トランザクションの中でメッセージをDBに保存する。トランザクションがコミットされた後、別のプロセスがメッセージを送信する。

    await db.transaction(async (tx) => { const order = await tx.createOrder({...}); await tx.saveMessage(order); });

    別サービスでは下記のような処理が何らかのスケジューラー等で実行される。

    const messages = await db.getMessages(); for (const message of messages) { await sendMessage(message); await db.deleteMessage(message); }

    このようにすることで、DBのデータとメッセージングをアトミックにできる。

    Idempotent consumer

    Idempotent consumerは、メッセージを受信した際に同じメッセージを何度も受信しても問題ないようにするパターンである。特にCloud TasksやPub/Sub、SQSのサービスは"least once"の保証をしている(Pub/SubやSQSはexactly onceもあるらしいが)。逆に言えば同じリクエストがに2回発行されることもあるわけで、これを考慮する必要がある。

    解決方法はシンプルで、処理開始時にDBに何らかのIDを保存しておく。処理が終わった後、DBからIDを削除する。次回同じメッセージが来た場合、DBにIDが存在するか確認し、存在した場合は処理をスキップする。

    const id = requestBody.id; const rowsAffected = await db.execQuery( 'INSERT INTO processed_messages (id) VALUES (?) ON CONFLICT DO NOTHING', [id], ); if(rowsAffected === 0) { // メッセージはすでに処理済みなのでスキップ return; } // 処理を行う await db.execQuery('DELETE FROM processed_messages WHERE id = ?', [id]);

    ここではDBを用いているが、RedisのNXオプションを使うなりすることでも解決可能だ。

    色々な記事を漁ると、最後にDELETEを入れない場合が多いが、Cloud Tasksのようにメッセージを再送信する場合は消すか、もしくはCloud Tasksから渡されるリトライカウントなどもキーに入れて保存するなどが必要になるかも知れない。