Monthly Archives: 11月 2015

2015-11-27

Apach Spark + Spark MLlib + MongoDBでwikipedia記事をクラスタリング

spark_logo  mongodb-logo
JWordのO.Yです。
ビッグデータがムーブメントとなって久しく、用途はなんであれ今はHadoopを導入している企業さんも多いことかと思います。

JWordでもHadoopを導入しており検索クエリーの集計等に使用しています。
Hadoopは簡単に分散処理環境を実現することができますが、分散させる処理単位でmapreduceアプリケーションを書かなきゃならないので複雑な処理だとプログラム量が多くなりやすい、Hadoop Streamingを使用した場合には処理スピードが遅い(JWordでは開発効率を優先してPHPかPythonでmapreduceアプリケーションを書いている)、起動デーモンが多すぎる、メモリコントールが難しい(メモリ系の設定がたくさんありバージョンによって微妙に名前が変わってたり廃止されてたりする)、HDFSを必要としない場合でもセットで付いてくる等デメリットもあります。
まあそもそもHadoopはインストール・設定するだけで結構な手間がかかります。CDHを使用したとしてもかなり手間です。

もっといい分散処理基盤はないかな・・・ということで登場したのがApache Sparkです。
ということで今回はApache Sparkに触れてみたいと思います。
(私は脳内では親しみをこめてApacheスパー子と呼んでいます。)

Apache SparkはNext Hadoopということで開発された分散処理基盤です。
Hadoopと比較すると主に以下のような利点があります。(但し、Hadoop抜きで使用する場合のものもあります。)

・構成が単純でわかりやすく、環境構築が容易。おそらく管理もHadoopに比べたら楽だと思われる
・scalaで書ける
・mapperとreducerと分けてプログラムを書く必要がなく、すべての処理を一つにまとめられる
・分散環境をあまり意識せず分散処理を書ける
・Apache Spark専用の機械学習用ライブラリが元々用意されている(Spark MLlib)
・処理データを直接メモリから読んだり書いたりするので処理スピードが速い
・Linuxファイルシステムから直接ファイルの読み込み、書き込みが可能
・mongo-hadoopコネクターを使用することによりMongoDBに対するデータの読み書きが可能

等です。

具体的事例としてwikipediaのページデータをMongoDBから読み込んでSpark MLlibが提供する1機能であるk-meansクラスタリング機能でwikipediaのページをクラスタリングしてみたいと思います。
(但し今回はあくまでApache Sparkの紹介なのでMongoDBの詳細は割愛させていただきます。またブログ掲載のプログラムのインストールは、自己責任で御願いします。インストール等の結果にかかるハードウェアの不稼働等は、 当ブログでは一切サポートしておりませんので、予め御了承下さい。)

使用したApache Spark・MongoDBのバージョンとサーバー環境は以下の通りです。

Apache Spark:1.5.1(Pre-build for Hadoop2.6 and later)
MongoDB:3.0.5
サーバー環境:Conoha(5core メモリ8G) x 5

(ちなみにGMOグループには「サバろうぜ!」というエンジニア支援制度があり、これを利用すると無料で3か月グループ内のサーバーリソースをレンタルすることができます。)

0.下準備・前提

・すべてのサーバー上でsparkユーザーを作成してください。sparkユーザーがApache Spark,Sparkアプリケーションの実行ユーザーとなります。

・今回はApache Sparkのスタンドアローンクラスタ環境を構築します。マスターノード及びワーカーノード起動の際にApache Sparkはマスターノードから各ワーカーノードへ実行ユーザー(spark)でssh通信を実施しますので予めマスターノードから各ワーカーノード及びマスターノード自身にsparkユーザーで公開鍵認証でログインできるように設定をしておいてください。

・スタンドアローンクラスタのマスター・ワーカー起動時、Sparkアプリケーション実行時に関係するプロセスが複数ポートでサーバー間通信を行います。ですので予め使用されるポートを開けておく必要があります。今回は同一ネットワークセグメントのサーバーからの通信に対しては全ポートを開けています。

・rubyとpythonを使用しますのでこちらもインストールしておいてください。バージョンはrubyは最新、pythonは2.7でいいです。
rubyはwp2txtを使用してwikipedia記事のXMLベースのアーカイブファイルをテキストファイルに変換するために、pythonはMongoDBにwikipediaのデータを登録するのに使用します。pythonでMongoDBにアクセスするためににサードパーティーライブラリのpymongoが必要になりますのでpipを使用してインストールしておいてください。
wp2txtのインストールについてはwp2txtでwikipediaのコーパスを作るまでの道のりを参考にしてください。
wikipedia記事データの最新アーカイブは以下のリンクからjawiki-latest-pages-articles.xml.bz2を選択してダウンロードしておいてください。
wikipedia最新各種アーカイブページ

・Apache Sparkはscalaで開発されているため当然scala本体とscalaで書いたユーザーアプリケーションのコンパイルにOpenJDKが必要になりますのでこちらもあらかじめインストールしておいてください。scala本体はhttp://www.scala-lang.org/download/all.htmlからダウンロード、インストールしてください。OpenJDKはyumでインストールが可能です。
今回使用したscalaは2.10.5,OpenJDKは1.8.0_65となります。

・各サーバーのホスト名をあらかじめ設定しておいてください。今回は5台の内1台をマスターノード、4台をワーカーノードとして割り当て、ホスト名はマスターノードはmaster,各ワーカーノードはnode1~node4にしてあります。また全サーバーの/etc/hostsに前述したホスト名とIPの対応関係を記述しておいてください。

・MongoDBはhttps://www.mongodb.org/downloads#productionから使用するサーバーに合うバージョン3系のディストリビュージョンをダウンロード、インストールしてください。
また今回使用したMongoDBの環境はmongos1台、mongoconfig1台(mongosと同居),プライマリ、セカンダリそれぞれ2台ずつの5台環境(Apache Sparkのマスターノード、ワーカーノードと同じサーバーにインストール・起動してあります)となっておりかつwikipediaのアーカイブデータをシャーディングにより分散配置しました。しかし今回紹介する事例に使用したアプリケーションはスタンドアローン環境、単一のプライマリ、セカンダリ構成でも動作します。

・最新バージョンのmecab、mecab用辞書(ipaidc)をインストールしておいてください。これはwikipediaの各ページデータの形態素解析に使用します。
最新バージョンのmecab本体及びipadicはMeCab: Yet Another Part-of-Speech and Morphological Analyzerからダウンロードできます。

・最新バージョンのmecab-Javaバインディングのダウンロードとビルド
上記mecabのダウンロードページと同じページにmecabをJavaで使用するためのバインディングがあるのでそれをダウンロードしてビルドしておいてください。
ビルド後にMeCab.jarとlibMeCab.soが生成されるのでMeCab.jarは後述するsbtのライブラリディレクトリに、libMeCab.soは/usr/local/libに移動してください。

・Apache Sparkの基本的な知識に関しては今回は割愛させていただきますので他の参考ページをご参照ください。

1.Apache Sparkの入手とインストール・スタンドアローンクラスター起動

Apache Sparkのソースとビルド済みバイナリは以下のページで配布されています。
http://spark.apache.org/downloads.html
現在の最新バージョンは1.5.2ですが、ダウンロード当時は1.5.1が最新で、今回も1.5.1を使用してます。

上記ページで「1.Choose a Spark release:」として1.5.1を、「2.Choose a package type:」としてPre build for Hadoop2.6 and laterを選択します。
そうすると「4.Download Spark:」の右横に選択した条件を満たすリソースのダウンロードが可能なミラーサーバーの一覧ページのリンクが表示されますのでクリックし遷移します。後は適当なミラーサーバーからspark-1.5.1-bin-hadoop2.6.tgzをマスターノード側からwgetを使用してダウンロードして解凍します。

解凍して生成されたディレクトリの中身は以下のように感じになっています。

上記を/usr/local以下にsparkをディレクトリを作成しそこへコピーします。

次にワーカーノードのホストの設定をします。
/usr/local/spark/confに移動し、slaves.templateをslavesとしてコピーします。
そして中身を「slavesの中身」のように編集します。

(上記までの作業はすべてのワーカーノード上でも実施する必要があります)
slavesファイルの編集が完了したら/usr/local/spark/sbinに移動し、Apache Sparkスタンドアローンクラスタのマスターとワーカーをsparkユーザーで起動します。

上記実行後に各サーバーでps aux を実行するとマスターノードには
org.apache.spark.deploy.master.Master
各ワーカーノードには
org.apache.spark.deploy.worker.Worker
が起動しているがわかると思います。

HadoopではCDHを使用しない場合でも最低nodemanger,secondarynodemanger,datanode,resourcemanager,nodemanagerの起動が必要となり、CDHの場合にはより多くのデーモンの起動が必要になります。それに比べ、Apache Sparkは2デーモンのみなのでシンプルであることがわかると思います。

2.wikipedia記事データの変換とMongoDBへの登録

まず前提としてwp2txtはインストール済み、wikipedia記事アーカイブファイル(jawiki-latest-pages-articles.xml.bz2)はマスターノードにダウンロード済みのものとします。

以下のようにしてjawiki-latest-pages-articles.xml.bz2をテキストに変換します。

上記はbzip2形式のwikipedia記事アーカイブを解凍し、さらにXMLベースのデータを解析してテキストに変換してファイルとして出力してくれます。
出力されるファイルは1つではなく分割されて出力されます。(1ファイルにつき3000件程度の記事データが含まれています)
アーカイブにはおおよそ1000000記事分のデータが含まれており、すべての記事データをテキスト化するのを待つととても時間がかかります。
今回使用したのはリソースとの兼ね合いで100000件程度なので解凍・テキスト化途中で100000件分くらいテキスト化が終わった時点で処理を中断してください。

これでwikipedia記事アーカイブファイルの解凍とテキスト化が終わったら次はMongoDBに登録します。
pythonで以下のようなソースコード書き、wiki_insert_to_mongo.pyの名前で保存します。

保存したら実行します。多分2~3分程度で完了するかと思います。

正常に登録されているか一応確認します。

show collections実行後に表示されるコレクション一覧にwiki_texts_littleが存在しているかを確認します。
また100000万件登録されているかをコレクションのcount()メソッドで確認します。

上記を実行して100000(多少オーバーでも想定の範囲内)と表示されていればOKです。
ついでに指定のカラムに指定のデータが登録されていることも確認しておきましょう。

limit指定をして1件だけ取得しましょう。
以下のような感じで登録されていればOKです。

ついでのついでに(最後です)コレクションwiki_texts_littleがシャーディングされていることを確認しておきます。
sh.status()を実行します。sh.から始まるコマンドはシャーディング関連のコマンドです。

以下のように表示されたらOKです。

これでwikipedia記事データのMongoDBへの登録が完了しました。

3.mongo-hadoopコネクターのダウンロードとビルド

SparkアプリケーションからMongoDBへアクセスするためにmongo-hadoopコネクターのダウンロードとビルドを行います。
mongo-hadoopコネクターはgithubで管理されておりmongodb/mongo-hadoopからマスターブランチをダウンロードし、解凍します。

解凍後にmongo-hadoop-masterというディレクトリが生成されますのでその中へ移動し、ビルドします。

ビルド処理は依存ライブラリのダウンロードも含む関係上結構時間がかかります。
ビルド完了後、後述するsbtのライブラリディレクトリに移動してください。

4.sbtのインストールと設定

scalaを用いてSparkアプリケーションを開発するためにsbtのインストールと設定を行います。
sbtはsimple build toolの略称でJava、scalaアプリケーションの開発・ビルドを支援するものです。
sbtをhttp://www.scala-sbt.org/download.htmlからwgetでマスターノードにダウンロード後、解凍します。

sbtというディレクトリが生成されるのでその中身を/usr/local/sbtというディレクトリを作成してコピーして、/usr/local/sbt/bin以下にproject、src、libディレクトリを作成します。またsrc以下に更にmain/scalaディレクトリを作成します。

projectディレクトリにはプロジェクト設定をscala形式で記述したbuild.scalaファイルと使用するsbtプラグインの情報を記述するplugins.sbtを配置します。
src/main/scalaディレクトリにはアプリケーションのソースを配置します。

次にプロジェクトの設定と使用するsbtプラグインの設定をします。
/usr/local/sbt/bin/projectに移動し、「編集後build.scala」の内容でbuild.scalaというファイル名で保存します。

build.scalaと同じディレクトリに「編集後plugins.sbt」の内容でplugins.sbtというファイル名で保存します。

sbt-assemblyプラグインはsbtでjarファイルを生成できるようにするためのプラグインです。
特段指定しなくても開発したJava,scalaアプリケーション本体及び必要なライブラリが含まれるjarファイルをまとめて一つのjarファイルを生成してくれます。

次にsbtを起動します。

上記実行後sbtのコマンドシェルが起動し、build.scalaの「libraryDependencies」に指定されたライブラリとplugins.sbtに指定されたプラグインをダウンロードされます。

最後に必要なサードパーティーライブラリのjarをsbtのライブラリディレクトに移動します。
/usr/local/sbt/bin/にlibとうディレクトリを作成してそこにサードパーティーライブラリのjarファイルを放り込んでおけばsbtで開発したアプリケーションビルド時に自動的に参照してくれます。build.scalaの「libraryDependencies」に使用したいライブラリを記述してもいいですがリポジトリにないライブラリに関しては手動でダウンロードとビルドを行い、このライブラリディレクトリに放り込む必要があります。

「0.下準備」でビルドしたmecab-JavaバインディングのMeCab.jarと「3.mongo-hadoopコネクターのダウンロードとビルド」でビルドしたmongo-hadoopコネクターのコアライブラリjarファイル(mongo-hadoop-core-1.5.0-SNAPSHOT.jar)を/usr/local/sbt/bin/libに移動してください。

これでやっとSparkアプリケーションを開発する準備が整いました。

5.wikipedia記事データクラスタリングSparkアプリケーションの開発とビルド

/usr/local/sbt/bin/src/main/scalaに移動して「sample.scala」の内容をsample.scalaのファイル名で保存します。

次にコンパイルですが、その前にソースコードを簡単に説明します。
(コメントの通りの箇所は割愛させていただきます)

上記はSparkアプリケーションの実行用設定を行っている部分です。
setMasterではApache Sparkスタンドアローンクラスターのマスターの設定「spark://[スタンドアローンクラスターのマスタ]:[使用ポート]」の形式で設定をしてます。
setMasterを使用する代わりに後述するspark-submitの引数としてマスター情報を渡すことも可能です。
setAppNameでこのSparkアプリケーションの名前を設定してます。これはなんでもOKです。

後はSparkアプリケーション実行時に参照されるいくつかの設定の変更をしています。
spark.serializerにはSparkアプリケーション内で扱うデータを別のサーバーに送信する場合のシリアライズ用クラスを指定します。デフォルトはjava.io.Serializerですが今回はテキストデータを別サーバーに送信することになるためちょっとでも送信するデータ量を減らしておきたいと思い、より小さいサイズに圧縮できるKryoシリアライザーを使用しています。このシリアライザーは特別な準備をする必要なく標準で使用することができます。

spark.kryoserializer.buffer.maxは詳細はわかりませんがシリアライズ時のバッファの最大サイズを指定するものです。今回わざわざ明示的に設定してあるのはデフォルトのままだとバッファが足りないというエラーメッセージを出力し例外を投げて異常終了してしまうためです。

後の設定はコメントのまんまです。早くなるかもとなんとなく設定してあるのでどこまでSparkアプケーションの実行スピードに影響を及ぼすかは不明です。
最後にSparkアプリケーション実行用設定クラスインスタンスを引数として実行コンテキストインスタンスを生成してます。この実行コンテキストがSparkアプリケーションの実行を制御する役割を担っているため必須となります。

sc.newAPIHadoopRDDは本来HDFSからSparkアプリケーション用入力データを読み込むためのものですがmongo-hadoopコネクターを導入し入力データ型を変更することによりMongoDBからのデータの読み込みが可能となります。戻り値はRDD[(K,V)]となります。ちなみにRDDは正直私も完全な理解を得られていませんが、透過的に分散処理されるコレクションだと考えておけばいいでしょう。

上記は基本的にコメントの通りですがcase (k , v)とTuple2(v.get(“title”).toString , parsed)の箇所のみ説明します。
case (k , v)は想像つく方もいらっしゃるかもしれませんがkはMongoDBから読み込んだデータの_id、vはMongoDBから読み込んだデータ(BSON)となります。
Tuple2(v.get(“title”).toString , parsed)はタプル要素の1番目にwikipedia記事のタイトルを、2番目は記事本文を形態素解析した後の単語リストとなります。
この形式をmapの戻り値とすると暗黙的にRDD[(K,V)]の型のRDDが生成され、読み込み時にcase (k , v)という読み出し方が可能となるためこの形式にしてあります。

上記は全wiki単語リストを走査してユニークなwiki単語リスト(厳密にはMapですが)を生成してます。
これは実のところ必要がないのですが、Sparkアプリケーション内で使用できるアキュムレーターと言われる分散環境共有変数の説明と単語リストの単語の整数IDへの変換と特長量算出のために使用するHashingTFのコンストラクタの引数として重複を除いた語彙数を渡すために用意したロジックです。語彙数を渡さなくても動作しますが、その場合には自動的に2の20乗(1048576)となります。

まずアキュムレーターですがこれはApache Spark分散環境下で共有され、かつ値を加算することのみが可能な変数です。標準のアキュムレーターは基本的に整数型、浮動小数点型などの基本データ型のみが用意されており、主に集計に使用されます。
但し、今回使用しているものは標準のアキュムレーターではなく、カスタムアキュムレーターと呼ばれるものです。
これはAccumulableParamを継承して定義された開発者独自のアキュムレータを指します。上記のWordMapを見るとわかりますが、定義されている3つのzero,addInPlace,addAccumulatorをオーバーライドして標準サポート以外のデータ型にも対応したアキュムレーターを定義することになります。
3つのメソッドのそれぞれの働きはzeroはアキュムレータの初期化、addInPlaceが各Executor毎のアキュムレーターの統合、addAcumulatorがアキュムレータへの値の追加となります。

WordMapのようにカスタムアキュムレータを定義したのち、

と言った形で初期化することにより使用することができるようになります。

上記はk-meansでwiki記事データをクラスタリングするため、Spark MLlibのKMeansオブジェクトのtrainメソッドを学習用データ、クラスタ数、各クラスタ毎の中心の再計算の回数を引数に呼び出しています。trainメソッドはクラスタ数と再計算の回数が多くなればなるほど負荷が高くなります。今回100000件のデータをクラスタするにあたって少ないように見える10を指定したのは用意したインフラリソースではこれが限界であるためです。(10以上の値も試しましたがOut of Memoryでこけるし、今回は事例を挙げるのが目的なので穏便に完了するようクラスタ数を10としました)

上記はwiki単語リストからMongoDBへの出力データを生成しています。
wiki単語リストをwiki単語ベクターに変換し、それを学習後のk-meansのモデルクラスであるKMeansModelクラスインスタンスのpredictメソッドに渡してwiki単語ベクターの属するクラスタの判定を行っています。その後、MongoDBへの出力準備としてデータ形式をBSONオブジェクトに変換しています。
ここでmapの戻り値がTuple2(new ObjectId() , bson)となっていますが、これも前出のTuple2(v.get(“title”).toString , parsed)と同じで、この形式を戻り値とすると暗黙的にRDD[(K,V)]に変換されて返却されるためです。

但し、最後の

で呼び出しているsaveAsNewApiHadoopFileはRDD[(K,V)]のメソッドではなくorg.apache.spark.rdd.PairRDDFunctionsというクラスのメソッドです。
saveAsNewApiHadoopFileメソッド呼び出し時にRDD[(K,V)]が暗黙的にorg.apache.spark.rdd.PairRDDFunctionsクラスのインスタンスに変換されるようです。
逆に言うと出力用のデータの形式をTuple2(K,V)にしておかないと、RDD[(K,V)]に変換されず、しいてはsaveAsNewApiHadoopFileを呼び出せないということになります。というかコンパイル時にエラーになります。(Javaではこの暗黙変換は適用されません。明示的な変換が必要になります。)

ちなみにsaveAsNewAPIHadoopFileは本来HadoopのHDFSへSparkアプリケーションでの処理結果を出力するためのメソッドですが、入力データ読み込み時と同じく引数をMongoDBへの出力用に設定してやるとMongoDBへ出力することができます。

ではソースコードの説明はここまでとしてコンパイルを実施します。

コンパイルがエラーなく完了したら次は必要なライブラリをひとまとめにしたjarファイルを生成します。

上記を実行後、/usr/local/sbt/bin/target/scala-2.10以下にsparksamples-assembly-0.0.1.jarが生成されます。
これでSparkアプリケーションの開発とビルドは完了です。

6.Sparkアプリケーションの実行

それでは開発したSparkアプリケーションを実行してみましょう。
ますはSparkアプリケーションの本体が含まれるjarファイルをマスターノードとワーカーノードの同じ場所にコピーします。後述するspark-submit実行時にコピー場所を指定すると自動的にその場所から所定の場所(DriverとExecutorが起動されたワーカーノードの/usr/local/spark/work以下)にコピーされます。
「5.Sparkアプリケーションの開発とビルド」で生成されたsparksamples-assembly-0.0.1.jarを/var/tmpにコピーします。

各ワーカーノードへのコピーは今回rsyncを使用しました。以下のような感じで各ワーカーノードへコピーします。

それではよいよ実行に移ります。
/usr/local/spark/binに移動し、spark-submitコマンドを実行し、Sparkアプリケーションを起動します。

spark-submitはDriverとExecutorを起動し、–class引数で渡されたクラスに含まれるSparkアプリケーションを起動します(Driverが起動してるのかspark-submitが起動しているのかは詳細は調べてません)。Driverは起動されたSparkアプリケーションの実行制御(分散対象となる処理をExecutorに渡す等)と分散対象となっていない処理の実行、Executorは分散対象となっている処理(RDDのmapメソッドの中の処理等)を実行します。
Driver、Executor共に/usr/local/spark/conf/slavesに記述されたワーカーノードのいずれかで起動されます。Executorは全てのワーカーノードで起動され、ドライバーは自動選択されたワーカーノード1台に1つだけ起動されます。Driverもそこそこメモリを必要とするため、起動するノードの選択ができないか調べましたがどうも現時点ではできないようです。(spark.driver.hostというSparkアプリケーションの実行時設定が存在するがこれを設定してもエラーになる)

引数の説明を簡単にしますと

–driver-memory:Driverの使用可能メモリ上限
–total-executor-cores:起動されるExecutorが同時に使用可能なCPUコア数上限
–executor-memory:Executor毎の使用可能メモリ上限
–executor-cores:Executor毎の使用可能なCPUコア数の上限
–master:マスター情報(今回はSparkアプリケーションの内部で設定しているので多分不要ですが一応設定しました)
–deploy-mode:Sparkアプリケーションの実行環境の指定(スタンドアローン環境・スタンドアローンクラスタ環境・Hadoop-YARN環境等)

と言った感じになります。

–superviseはSparkアプリケーションのエラーハンドリングの厳密化と耐障害性のアップのためのものかなとは思いますが、厳密な意味は不明です。今回は何となく指定してあります。
基本的にそこそこ重要なもののみしか指定してませんが、–total-executor-coresは特に重要です。ここの数値を単純にワーカーノードの台数x各ワーカーノードのCPUコア数としてしまうと各ワーカーノードに場合によっては限界以上にExecutorが起動されてしまう場合があります。何の限界かというとCPUではなく、メモリです。CPUのパワーはあまり使わないがExecutor毎メモリをかなり必要とするSparkアプリケーションを実行する場合、これは小さめにしておく方がいいです。今回はこれを知らずに大き目に設定したところ限界を超えて(100%中の180%くらい)メモリを使用しようとしてSparkアプリケーションが落ちてしまいました。

spark-submit実行後以下のような感じでメッセージが表示されれば起動は成功です。

Driverでの処理の経過はDriverが起動されたワーカーノードの/usr/local/spark/work/work-[JOBID]以下のstderrファイル、Executorでの処理の経過はワーカーノードの/usr/local/spark/work/app-[JOBID]以下のstderrファイルで確認することができます。
また、ソースコード中のprintlnの内容はすべてDriverのstderrファイルに出力されるようになっています。

7.結果の確認

完了したかどうかはログ(Driverの処理経過ログ)で確認することができます。
標準でWeb経由で処理経過を確認できるツールが付属しており(スタンドアローンクラスタマスター起動時に同時に自動的に起動されます)そこから確認できますが、今回は割愛します。

正常にwikipedia記事データのクラスタリング結果がMongoDBのclustered_resultに格納されているか確認します。

show collections実行後に表示されるコレクション一覧にclustered_resultが存在することを確認します。
また件数・内容も確認します。

count()メソッド実行後に100000以上の数値が表示されればOkです。
また中身について、実際の出力内容は以下のようになります。(内容は抜粋です。下記のようなデータが実際には100000件あります)

なんとなーく似た者同士が同じクラスタに振り分けられていることが確認できるかと思います。

最後に

こんな感じでApache Sparkを使うと簡単にビッグデータを入力とした機械学習処理(当然集計でもOK)ができることが分かってもらえたかと思います。
スタンドアローンクラスター環境を試す場合にはどうしても複数台のサーバーが必要になるため個人でためすにはちょっと敷居が高いかもしれませんがConoHa等でなら低コストで試すことも可能かと思いますので是非一度Apache Sparkに触れてみてください。

2015-11-26

Impala で unix_timestamp(now()) を実行しても現在時刻が返ってこない件について

impala-logo
(画像は Impala の Web サイト より転載)

GMOインターネット 次世代システム研究室 兼 GMOアドパートナーズ グループCTO室のM. Y.(自称DevOps担当)です。今回は、普段の業務で気付いた Impala の小ネタをご紹介します。小ネタではありますが、Impala 初見の人は結構つまづきやすいポイントだと思います。

出題編:Impala の TIMESTAMP 型の不思議な動作

最近、Impala に格納したデータを JDBC 経由で取得するコードを書いてみたところ、なぜか格納したつもりの時間よりも9時間前の時間が返される、ということがありました。

Impala には TIMESTAMP 型で格納されているデータを、java.sql.Timestamp クラスのオブジェクトとして取得しているのに、なんで時間がずれるんだろう? データを入れ間違えたのかな?と思って、impala-shell を使ってデータを調べていたら、さらに不思議な動作に気づきました。unix_timestamp(now()) が現在時刻のUNIXタイムスタンプを返していないようなのです。

Impala には now 関数があり、マニュアルの Built-in Function Support によると、これは現在時刻を TIMESTAMP 型の値として返す関数です。例えば、SELECT 文で実行すると、以下のような値を返します。以下は、11月20日の14時に実行した例です。これは確かに現在時刻を返しています。

そして、now() の返り値を unix_timestamp() に渡して、現在時刻のUNIXタイムスタンプを取得すると、以下のような結果が得られます。

この 1448029930 から日時を逆算すると、JST の 2015-11-20 23:32:10 (このSQLを実行した時刻の9時間後)になりました。本来は 1447997530 になるはずなのですが……? どういうこと?

Impala の TIMESTAMP は UTC との相対値として格納される

この異常事態にそれまでの調査はどうでもよくなってしまい、とりあえずImpalaのマニュアルに飛びつきました。Impalaのマニュアルには、TIMESTAMP 型の定義について、こう書かれています。

Time zones: Impala does not store timestamps using the local timezone to avoid undesired results from unexpected time zone issues. Timestamps are stored relative to UTC.

Impalaのマニュアル:TIMESTAMP Data Type

Impala は UTC との相対値として timestamp を格納する、とのこと。これを読んで「UNIXタイムスタンプとして格納するのではなくて、UTC??」と疑問を持つ人は多いと思います。私も思いました。ちなみに、このUTCとの相対値は、マニュアルのなかで “UTC timestamp” とも呼ばれています。「UTCタイムスタンプ」? タイムスタンプに、タイムゾーンがあるってどういうことなの……。

例えば、Impala の関係が深いデータベース製品である Hive にも TIMESTAMP 型がありますが、Hive のマニュアルには、以下のように「UNIXタイムスタンプを格納する」と明確に書かれています。これは納得できます。

Supports traditional UNIX timestamp with optional nanosecond precision.

Hiveのマニュアル:LanguageManual Types – Apache Hive – Apache Software Foundation

Impala は時刻の文字列表現を自動的に TIMESTAMP に変換する

また、Impalaのマニュアルには、もうひとつ気になることが書かれていました。

Conversions: Impala automatically converts STRING literals of the correct format into TIMESTAMP values. Timestamp values are accepted in the format YYYY-MM-DD HH:MM:SS.sssssssss, and can consist of just the date, or just the time, with or without the fractional second portion. For example, you can specify TIMESTAMP values such as ‘1966-07-30′, ’08:30:00’, or ‘1985-09-25 17:45:30.005’. You can cast an integer or floating-point value N to TIMESTAMP, producing a value that is N seconds past the start of the epoch date (January 1, 1970).

Impalaのマニュアル:TIMESTAMP Data Type

STRING が許容可能な時刻文字列であれば、Impala はそれを自動的に TIMESTAMP 型に変換する、とあります。この変換の様子を確認するために適当なテーブルを定義し、TIMESTAMP 型で定義された registration_date カラムに値を挿入してみます。

格納後のデータを SELECT 文で確認すると、正しそうに見えます。

しかし、その結果を unix_timestamp 関数に通すと、今度は9時間先のUNIXタイムスタンプ(1448029930 = 2015-11-20 23:32:10 JST)が返されてしまいます。うーん。

Impala では to_utc_timestamp 関数と from_utc_timestamp 関数が必須

結論としては、Impala に TIMESTAMP 型を読み書きする際は、ローカルタイムを UTC に変換するために、to_utc_timestamp 関数と from_utc_timestamp 関数を必ず挟まなければなりません。上で説明した member_data テーブルの例については、以下のようにto_utc_timestamp 関数を使うと、正しく時間を格納できます。

この値を取得すると、以下のように登録した時刻の9時間前のデータになってしまいます。しかし、これは時刻が「UTCタイムスタンプ」として格納されているためで、これが Impala で TIMESTAMP 型を使う場合の正しいやり方になります。

一方、時刻を取得する際は、from_utc_timestamp 関数に、データを利用する環境のローカルタイムを渡す必要があります。

今度は、意図した通りの時刻が取得できていますね。JDBC 経由でレコードを取得する際も、SQL で明示的に from_utc_timestamp 関数を呼びだす必要があります。さもないと、”2015-11-20 05:32:10.497165000″ をUNIXタイムスタンプに変換した値が返されてしまいます。これが、JDBC 経由で TIMESTAMP 型を取得した場合に、9時間前の時間が返されてしまった理由でした。

解決編:unix_timestamp(now()) の正しい呼び出し方

最初に挙げた now 関数の例については、やや状況が複雑なのですが、

  • now 関数はローカルタイムの現在時刻の文字列表現を取得する
  • unix_timestamp 関数は引数に UTC タイムスタンプを取るため、now 関数の値を UTC の文字列表現として解釈する
  • このとき now 関数の返り値(JST = UTC + 9時間)を UTC として解釈するために、結果的に9時間進む

と考えれば理解できます。従って、以下のような SQL を実行すれば、現在時刻のUNIXタイムスタンプを正しく取得できます。

個人的には、「ImpalaのTIMESTAMP型 = UTCとの相対値 = UTCにおけるその時刻の文字列表現」というモデルを頭のなかに持っておけば、TIMESTAMP 型に関する Impala の動作について予想しやすくなる気がします。

最新のImpalaでの対応状況

この unix_timestamp 関数の動作がわかりにくいと思う人は他にも居るようで、Impala の JIRA にいくつかの issue が作られていました。先頭の issue が、今回の話に特に関係しています。

これらの issue について対策が行われて、Impala 2.2 からは impalad の起動時に -use_local_tz_for_unix_timestamp_conversions を指定することで、unix_timestamp 関数の挙動を Hive と同様のものに変更できるようになりました。つまり、unix_timestamp(now()) で現在時刻のUNIXタイムスタンプが返されるようになります。

ただし、Impala 2.2でも、use_local_tz_for_unix_timestamp_conversions はデフォルトで無効です。また、このオプションを有効にしても、Impala の TIMESTAMP 型は UTC タイムスタンプのままなので、to_utc_timestamp 関数および from_utc_timestamp 関数を使い続ける必要があります。今後も、Impala で TIMESTAMP 型を使う際には、まだまだおっかなびっくりになってしまいそうです。

最後に

余談ですが、Impala のマニュアルには、以下の一文が書かれています。

Impala does not store or interpret timestamps using the local timezone, to avoid undesired results from unexpected time zone issues. Timestamps are stored and interpreted relative to UTC. This difference can produce different results for some calls to similarly named date/time functions between Impala and Hive. See Impala Date and Time Functions for details about the Impala functions.

Impalaのマニュアル:SQL Differences Between Impala and Hive

タイムゾーンにまつわる予期せぬ問題による、予期せぬ結果を避けるために(”to avoid undesired results from unexpected time zone issues”)このような設計にした、とあるのですが、余計に問題をややこしくしているように感じるのは私だけでしょうか……。

最近 Cloudera からリリースされた、HDFS とは異なる特性を持つストレージの Kudu も、SQLクエリエンジンとして Impala をサポートしています。そのため、今後も Impala を利用するユーザは徐々に増えていくと思います。そのような方が Impala を触ってみて、TIMESTAMP 型の動作に頭を抱えたときに、この記事が参考になれば幸いです。

2015-11-24

D3.jsを使ってグラフを表示する

d3js
GMOモバイルのT.Oです。
管理画面などで売上などのデータをグラフ表示したいという要望は多いのではないでしょうか?
そこでこの記事では以下のような棒グラフの実装手順をお伝えします。
グラフ描画にはJavaScriptライブラリであるD3.jsを利用します。

barchart
1.グラフデータを用意する
JSON形式にします。

2.グラフデータを描画するHTMLの用意
SVG要素を記述したHTMLを記述します。D3.jsで生成するグラフはSVG要素の部分に展開されることになります。

 

売上高

2.グラフデータ、表示する図形の設定
棒グラフを表示するのにSVGの矩形(rect)を使用します。

3.アニメーションを設定する
棒グラフが下から延びるような表示にします。
transitionメソッドなどを追加し、変化させたい属性の初期値と最終値を指定します。

4.グラフを完成させる
日付表示、凡例、スケール表示、マウスオーバーした時のツールチップなどのグラフ要素を追加して完成です。
D3.jsの場合、グラフデータに直接ひもづく要素を描画するのは容易なのですが、それ以外、例えば凡例などの要素は扱いにくいため、別途HTML要素として記述する方がお手軽です。
またグラフは期間中の傾向をざっくり把握することができるような表示内容にとどめ、詳細な数値などは表形式でグラフの下に表示するのがおすすめです。
D3.jsはもっと複雑なグラフや図形を描画するのも利用されています。ご興味のある方は以下のサイトで多くの事例が紹介されていますので是非参照してみてください。
D3.js – 日本語ドキュメント

 

 

売上高

2015-11-19

Apache Drill 1.2でJDBC Storage Pluginを試してみる

apache-drill-logo2

こんにちは。GMO NIKKOのY.Sです。

先月、Apache Drill 1.2がリリースされました。
Apache DrillはスキーマフリーのSQLエンジンで、HDFS, HBase, MongoDB,  Amazon S3, ローカルのCSV, JSONファイルなど、複数のデータソースをテーブルと見なしてJOINすることができるので、クエリを使って様々なデータを取得できます。

今回のリリースでいくつかの機能が追加されました。

  • JDBC storage plugin を介したRDBのサポート
  • 新しいwindow関数の追加
  • File/Directoryに対するDROP TABLEができるように
  • MongoDBサポートの強化

今回RDBがサポートされたとのことだったので、どんな感じになるのか試してみました。
Drill in 10 Minutesの手順でインストールします。

今回はローカルマシン上でembeddedモードで動かしてみます。

tutorialにあったparquetフォーマットのデータにqueryを投げてみたところ、同じ結果が返ってきました。

JDBC storage pluginを使うにはこちらを参考に設定していきます。

事前にJDBC driverが必要になるのでjarファイルを落としておきます。
今回MariaDBに接続したかったので、MariaDB Connector/Java 1.2のページ
からダウンロードして、jarファイルをapache-drill-1.2.0/jars/3rdparty/配下に配置します。

MariaDBにはtest.t1テーブルを作って適当なデータをいれておきます。

drill-embeddedをいったん終了して抜けます。
次に設定するプラグイン設定情報を永続化できるようするため、<installed_path>/conf/drill-override.confに以下追記します。

再度drill-embeddedを起動します。
http://localhost:8047/storage/のWeb ConsoleにアクセスしてNew Storage Pluginの追加設定をします。

今回MariaDBに接続したいので、プラグイン名をmariadbと入力して[Create]を押します。
Configurationのテキストフィールドが表示されるので、次のように指定しました。

Updateボタン押してSuccessと出たので、うまくいったようです。
show databases してみます。

show tablesの結果は表示されないものの、SQLの実行はうまく行きました。

MongoDBにもテストデータを入れて Storage Pluginを有効にします。こちらは接続先情報を変えるだけで、query結果の取得までうまくいきました。

MongoDBとMariaDBのデータソースを使ってjoinしてみます。

無事ちゃんと取れてるようです。

今回はMongoとMariaDBで試しましたが、HDFS、HBase、RDBなど多様なデータソースに対してjoinしてSQLを実行できるので、色々な組み合わせで活用できそうです。

インストールしてすぐ試すことできるので、興味がある人は是非お試しください。

2015-11-10

「APG Engineer Night」を開催しました!

_テックブログ紹介用_20151111

少し前の話になりますが、9月にGMO アドパートナーズグループ(以下APG)のエンジニアが一堂に会して、「第2回APG Engineer Night」が開催されました。

グループ各社に分散しているエンジニアのコミュニケーションを強化して、グループシナジーを高めるための施策の一つとして、半年に1度ほど定期的に開催して行こうというものです。

当日は、「火災警報」(誤報)による突然の停電で、会場が15分位、照明やマイクなど電源が全部落ちるハプニングもありましたが、各社長にもご参加いただいたAndroidウオッチやドローンが当たる「じゃんけん大会」などもあり、大いに盛り上がりました。参加いただいた80名以上の皆様にも概ね喜んでいただき、幹事一同、嬉しかったです。今後もぜひ続けていきたいですね!

次回の幹事へ引き継ぐ反省点として、一番は、

・APGのエンジニアをなめんなよ、思っているより、ソフトドリンクが売れるゾ☆

ですかね!

停電で空調ストップもあってか、コーラが無くなるのが、早い早い・・・。

ビールやサワーは残っていましたが、参加者の皆様、喉カラカラでした!(笑)