【ZOZOTOWNマイクロサービス化】API Gatewayのスロットリング機能開発におけるノウハウ大公開

ogp

はじめに

こんにちは。ECプラットフォーム部のAPI基盤ブロックに所属している籏野 @gold_kou と申します。普段は、GoでAPI GatewayやID基盤(認証マイクロサービス)のバックエンド開発をしています。

ZOZOでは、API Gatewayを内製しています。これまでも以下の記事を公開し、ご好評いただいております。ありがとうございます。

今回は、API Gatewayのスロットリング機能を開発しましたので、そこで得られた知見を共有いたします。ソースコードもたくさん掲載しております。マイクロサービスに興味ある方や、API Gatewayを内製する方の参考になれば幸いです。

また、本記事に掲載されているソースコードは分かりやすさを優先するため、実際とは異なる部分がありますので、あらかじめご了承ください。

スロットリングとは

スロットリングとは、何らかのリソースに対して使用量の上限を設定し、上限を超えるものについてはその使用を制限するような処理です。本記事ではリクエストに対するスロットリングを扱うため、設定した上限を超えたリクエストを制限する機能を指します。過剰な数のリクエストを制限することで、システムが停止してしまうことや一部のクライアントがリソースを占有してしまうことを防ぎます。制限方法はいくつかあります。例えば、HTTPステータスコード429を即時に返す方法です。

要件

今回開発するスロットリング機能の要件です。

クライアントタイプ毎に同時接続数で制限する

ZOZOTOWNのAPIでは外部サービスやクライアント端末のOSなどに応じて、クライアントをいくつかの分類に分けています。その分類をこの記事では「クライアントタイプ」と呼ぶこととします。このクライアントタイプ毎に「同時接続数」でリクエストを制限します。同時接続とは、API Gatewayがリクエストを転送処理している間の接続のことです。同時接続数はマイクロサービスへのリクエスト処理時に1つ増え、マイクロサービスからのレスポンス処理時に1つ減ります。

データストアを利用しない

スロットリング機能を開発しようとすると、閾値判定するための情報をサーバ間で共有するデータストアが欲しくなります。しかし、API Gatewayはデータベースなどの外部のデータストアサービスをこれまで必要としてきませんでした。そして、今後も出来る限り、使用したくありません。なぜならば、利用しない方が可用性の低下を抑えられるからです。データストアサービスを利用するとどうしても、全体の可用性は落ちてしまいます。そこで、各サーバで動作するAPI Gatewayがそれぞれのオンメモリの情報を使ってスロットリングする要件としました。この場合、サーバ間で情報を共有しないため、スロットリングが適用されるサーバとそうでないサーバが混在してしまいますが、今回は許容できる点でした。

仮に外部のデータストアを利用する設計では、Amazon ElastiCache for Redisの導入を検討していました。導入しなかったことにより、以下の問題を避けることができました。

  • 可用性の低下
  • インフラコストの増加
    • ノード数が3、キャッシュエンジンがRedis、インスタンスタイプがr6g.xlarge、料金モデルがOnDemandとします。その条件でAWS Pricing Calculatorを参考に計算すると、年間約12,000 USDの節約となります。

Canary Deploymentsに対応する

API GatewayはAmazon Elastic Kubernetes Service上に構築されており、複数のPodにデプロイされています。また、IstioVirtualServiceにより、API GatewayをCanary Deploymentsしています。Canary Deploymentsが発生しても、変わらずにスロットリングできる必要があります。

なお、本記事ではCanary DeploymentsによりPrimaryとCanaryのそれぞれに加重がかかっている状態を「カナリアリリース状態」と呼ぶこととします。

機能の概要

overview

スロットリング機能の処理内容は、大きく分けて、閾値の計算に必要な情報を取得する処理(図の「取得処理」)とAPIリクエスト毎に閾値判定する処理(図の「APIリクエスト処理」)に分かれます。

取得処理では、初回起動時のみ「ホスト名」と「最大の同時接続数」を取得します。また、定期的にKubernetesとIstioから「Primary/CanaryのPod数」と「Primary/Canaryへの加重比率」の情報を取得します。定期的に取得するのは、これらの値が変動するからです。

APIリクエスト処理では、リクエストを処理する際に、スロットリング対象のクライアントタイプに関して同時接続数の閾値判定をします。閾値を超していれば、転送処理をせずにクライアントへHTTPステータスコード429を即時に返します。閾値は、取得処理により取得した情報を使用して、リクエストの処理ごとに計算し直されます。計算し直すのは、閾値がインフラの状態により、変動するためです。

実装

各処理の実装とその説明です。

取得処理

以下は初回起動時のみ取得する情報です。

  • 最大の同時接続数
  • ホスト名
    • Podのデプロイメント種別(Primary用あるいはCanary用)

以下は定期的に取得する情報です。

  • Primary/CanaryのPod数
  • Primary/Canaryへの加重比率

それぞれの取得方法について説明します。

初回起動時のみの取得

最大の同時接続数

全Pod合算での最大の同時接続数を max_concurrent_requests.yml というYAMLファイルに設定可能としています。設定する最大の同時接続数の単位は、単一のPodでなく全Pod合算としています。PodはHorizontal Pod Autoscaler(HPA)でスケーリングします。仮に単一のPod単位で設定できてしまうと、Podが増える分だけリクエストを制限しなくなります。これでは正しくスロットリングできません。したがって、全Pod合算での設定単位としています。

以下は設定例です。

client1: 100
clinet2: 200

アプリケーションでは起動時に max_concurrent_requests.yml を以下のように読み込み、keyがクライアントタイプでvalueが最大の同時接続数のmapで保持しています。

var maxConcurrentMap map[Client]int

func init() {
    maxConcurrentData, e := ioutil.ReadFile(constants.ProjectDir + "config/max_concurrent_requests.yml")
    if e != nil {
        panic(e)
    }
    e = yaml.Unmarshal(maxConcurrentData, &maxConcurrentMap)
    if e != nil {
        panic(e)
    }
}

Podのデプロイメント種別

PodがPrimary用かCanary用かをこの記事では「デプロイメント種別」と呼ぶこととします。API Gatewayのアプリケーションが動作しているPod上で、そのPodのデプロイメント種別を判別します。Canary用Podの場合、ホスト名は api-gateway-canary-b4dd8dc5c-88n74 のような値になります。したがって、ホスト名に文字列 "canary" が含まれていればCanary用、そうでなければPrimary用と判別しています。

var depType string

func SetDepType() error {
    hostName, e := os.Hostname()
    if e != nil {
        return e
    }
    if strings.Contains(hostName, "canary") {
        depType = "canary"
    } else {
        depType = "primary"
    }
    return nil
}

定期的な取得

定期的な「Primary/CanaryのPod数」と「Primary/Canaryへの加重比率」の取得について説明します。

独自エラー

取得処理の失敗に関して2つの独自エラーを用意しました。 fetchError は、取得失敗となった場合のエラーです。 terminatingError は、Podがterminatingになった場合のエラーです。

type terminatingError struct {
    message string
}

func (e *terminatingError) Error() string {
    return e.message
}

type fetchError struct {
    message string
}

func (e *fetchError) Error() string {
    return e.message
}

Primary/CanaryのPod数

kubernetes/client-goというライブラリを使用して、KubernetesのPod情報とDeployment情報を取得しています。

まずはAPI Gatewayが動作しているPodのPod情報を取得しています。ライブラリの実行でエラーになった場合は、 [FETCH_ERROR] というプレフィックスをメッセージに付けて fetchError を返します。Podがterminatingになれば terminatingError を返します。KubernetesのRolling Updateなどにより、Podがterminatingになると、fetch処理は失敗してしまうからです。Podがterminatingになる際は、まずPodに .metadata.deletionTimestamp が設定されます。したがって、GetObjectMeta().GetDeletionTimestamp()の返り値がnilでなければ terminatingError を返すようにしています。

次に、Deployment情報を取得しています。各Deployment情報にはPod数が含まれています。Deployment名が api-gateway のものがPrimary用で、 api-gateway-canary のものがCanary用です。

var interval = time.Second

type InstanceCounts struct {
    Primary int
    Canary  int
}

func (FetcherImpl) fetchInstanceCounts() (InstanceCounts, error) {
    timeoutContext, cancel := context.WithTimeout(context.Background(), interval)
    defer cancel()

    // Pod情報の取得
    pod, e := k8sClientSet.CoreV1().Pods(podNameSpace).Get(timeoutContext, podName, metav1.GetOptions{})
    if e != nil {
        return InstanceCounts{}, &fetchError{
            message: "[FETCH ERROR] " + e.Error(),
        }
    }
    if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
        return InstanceCounts{}, &terminatingError{
            message: "this pod is terminating",
        }
    }

    // Deployment情報の取得
    var i = int64(interval)
    deployments, e := k8sClientSet.AppsV1().Deployments(podNameSpace).List(timeoutContext, metav1.ListOptions{TimeoutSeconds: &i})
    if e != nil {
        return InstanceCounts{}, &fetchError{
            message: "[FETCH ERROR] " + e.Error(),
        }
    }
    var primaryPodCount, canaryPodCount int
    for _, d := range deployments.Items {
        if d.Name == "api-gateway" {
            primaryPodCount = int(d.Status.AvailableReplicas)
        } else if d.Name == "api-gateway-canary" {
            canaryPodCount = int(d.Status.AvailableReplicas)
        }
    }
    return InstanceCounts{
        Primary: primaryPodCount,
        Canary:  canaryPodCount,
    }, nil
}

Primary/Canaryへの加重比率

istio/client-goというライブラリを使用して、IstioのVirtualService情報を取得します。VirtualServiceの各destinationにweightが設定されており、それを取得します。subsetの値に文字列 "canary" が含まれていれば、Canary用の設定です。そうでなければPrimary用です。PrimaryとCanaryのweightの合算は100です。

var interval = time.Second

type TrafficWeight struct {
    Primary int
    Canary  int
}

func (FetcherImpl) fetchTrafficWeight() (TrafficWeight, error) {
    timeoutContext, cancel := context.WithTimeout(context.Background(), interval)
    defer cancel()

    vs, e := istioClientSet.NetworkingV1beta1().VirtualServices(podNameSpace).Get(timeoutContext, virtualServiceName, metav1.GetOptions{})
    if e != nil {
        return TrafficWeight{}, &fetchError{
            message: "[FETCH ERROR] " + e.Error(),
        }
    }

    // istioによるルーティングは実施しないためGetHttpのインデックスは必ず0。Canaryがない環境も存在するためGetRoute()[1]はしない。
    httpRoute := vs.Spec.GetHttp()[0].GetRoute()[0]
    var weight = httpRoute.GetWeight()
    if strings.Contains(httpRoute.GetDestination().Subset, "canary") {
        return TrafficWeight{
            Primary: 100 - int(weight),
            Canary:  int(weight),
        }, nil
    }
    return TrafficWeight{
        Primary: int(weight),
        Canary:  100 - int(weight),
    }, nil
}

エラーハンドリング

fetch 関数の内部で fetchInstanceCounts 関数と fetchTrafficWeight 関数を実行しています。

fetchInstanceCounts 関数のエラーハンドリングでは、 fetchErrorterminatingError に対してcase文を用意しています。

fetchError であればエラー回数も含めてWARNログを出力します。ログはAmazon CloudWatch LogsによりAmazon S3へ集約されます。単位時間あたりに一定数以上の fetchError によるWARNログが出力された場合は、Slackで通知されるようにしています。また、 fetchError の回数は fetchErrCount でカウントしています。 fetchErrCount > fallbackThreshold の場合、つまり fetchError が一定数より多く発生した場合はスロットリング機能を一時的にOFFにする「機能縮退」をします。 fetchInstanceCountsfetchTrafficWeight の両方の実行が成功した場合のみ fetchErrCount を0に戻します。 terminatingError であればそのままエラーを返します。エラーを返した先で定期処理が停止します。これにより、不要にWARNログが出力されることを回避しています。

fetchTrafficWeight 関数のエラーハンドリングでは、エラー種類は1つなので、 fetchError に対してif文を用意しています。

var instanceCounts InstanceCounts
var trafficWeight TrafficWeight
var fetchErrCount int
const fallbackThreshold = 60

func fetch(f Fetcher) error {
    counts, e := f.fetchInstanceCounts()
    if e != nil {
        switch e.(type) {
        case *fetchError:
            fetchFailedLog.Warn(fmt.Sprintf("(%v/%v) ", fetchErrCount, fallbackThreshold) + e.Error())
            fetchErrCount++
            // fetchErrorが一定数発生した場合は機能縮退
            if fetchErrCount > fallbackThreshold {
                instanceCounts = InstanceCounts{
                    Primary: 0,
                    Canary:  0,
                }
            }
            return nil
        case *terminatingError:
            return e   
        }
    } else {
        instanceCounts = counts
    }

    w, e := f.fetchTrafficWeight()
    if e != nil {
        if err, ok := e.(*fetchError); ok {
            fetchFailedLog.Warn(fmt.Sprintf("(%v/%v) ", fetchErrCount, fallbackThreshold) + err.Error())
            fetchErrCount++
            // fetchErrorが一定数発生した場合は機能縮退
            if fetchErrCount > fallbackThreshold {
                trafficWeight = TrafficWeight{
                    Primary: 0,
                    Canary:  0,
                }
            }
            return nil
        }
    } else {
        trafficWeight = w
        fetchErrCount = 0
    }
    return nil
}

定期実行

Pod数と加重比率は変動する値です。例えばHPAでPod数は変化しますし、Canary Deploymentsで加重比率も変化します。そこで、Pod数と加重比率に関しては、無限forループの中でTickerにより毎秒取得するようにしています。 Poll 関数内で fetch 関数を実行しています。 fetch 関数がエラーを返すのは terminatingError の場合のみです。エラーの場合は、 ticker.Stop() して無限forループを break 文で抜けます。

func Poll(f Fetcher) {
    ticker := time.NewTicker(time.Duration(time.Second)
    defer ticker.Stop()
    for {
        <-ticker.C
        if e := fetch(f); e != nil {
            ticker.Stop()
            break
        }
    }
}

Poll 関数はmain.goにて、goroutineの中で実行されています。ソースコードは割愛します。

APIリクエスト処理

APIリクエスト処理についてです。

各クライアントタイプはそれぞれに応じたAPIクライアントトークンをヘッダーに付与してリクエストします。API Gatewayでは、このAPIクライアントトークンを利用して、APIクライアントトークン認証を行なっています。また、APIクライアントトークン認証に関する処理はミドルウェア化されています。そのミドルウェア処理の中で、同時接続数のカウントアップ・カウントダウンや閾値判定をしています。

同時接続数のカウントアップ・カウントダウン

同時接続数はリクエスト処理時にカウントアップし、レスポンス処理時にカウントダウンします。クライアントタイプと同時接続数の組み合わせを clientCount というmapで保持します。他のリクエストと処理が混じるのを防ぐため、Mutexで排他制御し、その中でインクリメントしています。一度変数 count に代入しているのは、カウントアップした値を使った処理(ここでは簡略化のためソースコードを省略しております)の完了を待たずに、素早く排他制御を終了するためです。ミドルウェアなので、 defer に実装されているカウントダウンはレスポンス処理時に実施されます。

var mutex = &sync.Mutex{}
var clientCount = map[client.Client]int{}

func ClientTokenMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        ...

        mutex.Lock()
        clientCount[c]++
        count := clientCount[c]
        mutex.Unlock()
        defer func() {
            mutex.Lock()
            clientCount[c]--
            mutex.Unlock()
        }()

        ...

        next.ServeHTTP(w, r.WithContext(ctx))
    })
}

閾値の計算式

カナリアリリース状態かどうかで計算式が異なります。

カナリアリリース状態の場合

Primary用Podにおける閾値の計算式は以下です。

最大の同時接続数 × Primaryへの加重比率 / Primary用Pod数

Canary用Podにおける閾値の計算式は以下です。

最大の同時接続数 × Canaryへの加重比率 / Canary用Pod数

以下は計算の具体例です。

  • 最大の同時接続数:100
  • Primary:Canaryの加重比率:90:10
  • Primary:CanaryのPod数:9:1

Primary用Podにおける閾値計算は 100 × 0.9 / 9 = 10 となります。Canary用Podにおける閾値計算は 100 × 0.1 / 1 = 10 となります。

カナリアリリース状態でない場合

閾値の計算式は以下です。

最大の同時接続数 / Primary用Pod数

ここで「全てのPod数」でなく、「Primary用Pod数」としているのは、 カナリアリリース状態でなくても最低1台は常にCanary用Podが起動しているためです。最低1台は常にCanary用Podが起動しているのは、HPAを利用しているため、minReplicasを1より小さい値で設定できないためです。

以下は計算の具体例です。

  • 最大の同時接続数:300
  • PrimaryのPod数:20

閾値計算は 300 / 20 = 15 となります。

閾値の計算と閾値判定

IsOverLimit 関数は、取得した情報を使って閾値を算出し、閾値判定します。閾値を超していればtrueを返し、以下であればfalseを返します。上記の計算式で閾値を算出し、現在の同時接続数と比較をします。閾値の計算結果がint型の切り捨てにより0になった場合は、閾値を1に繰り上げます。これにより全てのリクエストがスロットリングされるのを防ぎます。 max_concurrent_requests.yml に設定していないクライアントタイプはスロットリング対象外のためfalseを返します。スロットリングに必要な情報を未取得の場合、あるいは機能縮退時にはfalseを返します。第一引数はクライアントタイプです。第二引数は現在の同時接続数です。第三引数はデプロイメント種別です。第四引数はPod数です。第五引数は加重比率です。

fetch 関数では fetchError が一定数より多く発生した場合には、 instanceCountstrafficWeight のそれぞれの両フィールドに0を代入しています。そして、 IsOverLimit 関数ではいずれかの両フィールドが0だった場合には、 false を返します。これにより、機能縮退をしています。

func IsOverLimit(client Client, count int, depType string, instanceCounts deployment.InstanceCounts, trafficWeight deployment.TrafficWeight) bool {
   // ymlに設定がないクライアントタイプはfalse
    maxConcurrentValue, ok := maxConcurrentMap[client]
    if !ok {
        return false
    }

   // スロットリングに必要な情報を未取得の場合、あるいは機能縮退時はfalse
    if depType == "" || instanceCounts.Primary+instanceCounts.Canary == 0 || trafficWeight.Primary+trafficWeight.Canary == 0 {
        return false
    }

    // カナリアリリース状態でない場合
    if trafficWeight.Canary == 0 {
        threshold := maxConcurrentValue / instanceCounts.Primary
        if threshold == 0 {
            threshold = 1
        }
        return count > threshold
    }

    // カナリアリリース状態の場合
    var w int
    var c int
    if depType == "primary" {
        w = trafficWeight.Primary
        c = instanceCounts.Primary
    } else {
        w = trafficWeight.Canary
        c = instanceCounts.Canary
    }
    if c == 0 {
        return false
    }
    threshold := maxConcurrentValue * w / 100 / c
    if threshold == 0 {
        threshold = 1
    }
    return count > threshold
}

APIクライアントトークン認証ミドルウェアの中で、カウントアップ処理の後に IsOverLimit 関数を実行します。trueであればHTTPステータスコード429をHeaderに書き込みます。

func ClientTokenMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    ...
 
        if client.IsOverLimit(c, count, deployment.DepType(), deployment.GetInstanceCounts(), deployment.GetTrafficWeight()) {
            w.WriteHeader(http.StatusTooManyRequests)
            return
        }
    ...  

    })
}

注意点

今回開発したスロットリング機能にはいくつか注意すべき点があります。

閾値計算の繰り上げによる弊害

設定値よりも多くのリクエストが来てもスロットリングしない可能性があります。理由は、閾値の計算結果を0から1に繰り上げるケースがあるからです。

カナリアリリース状態でない場合、最大の同時接続数の設定値が「PrimaryのPod数」より小さい場合は、各Podでスロットリング判定に使用する閾値の計算結果は1に繰り上げとなります。例えば、PrimaryのPod数が100で最大の同時接続数を50で設定した場合です。

カナリアリリース状態の場合、設定値が「100*pod数/加重値」より小さい場合は、各Podでスロットリング判定に使用する閾値の計算結果は1に繰り上げとなります。例えば、CanaryのPod数が5で、加重値が20の場合に設定値が25より小さい場合です。

上述の通り、設定値やPod数などによっては、設定値通りの制限ができなくなり、本来のスロットリング機能の役割を果たせなくなる可能性があります。しかしながら、限定的なケースであることと、リクエストに対して常にHTTPステータスコード429を返してしまう可能性があるよりは良いと判断してこの仕様としています。

Dockerイメージが約1.4倍重くなった

スロットリング機能の開発前時点でAmazon Elastic Container Registryにpushしていたイメージサイズは36.6MBでしたが、開発後は51.6MBに増えました。理由は、 go.mod で以下のパッケージが追加され、それらの依存パッケージがたくさん追加されたためです。

  • istio.io/client-go
  • k8s.io/api
  • k8s.io/apimachinery
  • k8s.io/client-go

インフラ側と依存する命名規則ができてしまった

今回開発したスロットリングの仕様上、インフラのリソース名と依存するようになりました。以下の命名をインフラ側で変更する場合には、バックエンドの修正も必要となります。しかしながら、変更される見込みは基本的にはありません。

  • Deployment名
    • Deployment情報に含まれるPod数を使用しています。Deployment名が api-gateway であればそのDeploymentのPod数をPrimaryのPod数として計上しています。 api-gateway-canary であればCanaryのPod数として計上しています。
  • VirtualServiceのsubset
    • subsetの値に文字列 "canary" が含まれていれば、そのsubsetが属するdestinationのweightをCanaryの加重値としています。含まれていなければPrimaryの加重値としています。
  • ホスト名
    • ホスト名に文字列 "canary" が含まれていれば、そのPodのデプロイメント種別をCanaryとしています。そうでなければPrimaryとしています。

人気商品の対策で活用

開発時点では想定していなかったのですが、人気商品の対応にスロットリング機能を活用しています。ZOZOTOWNでは、福袋のように限定で発売されるような人気商品があります。人気商品は、発売開始のタイミングで一時的にアクセスが急増する傾向にあります。人気商品に対するPOSTリクエストに関しては、専用の特別なAPIクライアントトークンを中継システムにて付与し、スロットリング対象としています。これにより、通常商品の購入に影響がでにくくなりました。

We are hiring

ZOZOTOWNのマイクロサービス化は現在進行中です。今後も既存のマイクロサービスの追加開発や、新規のマイクロサービス立ち上げなど、やりがいのあるたくさんの仕事があります。エンジニア絶賛募集中です。ご興味のある方は、以下のリンクからぜひご応募ください。お待ちしております。

corp.zozo.com

カテゴリー