Cloud Dataflowでデータ基盤作り

毎日3半荘打ってますか。

プロジェクトでGCPを利用したデータ基盤を構築することになったので、勉強がてら自分でもなにか作ってみようと思います。

やりたいこと

オンライン麻雀サイト「天鳳」に日々蓄積される牌譜データを収集して、東風戦と東南戦(半荘戦)のリーチ率や副露率などのスタッツを比較しようと思います。一般に東風戦は東南戦に比べて副露率が上がると言われています。どれくらい上がるのか確認してみよう!

具体的な手順は以下のようになりました。

  1. 一日一回、天鳳のサイトから最近行われたゲームの牌譜のファイルインデックスを取得する
  2. インデックス名によって昨日行われた段位戦のファイルインデックスに絞る
  3. ファイルインデックスからmjlogと呼ばれる牌譜データ(mjlogの例)の一覧を取得する
  4. 一覧から鳳凰卓・四人麻雀・喰赤ありの東南戦と東風戦に絞って牌譜データを取得する
  5. 取得した牌譜をデータベースに保存する
  6. 取得した牌譜を解析し、1ゲームごとに4人のスタッツを集計し、そのデータをデータベースに保存する
  7. Looker Studio(旧称データポータル)で、東風戦と東南戦のそれぞれについて、指定した期間のスタッツの平均値をダッシュボードに表示する

ダッシュボードで期間を指定できる意味はそこまでないのですが、実用性ではなく勉強することを重視で作ろうと思います。

データウェアハウス相当のデータベースも欲しかったのですが、解析の目的がダッシュボード1つしかない今回のデータ基盤にはデータウェアハウスは不要そうなので用意しませんでした。

使うGCPのサービスは、データレイクとしてはCloud Storage、スクレイピング・クローリングをする際のGoogle App Engine、スケジューラーとしてCloud Composerなどを検討したのですが、考えている構成は全てをCloud Dataflowで解決することができてしまいました。

データレイクからデータマートにデータが流れていないアーキテクチャ図となってしまいましたが、dataflowで牌譜を取得・保存し、そのままdataflowの中で同じデータのスタッツを計算するという流れになっています。
このシンプルすぎるアーキテクチャ図はGoogle Cloud Architecture DiagrammingToolで書きました。

Cloud Dataflowとは

データ基盤の中でもELTやETLを担うツールです。Javaやpythonで記述します。

スケジュールを設定して定期的にELTを実行することもできます。今回の取り組みとは違いますが、大規模なデータ処理やストリーミング処理を必要とするときに有用なサービスです。

pythonで書いたコードをColud Dataflow向けにする方法は簡単です。

入力

データを処理する単位がPCollectionで、pythonでいうところのイテレーションです。例えばBig Queryに入っているデータを一行ずつ処理したいなら、各行のデータを次々送り出します。また、pythonのイテレーションをPCollectionに変換することもできます。今回はその方法を使いました。

以下のCreatePseudoColと名付けている部分で作っています。


with beam.Pipeline(options=options) as p:
        mjlog_pcollection = (p
               | 'CreatePseudoCol' >> beam.Create(["element"])
               | 'AddTimestamp' >> beam.ParDo(AddTimestamp())
               | 'CreateMjlogCol' >> beam.ParDo(CreateMjlog())
            )

コードの途中で現在時刻を取得し、取得するデータを昨日の牌譜のみにする必要があるのですが、普通にPcollectionを作る前にdatetimeモジュールで現在時刻を取得すると想定する動作になりません。ジョブでは上手くいっていたのですが、パイプライン作成まで進むと、テンプレートを作った時間が現在時刻になってしまいました。
そこで要素が1個の仮のPcollectionを作って、Beam.ParDoの処理の中でdatetimeモジュールを呼び出すことで正しい現在時刻を呼び出せるようにしています。

スクレイピングは別のサービスに移すべきだったかも……。

処理

PCollectionに与える処理はTransformと呼ばれます。apache_beamモジュールのクラスをオーバーライドして実装します。

以下は、牌譜データから4人分のスタッツを出力する処理です。


class CalcStats(beam.DoFn):
    def process(self, mjlog_dict) -> list[dict[str, Union[str, float, datetime.datetime]]]:
        game_data = GameData()
        game_data.calc_game_data(mjlog_dict["mjlog"])
        gamestart_date = mjlog_dict["gamestart_date"]
        game_unit = mjlog_dict["game_unit"]
        game_data_array = np.array([
            np.tile(game_data.kyoku_num_array, 4),
            game_data.agari_rate_array,
            game_data.tsumo_rate_array,
            game_data.reach_rate_array,
            game_data.furo_rate_array,
            game_data.furo_num_array,
            game_data.furo_num_per_kyoku_array,
            game_data.agari_point_array,
            np.tile(game_data.ryukyoku_rate_array, 4)
            ]).T
        output = []
        for game_data_person in game_data_array:
            stats_dict = {"gamestart_date": gamestart_date,
                            "game_unit": game_unit,
                            "kyoku_num": game_data_person[0],
                            "agari_rate": game_data_person[1],
                            "tsumo_rate": game_data_person[2],
                            "reach_rate": game_data_person[3],
                            "furo_rate": game_data_person[4],
                            "furo_num": game_data_person[5],
                            "furo_num_per_kyoku": game_data_person[6],
                            "agari_point": game_data_person[7],
                            "ryukyoku_rate": game_data_person[8]}
            output.append(stats_dict)
        return output

ジョブの実行

ここで、Cloud Dataflowの3つの概念を紹介します。

  • ジョブ:一回限りの処理
  • テンプレート:ジョブを作るためのテンプレート
  • パイプライン:定期的に実行されるジョブ。テンプレートから作ることができる

ジョブで動作を確認してテンプレートを作成し、そのテンプレートを使ってパイプラインを作るという順番で実装していこうと思います。

まずはCloud Dataflowのチュートリアルを一通り試します。ここで仮想環境の構築や権限の設定を行いました。

利用するつもりの外部モジュールの大半はappach-beeam[gcp]をインストールした際に一緒に入ってきましたが、pandasだけ仮想環境内で手動でインストールしました。

Cloud shellからpyファイルを作ります(pyファイル全体)。

各種オプションをつけて実行します。


python3 /home/kae_takahashi/tenhou/main.py \
    --region asia-northeast1 \
    --runner DataflowRunner \
    --project poc-api-test \
    --job_name tenhou-job \
    --temp_location gs://tenhou/tmp/ \
    --save_main_session \

外部モジュールを使う際などはsave_main_sessionオプションが必要です。

参考:Dataflow エラーのトラブルシューティング

無事実行できていることをdataflowのジョブ画面やBig Queryから確認します。

オールグリーン!

テンプレートの作成

ジョブが完成したら、オプションのtemplate_locationを追加して実行し、テンプレートを作成します。指定したgoogle storageの場所にテンプレートファイルができています。


python3 /home/kae_takahashi/tenhou/main.py \
    --region asia-northeast1 \
    --runner DataflowRunner \
    --project poc-api-test \
    --job_name tenhou-job \
    --temp_location gs://tenhou/tmp/ \
    --save_main_session \
    --template_location gs://tenhou/custom_template \

中身はJSONファイルです。冒頭を抜粋します。
Dataflowドキュメントによると「従来のテンプレート」と呼ばれています。


{"clientRequestId": "20230823090615905198-5953", "environment": {"clusterManagerApiService": "compute.googleapis.com", "dataset": "bigquery.googleapis.com/cloud_dataflow", "sdkPipelineOptions": {"display_data": [{"key": "runner", "namespace": "apache_beam.options.pipeline_options.PipelineOptions", "type": "STRING", "value": "DataflowRunner"}, 

パイプラインの作成

作成したテンプレートからパイプラインを作成します。ここでスケジュールなどを設定します。

ダッシュボードの作成

あとはLooker studioでダッシュボードを作ります。

Big Queryに集められたデータを可視化するLooker studioについては以前記事も書いているので見てみてください。

GoogleデータポータルでBigQueryを可視化

東風戦は東南戦に比べて6%ほど副露率が低いことがわかりました。とはいえ東風戦は平場の東南戦南1局とも言われますし、東南戦の中でも局が進むごとに副露率の平均値は上がるのかもしれません。データ基盤があれば、思いついた解析もすぐ試せますね!