Kazuho@Cybozu Labs: フレンド・タイムライン処理の原理と実践
奥さん本人の中でブームが去った感もあるRDBMSで実現するフレンド・タイムライン処理ですが、そういえばDBICで使ってみたのを思い出したので晒してみます。
要はDBICからストアドプロシージャの叩き方を知りたかっただけなんですけどね。
パッケージ名はWebインターフェースはどーせCatalystで作るでしょってことでCatalyst + Twitter = Catatter…って安直なネーミングですね。
記事中ではプッシュ型とプル型が紹介されているのですが、データ量やfollow, removeの際のコストとか考えたらプル型の方が好みかなってことでプル型を採用してみました。
また、基本的にスキーマやストアドプロシージャはオリジナルと同じですが、DBICでPKをマルチカラムにするとめんどっちーのでサロゲートキーを追加したりしてます。
CREATE TABLE user (
id int(10) unsigned NOT NULL AUTO_INCREMENT,
screen_name varchar(255) NOT NULL,
PRIMARY KEY (id)
) ENGINE=innodb;
CREATE TABLE message (
id int(10) unsigned NOT NULL AUTO_INCREMENT,
user_id int (10) unsigned NOT NULL,
body varchar(255) NOT NULL,
PRIMARY KEY (id),
KEY userid_id_id(user_id, id)
) ENGINE=innodb;
CREATE TABLE follower (
id int(10) unsigned NOT NULL AUTO_INCREMENT,
user_id int (10) unsigned NOT NULL,
follower_id int (10) unsigned NOT NULL,
PRIMARY KEY (id),
UNIQUE (user_id, follower_id),
KEY user_id_follower_id (user_id, follower_id),
KEY follower_id(follower_id)
) ENGINE=innodb;
DELIMITER //
CREATE PROCEDURE fetch_timeline_ids (IN uid int unsigned)
BEGIN
DECLARE fid,maxid int unsigned;
DECLARE done int DEFAULT 0;
DECLARE fid_maxid_cur CURSOR FOR
SELECT follower_id,(
SELECT id FROM message WHERE user_id=follower.follower_id
ORDER BY user_id DESC,id DESC LIMIT 1) AS max_id
FROM follower WHERE user_id=uid ORDER BY max_id DESC LIMIT 20;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done=1;
CREATE TEMPORARY TABLE IF NOT EXISTS fetch_timeline_tt (
id int unsigned NOT NULL PRIMARY KEY
) ENGINE=heap DEFAULT CHARSET=utf8;
DELETE FROM fetch_timeline_tt;
OPEN fid_maxid_cur;
REPEAT
FETCH fid_maxid_cur INTO fid,maxid;
IF NOT done THEN
INSERT INTO fetch_timeline_tt
SELECT id FROM message WHERE user_id=fid
ORDER BY id DESC LIMIT 20;
END IF;
UNTIL done END REPEAT;
CLOSE fid_maxid_cur;
END;
CREATE PROCEDURE build_max_ids_of_followers (IN uid int unsigned,IN max_id int unsigned)
BEGIN
DECLARE fid int unsigned;
DECLARE done int DEFAULT 0;
DECLARE fcur CURSOR FOR
SELECT follower_id FROM follower WHERE user_id=uid;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done=1;
CREATE TEMPORARY TABLE IF NOT EXISTS max_ids_of_followers (
user_id int unsigned NOT NULL,
max_id int unsigned NOT NULL
) ENGINE=heap DEFAULT CHARSET=utf8;
DELETE FROM max_ids_of_followers;
OPEN fcur;
REPEAT
FETCH fcur INTO fid;
IF NOT done THEN
INSERT INTO max_ids_of_followers SELECT fid,max(id) AS m FROM message
WHERE user_id=fid AND id<max_id HAVING NOT ISNULL(m);
END IF;
UNTIL done END REPEAT;
CLOSE fcur;
END;
CREATE PROCEDURE fetch_timeline_ids2 (IN uid int unsigned,IN maxid int unsigned)
BEGIN
DECLARE fid,fmaxid int unsigned;
DECLARE done int DEFAULT 0;
DECLARE fid_maxid_cur CURSOR FOR
SELECT user_id,max_id FROM max_ids_of_followers
ORDER BY max_id DESC LIMIT 20;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done=1;
CREATE TEMPORARY TABLE IF NOT EXISTS fetch_timeline_tt (
id int unsigned NOT NULL PRIMARY KEY
) ENGINE=heap DEFAULT CHARSET=utf8;
DELETE FROM fetch_timeline_tt;
CALL build_max_ids_of_followers(uid,maxid);
OPEN fid_maxid_cur;
REPEAT
FETCH fid_maxid_cur INTO fid,fmaxid;
IF NOT done THEN
INSERT INTO fetch_timeline_tt
SELECT id FROM message
WHERE user_id=fid AND id<=fmaxid
ORDER BY id DESC LIMIT 20;
END IF;
UNTIL done END REPEAT;
CLOSE fid_maxid_cur;
END;
//
Catatter::Schema
package Catatter::Schema; use strict; use warnings; use base 'DBIx::Class::Schema'; __PACKAGE__->load_classes; 1;Catatter::Schema::User
package Catatter::Schema::User;
use strict;
use warnings;
use base 'DBIx::Class';
__PACKAGE__->load_components(
"Core",
);
__PACKAGE__->table("user");
__PACKAGE__->add_columns(
"id",
{ data_type => "INT",
default_value => undef,
is_nullable => 0,
size => 10
},
"screen_name",
{ data_type => "VARCHAR",
default_value => "",
is_nullable => 0,
size => 255
},
);
__PACKAGE__->set_primary_key("id");
__PACKAGE__->has_many(
messages => 'Catatter::Schema::Message',
{ 'foreign.user_id' => 'self.id' }
);
sub following_timeline {
my $self = shift;
my $max_id = shift;
my $cond = shift;
my $attrs = shift || {};
my $source = $self->result_source;
my $storage = $source->storage;
$storage->dbh_do(
sub {
my ( $storage, $dbh, @cols ) = @_;
if ($max_id) {
my $sth = $dbh->prepare('Call fetch_timeline_ids2(?, ?)');
$sth->execute( $self->id, $max_id );
} else {
my $sth = $dbh->prepare('Call fetch_timeline_ids(?)');
$sth->execute( $self->id );
}
}
);
$source->schema->resultset('Message')->search(
{%$cond},
{ join => [qw/messages/],
%$attrs
}
);
}
1;
Catatter::Schema::Message
package Catatter::Schema::Message;
use strict;
use warnings;
use base 'DBIx::Class';
__PACKAGE__->load_components(
"Core",
);
__PACKAGE__->table("message");
__PACKAGE__->add_columns(
"id",
{ data_type => "INT", default_value => undef, is_nullable => 0, size => 10 },
"user_id",
{ data_type => "INT", default_value => "", is_nullable => 0, size => 10 },
"body",
{ data_type => "VARCHAR", default_value => "", is_nullable => 0, size => 255 },
);
__PACKAGE__->set_primary_key("id");
__PACKAGE__->belongs_to(user => 'Catatter::Schema::User', {'foreign.id' => 'self.user_id'});
__PACKAGE__->belongs_to(timeline => 'Catatter::Schema::FetchTimelineTt', {'foreign.id' => 'self.id'});
1;
Catatter::Schema::Follower
package Catatter::Schema::Follower;
use strict;
use warnings;
use base 'DBIx::Class';
__PACKAGE__->load_components(
"Core",
);
__PACKAGE__->table("follower");
__PACKAGE__->add_columns(
"id",
{ data_type => "INT", default_value => undef, is_nullable => 0, size => 10 },
"user_id",
{ data_type => "INT", default_value => "", is_nullable => 0, size => 10 },
"follower_id",
{ data_type => "INT", default_value => "", is_nullable => 0, size => 10 },
);
__PACKAGE__->set_primary_key("id");
__PACKAGE__->add_unique_constraint("user_id", ["user_id", "follower_id"]);
__PACKAGE__->belongs_to(user => 'Catatter::Schema::User', {'foreign.id' => 'self.user_id'});
__PACKAGE__->belongs_to(follower => 'Catatter::Schema::Userr', {'foreign.id' => 'self.follower_id'});
1;
Catatter::Schema::FetchTimelineTt
package Catatter::Schema::FetchTimelineTt;
use strict;
use warnings;
use base 'DBIx::Class';
__PACKAGE__->load_components(
"Core",
);
__PACKAGE__->table("fetch_timeline_tt");
__PACKAGE__->add_columns(
"id",
{ data_type => "INT", default_value => undef, is_nullable => 0, size => 10 },
);
__PACKAGE__->set_primary_key("id");
__PACKAGE__->belongs_to(message => 'Catatter::Schema::Message', {'foreign.id' => 'self.id'});
1;
で使い方は
use Catatter::Schema;
my $schema = Catatter::Schema->connect('dbi:mysql:catetter', 'user', 'pass');
my $user = $schema->resultset('User')->find(1);
my $timeline = $user->following_timeline(
undef,
{},
{order_by => 'me.id DESC', rows => 20}
);
my $minid;
while (my $message = $timeline->next) {
print $message->user->screen_name . ': ' . $message->body . "\n";
$minid = $message->id;
}
こんな感じ。
ただ、pageは効かないので次のページを取るには現在のresultsetの一番小さいidを渡してやる必要があります。
my $timeline = $user->following_timeline(
$minid,
{},
{order_by => 'me.id DESC', rows => 20}
);
ちなみに自分のタイムラインは
my $timeline = $user->messages(
{},
{order_by => 'me.id DESC', rows => 20}
);
で取れます。
結論から言うと、DBICから綺麗にストアドプロシージャを叩く方法がわからず生のDBHを叩くというどーなんだ?的な解決方法になってます。
全体的に無理やり感がありますが、まーこんな感じでもできるよって感じで。

