GMOアドマーケティングのM.Hです。
近年のソフトウェアは、従来のモノリシックなアーキテクチャからマイクロサービスアーキテクチャへと大きくシフトしています。この変化に伴いトランザクションはより複雑となり、分散トランザクションが一つの大きな課題となってきます。この記事では、この課題に対処するためのSagaパターンに焦点を当て、その設計、利点、欠点、および他のパターンとの比較について詳しく説明します。
マイクロサービスと分散トランザクション
マイクロサービスアーキテクチャの基本概念
マイクロサービスアーキテクチャは、各サービスが独立して動作し、それぞれが異なるデータソースにアクセスできるという特性を持っています。このため、従来の単一のデータベースを使用するモノリシックアプローチとは異なり、分散トランザクションが一般的です。このような分散環境では、トランザクションの整合性を保つための新しいアプローチが求められます。
分散トランザクションが難しい理由
独立したマイクロサービス間でのデータ整合性を保つためには、複数のサービスにまたがるトランザクションが必要になる場合があります。これを「分散トランザクション」と呼びます。しかし、ネットワーク遅延やサービス障害、さらにはCAP定理などの要因によりこのタスクは簡単ではありません。そもそも、不具合の温床になりがちなので基本的に推奨されていません。
Sagaパターンとは
Sagaパターンの基本的な説明
Sagaはマイクロサービスアーキテクチャで分散トランザクションを使わずにデータ整合性を維持するためのメカニズムです。Sagaパターンでは、非同期メッセージングで構成された一連のローカルトランザクションを使って、複数のサービスにまたがるサービスのデータ整合性を維持します。各トランザクションは独立してコミットまたはロールバックが可能で、仮に何かしらの問題が途中で発生した際には「補償トランザクション」と呼ばれる仕組みを用いて、すでに行われた操作を打ち消します。
以下の例ですと、サービスAから始まってサービスCまでトランザクションが①、②の順で実行された後、サービスCにおいて何らかのエラーか不整合が発生したケースです。今までに実行されたトランザクションを打ち消すための補償トランザクション❸、❹が実行されます。
ACID特性とSagaパターン
トランザクション処理において4つの重要な特性であるACID(Atomicity, Consistency, Isolation, Durability)がありますが、各マイクロサービスは独立しそれぞれが異なるDBを持っていることから、SagaパターンはこのうちのConsistency、つまり一貫性を犠牲にしてしまいそうな感じがします。しかし、先述の補償トランザクションの考え方を導入し実行することで、”結果として”全体の一貫性を維持することができます。伝統的なACID特性とはアプローチが異なりますが、一連のローカルトランザクションにおける結果整合性(更新してから時間が経過した後には整合性保証)の考え方を使ったアーキテクチャパターンが、Sagaということになります。
Sagaパターンの利点と欠点
Sagaパターンは、高い可用性とスケーラビリティを求めるシステムに特に適しているとされています。
利点
- 柔軟性: 独立したサービス間で容易にトランザクションを行うことができます。これにより、各サービスが自身のビジネスロジックに集中することができ、拡張や変更が容易になります。
- スケーラビリティ: 各サービスのローカルトランザクションが独立しているため、システム全体のスケーラビリティが向上します。一部のサービスに負荷が集中しても他のサービスに影響を与えにくいです。
- 障害回復: 補償トランザクションにより、障害が発生した場合でもシステムの一貫性を保つことができます。障害の影響範囲を最小限に抑えることができます。
欠点
- 複雑性: 補償トランザクションを設計・管理する必要があり、実装が複雑になる場合があります。開発や運用のコストが増大する可能性があります。
- レイテンシの増加: 複数のローカルトランザクションを管理するため、ネットワーク通信が増加することが考えられます。これによりレイテンシが増加する可能性があります。
課題とトレードオフ
イベントの順序付けや、同時実行の制御、失敗時の対処など、Sagaパターンを採用する際には様々な課題が生じます。特に、すべてのトランザクションが完了するまでの間に他のトランザクションが干渉するリスクがあり、これにより一貫性とのトレードオフが生じることが考えられます。
Sagaパターンのコーディネート
Sagaパターンにおいて、複数のサービス間でのトランザクションをどのように調整するかは、大きく分けてコレオグラフィとオーケストレーションの2つのアプローチが存在します。
コレオグラフィ (Choreography)
実装概要:
- 各サービスが独自に他のサービスのイベントを購読・発行する形で、トランザクションの流れを制御します。
- サービス間のトランザクションは明示的な中央のコーディネーターを持たず、各サービスが独立して動作します。
特徴:
- 分散型: 各サービスが独立してトランザクションの一部を担当し、イベントの発行・購読によって相互に連携します。
- 自動的な調整: サービスは他のサービスの状態変更をイベントとして捉え、それに応じて自身の動作を変更します。
オーケストレーション (Orchestration)
実装概要:
- 中央のコーディネーターサービス(オーケストレーター)が、各サービスに対して何をいつ実行するかを明示的に指示します。
- オーケストレーターは全体のトランザクションフローを管理し、各サービスを呼び出すことでトランザクションを制御します。
特徴:
- 中央集権型: 一つのオーケストレーターサービスがトランザクションの全体の流れを管理します。
- 明示的な調整: オーケストレーターが各サービスに対してトランザクションの一部を実行するよう指示します。
なお、Sagaパータンの実装において単純なケースでない限りは基本的にオーケストレーションで実装することが推奨されます。
実装例
Sagaパターンと補償トランザクションの実装例:GCPを用いたケーススタディ
GCP(Google Cloud Platform)は、その多機能性とスケーラビリティによって、Sagaパターンと補償トランザクションの実装に非常に適しています。以下に、GCPの各種サービスを使って簡単なSagaパターンの実装例を示します。前項では基本的にオーケストレーションでの実装が推奨されると言いましたが、ここでは簡単のためコレオグラフィでの実装例を示します。
シナリオ設定
- オンラインショッピングプラットフォームでの注文処理を考えます。
- 以下の3つのマイクロサービスが存在すると仮定します:
- 注文サービス
- 在庫サービス
- 決済サービス
使用するGCPサービス
- Pub/Sub: イベントの通知と分散トランザクションの調整
- Cloud Functions: 各マイクロサービスのロジックを実行
- Cloud Spanner: 高い一貫性を持つデータベース
実装の流れ
手順1: Cloud Spannerデータベースの設定
1. Google Cloud Consoleから、Cloud Spannerインスタンスとデータベースを作成します。
2. Orders
, Inventory
, Payments
という名前の3つのテーブルを作成します。スキーマは以下のように設定します。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
CREATE TABLE Inventory ( item_id INT64, count INT64, ) PRIMARY KEY(item_id); CREATE TABLE Orders ( order_id INT64, status STRING(20), ) PRIMARY KEY(order_id); CREATE TABLE Payments ( status STRING(20), user_id INT64, balance INT64, ) PRIMARY KEY(user_id); |
- 最終的に、各テーブルは次のカラムがあるはずです。
Orders
:order_id
,status
Inventory
:item_id
,count
Payments
:user_id
,status
,balance
3.Inventory
, Payments
テーブルにあらかじめ値を挿入しておきます。各テーブルの詳細画面を開いて「データ」欄から挿入できます。
-
Inventory
テーブル
-
Payments
テーブル
手順2: Google Cloud Pub/Subの設定
Google Cloud Consoleで新しいPub/Subトピックを3つ作成します:OrderEvents
, InventoryEvents
, PaymentEvents
手順3: Cloud Functionsの設定
1. 注文サービス(OrderService)
このサービスにはOrderEvents
をトリガーに設定します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
const {PubSub} = require('@google-cloud/pubsub'); const {Spanner} = require('@google-cloud/spanner'); const spanner = new Spanner({projectId: 'xxxxxxxx'}); const instance = spanner.instance('saga-test'); const database = instance.database('test'); const table = database.table('Orders'); const pubSubClient = new PubSub(); exports.orderService = async (req, res) => { const event = JSON.parse(Buffer.from(message.data, 'base64').toString()); if(event.event === "order_create") { const timestamp = new Date().getTime(); await table.insert({order_id: timestamp, status: 'created'}); const message = JSON.stringify({orderId: timestamp, itemId: event.item_id, event: "order_created"}); const dataBuffer = Buffer.from(message); await pubSubClient.topic('InventoryEvents').publish(dataBuffer); } else if(event.event === "inventory_failed") { const event = JSON.parse(Buffer.from(req.data, 'base64').toString()) await table.update({order_id: event.orderId, status: 'denied'}); } }; |
2. 在庫サービス(InventoryService)
このサービスにはInventoryEvents
をトリガーに設定します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
const {PubSub} = require('@google-cloud/pubsub'); const {Spanner} = require('@google-cloud/spanner'); const spanner = new Spanner({projectId: 'xxxxxxxx'}); const instance = spanner.instance('saga-test'); const database = instance.database('test'); const table = database.table('Inventory'); const pubSubClient = new PubSub(); exports.inventoryService = async (message, context) => { const event = JSON.parse(Buffer.from(message.data, 'base64').toString()); if(event.event === "order_created") { const [itemRows] = await table.read({ columns: ['count'], keys: event.itemId }); count = itemRows[0].toJSON().count; let isInventoryAvailable = count > 0; const newEvent = isInventoryAvailable ? "inventory_confirmed" : "inventory_failed"; const publishMessage = JSON.stringify({orderId: event.orderId, itemId: event.itemId, event: newEvent}); const dataBuffer = Buffer.from(publishMessage); if(isInventoryAvailable) { await table.update({item_id: event.itemId, count: count - 1}); await pubSubClient.topic('PaymentEvents').publish(dataBuffer); } else { await pubSubClient.topic('OrderEvents').publish(dataBuffer); } } else if(event.event === "payment_failed") { await table.update({item_id: event.itemId, count: count + 1}); const publishMessage = JSON.stringify({orderId: event.orderId, event: "inventory_failed"}); const dataBuffer = Buffer.from(publishMessage); await pubSubClient.topic('OrderEvents').publish(dataBuffer); } }; |
3. 決済サービス(PaymentService)
このサービスにはPaymentEvents
をトリガーに設定します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
const {PubSub} = require('@google-cloud/pubsub'); const {Spanner} = require('@google-cloud/spanner'); const spanner = new Spanner({projectId: 'xxxxxxxx'}); const instance = spanner.instance('saga-test'); const database = instance.database('test'); const paymentsTable = database.table('Payments'); const pubSubClient = new PubSub(); exports.paymentService = async (message, context) => { const event = JSON.parse(Buffer.from(message.data, 'base64').toString()); if(event.event === "inventory_confirmed") { const userId = 1; // 今回はユーザID1で固定 const orderAmount = 100; // 今回は100円で固定 const [userRows] = await paymentsTable.read({ columns: ['balance'], keys: userId }); const userBalance= userRows[0].toJSON().balance; const isPaymentSuccessful = userBalance >= orderAmount; const newEvent = isPaymentSuccessful ? "payment_successful" : "payment_failed"; if (isPaymentSuccessful) { await paymentsTable.update({user_id: userId, balance: userBalance - orderAmount, status: 'successful'}); } else { await paymentsTable.update({user_id: userId, balance: userBalance, status: 'failed'}); const publishMessage = JSON.stringify({orderId: event.orderId, item_id: event.itemId, event: newEvent}); const dataBuffer = Buffer.from(publishMessage); await pubSubClient.topic('PaymentEvents').publish(dataBuffer); } } }; |
システムフロー
- ユーザが注文した時など、外部から
OrderEvents
トピックが送られてOrderService
が実行され、注文データを作成した後InventoryEvents
トピックにメッセージが送信されます。 InventoryService
がこのイベントを購読し、在庫確認後に結果をPaymentEvents
トピックに送ります。PaymentService
が在庫確認の結果を購読し、決済処理を行います。
補償トランザクション
-
在庫サービス(InventoryService)内の
inventory_failed
イベント: 在庫が不足している場合、このイベントが発行され、注文サービスがそれを受けて注文をキャンセルするなどの補償処理を行います。 -
決済サービス(PaymentService)内の
payment_failed
イベント: 支払いが失敗した場合、このイベントが発行され、それに応じて在庫を元に戻すなどの補償処理を行います。
例
1. {"item_id": 1, "event": "order_create"} として叩く
Orderテーブルに新たに注文データが作成され、statusがcreatedになっていることがわかります。また、Inventryテーブルでは指定したアイテムの個数が1つ減っています。Paymentテーブルではユーザのbalanceが100だけ減らされ、statusがsuccessfulに変更されています。
つまり、この注文は成功しています。
2. {"item_id": 2, "event": "order_create"} として叩く
Orderテーブルに新たに注文データが作成されますが、statusがdeniedになっていることがわかります。また、Inventryテーブルでは指定したアイテムの個数は減っていません。元々の個数が0ですから、在庫サービスのロジックにより注文が受け付けられませんでした。よって、補償トランザクションが走り、注文サービスにおいて該当注文IDのstatusが変更されたわけです。
3. もう一度 {"item_id": 1, "event": "order_create"} として叩く
Orderテーブルに新たに注文データが作成されますが、statusがdeniedになっていることがわかります。また、Paymentテーブルではユーザのbalanceには変更はありませんが、statusがfailedになっています。さらに、Inventryテーブルでは注文したはずのアイテムの個数に変化はありません。
ユーザの所持金が20であるのに対し、注文代金は100なので支払うことができなかったため、決済サービス内のロジックにより支払いが拒否されていることがわかります。よって、補償トランザクションが走り、Inventryテーブルの該当アイテムの個数は戻され、注文IDのstatusも変更されたわけです。
このようにして、GCPのCloud Functions, Cloud Spanner, そしてPub/Subを使ってSagaパターンと補償トランザクションを実装することができます。
まとめ
Sagaパターンは、マイクロサービスアーキテクチャで分散トランザクションを使わずにデータ整合性を維持するための強力な回答となります。その設計の柔軟性とスケーラビリティは多くの場面で有益ですが、補償トランザクションの複雑性を考慮する必要があります。Sagaの特性とトレードオフを十分に理解し、適切な設計と実装を行うことが重要です。
以上がSagaパターンに関する解説となります。この知識を通じて、分散トランザクションの課題への新しい視点を得る手助けとなれば幸いです。