こんにちは。 Hadoop 周辺をよく触っている T.O. です。
Hadoop 周辺をよく触っているので、最近 Hadoop 周辺を触ってきて得た話などを書いていきます。
ということで今回は、数あるストリーム処理エンジンの中のひとつ Apache Storm を少々触ってストリーム処理を書いてみよう、という話を。
Apache Storm とは
ひとことで言えば、いわゆるストリーム処理エンジン。
以前、別のブログで Apache Apex について書きましたが、おおまかにはそれと同じカテゴリーに属するツールです。
例のごとく、詳しいことは 公式ドキュメントを熟読すればだいたいわかります(リンク先は 1.1.0 のもの)。
書籍は Amazon を調べれば多少見つかりますが、良書かどうかは不明です。このあたりの分野についてはウェブで英語で書かれたドキュメントを読むのが一番良いように思います。
日本語の資料だと、少し古いですが、 NTT データの方による「ストリームデータ分散処理基盤Storm」が概要をつかむにはよいかもしれません。
用語
ところで、以降の話を読み進めていただく上で、ある程度、用語への理解が必要になるので、その説明をごくかんたんにしておきます。
なお、今回は、 Storm を構成するプロセスについての説明は含みません(ので、 Nimbus などの話もしません)。
Topology
Topology (トポロジー)は、 Storm によるストリーム処理の流れ・構成を表したもので、 Spout (スパウト)と Bolt (ボルト)をつなげたものです。
Spout
Spout (スパウト)は、 Topology におけるデータの生成元になるノードです。データの生成、というのは、 Spout のコード中で生成することだけでなく、外部のデータソースから取り出すことも含みます。
Bolt
Bolt (ボルト)は、 Spout あるいは Bolt からデータを受け取り、なんらか処理をするものです。
Tuple
Tuple (タプル)は、 Storm の Topology において処理されるデータの単位です。一般的な Tuple と意味合いとしては同じです。 Storm におけるストリームというのは連続して流れてくる Tuple にほかなりません。
実装に使える言語
Storm の Topology, Spout そして Bolt の実装は基本的には Java でやることになります。もちろん Scala でも書けますし、さらには、 Clojure で書くこともできます。
そしてさらに、非 JVM 言語でも書けます。
Apache Storm を使ってストリーム処理をやってみよう
環境
今回も例のごとくGMOアプリクラウドを利用して環境を構築しています。
Storm については、 HDP 2.6.1 に含まれているもの( 1.1.0 ベース)を利用します。
本来、ストリーム処理などのいわゆる “Data in motion” 領域では同じ Hortonworks のディストリビューションを使うのであれば HDF(Hortonworks DataFlow) を使うのが筋といえば筋なのですが、たまたま勉強用に HDP 2.6.1 を使ったクラスタを用意してあり、 HDP 2.6.1 は Storm を含んでいるので、それを利用することにしました。
なお HDF については、以前、別のブログで NiFi を利用した処理の実装についてで書いていますので興味がある方は参考までに。
内容
今回は、実装したのは、端的に言えば Twitter でバースト検出です。 Twitter 公式で「トレンド」として提供されているアレです。
バースト検出というと他の例では、 Yahoo!JAPAN のサービスで言うところの「急上昇ワード」もそれにあたるだろうと思います。
しかし、まじめに実装するとそれなりに大変になるところだろう、ということと、今回はあくまで主眼は Storm でストリーム処理をしてみよう!というところなので、バースト検出そのものについては極めて雑にかんたんにやっています。
あくまで「何らかの条件にパスした語を取り出すフィルタ」ぐらいに思っていただいていて OK です。
さて、そんな Twitter バースト検出ですが、やることを一段階具体化すると、以下のようになります。
- Twitter のツイートを取り出し
- ツイートを形態素解析にかけて語に分割し
- 語の出現頻度等を見て、いわゆるバースト検出を行なう
では、それぞれのステップを具体的にしていきます。
まず最初の「Twitter のツイートを取り出し」です。
ツイートを取り出すことそのものは、 Twitter Streaming APIs の public stream を使えばできます。
今回は Java を使って実装することにしましたが、 Java で Twitter の API をたたくとなれば Twitter4J を使えばかんたんにできます。
そして、 Storm の Topology としては、これが Spout になります。
次に「ツイートを形態素解析にかけて語に分割し」ですが、今回は日本語のツイートを対象としたいので、いわゆる日本語形態素解析エンジンを使います。
日本語形態素解析エンジンはいろいろありますが、今回は kuromoji を使います。
kuromoji がサポートしている辞書にはいくつかありますが、 Twitter というサービスの性質上、新語にも対応していてくれた方がよいので、 kuromoji-ipadic-neologd を利用します。
こうして、ツイートを構成している語を出力するような Bolt を実装することができます。
Spout にしても Bolt にしても、それらを実装するにあたっては、指定のインターフェイスを実装したクラスを書くことになります。
それぞれ、おおもとをたどると ISpout インターフェイス、 IBolt インターフェイスですが、それを実装している BaseRichSpout クラスや BaseRichBolt クラスを extends してつくる方がお手軽といえばお手軽になるようです。
最後に「語の出現頻度等を見て、いわゆるバースト検出を行なう」を行います。
これも Bolt として実装します。
バースト検出のアルゴリズムは、本来であれば、 Bursty and Hierarchical Structure in Streams を実装するのが良いのですが、今回はそれ自体のアルゴリズムはほぼどうでもいい、としているので、「ある語が、一定時間の中で今までの平均よりも X 倍多く出現したらバーストしたとみなす」というシンプルなロジックにしています。
この「一定時間の中で」というのを実現するのはいわゆるウィンドウです。
Storm では、 Sliding Window と Tumbling Window に対応しています。
今回は Tumbling Window を使ったのですが、これは実現したい処理次第でしょう。
詳細は公式ドキュメントの解説をご覧ください。
さて、このようにして Spout と Bolt を用意するわけですが、さらにこれらを利用した Topology をつくる必要があります。
Topology をつくる、というのは、つまり Spout と Bolt を接続する、ということですが、例えば下記のコードのように書きます。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
TopologyBuilder tb = new TopologyBuilder(); tb.setSpout("spout", new SampleSpout(), 1); tb.setBolt("bolt1", new SampleBolt1(), 1).globalGrouping("spout"); tb.setBolt("bolt2", new SampleBolt2(), 1).globalGrouping("bolt1"); StormTopology st = tb.createTopology(); Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(3); StormRunner.runTopologyRemotely(st, "topology", conf); |
このようなコードを適当にクラスの main に実装します。 Topology については、 implements しないといけないインターフェイスはないようです。
このようにして実装した Topology, Spout, Bolt と依存しているライブラリをまとめた jar をつくり(この際には Apache Maven Assembly Plugin が役に立つでしょう)、下記のように storm
コマンドを実行すれば、 Topology の開始、となります。
1 |
$ storm jar sample-topology-0.0.1-SNAPSHOT-jar-with-dependencies.jar my.package.storm.sample.Topology |
ところで、コードこそ出しませんが、今回、実装するにあたっては、最終的な結果、つまり「バーストした」として検出した語を HDFS に書き出していました。これは Storm にビルトインの HdfsBolt クラスを使えばかんたんに実現できます。が、特にバースト検出のようなアプリケーションの場合、 HDFS 上のファイルとして書き出す、というのは、利用する側から考えるとあまり適切とは言えないでしょう。こういうケースでは、例えば WebUI からのアクセスするものとして MySQL など RDB や HBase に書き出すような Bolt を用意するとよいかもしれません。
実行状況のモニタリング
ここまでで Topology を実装し、動かすことができたわけですが、実行の状況は専用の WebUI から確認することができます。これにより今まで処理したデータの量や、個々の Spout や Bolt でエラーが発生している場合はそのエラーメッセージを確認することができます。
まとめ
今回は Apache Storm を使ってかんたんなストリーム処理を実装してみました。
デモをお見せしないと実感しづらい話ではありますが、割りとあっけなく実装でき、動いてしまうので、みなさんもお試しください。
ちなみに、以前に試した Apache Apex と比べてみると、正直なところ、ビルトインの処理( Storm で言うところの Spout と Bolt, Apex で言うところの Operator )の充実度や、独自に書く場合の書きやすさ、運用のしやすさ( Apex は on YARN だが、 Storm の Nimbus や Supervisor など独自のコンポーネントがある)を考えると Apex の方がかんたんだったな、という感想になっています。利用実績としては恐らく Storm の方が上で、世界的にはノウハウも溜まっているとは思いますが…。
Hadoop おじさん業を営んでいるが、元々は Java と Perl のウェブアプリケーションエンジニア。 実はまだウェブアプリケーションエンジニアとしてのキャリアの方がギリギリ長い。冬は毎週末スキーに行くため、夏はおとなしく過ごす。