PostgreSQL で簡易に MQ - Mi manca qualche giovedi`? を読んで、こりゃ素晴らしいと思ったので Perl モジュール化しました。
simple message queue using PostgreSQL.
http://github.com/fujiwara/perl-queue-q4pg-lite/tree/master
割と簡易な Message Queue ということで、Lite って名前に。pg_advisory_lock() を使用してるので、PostgreSQL-8.2 以降でないと動きません。
インターフェースは Queue::Q4M とほとんど同じ……というか、コード自体を半分ぐらい Queue::Q4M から頂いています。
use Queue::Q4Pg::Lite; my $q = Queue::Q4Pg::Lite->connect( connect_info => [ 'dbi:Pg:dbname=mydb', $username, $password, ], ); for (1..10) { $q->insert($table, { col1 => "foo", col2 => "bar", col3 => "baz" }); } while ($q->next($table)) { my $cols = $q->fetch_hashref() print "col1 = $cols->{col1}, col2 = $cols->{col2}, col3 = $cols->{col3}\n"; $q->ack; }
Queue::Q4M と違うのは、以下のようなところ。
- next() で取得できる行がない場合は、内部で $q->interval (default: 5) 秒 sleep して再度 SELECT している
- fetch_array, fetch_arrayref がない。fetch_hashref のみ
- 取り出した行の処理が完了したら $q->ack を実行する必要がある (実行しないと行が消えない)
依存モジュールは AnyMoose, DBI, DBD::Pg, SQL::Abstract ぐらいです。
insert() は特にこのモジュールを使う必要はなくて、普通に INSERT して構いません。なので Queue に投入するほう (Web App とか) は DBIC でも CDBI でも素の DBI でも、なんでもお好きな手段でどうぞ。
テーブルに好きなカラムを作れて、その値を条件にして SELECT できるので「優先度が 10 以下のものを、queue 投入日時の早い順に取得」というようなことも可能です。
CREATE TABLE mq ( id serial primary key, message text, priority int, created_on timestamp default now() );
while ( $q->next("mq", { priority => { "<=", 10 } }) ) { }
[追記]
ORDER BY を付けると、実際に取得しない行についても lock を掛けてしまい、困ったことになります。advisory_lock の評価をすべて試みて成功したものを並べ替えて、最初の 1行のみ取得するので。order は指定できないようにしました (version 0.03)。
CPAN にも upload したので、そのうち取得できるようになると思います。