Queue::Q4Pg::Lite を書いた

PostgreSQL で簡易に MQ - Mi manca qualche giovedi`? を読んで、こりゃ素晴らしいと思ったので Perl モジュール化しました。

simple message queue using PostgreSQL.

http://github.com/fujiwara/perl-queue-q4pg-lite/tree/master

割と簡易な Message Queue ということで、Lite って名前に。pg_advisory_lock() を使用してるので、PostgreSQL-8.2 以降でないと動きません。
インターフェースは Queue::Q4M とほとんど同じ……というか、コード自体を半分ぐらい Queue::Q4M から頂いています。

use Queue::Q4Pg::Lite;

my $q = Queue::Q4Pg::Lite->connect(
    connect_info => [
        'dbi:Pg:dbname=mydb',
        $username,
        $password,
    ],
);

for (1..10) {
    $q->insert($table, { col1 => "foo", col2 => "bar", col3 => "baz" });
}
 
while ($q->next($table)) {
    my $cols = $q->fetch_hashref()
    print "col1 = $cols->{col1}, col2 = $cols->{col2}, col3 = $cols->{col3}\n";
    $q->ack;
}

Queue::Q4M と違うのは、以下のようなところ。

  • next() で取得できる行がない場合は、内部で $q->interval (default: 5) 秒 sleep して再度 SELECT している
  • fetch_array, fetch_arrayref がない。fetch_hashref のみ
  • 取り出した行の処理が完了したら $q->ack を実行する必要がある (実行しないと行が消えない)

依存モジュールは AnyMoose, DBI, DBD::Pg, SQL::Abstract ぐらいです。

insert() は特にこのモジュールを使う必要はなくて、普通に INSERT して構いません。なので Queue に投入するほう (Web App とか) は DBIC でも CDBI でも素の DBI でも、なんでもお好きな手段でどうぞ。

テーブルに好きなカラムを作れて、その値を条件にして SELECT できるので「優先度が 10 以下のものを、queue 投入日時の早い順に取得」というようなことも可能です。

CREATE TABLE mq (
    id serial primary key,
    message text,
    priority int,
    created_on timestamp default now()
);
while ( $q->next("mq", { priority => { "<=", 10 } }) ) {
}

[追記]
ORDER BY を付けると、実際に取得しない行についても lock を掛けてしまい、困ったことになります。advisory_lock の評価をすべて試みて成功したものを並べ替えて、最初の 1行のみ取得するので。order は指定できないようにしました (version 0.03)。

CPAN にも upload したので、そのうち取得できるようになると思います。