Orbitalデザインパターン用のキュー実装解説
開発::util
序
Orbital designは私が提唱するマルチプロセス向けのデザインパターンであり、
- 非常に単純で守りやすい規則
- 必然的にプロセスあたりが非常にコンパクトになる
- スケーラビリティが極めて高い
- プリミティブな機能で実現可能
といった特徴がある。
基本的に実装は非常に容易であるため、わざわざライブラリを導入するほどではないことがほとんどだが、小さなワーカーにキュー機能を導入書くのは面倒でもあるため、ライブラリ自体はあったほうが良い。 結果、小さなライブラリを自分で書くのがスタンダードなスタイルになる。
Orbital QueueはそんなOrbitalデザインパターンを実践するRubyのキューライブラリである。
gem install orbitalqueue
や
bundle add orbitalqueue
として導入可能。
Orbital Queue概要
特徴
Orbital QueueはOrbitalデザインパターンを意識して書かれているが、単純にRPC手段のひとつとして簡単に利用できる。 その特徴としては以下の通り
- ファイルベース
- Gemに依存していない
- 軽量
- キューアイテムが増えても時間はほぼ変わらない
- 実質行が63行しかない (0.0.1時点)
ファイルベースの動作
キュー取得の仕組みはかなり単純
- ディレクトリ内のファイルを一覧する
- ファイルリストに対して
#each
する。最初の成功でbreak
されるのでどちらかというと#detect
に近い動作 - ファイルを
.checkout
ディレクトリ内に移動する。失敗した場合はnext
- 移動したファイルを読み込み、値を返す
ファイル一覧はファイル数によって処理時間は多少変化する(ファイルシステムにも依存)が、次のeach
はほとんどの場合1回で成功するため、単純ながらキュー操作は軽い。
ファイル移動は複数のプロセスで行うことができないため、これが一種のトランザクションとして機能する。
この動作がファイルベースになっていることにより
- Gem不要
- サーバー不要
- SSHFSなどを用いてクラスタに展開可能
といったメリットも獲得している。
基本的な使い方
オブジェクト化
非常に単純て゛
= OrbitalQueue.new("/home/foo/queue/something") queue
のようにディレクトリへのパスを用いて指定してインスタンスを作成する。
キューディレクトリと、.checkout
サブディレクトリが存在している必要があり、ない場合はOrbitalQueue::QueueUnexisting
例外を発生させる。
第二引数としてtrue
を指定した場合は自動作成される。
キューオブジェクトは再利用可能で、ワーカープロセスはプロセス自体を短命にせずに自分でインターバルを持つようにすればオブジェクト生成コストを踏み倒せる。
もっとも、OrbitalQueue
の初期化は基本的にディレクトリパスをインスタンス変数に格納してファイルの存在チェックをしているだけなので踏み倒すのは容易。
エンキュー
キューに入れる操作は#push
で行う。
.push(data) queue
Orbitalデザインパターンのルールとして書き込む者は単一で、競合しない形で一方的に書き込むというのがある。 このため、一度書いた内容を上書きするようなことはできない。 ただ、この制限はキューとしては割と当たり前の仕様だ。
#push
はTime
オブジェクトを生成するのと、SecureRandom.hex
を呼び出すことから若干重い。
ただ、エンキュー操作はデータを用意することが前提であることを考えると、それがパフォーマンスの足を引っ張ることはほとんどないだろう。
もしそこまでパフォーマンスにシビアなのであれば、自力でキューを実装することをおすすめする。 実装は容易だ。
デキュー
#pop
によってキューからアイテムを取得できる。
= queue.pop item
#pop
はOrbitalQueue::QueueObject
オブジェクトを返す。
OrbitalQueue::QueueObject#data
によって元データにアクセスすることができる。
#pop
ではアイテムは.checkout
に移された状態になっているので、一種のトランザクション保護状態にある。
完了するには#complete
を呼び出す。
= queue.pop
item = item.data
data
# Do something
.complete item
Rubyらしくブロック内で処理したい場合は、#pop
にブロックを渡すこともできる。
.pop do |data|
queue# Do something
end
ブロックが正常に終了した場合、自動的に#complete
が呼ばれる。
このブロック処理中に何かをブロックしたりしないため、おそらくもっとも使いやすい方法。
もし完了を確認することなくキューアイテムを削除して良いのであれば、#pop!
を使うとcompleteされた状態で値が返ってくる。
蛇足
ちなみに、どうしても冗長な作りにしないと気がすまない人のためにcomplete?
というメソッドが追加されている。
.complete unless obj.complete? obj
このメソッドはファイルの存在を確認しているのではなく、単に#complete
が呼ばれたことがあるかどうかで判定しているため、存在意義はほとんどない。
そもそも、ちゃんと安全な設計になっていればシステムコールがおかしな挙動を発生させない限り#complete
に迷いはないはずで、システムコールがおかしなことをしたなら確認するのも意味がない。
#complete?
の存在意義は、ほぼテストで使うためである。
中断耐性
ワーカーの副作用の内容にもよるが、キュー自体は#pop
したあとキューアイテムを.checkout
に保持しているため、キューの状態を気にせずシャットダウンが可能。
この場合、.checkout
内のファイルをキューディレクトリに戻すことでやり直すことができる。
このキューシステムは別にサーバーは持っていないので、自動的に.checkout
から取り戻したりはしない。
defer
0.0.3からはキューアイテムのdeferも可能になった。
.each_item do |item|
queuebegin
#...
rescue
.defer(Time.now + 120)
itemend
end
ただし、Orbital Designは「ゆるやかな協調」を指向しているため、#defer
は指定時間後に自動再試行をするという意味ではなく、指定時間を過ぎたらデキューの対象になるというだけである。
デキューの対象に戻すのも手動。
OrbitalQueue.resume("/home/foo/queue/something")
実はこのあたりは結構細かくいろいろあるのだけど、基本的には遅延させる時間と最大試行回数を指定すれば十分。
.each_item do |item|
queuebegin
#...
rescue
# 5分後以降にリトライ、最大5回まで
.defer((Time.now + 300), 5)
itemend
end
指数バックオフとかしたい場合はブロック付きで呼べる。
.each_item do |item|
queuebegin
#...
rescue
.defer do |retry_data|
item# リトライが5回まで既に行われていれば破棄
.destruct if retry_data[:count] > 5
item
# 指数バックオフを設定
retry[:until] = Time.now + 10 * 2 ** retry_data[:count]
end
end
end
#destruct
のかわりに#archive
を呼ぶことでログとして残してから抹消も可能。
#archive
はログ保存をしたあと#destruct
を呼ぶ。
#destruct
はOrbitalQueue::ItemDestruct
例外を発生させる。
#defer
はこの例外を捕捉してnil
を返す。このため、#defer
のブロックの中で#destruct
を呼ぶことで#defer
の処理を破棄して終了することができる。
動作概要
キューディレクトリは.checkout
, .defer
, .retry
, .archive
のディレクトリを行う持つ。
OrbitalQueueはファイルベースであり、これらにファイルを配置する。
OrbitalQueue#push
を行うと、キューディレクトリにキューファイルを配置する。
キューファイルはpush
したオブジェクトをMarshal
でシリアライズしたものである。
キューファイルのファイル名は${unixtime}-$$-${randomhex}.marshal
。
OrbitalQueue#pop
やOrbitalQueue#pop!
でキューを取得すると、.checkout
にキューファイルを移動する。
移動は複数回呼ぶと確実に失敗するため、移動に成功したことがそのプロセスが専有していることを保証することになる。
OrbitalQueue::QueueObject#complete
はOrbitalQueue#complete
のラッパーである。
#complete
が呼ばれると.checkout
からファイルが削除される。
#defer
は.checkout
から.defer
にキューファイルを移動するとともに、.retry
にキューファイルと同じファイル名のリトライ情報を持つMarshalシリアライズファイルを配置する。
#destruct
はキューディレクトリ以下のすべての該当するキューファイル名を持つファイルを削除して例外を発生させる。
#archive
は#destruct
を呼ぶ前に.archive
以下にファイルを生成してから#destruct
を呼ぶ。
.archive
以下に配置されるファイルはファイル名の体系が異なるため、#destruct
での削除対象にならない。
実用例
例えばウェブアプリで非同期処理をするためにジョブキューを使うとする。
def fooapi data
@queue.push data
204
end
非同期APIなので、キューに追加したことを以て正常応答する。
対してワーカーがこれを拾って処理を行う。
= OrbitalQueue.new("/var/run/webapp/fooapi")
queue
.each do |data|
queue#...
end
これだけの話だが、ここでOrbital Designのパワーを発揮することができる。 この同じやり方で2種類のメリットを選択できるのだ。
ひとつは、この処理が重い場合。
受け取ったリクエストに対して処理が重い場合は、リクエストを終端させるのに時間を要することになる。 これは大規模コネクションに対しても弱くなってしまうため、非同期で切り離すことで素早くレスポンスしてしまい、大規模コネクションにまつわる問題を軽減することを狙うという手法がある。
ここでジョブキューを介しているため、非対称ワーカーにすることが可能だ。 重い処理を思うためのワーカーを多数のプロセスで並列化することができ、一方で重い処理を切り離したウェブアプリのおかげでウェブサーバーワーカーを大量に用意する必要はない。
これは処理が計算的に重いだけでなく、不安定なバックエンドを持っている場合にも有効だ。
不安定なバックエンドに対してリクエストを投げるケースではウェブAPIはリトライ中レスポンスを保留するような設計にはしづらい。 そのような場合はだいたいpub/subなどを使って処理を分離するが、その機能をこのジョブキューに担わせることができる。
もうひとつは、シングルトンキューの実装。
静的ファイルを生成する場合など、競合排除の難しい処理が必要な部分だけをジョブキュー経由に切り出し、そこを単一プロセスにすることで競合を気にすることなく処理できる。 TCPサーバーで実装する場合などと比べると、非同期であるため効率がよくノンブロッキングになるため扱いやすい。