Coro::Channelとtimeoutを組み合わせる
use Plack::Request; use HTTP::Request; use Time::HiRes; use Coro; use Coro::Channel; use Coro::AnyEvent; use Coro::LWP; use LWP::UserAgent; my $UA_WORKER_CORO_DESC_IN_WORK = "ua_worker_coro_desc_in_work"; my $UA_WORKER_CORO_DESC_NOT_WORK = "ua_worker_coro_desc_not_work"; my $UA_WORKER_CORO_TIMEOUT_MESSAGE = "ua_worker_coro_timeout"; my $MAX_UA_WORKER = 100; sub coro_timeout { my $timeout = shift; my $cb = shift; my $coro = $Coro::current; $coro->{timeout_at} = Time::HiRes::time() + $timeout; my $installed_destroy = $coro->{on_destroy_once} ? 1 : 0; $coro->{on_destroy_once} = $cb; if ( !$installed_destroy ) { $coro->on_destroy( sub { my $message = shift; return if $message ne $UA_WORKER_CORO_TIMEOUT_MESSAGE; my $cb = delete $coro->{on_destroy_once}; return unless $cb; $cb->($message); }); } } sub ua_worker { my $channel = shift; async { $Coro::current->desc($UA_WORKER_CORO_DESC_NOT_WORK); while(1) { my $req = $channel->get(); $Coro::current->desc($UA_WORKER_CORO_DESC_IN_WORK); my $ua = LWP::UserAgent->new( timeout => $req->[1] ); #timeout is ignored coro_timeout( $req->[1], sub { $req->[2]->send( LWP::UserAgent::_new_response( $req->[0], &HTTP::Status::RC_INTERNAL_SERVER_ERROR, "request timeout" ) ); }); my $res = $ua->request($req->[0]); $Coro::current->desc($UA_WORKER_CORO_DESC_NOT_WORK); $req->[2]->send($res); } } } # build worker threads my $worker_timer; sub build_ua_channel { warn "[$$] build channel"; my $channel = Coro::Channel->new(); for ( my $i=0; $i < $MAX_UA_WORKER; $i++ ) { ua_worker($channel); } $worker_timer = AnyEvent->timer( after => 0.5, interval => 0.5, cb => sub { #timeout my $now = Time::HiRes::time; my @lwp_coro = grep { $_->desc eq $UA_WORKER_CORO_DESC_IN_WORK } Coro::State::list; for my $coro (@lwp_coro) { if ($now > $coro->{timeout_at}) { $coro->cancel($UA_WORKER_CORO_TIMEOUT_MESSAGE); } } #spawn worker my $dead_worker = $MAX_UA_WORKER - scalar( grep { $_->desc eq $UA_WORKER_CORO_DESC_IN_WORK || $_->desc eq $UA_WORKER_CORO_DESC_NOT_WORK } Coro::State::list ); for ( my $i=0; $i < $dead_worker; $i++ ) { warn "[$$] respwan"; ua_worker($channel); } } ); return $channel; } my $ua_channel; sub async_ua_request { my $request = shift; my %args = @_; my $timeout = $args{timeout} || 180; my $cb = $args{cb} || sub {}; $ua_channel ||= build_ua_channel(); my $cv = AE::cv; $ua_channel->put( [$request, $timeout, $cv ] ); $cv->cb( sub { $cb->(shift->recv) } ); } my $hanlder = sub { my $env = shift; my $req = Plack::Request->new($env); my $cv = AE::cv; my $request = HTTP::Request->new( $env->{REQUEST_METHOD}, URI->new_abs($env->{REQUEST_URI},'http://localhost/'), [ map { (my $field = $_) =~ s/^HTTPS?_//; ( $field => $env->{$_} ); } grep { /^(?:HTTP|CONTENT|COOKIE)/i } keys %$env ], $req->raw_body ); async_ua_request( $request, timeout => 1, cb => sub { my $res = shift; my @res_header; $res->headers->scan(sub{ push @res_header, @_; }); $cv->send([ $res->code, \@res_header, [$res->content] ]); }); return sub { my $start_response = shift; $cv->cb( sub { $start_response->( shift->recv ); } ); }; }
TODO: global変数の部分をpackageにしてオブジェクトの中に納める
created by blog.nomadscafe.jp
コメント
コメントはありません
コメントを投稿
コメントを投稿するにはログインが必要です