Chienomi

シェルスクリプトで並列処理

zsh

なんだか検索件数が多いので、シェルスクリプトによるコンカレンシーのお話をしよう。

ただし、bad design (変数を書き換えるとか、相互にやりとりするとか)は除外する。

また、Zshを前提とする。

投げっぱなし

まず基本。投げっぱなしはとても簡単。

subflow() {
    somecommand
}

repeat 3
do
  ( subflow ) &
done

シェルスクリプトではジョブコントロールは無効になっているので、SIGHUPの送信はなされないので、さっさと終了してしまっても大丈夫。

処理の終了を待ちたい場合はwait

subflow() {
    somecommand
}

repeat 3
do
  ( subflow ) &
done

wait

flock

flockを使う方法は簡単でシンプル。 ロックファイルを使ってファイルデスクリプタを開きっぱなしにし、そのファイルデスクリプタを指定してロックする。

まずファイルデスクリプタ9.lockファイルをライトモードでオープンする場合

exec 9> .lock

そしてファイルデスクリプタ9を閉じる場合

exec 9>&-

これを利用するとこんな感じ。

subflow() {
    exec 9>| .lock
    flock -x 9
    get_some_param
    exec 9>&-
    somecommands
}

repeat 3
do
  ( subflow ) &
done

wait
rm .lock

ロックしている間に共有しているリソースの読み込み/変更を行い、ファイルデスクリプタを閉じる。

リソースを読むより簡単な方法は、ひとつのストリームを共有したファイルデスクリプタとして開き、 ロックを中に読むことである。

subflow() {
    while true
    do
        exec 9>| .lock
        flock -x 9
        read item
        if [[ -z "$item" ]]
        then
            exit
        fi

        exec 9>&-
        somecommands
    done
}

(
    repeat 3
    do
        ( subflow ) &
    done
) < queue

wait
rm .lock

ワーカーを生成するサブシェルの標準入力はqueueファイルにリダイレクトされている。 そのため、ファイルデスクリプタ0queueファイルなのだが、そのサブシェルの子プロセスであるワーカープロセスはリダイレクトしていないため、このファイルデスクリプタが共有される。 結果、全てのワーカーはqueueファイルを標準入力とするのだが、ストリーム自体を共有しているため、どのワーカーが一行読んだとしてもストリームの位置が変更され、次に読み込む位置は他のワーカーにとっても変更される。実際

(
    typeset -i x=1
    repeat 3
    do
        read
        ( print "WORKER $x: $REPLY")
        (( x++ ))
    done
) <<EOF
foo
bar
baz
EOF
WORKER 1: foo
WORKER 2: bar
WORKER 3: baz

となる。

producer-consumer キュー

もっと凝ったことがしたいのであればUNIXドメインソケットを使ってproducer部分をシングルスレッド化することができる。

# consumerの定義
zmodload 'zsh/net/socket'
subflow() {
    while true
    do
        zsocket .queue.sock
        typeset -i fd=$REPLY
        item=$(cat <&$fd)
        if [[ -z "$item" ]]
        then
            exit
        fi
        somecommands
    done
}

# producerを作る
(
    zsocket -l .queue.sock
    typeset -i sock_fd=$REPLY
    while zsocket -a $sock_fd
    do
        fd=$REPLY
        read -e >&$fd
        exec $fd>&-
    done
) < queue
integer producer=$!

# consumerを作る
typeset -a consumers=()
repeat 3
do
  ( subflow ) &
  consumers+=($!)
done

# consumerを全部待つ
wait $consumers

# producerを終了してソケットを消す
kill $producer
rm .queue.sock

zsocket -lのタイミングで接続を受け付けているのだが、zsocket -lしていないタイミングでは接続しようとするプロセスをブロックするため、producer側の処理は直列に行われる。

双方向性があるときはproducerと直接やりとりできるのはメリット。

Orbit designの場合

私が採用しているOrbit designはレギュレーターはZshスクリプトなので、基本的にこのような並列化手順をとっている。 とはいえ、ものによっては直列(serial)になっていたりする。

queue_generator > /tmp/orbit.type1.$$.queue

while read
do
  orbit.type1.script.rb "$REPLY"
done < /tmp/orbit.type1.$$.queue

rm /tmp/orbit.type1.$$.queue

ただ、並列化されているものが多い。ワーカープロセスは最も多いもので5。

worker() {
    if [[ -z "$1" ]]
    then
        exit
    fi
    orbit.type2.script.rb "$1"
}

queue_generator > /tmp/orbit.type2.$$.queue
typeset -a workers

(
    repeat 5
    do
        ( worker "$REPLY" ) &
        workers+=($!)
    done
) < /tmp/orbit.type2.$$.queue

wait $workers
rm /tmp/orbit.type2.$$.queue

基本的にワーカースクリプトが受け取るのは処理対象ID(ほとんどの場合ファイルパス)だけである。 それ以外の情報はスクリプト側で生成するか、別途取得するかする。