Spread Toolkit でセッションレプリケーション

ログ以外に、何か面白い使い方はないかなあと思って。

複数アプリケーションサーバ間でセッションデータを共有したい場合、まあ普通は DB に入れるか Memcached に入れるかするわけですが、これを Spread を使って出来ないか。

要は、Tomcat やらのセッションレプリケーションを真似したいと。

 +----- host1 --------------+               +----- host2 --------------+               
 |  +------------+          |               |  +----------------+      | 
 |  | App Server |          |               |  |     App Server |      |
 |  +------------+          |               |  +----------------+      |  
 |   ↑        ↓           |               |     ↓           ↑      |  
 |   ↑    [ spread ] <-------- message -------> [ spread ]    ↑      |
 |   ↑        ↓           |               |     ↓           ↑      |
 |   ↑   [ writer daemon ] |               |[ writer daemon ] ↑      |
 |   ↑        ↓           |               |     ↓           ↑      |
 |  [   storage   ]         |               |[      storage     ]      |
 +--------------------------+               +--------------------------+

図にするとこんな感じで、下向きの矢印が書き込み、上向きの矢印が読み出し。

  • セッションデータの書き込みはアプリケーションから spread に渡され、複数ホスト間の spread に通知される
  • writer daemon は書き込みメッセージを受けて storage に書き込み
  • セッションデータの読み出しは spread を介さず、直接ローカルのストレージから読み出す

書き込みに遅延があるとどうなるんだ、とか、まあそういう問題はあるけれども。

ということで実証実験として、Catalystプラグインとして Catalyst::Plugin::Session::Store::Spread てのを書いてみた。

書き込み系のメソッドだけ実装しているので、読み込みには他の C::P::Session::Store::* を組み合わせて使う。そのため、プラグインの Load 順は Store::Spread を先にする必要がある。

package Catalyst::Plugin::Session::Store::Spread;

use warnings;
use strict;
use base qw/Class::Data::Inheritable
            Class::Accessor::Fast
            Catalyst::Plugin::Session::Store/;
use NEXT;
use Catalyst::Utils ();
use Spread::Session ();
use Storable qw/ nfreeze thaw /;

eval { Log::Channel::disable('Spread::Session') };

__PACKAGE__->mk_classdata(qw/_session_spread_obj/);

our $VERSION = '0.1';

sub store_session_data {
    my ( $c, $sid, $data ) = @_;
    $c->_session_spread_obj->publish(
        $c->config->{session}->{spread_group} => $sid. "\n". nfreeze( \$data )
    );
}

sub delete_session_data {
    my ( $c, $sid ) = @_;
    $c->_session_spread_obj->publish(
        $c->config->{session}->{spread_group} => $sid. "\n"
    );
}

sub setup_session {
    my $c = shift;
    $c->NEXT::setup_session(@_);
    my $config = $c->config->{session};
    my $s = Spread::Session->new(
        spread_name => $config->{spread_name} || '4803',
    );
    $c->_session_spread_obj( $s );
    return;
}
1;

writer daemon は組み合わせる Session::Store::* によって実際の書き込み対象が切り替えられるようにするといいのだが、とりあえず面倒なので FastMmap 限定。YAMLCatalyst のを読めるようにする。

#!/usr/bin/perl
# session_writer.pl

use strict;
use Spread::Session;
use Event;
use Data::Dumper;
use Storable qw/ nfreeze thaw /;
use YAML::Syck;
use Cache::FastMmap;

eval { Log::Channel::disable('Spread::Session') };

my $cfg = LoadFile( $ARGV[0] )->{session};
print Dump $cfg;

my $storage =
    Cache::FastMmap->new(
        share_file  => $cfg->{storage},
        ( map { $_ => $cfg->{$_} }
              grep { exists $cfg->{$_} } qw/init_file cache_size/ ),
    );

my $session = Spread::Session->new(
    spread_name      => $cfg->{spread_name} || '4803',
    MESSAGE_CALLBACK => \&write_session,
);

$session->subscribe( $cfg->{spread_group} );

Event->io(
    fd => $session->{MAILBOX},
    cb => sub { $session->receive(0) },
);
Event::loop;

sub write_session {
    my $msg = shift;
    my ( $sid, $body ) = split /\n/, $msg->{BODY}, 2;
    my $data = defined $body ? eval { ${ thaw $body } } : undef;
    print "got sid=$sid\n";
    print "    data=". Dumper( $data ), "\n";

    if ( !defined $data ) {
        $storage->remove( $sid );
    }
    else {
        $storage->set( $sid, \$data );
    }
}

app.yml で spread 関係の定義もする。

---
name: App
session:
  spread_group: test
  spread_name: 4803@localhost
  expires: 86400
  storage: /var/tmp/app_session

これでとりあえず。

 $ sessoin_writer.pl /path/to/app.yml

として起動して、Catalyst アプリからセッションを更新すると、一応ちゃんと動いているみたい。

今後の課題とか。

  • アプリケーションサーバを追加したり、落ちたのが復活したりした際には自動で storage を同期したい
  • 普通の構成よりも daemon が増えるので、安定性がどうかな
  • 書き込み遅延はどの程度だろう?