Amazon Kinesis Data Streamsの事例紹介とAWS SDKのバージョンアップについて

OGP

はじめに

こんにちは。ECプラットフォーム部カート決済ブロックの曽根です。

ZOZOTOWNでは、リプレイスの一環として、2021年4月からカート決済機能のマイクロサービス化を開始しました。

ZOZOTOWN カート決済機能リプレイス Phase1 〜 キャパシティコントロールの実現 - ZOZO TECH BLOG

本記事では、上記で紹介しているCart Queuing SystemのAmazon Kinesis Data Streams(以下、KDS)にフォーカスし、Javaの実装を交えて事例をご紹介します。また、開発中にAWS SDK for Javaをv1からv2へバージョンアップしたため、合わせて変更点もお伝えします。

KDSとは

KDSは、ログやイベントデータの収集、リアルタイム分析などで活用可能なストリーミングデータサービスです。

KDSに格納されるデータの単位は、レコードです。レコードは、以下で構成されています。

  • シーケンス番号
  • パーティションキー
  • データBLOB

パーティションキーはKDSにデータを組み込む時に使用され、レコードをストリームのシャードにルーティングします。

シャードとはKDS内で識別されたレコードのシーケンスです。シャードへのルーティングは以下のルールで行われます。

  1. パーティションキーをMD5ハッシュ関数でハッシュ化して、128bitの整数値にマッピングを行う
  2. ハッシュ化された整数値が割り当てられたシャードにデータを送る

128bitなので、シャードには0から340282366920938463463374607431768211455の値が割り振られています。この値をシャードの数に応じて分割します。

具体的には、シャードが1つの場合は以下のようになります。

シャード名
シャード1 0 - 340282366920938463463374607431768211455

シャードが2つの場合は340282366920938463463374607431768211455を2で割り、以下のようになります。

シャード名
シャード1 0 - 170141183460469231731687303715884105727
シャード2 170141183460469231731687303715884105728 - 340282366920938463463374607431768211455

本来、各シャードへの振り分けはKDSが自動で行います。しかし、開発時に確認したところ想定以上の偏りが出てしまいました。そこで、意図的にシャードを振り分ける検証をしました。

以下は、レコードを100個に分割したシャードに対してランダムかつ均等に振り分ける例です。最大値をシャードの数で割り、それにシャードの数を最大とした乱数を掛けた値をハッシュ値にしています。意図的にシャードを指定するためには、explicitHashKeyにハッシュ値を設定する必要があります。partitionKeyとexplicitHashKeyの両方が設定されていた場合、explicitHashKeyが優先されます。

BigDecimal sortingShard =
  new BigDecimal(new BigInteger("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", 16))
    .divide(
      BigDecimal.valueOf(100),
      4,
      RoundingMode.HALF_UP);
String explicitHashKey = sortingShard
  .multiply(BigDecimal.valueOf(new SecureRandom().nextInt(100)))
  .setScale(0, RoundingMode.UP)
  .toString();
PutRecordRequest putRecordRequest =
  PutRecordRequest.builder()
    .streamName(streamName)
    .data(SdkBytes.fromByteArray(json.getBytes()))
    .explicitHashKey(explicitHashKey)
    .partitionKey(partitionKey)
    .build();

各シャードは、読み取りに対して最大5トランザクション/秒をサポートします。また、最大合計データ読み取りレートは2MB/秒、最大合計データ読み取りレートは1,000レコード、最大合計データ書き込みレートは1秒あたり1MB(パーティションキーを含む)までサポートできます。より詳しい説明は公式ドキュメントをご参照ください。

アーキテクチャ設計

ZOZOTOWNでは、カート投入時に商品の在庫引き当てを行う仕様があります。そのため、複数のユーザーが同じ商品をカート投入しようとした場合、FIFO(First-In First-Out)で処理を行い、投入順を維持する必要があります。KDSはシャードごとに順序保証がされているため、同一のシャードにリクエストを振り分ける必要がありました。そこでパーティションキーを商品ごとに振られている商品IDにしました。より詳しい説明は以下の記事をご参照ください。

ZOZOTOWN カート投入の分散キューイングシステム 〜 プロダクションレディまでの歩み - ZOZO TECH BLOG

過熱商品への対応

福袋や限定品など、ユーザーから大量のアクセスが来る商品のことを弊社では「過熱商品」と呼んでいます。

過熱商品を先程までのアーキテクチャで処理してしまうと、下図のように過熱商品ではない商品を購入したいユーザーが巻き込まれて商品をカートに追加できなくなってしまいます。

image

そこで、この問題を解決するために、下図のようにストリームを分けるようにしました。

image

過熱商品になりそうな商品をあらかじめDBに登録しておき、カートに商品を追加する際にそのDBを参照します。これによりデータが取得できた場合は、過熱商品用のストリームにデータを流します。

このようにストリームを分けることにより、過熱商品による大量のアクセスが他の商品を購入したいユーザに影響を及ぼすことはなくなりました。

AWS SDKとは

ここからはAWS SDKのバージョンアップについて説明します。AWS SDKはAWSのサービスをプログラムなどから操作できるようにするための開発キットです。AWS SDKを使用することでWebアプリケーションを介さずに直接AWSサービスとやり取りできるアプリケーションを開発できます。AWS SDKは各種言語に対応しており、様々なAWSのサービスに対応しています。今回のプロジェクトはJavaで開発しているため、AWS SDK for Javaを使用しています。

SDKのバージョンアップ

2018年の11月にAWS SDKの2.xがリリースされました。プロダクトでは最初に1.x系のライブラリを使用していましたが、途中から2.x系にバージョンアップを行いました。変更点をいくつかご紹介します。

クライアントの生成

1.xではコンストラクタによる生成だったのが、2.xではbuilderによる生成になりました。

1.x

AmazonKinesis kinesisClient = AmazonKinesis.defaultClient();
AmazonKinesisClient kinesisClient = new AmazonKinesisClient();

2.x

KinesisClient kinesisClient = KinesisClient.create();
KinesisClient kinesisClient = KinesisClient.builder().build();

クライアントの設定方法

1.xではClientConfigurationですべて設定していましたが、2.xでは別々の設定クラスに分割されています。

設定内容 1.x メソッド - 2.x クラス 2.x メソッド
接続タイムアウトまでの時間 withConnectionTimeout ApacheHttpClient connectionTimeout
クライアントがAPI呼び出しの実行を完了するのにかかる時間 withClientExecutionTimeout ClientOverrideConfiguration apiCallTimeout
リクエストをタイムアウトするまでの時間 withRequestTimeout ClientOverrideConfiguration apiCallAttemptTimeout
ソケット通信をタイムアウトするまでの時間 withSocketTimeout ApacheHttpClient socketTimeout
HTTP接続の最大数 withMaxConnections ApacheHttpClient maxConnections
最大リトライ数 withMaxErrorRetry ClientOverrideConfiguration retryPolicy

認証情報の設定

2.xでは環境変数名やメソッドなどが変更になっています。また一部のメソッドがサポート外になりました。

環境変数名の変更

1.x 2.x
AWS_ACCESS_KEY AWS_ACCESS_KEY_ID
AWS_SECRET_KEY AWS_SECRET_ACCESS_KEY
AWS_CREDENTIAL_PROFILES_FILE AWS_SHARED_CREDENTIALS_FILE

メソッド名の変更

1.x 2.x
AWSCredentialsProvider.getCredentials AwsCredentialsProvider.resolveCredentials
DefaultAWSCredentialsProviderChain.getInstance サポート外
AWSCredentialsProvider.getInstance サポート外
AWSCredentialsProvider.refresh サポート外

システムプロパティ名の変更

1.x 2.x
aws.secretKey aws.secretAccessKey
com.amazonaws.sdk.disableEc2Metadata aws.disableEc2Metadata
com.amazonaws.sdk.ec2MetadataServiceEndpointOverride aws.ec2MetadataServiceEndpoint

DynamoDBへのアクセス方法

メソッドチェーンでより直感的に記載できるようになりました。

1.x

public void register(Id id) {
  CartRequests cartRequests = new CartRequests(id.getValue());
  DynamoDBSaveExpression dynamoDBSaveExpression =
    new DynamoDBSaveExpression()
      .withExpectedEntry(ID, new ExpectedAttributeValue().withExists(false));
    dynamoDBMapper.save(
      cartRequests,
      dynamoDBSaveExpression
  );
}

2.x

public void register(Id id) {
  CartRequests cartRequests = new CartRequests(id.getValue());
  Expression expression =
    Expression.builder()
      .expression("attribute_not_exists(#id)")
      .expressionNames(Map.of("#id", ID))
      .build();
    PutItemEnhancedRequest<CartRequests> putItemEnhancedRequest =
      PutItemEnhancedRequest.builder(CartRequests.class)
        .item(cartRequests)
        .conditionExpression(expression)
        .build();
  getCartRequestsTable().putItem(putItemEnhancedRequest);
}

例外クラス名の変更

1.x 2.x
com.amazonaws.SdkBaseException
com.amazonaws.AmazonClientException
software.amazon.awssdk.core.exception.SdkException
com.amazonaws.SdkClientException software.amazon.awssdk.core.exception.SdkClientException
com.amazonaws.AmazonServiceException software.amazon.awssdk.awscore.exception.AwsServiceException

クライアントや例外クラスの変更などがあるため、アップデートする際にある程度コードの修正が発生してしまいます。時間に余裕を持って行いましょう。また、現状ではまだ1.x系のみにしかない機能がいくつかあります。その機能を使う場合は1.x系と2.x系を両方同時に使用して、処理によってライブラリを使い分けましょう。

ほかにも様々な変更があるので、詳しくは公式ドキュメントchangelogをご参照ください。

まとめ

今回はZOZOTOWNのカート決済機能のリプレイスで使用したKDSの事例とAWS SDKのバージョンアップについて紹介しました。

KDSやAWS SDKを使用することで、ユーザーに安定したカート投入を提供できるようになりました。今後もサービス向上のため、さらなる改善を進めていきます。

最後に

ZOZOでは、一緒にサービスを作り上げてくれる仲間を募集中です。ご興味のある方は以下のリンクからぜひご応募ください。

hrmos.co

hrmos.co

カテゴリー