Cloud Dataflowで複数リソースを読み込む方法

投稿者: | 2018年7月10日

こんにちは。GMOアドマーケティングのN.Sです。

弊社ではビッグデータを入出力する際に一部Cloud Dataflow(以下Dataflow)を使っています。
読み込むファイルが2つあった場合、それらをインプットする方法が分かるまで結構苦戦したため、備忘録的に記載します。

環境

CloudFunctions version: 1.0.0
Dataflow SDK version: 2.4.0

ドキュメント

Dataflowバージョン2.X以上はApacheBeamを基にしているため、そちらのドキュメントを読む。
apache beamの副入力に関するドキュメント
Side Inputsの項目を読むと、副入力をPCollectionViewに変換し、withSideInputsメソッドを使って同一パイプラインにマージするようです。

処理の流れ

今回はサンプルとして、生徒のテスト結果から、合格者のみ出力するプログラムを作ります。
①Google Cloud Strage(以下GCS)にテスト結果ファイルを置く。
②CloudFunctionsが①をトリガーにして起動。Dataflowを呼び出す。
③Dataflowにてファイル読み込み・フィルター処理・書き込みを行う。

GCS→CloudFunctions→Dataflow→GCS

Dataflowで読み込むファイルは2種類(主入力と副入力)

※上記二枚の画像は当社が作成したもの

用意するファイル

dataflow_test_20180309.csv(生徒のテスト結果)

sub.txt(合格ライン定義)

Cloud Functions

ここではGCSに置かれたファイル名を取得し、Dataflowにパラメータとして渡して実行している。
index.js

Dataflow

CloudFunctionsから指定されたファイルと副入力ファイル(sub.txt)を読み込み、
両ファイルをマージした結果をGCSに出力する(output.csv)。
Main.java

ジョブ詳細

Dataflowジョブの詳細を見ると、2つのリソースを読み込んでで実行していることが分かります。

結果

結果ファイルを見ると、期待通り合格ライン(40点)より高い生徒だけ出力されています。

まとめ

以上、かなりシンプルな例ではありますが、副入力を使用したパイプライン作成方法でした。
実際にビッグデータを使う際などは、PCollection<String>をPCollection<TableRow>に置き換えたりする必要があると思いますが、基本的な作りはそんなに変わらないはずです。
Dataflowはまだ情報が少ない(Version2.0以降は特に)状況ため、見てくださった方にわずかでも役立てれば幸いです。

The following two tabs change content below.

N.S

最新記事 by N.S (全て見る)