kintoneとBigQueryをリアルタイム連携してみた

OGP

こんにちは。ML・データ部 データ基盤ブロックの塩崎です。最近はつちのこフェスタが4年ぶりに開催されたというニュース1でアフターコロナの訪れを感じています。

さて、データ基盤のためのデータ転送パイプライン構築といいますと、多くの方はMySQLなどのデータベースからのデータ連携を思い浮かべるかと思います。実際にシステムの保有する多くのデータはデータベースに保存されており、データベースからのデータ連携は大きな部分を占めます。当ブログでも数々の事例を紹介してきました。

しかし、それ以外にもデータを保有しているソースは数多く、それらからのデータ連携を作成する必要もあります。今回は日本の多くの企業で導入されているクラウドサービスであるkintoneからBigQueryへリアルタイムにデータ連携する事例を紹介します。

従来手法について

既にkintoneからBigQueryにデータ連携するソリューションは数多く、Googleで「kintone BigQuery」などのクエリで検索すると多くの記事が見つかります。しかし、それら既存の手法には以下の欠点があったため、今回は一から自作してみました。

  • 日次などの頻度でのバッチ連携をしているので、データの反映にタイムラグがある
  • 有料のパッケージソフトもしくはSaaSが必要になる

提案手法

今回構築したシステムのアーキテクチャ図を以下に示します。

アーキテクチャ図

まず、BigQueryのRemote Functions機能を使い、Cloud Functionsを外部関数として登録します。そして、Cloud FunctionsからkintoneのWeb APIを呼び出すことでデータを取得します。

Remote Functionsについて

Remote Functions機能はCloud FunctionsもしくはCloud RunをBigQueryから呼び出すことができる機能です。

cloud.google.com

BigQueryにはもともとユーザー定義関数(UDF)という似たような機能があり、SQLやJavaScriptで関数を定義できました。しかし、この機能は制約が多く、一部の処理は記述できないという問題点がありました。例えばXMLHttpRequestやfetchなどのネットワーク通信をともなう関数を呼び出すことは不可能でした。そのため、従来のUDFでkintoneのWeb APIを呼び出すことはできません。

cloud.google.com

一方でこのRemote Functions機能はCloud FunctionsやCloud Runの機能を活用できるので柔軟性が非常に高いです。Web APIの呼び出しができるのはもちろんのことですが、プログラミング言語やライブラリも柔軟に選択できます。今回の件ではランタイムのDockerイメージをカスタマイズできる柔軟性は不要でしたので、Cloud Functionsを使うことにしました。また、Cloud Functions上で動かす処理はシンプルなものなのでどの言語でも問題なく実装できますが、今回はビッグデータ系での利用者が多いPythonを使います。

kintone APIについて

kintoneに登録されたデータはWeb画面から閲覧・更新できるだけでなく、Web APIを通して閲覧・更新を行えます。

cybozu.dev

このREST APIを直接呼び出しても良いですが、有志が様々な言語でSDKを作成しているので今回はそれを使います。cybozu developer networkでも紹介されているpykintoneというライブラリを使用します。

github.com

実際に作ってみる

では、ここからは実際にkintoneとBigQueryを連携するためのシステムを構築していきます。

Cloud Function

まずは、以下のPythonコードでkintoneからデータを取得して、その結果を返却する関数を作成します。

import os
import re
import logging

import yaml
import json

import pykintone
import functions_framework
import flask

def read_yaml(path):
    envvar_matcher = re.compile(r'\${([^{^}]+)}')
    envvar_tag = '!envvar'


    def envvar_constructor(loader, node):
        value = loader.construct_scalar(node)

        matched = envvar_matcher.match(value)
        if matched is None:
            return value

        envvar_name = matched.group(1)
        return os.environ[envvar_name]

    yaml.add_implicit_resolver(envvar_tag, envvar_matcher, None, yaml.SafeLoader)
    yaml.add_constructor(envvar_tag, envvar_constructor, yaml.SafeLoader)

    with open(path, 'rb') as f:
        return yaml.safe_load(f)

def read_kintone_records(kintone_app, batch_size=500):
    raw_records = []

    offset = 0
    while True:
        query = f"order by $id asc limit {batch_size} offset {offset}"
        logging.info(f"executing: {query}")
        result = kintone_app.select(query)

        if result.ok:
            raw_records.extend(result.records)
            logging.info(f"total count is {result.total_count}")
            logging.info(f"{len(raw_records)} rows fetched")

            offset += batch_size
            if result.total_count == len(raw_records):
                break
        else:
            logging.error(result.error)
            logging.error(result.detail)
            raise RuntimeError(f"Error while reading kintone records: {result.error}")
            break

    return [
        {key:value['value'] for key, value in raw_record.items()}
            for raw_record in raw_records
    ]

@functions_framework.http
def read_kintone(request):
    try:
        request_json = request.get_json()
        calls = request_json['calls']
        if len(calls) != 1: # 後述
            raise RuntimeError("this function must be call in scalar subquery!")

        app_id = calls[0][0]
        kintone_app = pykintone.account.Account.loads(read_yaml("account.yaml")).app(app_id)
        kintone_records = read_kintone_records(kintone_app)
        return_value = [kintone_records]

        return flask.make_response(flask.jsonify({"replies": return_value}))
    except Exception as e:
        return flask.make_response(flask.jsonify( {"errorMessage": str(e)}), 400)

kintoneにアクセスするためにはAPIキーが必要ですので、以下のようなYAMLファイルも用意します。kintoneはアプリ毎にAPIキーが独立しているため、必要に応じてBigQueryと連携したいアプリのAPIキーをYAMLファイルに記載します。

domain: <kintoneのドメインから.cybozu.comを除いたサブドメ>
apps:
    hoge_master:
        id: <アプリID>
        token: ${KINTONE_API_KEY_HOGE_MASTER}
    fuga_master:
        id: <アプリID>
        token: ${KINTONE_API_KEY_FUGA_MASTER}

BigQueryとの間のデータの入出力の形式は以下のページを参考にしました。

cloud.google.com

Remote Functionsの返り値の型は構造体型や配列型をとることはできず、スカラー型である必要があるという制約があります。そのため、JSON型を返却することで擬似的に複合型を返したかのような振る舞いをさせています。

また、以下のようなSQL呼び出しをすると、kintone APIをテーブルの行数と同じ数だけ呼び出してしまいkintoneのAPIレート制限に一瞬で達してしまいます。そのため、そのような呼び出しをした場合にはkintone APIを呼び出す前に関数を失敗させています。ソースコードの if len(calls) != 1: 部分でその条件分岐をしています。

SELECT read_kintone() FROM <大きなテーブル>

その後、以下のシェルスクリプトでCloud Functionsにデプロイをします。Cloud Functionsには第一世代と第二世代の2種類がありますが、第二世代の方はリソース制限が緩和されており第一世代を選ぶモチベーションは少ないので第二世代を使います。なお、このコマンドを実行する前に以下の操作が必要です。

  • Cloud Functions用のサービスアカウントの作成
  • kintoneのAPIキーを格納するシークレットをSecret Managerで作成
  • サービスアカウントにシークレットの読み出しロール(roles/secretmanager.secretAccessor)の付与
PROJECT_ID=<プロジェクトID>
PROJECT_NUMBER=$(gcloud projects list --filter="PROJECT_ID:$PROJECT_ID" --format="value(projectNumber)")

gcloud functions deploy read_kintone \
  --project=$PROJECT_ID \
  --region=us-central1 \
  --gen2 \
  --runtime python39 \
  --entry-point read_kintone \
  --trigger-http \
  --no-allow-unauthenticated \
  --run-service-account read-kintone@$PROJECT_ID.iam.gserviceaccount.com \
  --set-secrets=KINTONE_API_KEY_HOGE_MASTER=projects/$PROJECT_NUMBER/secrets/kintone_api_key_hoge_master:latest \
  --set-secrets=KINTONE_API_KEY_FUGA_MASTER=projects/$PROJECT_NUMBER/secrets/kintone_api_key_fuga_master:latest

BQから読み出すための設定

次に先程の関数をBigQueryから呼び出すための設定をします。以下のterraformを反映するとBigQueryとCloud Functionsが接続されます。

resource "google_bigquery_connection" "cloud_resource" {
  connection_id = "cloud_resource"
  location      = "US"
  description   = "Connection for Cloud Resource"
  cloud_resource {}
}

data "google_cloud_run_service" "read_kintone" {
  name     = "read-kintone"
  location = "us-central1"
}

resource "google_cloud_run_service_iam_member" "read_kintone" {
  location = data.google_cloud_run_service.read_kintone.location
  service  = data.google_cloud_run_service.read_kintone.name
  role     = "roles/run.invoker"
  member   = "serviceAccount:${google_bigquery_connection.cloud_resource.cloud_resource[0].service_account_id}"
}

一番上で作成している google_bigquery_connection はCloud Resource Connectionです。これはBigQueryとCloud Function・Cloud Runなどを繋ぐためのリソースです。このConnectionはサービスアカウントを持ち、BigQueryからCloud Functionsを呼び出す時にはそのサービスアカウントを使います。

cloud.google.com

そのため、Connectionのサービスアカウントに対してCloud Functionsを呼び出すロールを割り当てます。Cloud Functionsの第二世代は裏側でCloud Runが動いているため、functions.invoker ロールではなく run.invoker ロールを割り当てる必要があります。

cloud.google.com

最後にBigQueryでCREATE FUNCTION文を実行してBigQuery上で関数を作成します。 ここまでの準備は最初に1回だけ行えば十分で、2回目以降は不要です。

CREATE FUNCTION `<プロジェクトID>.<データセットID>.`.remote_kintone() RETURNS JSON
REMOTE WITH CONNECTION `<プロジェクトID>.US.<コネクション名>`
OPTIONS (
  endpoint = '<Cloud FunctionsのエンドポイントURL>'
)

BQから呼んでみる

この関数を呼び出すと以下のような非常に巨大なJSONが返されます。このままの形式ですと非常に扱いづらいため JSON_* 系の関数を使って扱い易い形式に変換します。

関数を実行した結果

以下のSQLでJSONの中の各要素を取り出して通常のテーブルの列のように変換できます。巨大なJSONは構造体の配列という型をとっているので、まずJSON_QUERY_ARRAYで配列を分解し、JSON_VALUEで構造体の各要素を抜き出しています。

SELECT
  JSON_VALUE(row.company_code) AS company_code,
  JSON_VALUE(row.employee_code) AS employee_code,
  (省略)
FROM UNNEST((
  SELECT JSON_QUERY_ARRAY(`<プロジェクトID>.<データセットID>.read_kintone`(1), "$")
)) AS row

なお、このときにUNNEST関数の引数は必ず二重括弧で囲む必要があります。外側の括弧は関数呼び出し、内側の括弧はスカラーサブクエリという別々の役割を持つために一重括弧では不十分です。

cloud.google.com

実際にこの機能を運用に乗せるときには、一々 JSON_* 系関数を使うのではなく、上記のようなSELECT文をVIEWとして保存すると複雑な処理が隠蔽されて使いやすくなります。

まとめ

BigQuery Remote Functions機能を使いCloud Functionsを呼び出すことで、kintoneのデータをBigQueryから取得できるようになりました。既に知られている手法と比較するとリアルタイムかつ安価であるというのが利点です。また、この方法を応用することでkintone以外のWeb APIとBigQueryを繋ぐことも可能です。

ZOZOでは、一緒に楽しく働く仲間を募集中です。ご興味のある方は下記採用ページをご覧ください!

corp.zozo.com

カテゴリー