こんにちは。iQONのバックエンドエンジニアを担当しているjoeと申します。 最近、iQONのお知らせ機能のDBをMySQLからDynamoDBへ移行しました。 移行する際に発生した問題点である並列処理によるデータ欠損とProvisioning超過の対策を書きます。 間違っているところや改善点があればご指摘ください。よろしくお願い致します。
お知らせ機能とは
お知らせ機能とは、facebookで言うところの「◯◯さんがあなたの投稿に「いいね!」といっています」のような、ユーザーに対するアクションがあったことを通知したり、アイテムが値引きされた、アイテムの在庫が少なくなった等のlikeしたアイテム情報をユーザーに通知する機能です。
既存のお知らせ機能の問題点
既存の構成における問題点は以下の二点です。
- データの肥大化
- レスポンスが遅い
お知らせのデータ構造は、下記のようになっています。
- MySQL : マスターデータ
- redis : キャッシュ
図 お知らせのデータ保持の構成図
redisにはユーザーごとのMySQLのお知らせidを直近100件分保持しています。 お知らせはユーザーの行動やアイテムのステータスの変化の分レコード数が存在するので、MySQLのレコード数が尋常ではない量になります。また、ユーザー分のindexをredisに保持しているので、これではユーザーの人数が増えれば増えるほどredisの容量を食う事になります。MySQLのデータが大きくなればなるほどレスポンスが遅くなっていきます。
この問題を解決するために、awsのDynamoDBに移行することにしました。
なぜDynamoDB?
DynamoDBは
- スキーマレスなkey, valueストレージ
- 10ms未満のレイテンシー
- 大規模なデータにも柔軟に対応
- 自動で冗長化してくれるのでメンテが楽
のような特徴があり、今回解決したい問題の2つである速度改善とデータの肥大化への対策が期待できます。 また、DynamoDBのデメリットである柔軟な検索や集計処理が不得意という点に関しては、今回のケースで影響は少ないと考えられます。
DynamoDBでのデータ保持
ユーザーごとにkeyを持たせて、お知らせはJSONを要素とする配列を格納するようにしました。元々redisで100件分しかデータを保持していなかったので、配列の長さは100件までとしました。
update_countという項の存在意義は後ほど説明します。 event_typeというのは発生したeventごとに割り振られているユニークな数値です。これによってお知らせの種類を見分けています。
# ユーザー1人分のお知らせデータ feedback_user_id(primary_key): "feedback:user:40" feedbacks: [ {"event_type":11,"set_id":1079944,"create_time":"2016-04-30 12:43:56 +0900","feedback_id":500000566}, {"event_type":11,"set_id":1080144,"create_time":"2016-04-30 12:43:58 +0900","feedback_id":500000568}, {"event_type":11,"set_id":1079456,"create_time":"2016-04-30 12:44:00 +0900","feedback_id":500000570}, {"event_type":11,"set_id":1073230,"create_time":"2016-04-30 12:44:01 +0900","feedback_id":500000572}, {"event_type":11,"set_id":1068334,"create_time":"2016-04-30 12:44:03 +0900","feedback_id":500000574}, {"event_type":11,"set_id":1064505,"create_time":"2016-04-30 12:44:05 +0900","feedback_id":500000576}, {"event_type":11,"set_id":1065469,"create_time":"2016-04-30 12:44:06 +0900","feedback_id":500000578}, {"event_type":11,"set_id":1055427,"create_time":"2016-04-30 12:44:07 +0900","feedback_id":500000580} ], update_count: 766
お知らせ機能のdynamo移行における問題点
お知らせ機能のdynamo移行における問題点は2つです。
- provisioning量超過
- 並列処理実行によるデータの欠損
お知らせの発行において予測できない書き込みの増加が発生することがあるので、provisioning量超過時のエラーの対処をする必要がありました。 また、iQONのお知らせデータの発行はworkerによる遅延処理のため、複数のworkerが同時にdbにアクセスしてデータを書き換えるという事例が発生し、データが欠損する恐れがありました。 それぞれの問題についての今回の対応を書いていきます。
provisioning量の調節
いきなりですが、provisioning量を自動調節する方法は弊社の別のブログで公開しているのでそちらを参照してください! DynamoDBの導入とDynamic DynamoDBを用いたプロビジョニング量自動調整
ただ、上記の場合だと急激なthroughput(読み込み・書き込み量)の変化に追いつけずに書き込みに失敗することがあるので、今回はprovisioningを超過して失敗した場合、お知らせの発行処理をenqueueしてworkerに処理させるようにしました。
実装例
お知らせをDynamoDBにinsertする処理をwith_retryメソッドで囲み、insertが失敗した場合にworkerにenqueueしています。
# retry処理 def with_retry begin yield rescue Aws::DynamoDB::Errors::ProvisionedThroughputExceededException, Aws::DynamoDB::Errors::ConditionalCheckFailedException => e if get_event_value[:retry_enqueue_limit] != 0 @retry_enqueue_limit -= 1 Sidekiq::Client.enqueue(FeedbackDynamoFailedInsertWorker, get_event_value) else ::Rails.logger.info "[FEEDBACK ENQUEUE RETRY MAX]#{e.message}" raise e end end end
# 失敗のqueueを処理するworker class FeedbackDynamoFailedInsertWorker < Worker sidekiq_options :queue => :feedback_failed_insert, :retry => false, :backtrace => true def perform(params) sleep(0.005) begin FeedbackUserIndexDynamo.new(params["user_id"]).update_feedback!(params["event_value"], params["retry_enqueue_limit"]) rescue => e ::Rails.logger.info "[ERROR][FeedbackDynamoInsertWorker]" + e.message end end end
上記を実装することで完全にthroughputによるエラーをなくせました!
並列処理実行によるデータの欠損
iQONのお知らせデータの発行はworkerによる遅延処理のため、複数のworkerが同時にdbにアクセスしてデータを書き換えるという事例が発生します。そこで起こりえるのが、同時データ取得・書き込みによるデータの欠損です。
例えばworker1がDynamoDBにアクセスしてお知らせのデータを取得し、お知らせの配列にデータを追加してupdateをする。その間にworker2がDynamoDBにアクセスして値を追加、worker1の動作を上書いてしまうといった不整合が発生します。
そこで、 DynamoDBの機能としてある"Conditional Update(条件付き書き込み)"という楽観的ロックを用いてデータの欠損を防ぎました。
"Conditional Update"とは、DynamoDBのupdate処理のoptionの一つであり、こちら側が指定した条件を満たす時のみデータをupdateするようにする処理です。
実際の構成
Conditional Updateの条件にするため、上記でも説明したupdate_countというcounterをお知らせの更新のたびにincrementします。 Conditional Updateの条件を下記のように設定して同時書き込みを防ぎました。
条件: update_count == previous_update_count
図 お知らせ更新の構成図
※ iQONのお知らせは、過去のお知らせ情報を元に丸め処理(同じコーデに対するLikeのお知らせは1個に丸める等)をするため、DynamoDBからお知らせの配列ごと取得し、お知らせを書き換えて配列をDynamoDBに上書きするというフローになっています。
実装例
insert
def add!(feedbacks, update_count, previous_update_count) options = { table_name: "feedbacks", key: "feedback:user:1", update_expression: "SET update_count = :update_count, feedback = :feedbacks", condition_expression: "update_count <= :previous_update_count", expression_attribute_values: { ":feedbacks" => feedbacks, ":update_count" => update_count, ":previous_update_count" => previous_update_count } } @@dynamodb.update_item(options) end
update処理を行うときは、update_item
を使用しますが、 update_item
のオプションはAttributeUpdates
でなく、UpdateExpression
を使用することが推奨されています。
また、update_expressionでは下記の4つのupdateのオプションがあります。
- SET
- REMOVE
- ADD
- DELETE
今回はADD
より柔軟なSET
を使用しています。(ADDは数値とset型の配列しか受け付けない)
また、SET
を使えば、数値をincrement/decrementすることもできるので、update_countのincrementをDynamoDB側で行う事もできます。詳しくは参考資料を御覧ください。
(今回は自前でincrementしています。)
update_expressionの参考資料 Modifying Items and Attributes with Update Expressions
その他の注意点としては、
- 数値の型がintではなくBigDecimal
- Time型がないので時間は文字列で値を入れる
等があります。
Conditional Updateの失敗数の監視と失敗時の対応
Conditional Updateのfail、つまり、同時書き込みによる失敗数は下記の図のようになっています。 このように、DynamoDBではthroughtputの量やConditional Updateのfail等、awsコンソールで表示してくれます。
Conditional Updateの失敗をした場合は、上記で説明した用にworkerでenqueueして処理を再度行うようにしています。
結論
各対処法によってデータ欠損、Provisioning量超過によるデータの書き込みエラーで飛んでくるsentryのエラーを0にできました!
DynamoDBを触ってみて、配列の上限がx個に達したら自動的に古い要素を削除してくれるような機能があればいいなと思いました。 このような機能がある、又はDynamoDBの◯◯を使えばコレが実現できそう等がありましたら教えて下さい!
終わりに
VASILYではDynamoDBをごりごりに使ってみたい仲間を募集しています。 新規事業の開発も始まりましたし、ご興味がある方はぜひこちらからご応募ください。