Coro と Go で並列実行数の制御

http://d.hatena.ne.jp/tokuhirom/20090710/1247196134 この Coro の例を Go でやってみます。

まず特定数 (ここでは 3) の worker thread を立ち上げておいて、そこにメッセージを送って仕事させるモデル。メッセージのやりとりに Coro::Channel を使います。

use Coro;
use Coro::Channel;
use Coro::Timer;

my $ch = Coro::Channel->new;

sub worker {
    while (1) {
        my $url = $ch->get;
        printf("worker(%d): %s\n", $_[0], $url);
        Coro::Timer::sleep(1);
    }
}

my @coros = (
    async { worker(1) },
    async { worker(2) },
    async { worker(3) },
);
for my $i ( 0 .. 9 ) {
    $ch->put( sprintf("http://www%d.example.com/", $i) );
}
$_->join for @coros;

Go で書くとこうなる。ほとんど同じですね!

package main
import "fmt"
import "time"

var ch = make(chan string);

func Worker(num int) {
    for {  // 無限ループして channel からの入力を待ち受ける
        url := <- ch;
        fmt.Printf("worker(%d): %s\n", num, url);
        time.Sleep(int64(1) * 1e9);
    }
}

func main() {
    go Worker(1);
    go Worker(2);
    go Worker(3);
    for i := 0; i < 10; i++ {
        ch <- fmt.Sprintf("http://www%d.example.com/", i);
    }
    for { time.Sleep(int64(1) * 1e9) }
}

Go で main() の最後、loop して Sleep しているのは、こうしないと Worker の実行を待たないで終了してしまうため。$coro->join に相当するのはどうすればいいのかな。

次に、とりあえず worker thread を立ち上げまくっておいて、セマフォで同時実行数を制御するモデル。Coro::Semaphore を使います。

use Coro;
use Coro::Timer;
use Coro::Semaphore;

my $sem = Coro::Semaphore->new(3);

sub worker {
    my ($n, $url) = @_;
    my $guard = $sem->guard;
    printf("worker(%d): %s\n", $n, $url);
    Coro::Timer::sleep(1);
}

my @coros;
for my $i ( 0 .. 9 ) {
    my $url = sprintf("http://www%d.exmaple.com/", $i);
    push @coros, async { worker($i, $url) };
}
$_->join for @coros;

Go では、make(chan TYPE, X) とすると X 個のメッセージしか入らない (それ以上入れようとするとブロックする) channel が作れるのでそれがセマフォの代わりになる。参考 http://golang.org/doc/effective_go.html#channels

これも、Coro で書いたのとほとんど同じ構造のコードになります。

package main
import "fmt"
import "time"

var sem = make(chan int, 3);

func Worker(n int, url string) {
    sem <- 1;    // セマフォに値をセット(できなければblock)
    fmt.Printf("worker(%d): %s\n", n, url);
    time.Sleep( int64(1) * 1e9 );
    <-sem;       // 完了したので消す
}

func main() {
    for i := 0; i < 10; i++ {
        url := fmt.Sprintf("http://www%d.exmaple.com/", i);
        go Worker(i, url);
    }
    for { time.Sleep( int64(1) * 1e9 ) }
}

結論。Goroutine と Coro はよく似ている。
# はてな記法シンタックスハイライト、早く .go に対応しないかなあ……