分散処理の神Rubyライブラリ Rindaを紹介する
プログラミング::technique
Rindaとは
Rubyは多彩なライブラリが標準添付されているが、いくつかは超強力なライブラリであるにも関わらず、ほとんど知られていないようなものもある。 例えばPStoreなどだが、Rindaは全私による「使わないのもったいないRubyライブラリ選手権」第一位のライブラリである。
RindaはRubyのタプルスペース実装だ。 タプルスペースそのものはJavaのLindaというライブラリで有名である。
タプルスペースとは
タプルスペースはプロセス間通信手法のひとつだ。 タプルスペースを使うことで簡単に並列実行可能なプログラム書くことができる。 しかも、タイミングによらない任意のワーカーの増減やネットワーク分散など難易度が高いものが簡単に実現できるのだ。
タプルスペースはデータを仲介し、タプルスペースに書きだしたデータを保持しタプルスペースからデータをとることができる。
タプルスペースでは次の5つの操作が可能:
- out: タプルスペースに値を書き出す
- in: タプルスペースから値を削除しプロセスに返す (bloking)
- rd: タプルスペースから値をコピーしプロセスに返す (blocking)
- inp: inのnon-blocking版
- rdp: rdのnon-brocking版
「タプルスペースに書く」「タプルスペースから取る」という操作によりプロセス間通信が実現できる。 タプルスペースから取り出す値は条件をつけることができ、これによって自身に対する応答を取り出したり、種類のことなるやりとりをすることができる。
Rindaにおけるタプルスペース
Rindaはタプルスペースとのデータの受け渡しにDRbを使っている。
DRbは分散Rubyのためのメソッドのリモート呼び出し機能である。 DRbを使うとRubyを複数のマシンをまたいで使っている。
DRbでのメソッド呼び出し、つまりネットワークやりとりはMarshalを使っている。
Marshalはライブラリではなく組み込みクラスなのだが、すごく便利なのに全然使われていないクラスである。 具体的には
= Marshal.dump(obj) str
のようにしてオブジェクトを文字列化したデータを得られる。
File.open("foo.marshal", "w") {|f| Marshal.dump(obj, f) }
のようにしてファイルに書き出すこともできるし、
= Marshal.load(File.read("foo.marshal")) obj
のようにオブジェクトに戻すこともできる。
Marshalはほとんどの組み込みクラスを扱うことができ、わざわざ書けばカスタムクラスを使うことも可能だ。
DRbはMarshalを使うため、Rindaも当然ながらMarshalを使うことになり、幅広いオブジェクトをタプルスペースを介してやりとりすることができる。このため、とてつもなく強力だ。
Rindaはrd, inp,
rdp相当の操作がなく、非常にシンプルにRinda::TupleSpaceProxy
のwrite
でout操作、take
でin操作となっている。
さらにDRb.uri
を使うことによりメッセージのやりとりを一意に特定可能だ。
また、DRbはそもそもTCPを使うようになっており、これによってRindaを使うとネットワーク分散も可能になる。
Rindaを使う設計
Rindaサーバー、つまりタプルスペースはひとつである。 非常にシンプルに
require 'drb/drb'
require 'rinda/tuplespace'
= ARGV.shift
uri DRb.start_service(uri, Rinda::TupleSpace.new)
puts DRb.uri
DRb.thread.join
のようなコードになる。
これは値を受け取り、保持し、取り出すタプルスペースそのもののコードである。
これに対して、write
やtake
するクライアントプロセスは、どのような形であってもよい。
すごく単純にはwrite
するプログラムとtake
するプログラムのふたつで受け渡しする形式だが、Aプログラムがwrite
してtake
し、Bプログラムがtake
してwrite
するようにすると、Aプログラムが処理を要請し、Bプログラムが処理して値を返し、Aプログラムが結果を受け取るというものになる。
Rindaで重要なポイントは、これがあくまでタプルスペースに対する書き込みと取り出しであるということだ。 そのため、書き込むプロセスも取り出すプロセスもいくつあっても問題はなく、非対称な、そして任意数の並列実行が可能である。
簡単なサンプル
次のプログラムは、引数に与えられたふたつのファイルパスをタプルスペースに書き出す。
require 'drb/drb'
require 'rinda/rinda'
= "drb://fooserver.local:40121"
uri
= ARGV.shift
sourcefile = ARGV.shfit
destfile
DRb.start_service
= Rinda::TupleSpaceProxy.new(DRbObject.new(nil, uri))
ts
.write(["ffmpeg", {source: sourcefile, dest: destfile}]) ts
このプログラムはタプルスペースに書くだけで一瞬で実行は終了する。 並列化する意味は乏しいが、たとえ同時に実行したとしても支障はない。 つまり、このプログラムを呼び出すプログラムは並列に実行できる。
次の2つ目のプログラムは、タプルスペースから値を取り出し、ffmpegによって変換する。
require 'drb/drb'
require 'rinda/rinda'
= "drb://fooserver.local:40121"
uri
DRb.start_service
= Rinda::TupleSpaceProxy.new(DRbObject.new(nil, uri))
ts
while arg = ts.take(["ffmpeg", nil])
system("ffmpeg", "-i", arg[1][0], "-c:v", "libvpx-vp9", "-crf", "38", "-c:a", "libopus", "-b:a", "128k", arg[1][1])
end
libvpxによる変換処理は重い処理であり、かなりの時間がかかる。 しかし並列性は最高ではないから、メニーコアのマシンであればいくつかのプロセスを並列実行できるだろう。 また、複数のマシンでの並列実行をしてもしれない。
既に述べた通り、タプルスペースを使うプログラムはいくつ並列で起動してもかまわないし、Rindaはネットワークごしにも利用できる。 プロセスを起動した数だけ分散処理されるのだ。
なお、この場合ネットワーク分散するには、同じファイルパスでファイルにアクセスできるようにする方法は別に用意しなければならないことに注意してほしい。
ここではffmpegにしたが、受け渡すものを引数と考え、system
を任意の時間のかかる処理にすることができる。
Rindaの利用
私が書いたプログラムでは、オープンソースではないがPlutoがRindaを採用している。
Plutoは内部でCairo/Pangoを用いた組版、Popplerを用いたPDF変換を行っており、いずれの処理もかなり重い。 これをそれぞれの段階ごとに受け渡すことで各フェーズが直交的に並列処理するようにできる。
Rinda採用の理由は、並列性をコントロールしやすく、サーバー増強を簡単に繁栄できる、というのが大きい。 さらに、1台で足りない場合はクラスタ化も可能だ。もっとも、その場合ファイルIOをネットワークごしにやることになり、ファイルIOがかなり重い(大きなファイルになる)ものであるから、100MbpsのConoHa VPS LANでは心もとなく、できればネットワークごしにはやりたくなかったが。
また、先の例とほとんど同じ内容で、ゲームのプレイ動画を変換するスクリプトもRindaを採用した。 例の通り、libvpxを用いたVP9への変換が重く、私は強力なマシンを複数保有するため、並列で動画処理をしたいと考えたことから、Rindaを採用した。 当初は純粋なTCPサーバーキューを実装すること考えたのだが、対象のリロードが難しいことと、Rindaを使ったほうが簡単なコードになるため、Rindaを採用した。
単純には次のようなコードで、標準入力の値をラウンドロビン式にプロセスに渡す。
COMMAND = "cat -n"
= []
ps = 5
nproc .times {|i| ps.push IO.popen(COMMAND, "w")}
nrpoc
= 0
i STDIN.each do |line|
[i].puts line
ps= ( i + 1 ) % ps.length
i end
これで並列化はできるが、個々のコマンドの実行時間にムラがあると、最適な並列化とは言い難い。 これをプル式に変えればいくらかは改善する。
require 'socket'
= ARGF.each.map {|i| i.chomp }.to_a
args = UNIXServer.new("/run/para1.sock")
serv while s = serv.accept
.puts args.shift
s.close
sbreak if args.empty?
end
.close serv
require 'socket'
= UNIXSocket.new("/run/para1.sock")
s
while arg = s.gets
system("command", arg)
end
もちろん、ZeroMQを使う方法もあり、これはネットワーク分散もできるようになる。
require 'ffi-rzmq'
= ARGF.each.map {|i| i.chomp }.to_a
args
= ZMQ::Context.new
cx = cx.socket(ZMQ::REP)
sock .bind("tcp://127.0.0.1:40000")
sock
loop do
.recv_string
sock.send_string(args.empty? ? ":exit" : args.shift)
sockend
require 'ffi-rzmq'
= ZMQ::Context.new
cx = cx.socket(ZMQ::REQ)
sock .connect("tcp://127.0.0.1:40000")
sock
while sleep 10
.send_string("PLZ")
sock= ""
s = sock.recv_string s
rc break if s == ":exit"
puts "WOIRKER RECEIVED: #{s}"
end
だが、ZeroMQに対してRindaを使うメリットとして
- Rindaは標準添付ライブラリであり、セットアップが楽
- ZeroMQにジャストフィットするモデルでない限り、多くの場合Rindaのほうが短く書ける
- Rindaのほうが柔軟で、簡単に任意数の増減できる並列処理が書ける。ZeroMQだとそれなりに設計が必要で、柔軟な増減のためにはもう一段構成を増やす必要があり、長く複雑になる
- ZeroMQの場合、シリアライズ/デシリアライズは手動でやる必要がある。Rindaならオブジェクトをそのまま渡すことができる
- 複数種類のメッセージやりとりをする場合、ZeroMQはそれだけサーバーを増やす必要がある。Rindaはひとつのタプルスペースですべて処理できる
といったことが挙げられる。
Rindaの注意点
Rindaの最大の注意点は「FIFOではない」ということだ。
Rindaを使う場合、条件に一致するどのオブジェクトが返るかというのは予期できない。 「条件に一致するいずれかのオブジェクトを取り出すのだ。
このことから並列処理において入力された順序を守る必要がある場合、かなり難しい(なおかつ並列性または柔軟性をいくらか損ねる)方法が必要になる。
「Rindaを使う」選択肢が出る場面
前提として「プロセス間通信が必要である」という状況であるときにRindaは登場する。
便利なプロセス間通信ライブラリとして挙げられるのは、やはりZeroMQだろう。 ZeroMQは便利だが、より柔軟な並列実行、特に「多対多」モデルにはあまり適していない。
複数ワーカーから複数ワーカーに受け渡すようなものであったり、より簡単に柔軟に並列実行できるようにしたい場合、Rindaを採用するとうまくいくことが多い。
結び
Rindaの強力さとしては、なんといっても受け渡しを書くだけで、柔軟な並列実行が可能になる、というのが大きい。 このような並列処理用ライブラリが標準で存在する、という発想があまりないのか、あまりにも利用されていないライブラリではあるが、実際は超強力だ。
タプルスペースによる受け渡しコストは非常に低く、自身がシンプルに「処理すべき対象を受け取り、処理し、完了したらタプルスペースに通知する」という動作をするスクリプトを書けば、それだけでプロセッサ数の増加に対してリニアな処理性能増加を果たすスクリプトを書くことができる。 これは、非常に簡単だ。
このようにそれぞれの段階が直交し、待ち合わせをしない設計は並列処理における高パフォーマンスが発揮できるだけでなく、Orbital designとの相性も良い。
プログラムを小さく保つことができるのもメリットだ。 さらに小さなスクリプトを必要に応じてSystemdで起動する、というLinuxらしいアプローチにも適している。
なにもこれはRubyプログラマだけに利益があるわけではない。
例においてffmpeg
を出したように、system
,
IO.popen
、あるいはUNIXドメインソケットなどを使うことによって他の言語で書かれたプログラムを並列化するのにも役に立つ。
このように強力なライブラリであるRinda、ぜひ使ってみてほしい。