2008年6月アーカイブ

Kazuho@Cybozu Labs: フレンド・タイムライン処理の原理と実践

奥さん本人の中でブームが去った感もあるRDBMSで実現するフレンド・タイムライン処理ですが、そういえばDBICで使ってみたのを思い出したので晒してみます。

要はDBICからストアドプロシージャの叩き方を知りたかっただけなんですけどね。

パッケージ名はWebインターフェースはどーせCatalystで作るでしょってことでCatalyst + Twitter = Catatter…って安直なネーミングですね。

記事中ではプッシュ型とプル型が紹介されているのですが、データ量やfollow, removeの際のコストとか考えたらプル型の方が好みかなってことでプル型を採用してみました。

また、基本的にスキーマやストアドプロシージャはオリジナルと同じですが、DBICでPKをマルチカラムにするとめんどっちーのでサロゲートキーを追加したりしてます。

CREATE TABLE user (
    id int(10) unsigned NOT NULL AUTO_INCREMENT,
    screen_name varchar(255) NOT NULL,
    PRIMARY KEY (id)
) ENGINE=innodb;

CREATE TABLE message (
    id int(10) unsigned NOT NULL AUTO_INCREMENT,
    user_id int (10) unsigned NOT NULL,
    body varchar(255) NOT NULL,
    PRIMARY KEY (id),
    KEY userid_id_id(user_id, id)
) ENGINE=innodb;

CREATE TABLE follower (
    id int(10) unsigned NOT NULL AUTO_INCREMENT,
    user_id int (10) unsigned NOT NULL,
    follower_id int (10) unsigned NOT NULL,
    PRIMARY KEY (id),
    UNIQUE (user_id, follower_id),
    KEY user_id_follower_id (user_id, follower_id),
    KEY follower_id(follower_id)
) ENGINE=innodb;

DELIMITER //
CREATE PROCEDURE fetch_timeline_ids (IN uid int unsigned)
BEGIN
  DECLARE fid,maxid int unsigned;
  DECLARE done int DEFAULT 0;
  DECLARE fid_maxid_cur CURSOR FOR
    SELECT follower_id,(
      SELECT id FROM message WHERE user_id=follower.follower_id
      ORDER BY user_id DESC,id DESC LIMIT 1) AS max_id
    FROM follower WHERE user_id=uid ORDER BY max_id DESC LIMIT 20;
  DECLARE CONTINUE HANDLER FOR NOT FOUND SET done=1;
  CREATE TEMPORARY TABLE IF NOT EXISTS fetch_timeline_tt (
    id int unsigned NOT NULL PRIMARY KEY
  ) ENGINE=heap DEFAULT CHARSET=utf8;
  DELETE FROM fetch_timeline_tt;
  OPEN fid_maxid_cur;
  REPEAT
    FETCH fid_maxid_cur INTO fid,maxid;
    IF NOT done THEN
      INSERT INTO fetch_timeline_tt
        SELECT id FROM message WHERE user_id=fid
        ORDER BY id DESC LIMIT 20;
    END IF;
  UNTIL done END REPEAT;
  CLOSE fid_maxid_cur;
END;

CREATE PROCEDURE build_max_ids_of_followers (IN uid int unsigned,IN max_id int unsigned)
BEGIN
  DECLARE fid int unsigned;
  DECLARE done int DEFAULT 0;
  DECLARE fcur CURSOR FOR
    SELECT follower_id FROM follower WHERE user_id=uid;
  DECLARE CONTINUE HANDLER FOR NOT FOUND SET done=1;
  CREATE TEMPORARY TABLE IF NOT EXISTS max_ids_of_followers (
    user_id int unsigned NOT NULL,
    max_id int unsigned NOT NULL
  ) ENGINE=heap DEFAULT CHARSET=utf8;
  DELETE FROM max_ids_of_followers;
  OPEN fcur;
  REPEAT
    FETCH fcur INTO fid;
    IF NOT done THEN
      INSERT INTO max_ids_of_followers SELECT fid,max(id) AS m FROM message
        WHERE user_id=fid AND id<max_id HAVING NOT ISNULL(m);
    END IF;
  UNTIL done END REPEAT;
  CLOSE fcur;
END;

CREATE PROCEDURE fetch_timeline_ids2 (IN uid int unsigned,IN maxid int unsigned)
BEGIN
  DECLARE fid,fmaxid int unsigned;
  DECLARE done int DEFAULT 0;
  DECLARE fid_maxid_cur CURSOR FOR
    SELECT user_id,max_id FROM max_ids_of_followers
    ORDER BY max_id DESC LIMIT 20;
  DECLARE CONTINUE HANDLER FOR NOT FOUND SET done=1;
  CREATE TEMPORARY TABLE IF NOT EXISTS fetch_timeline_tt (
    id int unsigned NOT NULL PRIMARY KEY
  ) ENGINE=heap DEFAULT CHARSET=utf8;
  DELETE FROM fetch_timeline_tt;
  CALL build_max_ids_of_followers(uid,maxid);
  OPEN fid_maxid_cur;
  REPEAT
    FETCH fid_maxid_cur INTO fid,fmaxid;
    IF NOT done THEN
      INSERT INTO fetch_timeline_tt
        SELECT id FROM message
        WHERE user_id=fid AND id<=fmaxid
        ORDER BY id DESC LIMIT 20;
    END IF;
  UNTIL done END REPEAT;
  CLOSE fid_maxid_cur;
END;
//
Catatter::Schema
package Catatter::Schema;

use strict;
use warnings;

use base 'DBIx::Class::Schema';

__PACKAGE__->load_classes;

1;
Catatter::Schema::User
package Catatter::Schema::User;

use strict;
use warnings;

use base 'DBIx::Class';

__PACKAGE__->load_components(
    "Core",
);
__PACKAGE__->table("user");
__PACKAGE__->add_columns(
    "id",
    {   data_type     => "INT",
        default_value => undef,
        is_nullable   => 0,
        size          => 10
    },
    "screen_name",
    {   data_type     => "VARCHAR",
        default_value => "",
        is_nullable   => 0,
        size          => 255
    },
);
__PACKAGE__->set_primary_key("id");
__PACKAGE__->has_many(
    messages => 'Catatter::Schema::Message',
    { 'foreign.user_id' => 'self.id' }
);

sub following_timeline {
    my $self   = shift;
    my $max_id = shift;
    my $cond   = shift;
    my $attrs  = shift || {};

    my $source  = $self->result_source;
    my $storage = $source->storage;
    $storage->dbh_do(
        sub {
            my ( $storage, $dbh, @cols ) = @_;
            if ($max_id) {
                my $sth = $dbh->prepare('Call fetch_timeline_ids2(?, ?)');
                $sth->execute( $self->id, $max_id );
            } else {
                my $sth = $dbh->prepare('Call fetch_timeline_ids(?)');
                $sth->execute( $self->id );
            }
        }
    );

    $source->schema->resultset('Message')->search(
        {%$cond},
        {   join => [qw/messages/],
            %$attrs
        }
    );
}
1;
Catatter::Schema::Message
package Catatter::Schema::Message;

use strict;
use warnings;

use base 'DBIx::Class';

__PACKAGE__->load_components(
    "Core",
);
__PACKAGE__->table("message");
__PACKAGE__->add_columns(
  "id",
  { data_type => "INT", default_value => undef, is_nullable => 0, size => 10 },
  "user_id",
  { data_type => "INT", default_value => "", is_nullable => 0, size => 10 },
  "body",
  { data_type => "VARCHAR", default_value => "", is_nullable => 0, size => 255 },
);
__PACKAGE__->set_primary_key("id");
__PACKAGE__->belongs_to(user => 'Catatter::Schema::User', {'foreign.id' => 'self.user_id'});
__PACKAGE__->belongs_to(timeline => 'Catatter::Schema::FetchTimelineTt', {'foreign.id' => 'self.id'});

1;
Catatter::Schema::Follower
package Catatter::Schema::Follower;

use strict;
use warnings;

use base 'DBIx::Class';

__PACKAGE__->load_components(
    "Core",
);
__PACKAGE__->table("follower");
__PACKAGE__->add_columns(
  "id",
  { data_type => "INT", default_value => undef, is_nullable => 0, size => 10 },
  "user_id",
  { data_type => "INT", default_value => "", is_nullable => 0, size => 10 },
  "follower_id",
  { data_type => "INT", default_value => "", is_nullable => 0, size => 10 },
);
__PACKAGE__->set_primary_key("id");
__PACKAGE__->add_unique_constraint("user_id", ["user_id", "follower_id"]);
__PACKAGE__->belongs_to(user => 'Catatter::Schema::User', {'foreign.id' => 'self.user_id'});
__PACKAGE__->belongs_to(follower => 'Catatter::Schema::Userr', {'foreign.id' => 'self.follower_id'});

1;
Catatter::Schema::FetchTimelineTt
package Catatter::Schema::FetchTimelineTt;

use strict;
use warnings;

use base 'DBIx::Class';

__PACKAGE__->load_components(
    "Core",
);
__PACKAGE__->table("fetch_timeline_tt");
__PACKAGE__->add_columns(
  "id",
  { data_type => "INT", default_value => undef, is_nullable => 0, size => 10 },
);
__PACKAGE__->set_primary_key("id");
__PACKAGE__->belongs_to(message => 'Catatter::Schema::Message', {'foreign.id' => 'self.id'});

1;
で使い方は
use Catatter::Schema;

my $schema = Catatter::Schema->connect('dbi:mysql:catetter', 'user', 'pass');
my $user = $schema->resultset('User')->find(1);
my $timeline = $user->following_timeline(
    undef, 
    {}, 
    {order_by => 'me.id DESC', rows => 20}
);
my $minid;
while (my $message = $timeline->next) {
    print $message->user->screen_name . ': ' . $message->body . "\n";
    $minid = $message->id;
}
こんな感じ。 ただ、pageは効かないので次のページを取るには現在のresultsetの一番小さいidを渡してやる必要があります。
my $timeline = $user->following_timeline(
    $minid, 
    {}, 
    {order_by => 'me.id DESC', rows => 20}
);
ちなみに自分のタイムラインは
my $timeline = $user->messages(
    {}, 
    {order_by => 'me.id DESC', rows => 20}
);
で取れます。

結論から言うと、DBICから綺麗にストアドプロシージャを叩く方法がわからず生のDBHを叩くというどーなんだ?的な解決方法になってます。

全体的に無理やり感がありますが、まーこんな感じでもできるよって感じで。

あちこちで叫ばれている、むやみやたらにCatalystのPluginを作るべきではない運動。(この辺はトクヒロムさんがキャンペーンを張ってるので一読して方がいいです。)
まー言われていることはもっともで、ComponentでできることはComponentにすべきなわけです。
で、拙作のCatalyst::Plugin::RequestTokenなる代物も一部finilizeを使っているものの、明らかにControllerにするべきなので、C::P::RequestTokenをdeprecateさせてCatalyst::Controller::ReqeustTokenってのをとりあえず作ってCPANでリリースしました。

使い方は
package MyApp::Controller::Foo;
use base qw(Catalyst::Controller::RequestToken);

sub form :Local {
    my ($self, $c) = @_;
    $c->stash->{template} = 'form.tt';
    $c->forward($c->view('TT'));
}

sub confirm :Local :CreateToken {
    my ($self, $c) = @_;
    $c->stash->{template} = 'confirm.tt';
    $c->forward($c->view('TT'));
}

sub complete :Local :ValidateToken {
    my ($self, $c) = @_;
    if ($self->validate_token) {
        $c->response->body('complete.');
    } eles {
        $c->response->body('invalid operation.');
    }    
}
confirm.tt
<html>
<body>
<form action="complete" method="post">
<input type="hidden" name="_token" values="[% c.req.param('_token') %]"/>
<input type="submit" name="submit" value="complete"/>
</form>
</body>
</html>

とかすると、トークンにより/foo/confirm -> /foo/complete のトランザクションが保証され、/foo/completeの二重サブミットやCSRF攻撃を防ぐことができます。

Controllerにすることによってfinilizeのフックが出来なくなってprepare_tokenが再現できないのですが、その他はほぼ同じ動きをするし、見た目こっちの方がわかりやすいと思います。

これに伴いCatalyst::Plugin::RequestTokenはdeprecateされたのでご注意ください。

先日、奥さんと知り合う機会があって一緒に飲ませていただいたのですが、その際に家がご近所ということが発覚。
奥さんは赤坂までチャリンコ通勤してるそうで(この日も渋谷までチャリで来てました)僕が麹町まで電車通勤してるって言ったらドン引きされました。

ってことで、15年くらい乗り続けてたママチャリはさすがにあちこちへたってきてたので、ドンキで新車を購入して早速チャリ通勤始めたよ。

新車

全速力でダッシュしたら確かに電車よりは早いかも。


プロフィール

このアーカイブについて

このページには、2008年6月に書かれたブログ記事が新しい順に公開されています。

前のアーカイブは2008年5月です。

次のアーカイブは2008年7月です。

最近のコンテンツはインデックスページで見られます。過去に書かれたものはアーカイブのページで見られます。