Cloud FunctionsとCloud Workflowsを使ってDataprocを動かしてみよう

みなさんこんにちは、GMOアドマーケティングのM.H.です。

日に日に溜まっていく膨大なデータを解析したり、機械学習モデルに投げるデータの前処理をしたりするための大規模データの分散処理フレームワークは幾つかありますが、その技術を活用するためにクラウド上のリソースを使ってビッグデータを処理したいケースがあるかと思います。

今回はGoogle Cloud Platformの各種サービスを複合的に活用し、特にCloud WorkflowsとCloud Functionsを組み合わせることで、Dataprocのクラスタを動的に作成・削除しながらジョブを実行する方法について書いていきます。

やりたいこと

Dataprocは、Apache SparkやApach Hadoopなどの分散処理のフレームワークをGoogle Cloud Platform(以下 GCP)マネージドで動かすためのサービスです。簡単な流れとしては、GCPでインスタンスを起動し、そこに実行用のクラスタを作成した上でジョブを投げて任意の処理を実行します。 各種処理はブラウザ上からGCPのコンソール画面上からでも、gcloudコマンドを用いてターミナル上からでも実行可能ですし、幾つかの言語にクライアントライブラリが存在しているためプログラムから実行できます。

今回のユースケースでは、処理の内容を条件分岐で変更したり、Dataprocによる処理の前後に簡単な別処理を入れ込んだりするワークフローを考慮しつつ、GCPのStorageやBigQueryで管理している大規模なデータを処理させたい場合を考えます。さらに、この一連の処理を定期的なバッチによって実施したいと仮定します。

使用するGCPサービス

今回使うGCPのサービスは以下の通りです。
  • Dataproc
    • 大規模データの分散処理用のサービス
    • 「クラスタ作成」と「ジョブ実行」によってデータが処理される
  • Cloud Storage (以下Storage)
    • Dataprocのクラスタ作成時の初期化ファイルおよびジョブ実行のプログラムファイルを格納する
  • Cloud Workflows (以下Workflows)
    • サーバレスでGCPの各種サービスを組み合わせてワークフローを定義する
    • ワークフローには組み込みメソッドも使える
  • Cloud Functions (以下Functions)
    • サーバレスに実行できる関数を定義する
    • Workflowsから実行できる
    • 本記事では第1世代の使用を想定しています
  • Cloud Scheduler (以下Scheduler)
    • cronと同様の形式で実行日時や頻度を設定できるジョブスケジューラー
    • 定期バッチ実行のトリガー用として使える

実装

Workflows

本記事の肝です。GCPのブラウザコンソールを開いて「workflows」で検索し、適宜APIを有効にして新規のワークフローを作成します。今回は「test_workflow」という名前でワークフローを作成しました。説明文やリージョン、サービスアカウントの設定は各環境に合わせて適宜設定してください。 「新しいトリガーを追加」をクリックし、「Cloud Scheduler」を選択して定期実行用のトリガーを作成します。Schedulerの作成画面が出てくるので、名前やリージョン、頻度などお好みの設定で作成してください。 続いてワークフローを定義します。Workflowsではyamlかjsonを記述することでワークフローを定義できます。ワークフロー中の1つ1つの実行処理単位は steps として定義でき、その中で call などのメソッドによってstepの中身で実行する処理を決めることができます。ちなみにstepの名前は日本語にも対応しています。

今回は以下のようにコーディングしてみました。Schedulerから送られてきたメッセージを params: [input] として受け取り、Dataprocのジョブに投げるデータ引数を data 、条件分岐のためのパラメータを mode としています。また、Workflowsには組み込み関数も多数用意されており、以下の例ではtime、sysライブラリのメソッドを使用しています。 具体的な処理内容や書き方に関しては情報が多くなってしまうので今回の記事では取り上げませんが、公式レファレンスなど参考に処理を追加できると思います。

Workflowsでは定義ファイルを記述するとリアルタイムで隣にワークフローを可視化したグラフを表示してくれます。処理の流れを確認しながらソースコードを書けるため非常に便利です。

Functions

次にWorkflowsから実行する関数を定義します。今回の例では
  • create_cluster_test – Dataprocのクラスタを作成するための関数
  • submit_job_test – 作成したクラスタに投げる通常ジョブの関数
  • submit_job_test_ex – 作成したクラスタに投げる別のジョブの関数
の3つを作成することになります。

GCPのブラウザコンソールを開いて「functions」で検索し、適宜APIを有効にして新規の関数をそれぞれ作成します。以下は create_cluster_test 関数を作成する例です。ここでトリガーはHTTPにしてください。WorkflowsからFunctionへはHTTPエンドポイントの呼び出しのみ対応しているためです。

また、 create_cluster_test 関数ではランタイム内のタイムアウト設定値を大きい値にしておきます。デフォルト値は60秒ですが、Dataprocのクラスタ作成にはおおよそ1分半程度かかるため、通常設定のままだとタイムアウトしてWorkflowsがエラー終了してしまうのを防ぐためです。最大値で540秒(9分)まで設定できるため、今回はこの値で設定しています。また、ジョブ実行関数に関してはクラスタにジョブを正常に投げられた時点で関数が終了するため、基本的にタイムアウト時間を別途設定する必要はありません。

create_cluster_test および submit_job_test 関数のソースコードの例は以下の通りです。今回はPython3.9を選択して関数を作成しています。

submit_job_test_ex 関数ではジョブ設定中の main_python_file_uri のPythonファイル名を execute_ex.py に変えてデプロイしています。エントリポイント名もそれぞれ定義した関数名に変更します。 さらに、 requirements.txt は以下のようにしました。 google-cloud-dataproc はそのままFunctionからDataprocへアクセスして操作するためのクライアント用ライブラリです。 protobuf はクラスタのTTLを設定するために使用しています。

Scheduler

先ほどWorkflowsを作成する時に併せて作成したSchedulerについて、設定を追加します。GCPのブラウザコンソールを開いて「scheduler」で検索し、先ほど作成したジョブスケジューラーの編集画面に進んでください。その中に「本文」を設定する欄があると思います。

この本文中の argument の中身を以下のように書き換えます。keyとvalueの値共にダブルクォーテーションで囲う必要があるため、その中に記述する変数は全てエスケープする必要があることに注意してください。 mode はWorkflowsで設定していた条件分岐用の値となっており、今回の例では「normal」か「ex」のどちらかを指定します。 data は作成するDataprocについての情報と、実行するジョブに引き渡す変数が格納されています。

Workflowsのyamlファイル中の params で受け取り、Functionsを呼び出す際に body で引き渡して、Functions内では request 引数で受け取っています。複雑ですが、このようにユーザーが設定したパラメータをWorkflowsやFunctionsで引き回すことも可能です。(実際は外部からパラメータを読み込んで使ったほうが良いとは思います)

Storage

Dataproc内で実行するプログラムファイルなどはStorageに保存しておき、実行時に呼び出します。Functionsで設定したGCPのブラウザコンソールを開いて「storage」で検索し、適宜APIを有効にしておきます。

必要となるのはバケットとその中に入れるファイル群です。バケット名は先述のSchedulerの設定内で bucket_name: dataproc-test としているため、この名前で作成します。一意の名前で作成してくださいと言われる場合は他の名前を適宜設定しましょう。

次に、作成したバケット内に保存しておくプログラムを作成します。今回はDataprocの中でPysparkを用いてソースコードを作成し実行します。Dataprocのクラスタを作成する時に実行する initialize.sh は以下の通りです。 次に作成されたクラスタに投げるジョブのmain関数である execute.pyを作成します。さらに execute_ex.py ではmain中でprintしている開始時間と終了時間の前を「NORMAL」から「EX」に変更して保存します。 最終的に、 dataproc-test バケット内のファイルは以下のようになっています。

実行

ここまで実装できたら早速実行してみましょう。Schedulerでは定期実行のスケジュールが組まれていると思いますが、即時実行も可能です。ブラウザからGCPコンソールを開き、Schedulerの一覧画面から「今すぐ実行」ボタンを押してジョブを実行します。

SchedulerからWorkflowsが発火し、まずDataprocのクラスタがプロビジョニング状態に入ると思います。クラスタ名は create_cluster_test 関数で定義した通り、 data["cluster_name"]+"-"+execute_datetime となっているのがわかると思います。クラスタ作成が終わって少し経つとジョブが実行されます。Dataprocのクラスタ詳細をブラウザ上で確認すると下のような状態になっているはずです。
クラスタの詳細からジョブ一覧を確認して、今回実行されたジョブの詳細を見てみましょう。 mode はnormalを指定し、 tip_criterion には2を指定しているため、 execute.py の方が実行され、チップ額が2より大きいものだけが出力されています。期待した通りの結果となりました。

さらに、Schedulerで設定した本文の内容を以下のように変更してもう一度実行してみましょう。 mode をexに変更し、 tip_criterion を3に増やしてみます。 結果は以下のようになります。ちゃんと execute_ex.py の方が実行されており、チップ額が3より大きいデータだけが出力されていることがわかります。 最後に、クラスタの詳細画面から「構成」タグを開いてみましょう。今回与えた変数の中にはSparkのワーカー数やクラスタの生存期間(TTL)などが含まれており、 create_cluster_test 関数ではこの値に基づいてクラスタを作成していました。その設定が正しく設定されていることがわかります。特に、今回はDataprocのクラスタ削除を明示的に行なっていないため、そのまま起動させておくとその分コストが掛かります。そのため、TTLを設定してジョブ完了後クラスタが自動的に削除することでほぼ必要分のコストだけが掛かる状態になります。
なお、今回はSchedulerからWorkflowsを実行して一連のワークフローを実行していますが、WorkflowsのトリガーURLとargumentを設定してあげれば実行できます。与える値も必要に応じて変更できるため、Workflowsの定義と併せて柔軟にワークフローを設定・実行できます。

まとめ

今回は、GCPのサービスであるCloud FunctionsとCloud Workflowsを中心に、Cloud SchedulerをトリガーとしてDataprocで大規模データの分散処理をするための柔軟なワークフローを作成する方法について説明しました。Workflowsは比較的最近ローンチされたサービスで、組み込み関数の対応もどんどん増えているため、使い方によってはかなり強力なツールとなるのではないかと思います。

実装例では最終的に出力は普通にprintしているだけでしたが、もちろんBigQueryからデータを読み、複雑な処理を施した後に出力用のBigQueryのテーブルに書き出すということもできます。

他にもワークフローを構築して大規模データの処理をさせる方法は幾つかあるかと思いますが、今回使用したGCPのサービスはどれもコストがかなり安価であり、Dataprocのクラスタも必要な時にのみ稼働する設定になっているため、柔軟なワークフローを構築しつつ全体的に安価に済ませられる一つのケースとして紹介してみました。