この記事は GMOアドマーケティングAdvent Calendar 2020 2日目の記事です。
皆さん
こんにちは、GMOアドマーケティングのS.Rです。
GoogleのBigQuery(Google Cloud Platform)という大規模なデータ分析ツールを使ったことがありますか?
BigQueryでは1PB(ペタバイト)あるいは10億行といった膨大なデータに対して、SQLで集計・分析処理を極めて高速に実行できます。
ただBigQueryのQueryを実行するには、気を付けなければ高い料金が課せられます。
今回は、Pythonで、GCPのSDKでBigQueryの実行する状況を監視するプログラムを作る方法を、皆さんへ紹介させていただきます。
プログラムの構成
プログラムの構成は下記です。
Service account を作成&権限を設定
-
GCPの管理画面のService accountのTabでCREATE SERVICE ACCOUNTを押して新しいservice accountを作成します。
2.今回の例では”bigquery_poilce”でservice account を作成します。
3.作成されたservice accountへ bigquery resource viewの権限を付与します。
4.作成したservice accountのKey fileをダウンロードします。ファイル名”bigquery_police.json”で保存します。
Pythonのコードを書く
1. Libraryをimportします。
1 2 3 4 5 6 7 8 9 10 11 |
#!/usr/bin/env python # -*- coding: utf-8 -*- import pandas as pd from google.cloud import bigquery from google.oauth2.service_account import Credentials import requests, json from datetime import datetime,timedelta import os import io import json from pytz import timezone, utc |
2. Service accountのkey fileをロードします。
1 2 3 4 |
with io.open('./bigquery_police.json', 'r', encoding='utf-8') as json_fi: credentials_info = json.load(json_fi) list_credentials = Credentials.from_service_account_info(credentials_info) client = bigquery.Client(credentials=list_credentials) |
3. 30分前から今の時点までのBigQueryのjobをリストします。
1 2 3 4 |
def list_jobs(client): min_creation_time = datetime.now() - timedelta(minutes=30) return client.list_jobs(max_results=10000, min_creation_time=min_creation_time, all_users=True,state_filter="done") records = list_jobs(client) |
4.実際の課金金額を換算します。
実行履歴には実際の課金金額は記載されていないので手動で換算する必要があります。計算式は下記です。
- job.total_bytes_billed:利用されたバイト数です。
- BYTE2GB: バイトからGBへ変換する率です。
- RATE: 1GBの課金金額です。
1 2 3 4 |
def bytes_billed2price(job): BYTE2GB = 1073741824 * 1000 RATE = 5 * 108 return (job.total_bytes_billed /BYTE2GB) *RATE |
5.jobの実行履歴をPandasのDataFrameに保存します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
def build_records(jobs): ja = timezone('Asia/Tokyo') now = datetime.now(tz=ja) dt_string = now.strftime("%d_%m_%Y_%H_%M_%S.csv") print(dt_string) user_emails = [] jobids = [] total_bytes = [] times = [] querys = [] prices = [] project_ids = [] for job in jobs: try: if job.total_bytes_billed > 0 and job.ended: total_bytes += [job.total_bytes_billed] prices += [bytes_billed2price(job.total_bytes_billed)] user_emails += [job.user_email] jobids += [job.job_id] querys += [job.query] project_ids += [job.project] times += [job.created.strftime('%m/%d/%Y, %H:%M:%S')] except: pass df = pd.DataFrame.from_dict({"user_email": user_emails, "jobid": jobids, "total_byte": total_bytes, "time":times, "query":querys, "prices":price, "project_id":project_ids}) return df df = build_records(records) |
6. DataFrameをBigQueryへアップロードします。
BigQueryのjobを実行する履歴を格納する先を指定してアップロードします。今回の例で格納する先は下記です。
project_id | test_project |
dataset | bigquery_job_history |
table name | bigquery_cost_%Y%m%d” |
Pythonのコードは下記です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
def load_table(credentials,project_id, records, dataset_id): if len(records.jobid) == 0: return ja = timezone('Asia/Tokyo') now = datetime.now(tz=ja) dt_string = now.strftime("%Y%m%d") table_id = "bigquery_cost_%s" % dt_string records.to_gbq("%s.%s" % (dataset_id, table_id), if_exists='append', verbose=False, credentials=credentials, project_id=project_id) print("Loaded {} rows into {}:{}.".format(len(records.jobid), dataset_id, table_id)) load_table(client, "test_project", records, "bigquery_job_history") |
Queryを実行する履歴を確認する
下記のコマンドでQueryを実行する履歴を確認しましょう。
1 2 3 4 |
bq query --nouse_legacy_sql \ 'SELECT user_email,jobid,total_byte,time,project_id FROM `prediction_dataset.bigquery_cost_20200928` GROUP BY user_email,jobid,total_byte,time,project_id LIMIT 10' |
実行した結果は下記です。
まとめ
今回はPythonで、GCPのSDKでBigQueryを実行する状況を監視するプログラムを作る方法を紹介しました。いかがだったでしょうか。
弊社では100G以上の重いQueryがあった場合は、Queryの実行者、課金金額とQueryの内容をSlackで責任者へ共有しています。BigQueryの課金管理は楽になりました。
明日は、引き続き私から「PythonでBigQueryの実行情報をSlackへ共有する方法」について紹介します。
引き続き、 GMOアドマーケティングAdvent Calendar 2020 をお楽しみください!
■エンジニア採用ページ ~福利厚生や各種制度のご案内はこちら~
https://www.gmo-ap.jp/engineer/
■noteページ ~ブログや採用、イベント情報を公開中!~
https://note.gmo-ap.jp/