GCPデビューしました

毎月大量のCSVが発生する事態になりました。
CSVは100個くらいに分かれる想定で、データが重複している可能性がある、という状況。
今のところの想定容量は合計100MB/月行くかどうか。
そこでクラウドで管理しよう、ということになりました。
社内の別プロジェクトで使用していたGCPに私も挑戦することになってしまいました。

レベル感としては…
SQL全く書けない、Pythonほぼ初心者(あまり成長していない)、GCPはアカウント作っただけ、コマンド叩く画面ってなんか黒くて怖いし使ったことない、というような初心者がお送りします。

達成したいこと

  • 大量のCSVをGCSに置いて、必要であればローデータにアクセスできるようにする
  • ローデータはファイル内部で重複している可能性があるので無駄な分は削る
  • ローデータとマスターを突合させてinner-joinのデータだけをBQに置く
  • ローデータはファイルを跨いでも重複しているので、BQに置いたデータも重複を除く必要がある
  • 新しいデータが追加されたら読み込み用の最終テーブルが自動で更新される(でも重複は許さない)ようにする
  • ちゃんとデータが更新されてるか不安な(のだがSQLはわからない)のでTableauに接続して確認してみる

まずは準備から

  • Google Cloud SDKをインストールする
    ローカル環境からcloud functionにデプロイしたり、GCSの操作をしたりするのに使いました。
    https://cloud.google.com/sdk/docs/install?hl=ja
  • Visual Studio Codeをインストールする
    テキストファイルで書こうとしてお手上げだったところを救われました。
    https://www.python.jp/python_vscode/windows/setup/install_vscode.html
  • pandasでdataframe処理するための練習をjupyter notebookで動かしてみて確認
    (これはすぐ終わったけど、その後が長かった)
  • 作業用フォルダとその中にファイルを用意する
    最低限必要なのはmain.py とrequirements.txtの2つです
  • 今回使う用のBQのデータセットとその中のテーブル、GCSのbucketを作成しておく

全体の流れ

  1. コードをmain.pyに書く
  2. トリガーを設定して関数をデプロイする
  3. トリガーに設定したGCSのbucketにてイベント実行する(私の場合はファイルをアップロード)
  4. 関数が実行される
  5. エラーが発生したら直す
  6. 実現したいことが全部できるまで頑張る+結果確認

1. コードを書く

ファイル名がmain.pyである必要がありますが、その中に関数がいくつ書いてあっても問題無いみたいです。
(いろいろ試行錯誤しすぎてMAX6個同居していてめちゃくちゃ長くなったりしました)
まずは必要なライブラリをインポートする部分を書きます。

from google.cloud import bigquery
import os
from google.cloud import storage as gcs
import pandas as pd
from google.cloud.storage.blob import Blob
import codecs

※ライブラリのインストールは別途ローカルでpipやcondaで行う必要があります。

必要なライブラリのバージョンをrequirements.txtに記載します。
よくわからなくてどんどん追記していったけど、結局この辺りしか関係ないような気がしています。
でも大は小を兼ねます。以下例です。

pandas>=1.0.0
pyarrow>=1.0.0
google-api-core==1.21.0
google-api-python-client==1.10.0
google-auth==1.19.2
google-auth-httplib2==0.0.4
google-cloud-bigquery==1.24.0
google-cloud-core==1.3.0
google-cloud-storage==1.28.1
google-cloud-trace==0.23.0
googleapis-common-protos==1.52.0

関数に名前を付けます。
以下の例ではaddという名前にしてみました。

def add(event, context):

変数の定義と例外処理を入れます。
同じbucketでマスターファイル(更新頻度が低い)を管理したいので、master.csvの時は関数が動かないようにします。
イベント発生時にeventの情報が取れます。そこからファイル名やbucket名を取得して使用しました。

def add(event, context):
    client = bigquery.Client()

    project_id = "{your_project}"
    dataset_id = '{your_dataset}'
    table_name = '{your_table}'
    table_id = '{}.{}.{}'.format(project_id, dataset_id, table_name)
    bucket_name = event['bucket']
    file_name = event['name']
    uri = 'gs://{}/{}'.format(bucket_name, file_name)

    # not for master
    if event['name'] == 'master.csv':
        print('Not supported file name: {}'.format(event['name']))
        return

GCSに追加されたファイルを一時書き込み用のBQテーブルに流し込みます。
先にテーブルを新しく作成します。
試行錯誤した結果、schemaは先回りして作成しました。
次回もやることがあればこの辺りをスマートに動的にやってみたいです。

    # create table in bq
    schema = [
        bigquery.SchemaField("string_field_0", "STRING"),
        bigquery.SchemaField("string_field_1", "STRING"),
        bigquery.SchemaField("string_field_2", "STRING"),
        bigquery.SchemaField("int64_field_3", "INTEGER"),
        bigquery.SchemaField("string_field_4", "STRING"),
        bigquery.SchemaField("int64_field_5", "INTEGER"),
    ]

    table = bigquery.Table(table_id, schema = schema)
    table = client.create_table(table)
    print("temp_table created.")

    # read from bq
    dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
    table_ref = dataset_ref.table(table_name)
    table_pd = client.get_table(table_ref)

    # load data to temp table
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.write_disposition = 'WRITE_TRUNCATE'
    job_config.autodetect = True
    table = bigquery.Table(table_id)
    load_job = client.load_table_from_uri(
        uri, table_id, job_config=job_config
    )
    print("\tStarting job {}".format(load_job.job_id))
    load_job.result()
    print("table: {} Loaded from uri.".format(table_id))

print()をつけてどこまで処理が成功して、エラーがどの辺りで発生したのか何回も確認しました。
printの内容はcloud functionのログから確認できます。

pandasで重複を除いて、マスターと突合します。
マスターは別途BQの同じプロジェクト内の別テーブルとして読み込めるようにしておきます。

    # pandas
    df = client.list_rows(table_pd).to_dataframe()
    df = df.drop_duplicates(subset = ['string_field_0', 'string_field_1', 'string_field_2', 'int64_field_3', 'string_field_4'])
    df = df.astype(str)

    # master
    table_ref_master = dataset_ref.table("{your_master_file}")
    master = client.get_table(table_ref_master)
    df_master = client.list_rows(master).to_dataframe()
    df_master = df_master.astype(str)

    # merge
    new = pd.merge(df, df_master, left_on = '{left_column}', right_on = '{right_column}')
    new = new.drop(['{your_field}'], axis = 1)

GCSに追加されてマスターと突合した結果のデータのみを上書きで貯めておくテーブルに流し込みます。
outputのテーブルの作成は1度きり(のはず)なので関数実行前に手動で作成しておきました。
(実際は何度も失敗して作り直しました)
どんどん蓄積したいので、WRITE_APPENDにしました。

    job_config2 = bigquery.LoadJobConfig()
    job_config2.autodetect = True
    job_config2.source_format = bigquery.SourceFormat.CSV
    job_config2.write_disposition = 'WRITE_APPEND'

    output_table_name = "output"
    output_table_id = '{}.{}.{}'.format(project_id, dataset_id, output_table_name)

    job = client.load_table_from_dataframe(
        new, output_table_id, job_config = job_config2
        )

一時書き込み用テーブルの役目が終わったので、削除します。

    client.delete_table(table_id, not_found_ok = True)
    print("temp_table deleted.")

outputテーブルを読み込んで重複を除いて最終テーブルに移します。
WRITE_TRUNCATEモードで上書きします。

   # drop_duplicate for output table
    output_table_ref = dataset_ref.table(output_table_name)
    output_table_pd = client.get_table(output_table_ref)
    output_df = client.list_rows(output_table_pd).to_dataframe()
    output_df = output_df.drop_duplicates(subset = ['string_field_0', 'string_field_1', 'string_field_2'])

    job_config3 = bigquery.LoadJobConfig()
    job_config3.autodetect = True
    job_config3.source_format = bigquery.SourceFormat.CSV
    job_config3.write_disposition = 'WRITE_TRUNCATE'

    last_table_name = "for_load"
    last_table_id = '{}.{}.{}'.format(project_id, dataset_id, last_table_name)

    job = client.load_table_from_dataframe(
        output_df, last_table_id, job_config = job_config3
        )
    print("output overwritten.")

2. トリガーを設定して関数をデプロイする

Google Cloud SDKのShellを開きます。
まずmain.pyがあるディレクトリに移動します。
cd C:\Users\your_folder
※初回は認証を通す必要があります。
参考:https://cloud.google.com/sdk/docs/initializing?hl=ja

先ほどのadd関数をデプロイします。
gcloud functions deploy add --runtime python39 --trigger-bucket xxxx
2分で成功することもあれば、20分かかって失敗することも、25分かかって成功することもあったので毎回ひやひや待ちました。
shellで完了のメッセージが出たらブラウザでCloud Functionsのログを確認します。

3. トリガーに設定したGCSのbucketにてイベントを実行する

今回のコードはbucketのfinalizeがトリガーになっています。
(デフォルトがfinalizeなのでデプロイ時には省略しています)
ブラウザでGCSを開いてファイルをアップロードするか、SDKのshellから行います。
私は毎回同じファイルでテストすることを決めて、以下のコマンドをメモ帳からコピペしたりしていました。
gsutil cp test.csv gs://{your_bucket}
※アップロードするファイルのディレクトリに移動していることを確認してください。main.pyと同じ階層に置いておいても可。

エラーが発生するなどしてGCSから消したいときはこちら↓
gsutil rm gs://{your_bucket}/test.csv

4. 関数が実行される

イベントを実行したらCloud Functionsのログを確認します。
Function execution started と出たら無事に実行開始です。

5. エラーが発生したら直す

試行錯誤する中で個人的に出たエラーの例です。

  • schema error
    予め列名などを指定しておいてもうまく読み込んでくれませんでした。job.configでautodetect = Trueにして回避しました。
  • pyarrow error
    コードに直接は出てきていないが、pandasの処理を追記したら出てくるようになりました。予めインストールしてrequirements.txtにも追記して解決。
  • pandasのデータフレームにしようとすると渡すテーブルのリファレンスの形式が変わる
    client.load_table_from_uriはproject.dataset.tableをリファレンスとして渡すのに、client.list_rows(table_pd).to_dataframe()で見ているtable_pdは分解すると以下の通り

    dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
    table_ref = dataset_ref.table(table_name)
    table_pd = client.get_table(table_ref)
    

    table_refにproject.dataset.tableの形式のまま入れるとproject.dataset.table.project.dataset.tableみたいになってしまうみたいでした。

  • table already exists
    一時書き込み用テーブルを作成したけど削除するまでの間でエラーが発生して、BQが汚いままだった。BQのテーブルを消したら解決。
  • table does not exist
    消してからテーブルを用意しなおすのを忘れてもエラーになります。
  • CSVのカラムがずれてた
    データ確認してみたら1列ずれてたのでインプットを修正しました。
  • pandasのrenameができない(1列だけならなぜかできた)
  • pandasのset_axisもできない
    動くことを優先して諦めました。。

    6. 実現したいことが全部できるまで頑張る+結果確認

    頑張りました。いろんなエラーを潰し続けました。
    エラーはでなくなりましたが、ちゃんと動いているのかBQの最終アウトプットから確認します。
    しかし、SQLが書けない!!そんな時はTableauから接続してしまえ、という発想に至りました。


    Google BigQueryを選択します。

    ブラウザからGoogleの認証をします。
    プロジェクト、データセット、テーブルを選択して可視化してみましょう。

    ん…?なんか明らかに黒枠の部分足りてなくないか…?
    BQのレコードを確認したら、黒枠に来る部分のCSVをGCSに上げ忘れてました…早く気づけて良かったです。

    足りなかったCSVをGCSに追加して、関数の実行を見届けて、Tableauでも確認しました。

    BQの接続を更新したら色がずれてしまった…&他の新しいのもついでに投入してしまったのでわかりにくいかもしれませんが、
    ちゃんと入ったぞ、と確認ができてめでたしです。

まとめ

クラウドで管理すると(お金はかかりますが)ストレージのスケーリングができたり、他の作業者と共有しやすい、などのメリットを作りながら再確認しました。
出来上がった関数はファイルのアップロードをするだけで自動でデータ処理を行ってくれるので、大量のCSVを抱えても毎月煩わしい手作業が発生する心配もなし!
次回は今回諦めた列名の指定などの細かいところや、長期的な保守も検討したいと思います。