Chienomi

Rindaを用いた分散計算設計

プログラミング::technique

ここのところ立て続けにRindaの記事を2本書いたが、今回はより実用的にRindaを使った分散システムを書く方法について解説する。

先にRindaについてのアップデートを記載する。

  • タプルスペースに書いた順序が保たれるようになった(っぽい)
  • Ruby 3.0よりRindaが標準ライブラリから外され、Gemになった!!!!!

なお、Rindaの基本的な書き方はゲームプレイ動画の動画変換用に分散処理できるスクリプトを書いた, 分散処理の神Rubyライブラリ Rindaを紹介するを読んで欲しい。

前提として次を理解しておくこと。

  • タプルスペースを提供するサーバーはひとつだけ起動している状態になる。このサーバーにはTCPを介してアクセスできる
  • 対称になるような設計にしない限り、タプルスペースに書く、タプルスペースから取るプログラムは、タイミングや数に関係なく実行できる

単純なキュー

もっともシンプルなプログラム。

これは “分散処理の神Rubyライブラリ Rindaを紹介する” で紹介したものだ。

タプルスペースに引数を書き込むプログラム。

require 'drb/drb'
require 'rinda/rinda'

uri = "druby://fooserver.local:40121"

sourcefile = ARGV.shift
destfile = ARGV.shfit

DRb.start_service
ts = Rinda::TupleSpaceProxy.new(DRbObject.new(nil, uri))

ts.write(["ffmpeg", {sourcefile, destfile}])

それを受け取ってffmpegを実行するプログラム。

require 'drb/drb'
require 'rinda/rinda'

uri = "druby://fooserver.local:40121"

DRb.start_service
ts = Rinda::TupleSpaceProxy.new(DRbObject.new(nil, uri))

while arg = ts.take(["ffmpeg", nil])
  system("ffmpeg", "-i", arg[1][0], "-c:v", "libvpx-vp9", "-crf", "38", "-c:a", "libopus", "-b:a", "128k", arg[1][1])
end

ファイル共有をすればネットワーク越しに分散することもできる。

応答する

簡単なキューは一方向に受け取るだけだったが、今度は応答してみよう。

応答するとなると、処理を実行するプログラムは「取り出す」「書き込む」の2つの動作をすることになり、処理を実行するプログラムが取り出す値と、処理を実行したプログラムが書く値の2種類がある。

ここでは、ファイル共有なしに、rsyncを使って互いに転送する形式をとってみよう。なお、処理効率はファイル共有するのよりも悪いので、実用的に先のものより良いわけではない。だが、値を受け渡すという意味でのサンプルになる。

値を書き込むプログラムは同じだ。

受け取ってffmpegを実行するプログラムは、値を取り、rsyncし、ffmpegで処理し、値を書き込む。

require 'drb/drb'
require 'rinda/rinda'

uri = "druby://fooserver.local:40121"

MY_HOST = `hostname`.chomp
DATA_HOST = "fooserver.local"

DRb.start_service
ts = Rinda::TupleSpaceProxy.new(DRbObject.new(nil, uri))

while arg = ts.take(["source", nil])
  system("rsync", "#{DATA_HOST}:#{arg[1]}", "/var/tmp/dist/source/#{File.basename(arg[1])}")
  system("ffmpeg", "-i", "/var/tmp/dist/source/#{File.basename(arg[1])}", "-c:v", "libvpx-vp9", "-crf", "38", "-c:a", "libopus", "-b:a", "128k", "/var/tmp/dist/conved/#{File.basename(arg[1], ".*")}.webm")
  ts.write(["conved", "/var/tmp/dist/conved/#{File.basename(arg[1], ".*")}.webm", arg[1], MY_HOST])
end

データを保存するサーバー上で、ffmpegで処理するスクリプトが書いた値を受け取り、rsyncで取得する。 とりあえず、ここでは失敗することは考えない。

require 'drb/drb'
require 'rinda/rinda'

uri = "druby://fooserver.local:40121"

DRb.start_service
ts = Rinda::TupleSpaceProxy.new(DRbObject.new(nil, uri))

while arg = ts.take(["conved", nil, nil, nil])
  File.delete(arg[2])
  system("rsync", "#{arg[3]}:#{arg[1]}", "#{arg[2].sub(/\.[^./]+/, '')}.webm")
  system("ssh", arg[3], "rm", arg[1])
end

これは非同期のメソッド呼び出しだし考えて良い。 つまり、

ts.write(メソッド名, 引数)

である。

実際の非同期メソッド呼び出しと違う点として、「手が空いたワーカーがプルする形で起動される」ということがある。 もし本当に非同期に呼び出すと、プロデューサー側の都合でカスタマーが起動されることになり、プロデューサーはその供給量を計算リソースを踏まえた量にコントロールする必要がある。

しかし、Rindaを使う場合、使用するワーカーリソースは明示的に調整可能だ。 ワーカーを起動する数を増減させれば良い。

今回の設計のようにプロデューサーがカスタマーの振る舞いを気にしなくて良いのであれば、カスタマーがプロデューサーとなる呼び出しもまた、メソッド呼び出しのようなものだと考えることができる。 一方、待ち合わせることもできる。その場合、

ts.write(["req", DRB.uri, arg])
result = ts.take(["rep", DRb.uri, arg])

のようにできる。これは、非同期メソッドに対するwaitと同じような処理である。

ただし、私はプロセスは

  • タプルスペースに書く
  • タプルスペースから取る
  • タプルスペースから取り、タプルスペースに書く

のいずれかであることを強く推奨する。そのほうが柔軟だからだ。 この場合、「タプルスペースに書き、タプルスペースから取る」という動作になるから、適切ではないと考える。

大きなデータと多くのフェーズ

今回の場合応答するモデルはよくないと言った。理由は、動画データという大きなデータを対象にしているからだ。

rsyncを使う場合、データ全体を転送する。 もちろん、ffmpegが使うのはデータ全体であるが、ffmpegの処理速度はデータ転送速度よりもずっと遅いので、ffmpegの処理速度に合わせてデータを転送してもらうほうが回線負荷が小さく、回線が詰まりにくいだろう。だから、SSHFSを使うなどしてデータをもっと柔軟に扱えるようにしたほうがいい。

では、そもそもデータベースとやりとりする1レコードが巨大であり、なおかつ何度も読み書きをする必要がある場合はどうだろう?

大きなデータを扱うとき、基本的にはその大きなデータそのものはネットワーク上を流れないようにしたほうが良い。だが、ffmpegのように1データが大きく、処理時間も長いものの場合、同等の計算力を持つマシンで分散したほうが良いかもしれない。

これは各マシンの処理時間中において

全マシンの転送データ総量 < ネットワーク容量

であるならば分散して良い、と言える。

例えば、1つのマシンは1GBのデータを1分で処理し、1GBのデータを応答する、とする。

ネットワークがGbEで、本当に1Gbps出るとすると、1分間の転送量は上下合わせて16Gbであり、ネットワークの容量は60Gb/mである。

2台で分散した場合、32Gb/mを必要とするが、足りているので分散したほうが速い。3台なら48Gb/m。まだ速い。

しかし、4台になると64Gb/mを必要とし、実際の容量は60Gb/mだから「転送待ち」が発生する。

ネットワークはあまり容量が大きくないため、データが大きくなると分散したために遅い、ということが普通にある。 上記の例では3台がベストであるかのように見えるが、これを1台のホストで解決するとそもそも「上下16秒の転送時間」が消えるため、44秒で処理できるようになる。 2分散すると1データ平均30秒で処理されることになるから、この場合はまだ分散したほうが速いが、もう少しデータが大きくなると分散が裏目に出る。

こうした場合の有効な方法として、「最初と最後だけ転送する」という方法である。 例えば次の段階があるとする。

  1. データをパースし、構造化する
  2. 構造化したデータを検証する
  3. 検証されたデータを計算し、resultデータを生成する
  4. resultデータを元に、出力ファイルを編成する

この場合、それぞれの処理の重さによって、それぞれ分散の重みが2,1,4,4だとする。 これをそれぞれ転送すると5回転送が必要で、かなりのネットワークトラフィックが発生するだろう。

そこで、ネットワーク分散はソースファイルレベルでしか行わない。 つまり、一段目の前に「データを要求に応じて渡す」というものを追加する。 ここからデータをパースするためにデータを受け取るのはネットワーク分散する。 しかし、そこから出力するまでの間はlocalhost上のタプルスペースにアクセスするようにして、ネットワーク伝送を行わない。

最終的には集約する必要があるとしても、ネットワークによるデータ伝送は2回に減るのでネットワーク分散の効率が上がる。 さらに、ネットワークトラフィックが減ることにより、マシンによって要求タイミングが分散し、待ち時間は減る可能性もある。

計算時間が短い処理は分散すること自体が微妙だろう。 もちろん、取り回しのために分割することは考えられるが。

様々な処理を混在させる

書き込む側は「誰が処理すべきものか」を考えず、指示だけを書く。 処理する者が複数いれば分散になる。これは、「非同期のリモートのメソッド呼び出し」に似ている。

まず、app Aは「更新情報」を要求する。 これは複数のアプリが提供してくれるかもしれないので、全部から15秒間受け取る。 この15秒間は「最後に更新を受け取ってから15秒」である。

require 'drb/drb'
require 'rinda/rinda'

uri = "druby://fooserver.local:40000"

#...

DRb.start_service
ts = Rinda::TupleSpaceProxy.new(DRbObject.new(nil, uri))

ts.write(["req", "news", "2020-11-05", DRb.uri])

begin
  while val = ts.read(["res", "news", nil, DRb.uri], 30)
    add_news(val)
  end
rescue Rinda::RequestExpiredError
  ts.take(["call", "news", nil, "2020-11-05", DRb.uri])
end

これは「ブロードキャスト型リクエスト」の基本形である。 リクエストを書き込んだら、リプライをもらうのだが、いくつリプライをもらうかわからない。 そこで、15秒待ちでリプライを受け取り、受け取ったら更新処理を走らせる。

タイムアウトした場合、もう受け取るべきものはないと判断し、タプルスペースからリクエストを削除する。

readはコピーするが削除しない。takeはコピーして削除する。 Rindaは削除だけのメソッドがないのでtakeしている。

ここでタプルスペースから削除しているのが、リクエスト時のものとは異なるのは、リスナー側が同じリクエストを処理してしまわないようにするために間に唯一のクライアントがはさまるためだ。

require 'drb/drb'
require 'rinda/rinda'

uri = "druby://fooserver.local:40000"

DRb.start_service
ts = Rinda::TupleSpaceProxy.new(DRbObject.new(nil, uri))

request_counter = Hash.new(0)

while val = ts.take(["req", nil, nil, nil])
  request_counter[val[1]] += 1
  ts.write(["call", val[1], val[2], request_counter[val[1]], val[3]])
end

リクエストを処理する側は次のようになる。

require 'drb/drb'
require 'rinda/rinda'

uri = "druby://fooserver.local:40000"

#...

DRb.start_service
ts = Rinda::TupleSpaceProxy.new(DRbObject.new(nil, uri))

#...

limiter = 0

while val = ts.read(["call", "news", (limiter..), nil, nil])
  limiter = val[2] + 1
  ts.write(["res", "news", return_news(val[3]), val[4])
end

最新のRindaはタプルスペースに入れた順序を守るようになったので、limiterにより「最低いくつ」とすればリクエストした順序を守って処理してくれる。このため、受け取ったものプラス1を最低値だとみなして良い。

もしこの前提が正しくない場合(検証した限りだと順序が保たれるようだったが、確証はない)、read_allを用いて全部読み、その中で最大のものに1を足したものをlimiterに設定すれば良い。

もちろん、1:1でやりとりするのであれば、

ts.write([command, target, arg, DRb.uri])

のようにして、

val = ts.take([COMMAND, IAM, nil, nil])
ts.write([val[3], proc_command(val[2]))

として処理し、リクエストした者は

val = ts.take([DRb.uri, nil])

とすれば良い。