Cloud StorageからBigQueryにCSVを自動で取り込む方法について

こんにちは。GMO NIKKO エンジニアのN.I.です。

今回はCloud Storage(GCS)から自動でBigQueryにCSVデータを取り込む仕組みを作成したので紹介します。

BigQueryにCSVデータを取り込む場合は、Cloud Storageにデータをアップしてbq loadコマンドを実行してロードするのが一般的だと思います。
また、CloudFunctionsにはCloud StorageにCSVデータをアップするとアップされたことを検知する機能があります。
この2つの機能をうまく組み合わせれば、自動でデータを取り込めるのではないかというのが今回の内容です。

簡単な図にすると以下の様になります。

事前準備

  1. GCSにバケットを作成し、作成したバケットにフォルダを作成する。
  2. bigquery.confを作成する。
  3. bigquery.jsonを作成する。
  4. 作成したbigquery.conf、bigquery.jsonをバケットのフォルダにアップする。
  5. Cloud Functionsを作成する。

処理手順

  1. CSVファイルをGCSにアップする。
  2. Cloud FunctionsがCSVファイルをアップされたことを検知する。
  3. Cloud FunctionsがGCSにアップされたconfファイル(bigquery.conf)、jsonファイル(bigquery.json)を読み込む
  4. Cloud FunctionsがBigQueryを確認し、confに記載された表が無い場合はBigQueryに表を作成する。
  5. Cloud Functionsでbq loadコマンドを実行しGCSにアップしたcsvファイルをBigQueryにロードする。
  6. 完了したcsvファイルをGCSのold/フォルダに移動する。

では実際に事前準備をしてcsvの自動取り込みの手順について説明します。

今回は下記の様なcsvファイルをGCSにアップしてBigqueryのテーブルにloadする例で記載します。

 

事前準備を実施します。

1.バケットの作成、フォルダの作成

バケットの作成で「データの保存場所の選択」時にBigQueryのDataSetのリージョンと同じリージョンに作成してください。バケット名は何でも大丈夫です。
※bq loadコマンドはGCSのバケットとBigqueryのデータセット同一リージョンでないと動作しないようです。別リージョンだと失敗してハマリます。

バケットを作成したら、バケットにフォルダを作成して、bigquery.conf、bigquery.jsonをフォルダに配置します。
bigquery.conf、bigquery.jsonはbq loadコマンドを実行する時に必要な情報を記載します。
ファイルの中身は下記説明を参考に作成してください。

2.bigquery.confの作成

下記の内容を記載します。

SkipReadingRows : CSVのヘッダーを読み飛ばす行数の設定します。BigQueryに1行目をloadしない場合は1、1~3行目をloadしないなら3を記載します。
ProjectId : GCPのプロジェクトIDを記載してください。BigQueryの表を特定するために使用します。
DatasetName : BigQueryのDataSet名を記載してください。BigQueryの表を特定するために使用します。
TableName : BigQueryの表名を記載してください。BigQueryの表を特定するために使用します。

bigquery.conf 記載例 (記載情報はダミーです。実際の弊社で使用している環境の情報とは違います。)

3.bigquery.jsonの作成

jsonファイルの内容はcsvに準拠したBigQueryのテーブルのスキーマ情報になります。

このjson情報がテーブルの構造の情報になります。このファイルの作成方法が分からない場合はGCPの公式ドキュメントをご確認ください。
下記ページ「テーブルスキーマの操作」-「スキーマの指定」-「JSON スキーマ ファイルの作成」の項目参照
https://cloud.google.com/bigquery/docs/schemas?hl=ja#specifying_a_json_schema_file

今回のcsvに合わせてjsonファイルを作成すると下記の様な内容になります。

bigquery.json 記載例

4.bigquery.conf、bigquery.jsonのアップ

ファイルを作成したらGCSのフォルダにアップします。

5.Cloud Functionsの作成

次にCloud Functionsの設定をします。

CloudFunctionで関数作成をクリックして下記情報を設定します。

関数名 : 任意
リージョン : BigqueryのDataSetのリージョンと合わせる
トリガータイプ : Cloud Storage
Event type : ファイナライズ/作成
バケット : 作成したバケットを選択

設定例

設定が完了したら[保存]をクリックし[次へ]をクリックします。
ランタイムでPython3.9を選択します。

requirements.txtを記載します。

cloud storageAPIと、bigqueryAPIのパッケージを追加します。

main.pyにプログラムを記載します。

設定はbigquey.comfの方に記載済みなのでそのまま貼り付ければ動くはずです。

完了したら、[デプロイ]をクリックします。

これで準備完了です。

実際に処理手順を実行する為にフォルダにcsvファイルをアップしてみましょう。
csvファイルをアップしてから3分以内には読み込みが始まります。
csvファイルがフォルダから消えたら作業完了です。BigQueryを確認しましょう。

BigQUeryを確認すると新規テーブルが作成され、csvデータが登録されたことを確認出来ます。

以上で今回のご紹介はこれで完了です。

今回のご紹介からはソースが複雑になってしまう為削除させて頂きましたが、実稼働させる場合は弊社の場合下記のような調整が必要となりましたので、参考の為に一応記載いたします。

  • 日付データをアップする場合はYYYY-MM-DD形式ではないとbq loadが動かないので、csvの日付データをYYYY-MM-DD形式に加工する機能。
  • csvの文字コードがutf-8じゃないとBigqueryに取り込めない為csvをutf-8に変換する機能。
  • 日付データが重複するデータを取り込む場合にcsvの日付のMAX値と、MIN値を把握してBigQuery上からwhere句でdeleteする機能。
  • BigQueryに主キーは無いけど主キーに相当する列でcsvデータと重複するデータをBigQuery上からwhere句でdeleteする機能。

あまりGCPでの構築例を紹介している記事が少ないので参考になれば幸いです。