Chienomi

Orbitalデザインパターン用のキュー実装解説

開発::util

Orbital designは私が提唱するマルチプロセス向けのデザインパターンであり、

  • 非常に単純で守りやすい規則
  • 必然的にプロセスあたりが非常にコンパクトになる
  • スケーラビリティが極めて高い
  • プリミティブな機能で実現可能

といった特徴がある。

基本的に実装は非常に容易であるため、わざわざライブラリを導入するほどではないことがほとんどだが、小さなワーカーにキュー機能を導入書くのは面倒でもあるため、ライブラリ自体はあったほうが良い。 結果、小さなライブラリを自分で書くのがスタンダードなスタイルになる。

Orbital QueueはそんなOrbitalデザインパターンを実践するRubyのキューライブラリである。

RubyGemsでも公開されており、

gem install orbitalqueue

bundle add orbitalqueue

として導入可能。

Orbital Queue概要

特徴

Orbital QueueはOrbitalデザインパターンを意識して書かれているが、単純にRPC手段のひとつとして簡単に利用できる。 その特徴としては以下の通り

  • ファイルベース
  • Gemに依存していない
  • 軽量
  • キューアイテムが増えても時間はほぼ変わらない
  • 実質行が63行しかない (0.0.1時点)

ファイルベースの動作

キュー取得の仕組みはかなり単純

  • ディレクトリ内のファイルを一覧する
  • ファイルリストに対して#eachする。最初の成功でbreakされるのでどちらかというと#detectに近い動作
  • ファイルを.checkoutディレクトリ内に移動する。失敗した場合はnext
  • 移動したファイルを読み込み、値を返す

ファイル一覧はファイル数によって処理時間は多少変化する(ファイルシステムにも依存)が、次のeachはほとんどの場合1回で成功するため、単純ながらキュー操作は軽い。

ファイル移動は複数のプロセスで行うことができないため、これが一種のトランザクションとして機能する。

この動作がファイルベースになっていることにより

  • Gem不要
  • サーバー不要
  • SSHFSなどを用いてクラスタに展開可能

といったメリットも獲得している。

基本的な使い方

オブジェクト化

非常に単純て゛

queue = OrbitalQueue.new("/home/foo/queue/something")

のようにディレクトリへのパスを用いて指定してインスタンスを作成する。

キューディレクトリと、.checkoutサブディレクトリが存在している必要があり、ない場合はOrbitalQueue::QueueUnexisting例外を発生させる。 第二引数としてtrueを指定した場合は自動作成される。

キューオブジェクトは再利用可能で、ワーカープロセスはプロセス自体を短命にせずに自分でインターバルを持つようにすればオブジェクト生成コストを踏み倒せる。

もっとも、OrbitalQueueの初期化は基本的にディレクトリパスをインスタンス変数に格納してファイルの存在チェックをしているだけなので踏み倒すのは容易。

エンキュー

キューに入れる操作は#pushで行う。

queue.push(data)

Orbitalデザインパターンのルールとして書き込む者は単一で、競合しない形で一方的に書き込むというのがある。 このため、一度書いた内容を上書きするようなことはできない。 ただ、この制限はキューとしては割と当たり前の仕様だ。

#pushTimeオブジェクトを生成するのと、SecureRandom.hexを呼び出すことから若干重い。 ただ、エンキュー操作はデータを用意することが前提であることを考えると、それがパフォーマンスの足を引っ張ることはほとんどないだろう。

もしそこまでパフォーマンスにシビアなのであれば、自力でキューを実装することをおすすめする。 実装は容易だ。

デキュー

#popによってキューからアイテムを取得できる。

item = queue.pop

#popOrbitalQueue::QueueObjectオブジェクトを返す。 OrbitalQueue::QueueObject#dataによって元データにアクセスすることができる。

#popではアイテムは.checkoutに移された状態になっているので、一種のトランザクション保護状態にある。 完了するには#completeを呼び出す。

item = queue.pop
data = item.data

# Do something

item.complete

Rubyらしくブロック内で処理したい場合は、#popにブロックを渡すこともできる。

queue.pop do |data|
  # Do something
end

ブロックが正常に終了した場合、自動的に#completeが呼ばれる。 このブロック処理中に何かをブロックしたりしないため、おそらくもっとも使いやすい方法。

もし完了を確認することなくキューアイテムを削除して良いのであれば、#pop!を使うとcompleteされた状態で値が返ってくる。

蛇足

ちなみに、どうしても冗長な作りにしないと気がすまない人のためにcomplete?というメソッドが追加されている。

obj.complete unless obj.complete?

このメソッドはファイルの存在を確認しているのではなく、単に#completeが呼ばれたことがあるかどうかで判定しているため、存在意義はほとんどない。 そもそも、ちゃんと安全な設計になっていればシステムコールがおかしな挙動を発生させない限り#completeに迷いはないはずで、システムコールがおかしなことをしたなら確認するのも意味がない。 #complete?の存在意義は、ほぼテストで使うためである。

中断耐性

ワーカーの副作用の内容にもよるが、キュー自体は#popしたあとキューアイテムを.checkoutに保持しているため、キューの状態を気にせずシャットダウンが可能。 この場合、.checkout内のファイルをキューディレクトリに戻すことでやり直すことができる。

このキューシステムは別にサーバーは持っていないので、自動的に.checkoutから取り戻したりはしない。

defer

0.0.3からはキューアイテムのdeferも可能になった。

queue.each_item do |item|
  begin
    #...
  rescue
    item.defer(Time.now + 120)
  end
end

ただし、Orbital Designは「ゆるやかな協調」を指向しているため、#deferは指定時間後に自動再試行をするという意味ではなく、指定時間を過ぎたらデキューの対象になるというだけである。 デキューの対象に戻すのも手動。

OrbitalQueue.resume("/home/foo/queue/something")

実はこのあたりは結構細かくいろいろあるのだけど、基本的には遅延させる時間と最大試行回数を指定すれば十分。

queue.each_item do |item|
  begin
    #...
  rescue
    # 5分後以降にリトライ、最大5回まで
    item.defer((Time.now + 300), 5)
  end
end

指数バックオフとかしたい場合はブロック付きで呼べる。

queue.each_item do |item|
  begin
    #...
  rescue
    item.defer do |retry_data|
      # リトライが5回まで既に行われていれば破棄
      item.destruct if retry_data[] > 5

      # 指数バックオフを設定
      retry[] = Time.now + 10 * 2 ** retry_data[]
    end
  end
end

#destructのかわりに#archiveを呼ぶことでログとして残してから抹消も可能。 #archiveはログ保存をしたあと#destructを呼ぶ。

#destructOrbitalQueue::ItemDestruct例外を発生させる。 #deferはこの例外を捕捉してnilを返す。このため、#deferのブロックの中で#destructを呼ぶことで#deferの処理を破棄して終了することができる。

動作概要

キューディレクトリは.checkout, .defer, .retry, .archiveのディレクトリを行う持つ。 OrbitalQueueはファイルベースであり、これらにファイルを配置する。

OrbitalQueue#pushを行うと、キューディレクトリにキューファイルを配置する。 キューファイルはpushしたオブジェクトをMarshalでシリアライズしたものである。

キューファイルのファイル名は${unixtime}-$$-${randomhex}.marshal

OrbitalQueue#popOrbitalQueue#pop!でキューを取得すると、.checkoutにキューファイルを移動する。 移動は複数回呼ぶと確実に失敗するため、移動に成功したことがそのプロセスが専有していることを保証することになる。

OrbitalQueue::QueueObject#completeOrbitalQueue#completeのラッパーである。 #completeが呼ばれると.checkoutからファイルが削除される。

#defer.checkoutから.deferにキューファイルを移動するとともに、.retryにキューファイルと同じファイル名のリトライ情報を持つMarshalシリアライズファイルを配置する。

#destructはキューディレクトリ以下のすべての該当するキューファイル名を持つファイルを削除して例外を発生させる。 #archive#destructを呼ぶ前に.archive以下にファイルを生成してから#destructを呼ぶ。 .archive以下に配置されるファイルはファイル名の体系が異なるため、#destructでの削除対象にならない。

実用例

例えばウェブアプリで非同期処理をするためにジョブキューを使うとする。

def fooapi data
  @queue.push data

  204
end

非同期APIなので、キューに追加したことを以て正常応答する。

対してワーカーがこれを拾って処理を行う。

queue = OrbitalQueue.new("/var/run/webapp/fooapi")

queue.each do |data|
  #...
end

これだけの話だが、ここでOrbital Designのパワーを発揮することができる。 この同じやり方で2種類のメリットを選択できるのだ。

ひとつは、この処理が重い場合。

受け取ったリクエストに対して処理が重い場合は、リクエストを終端させるのに時間を要することになる。 これは大規模コネクションに対しても弱くなってしまうため、非同期で切り離すことで素早くレスポンスしてしまい、大規模コネクションにまつわる問題を軽減することを狙うという手法がある。

ここでジョブキューを介しているため、非対称ワーカーにすることが可能だ。 重い処理を思うためのワーカーを多数のプロセスで並列化することができ、一方で重い処理を切り離したウェブアプリのおかげでウェブサーバーワーカーを大量に用意する必要はない。

これは処理が計算的に重いだけでなく、不安定なバックエンドを持っている場合にも有効だ。

不安定なバックエンドに対してリクエストを投げるケースではウェブAPIはリトライ中レスポンスを保留するような設計にはしづらい。 そのような場合はだいたいpub/subなどを使って処理を分離するが、その機能をこのジョブキューに担わせることができる。

もうひとつは、シングルトンキューの実装。

静的ファイルを生成する場合など、競合排除の難しい処理が必要な部分だけをジョブキュー経由に切り出し、そこを単一プロセスにすることで競合を気にすることなく処理できる。 TCPサーバーで実装する場合などと比べると、非同期であるため効率がよくノンブロッキングになるため扱いやすい。