Coro::Channelとtimeoutを組み合わせる

use Plack::Request;
use HTTP::Request;
use Time::HiRes;
use Coro;
use Coro::Channel;
use Coro::AnyEvent;
use Coro::LWP;
use LWP::UserAgent;

my $UA_WORKER_CORO_DESC_IN_WORK = "ua_worker_coro_desc_in_work";
my $UA_WORKER_CORO_DESC_NOT_WORK = "ua_worker_coro_desc_not_work";
my $UA_WORKER_CORO_TIMEOUT_MESSAGE = "ua_worker_coro_timeout";
my $MAX_UA_WORKER = 100;

sub coro_timeout {
    my $timeout = shift;
    my $cb = shift;
    my $coro = $Coro::current;
    $coro->{timeout_at} = Time::HiRes::time() + $timeout;
    my $installed_destroy = $coro->{on_destroy_once} ? 1 : 0;
    $coro->{on_destroy_once} = $cb;
    if ( !$installed_destroy ) {
        $coro->on_destroy( sub {
            my $message = shift;
            return if $message ne $UA_WORKER_CORO_TIMEOUT_MESSAGE;
            my $cb = delete $coro->{on_destroy_once};
            return unless $cb;
            $cb->($message);
        });
    }
}

sub ua_worker {
    my $channel = shift;
    async {
        $Coro::current->desc($UA_WORKER_CORO_DESC_NOT_WORK);
        while(1) {
            my $req = $channel->get();
            $Coro::current->desc($UA_WORKER_CORO_DESC_IN_WORK);
            my $ua = LWP::UserAgent->new( timeout => $req->[1] ); #timeout is ignored
            coro_timeout( $req->[1], sub {
                $req->[2]->send( LWP::UserAgent::_new_response( $req->[0], &HTTP::Status::RC_INTERNAL_SERVER_ERROR, "request timeout" ) );
            });
            my $res = $ua->request($req->[0]);
            $Coro::current->desc($UA_WORKER_CORO_DESC_NOT_WORK);
            $req->[2]->send($res);
        }
    }
}

# build worker threads
my $worker_timer;
sub build_ua_channel {
    warn "[$$] build channel";
    my $channel = Coro::Channel->new();
    for ( my $i=0; $i < $MAX_UA_WORKER; $i++ ) {
        ua_worker($channel);
    }
    $worker_timer = AnyEvent->timer(
        after => 0.5,
        interval => 0.5,
        cb => sub {
            #timeout
            my $now = Time::HiRes::time;
            my @lwp_coro = grep { $_->desc eq $UA_WORKER_CORO_DESC_IN_WORK } Coro::State::list;
            for my $coro (@lwp_coro) {
                if ($now > $coro->{timeout_at}) {
                    $coro->cancel($UA_WORKER_CORO_TIMEOUT_MESSAGE);
                }
            }

            #spawn worker
            my $dead_worker = $MAX_UA_WORKER - 
                scalar( grep { $_->desc eq $UA_WORKER_CORO_DESC_IN_WORK || $_->desc eq $UA_WORKER_CORO_DESC_NOT_WORK } Coro::State::list );
            for ( my $i=0; $i < $dead_worker; $i++ ) {
                warn "[$$] respwan";
                ua_worker($channel);
            }
        }
    );
    return $channel;
}

my $ua_channel;
sub async_ua_request {
    my $request = shift;
    my %args = @_;
    my $timeout = $args{timeout} || 180;
    my $cb = $args{cb} || sub {};
    $ua_channel ||= build_ua_channel();

    my $cv = AE::cv;
    $ua_channel->put( [$request, $timeout, $cv ] );
    $cv->cb( sub { $cb->(shift->recv) } );
}

my $hanlder = sub {
    my $env = shift;
    my $req = Plack::Request->new($env);

    my $cv = AE::cv;

    my $request = HTTP::Request->new(
        $env->{REQUEST_METHOD},
        URI->new_abs($env->{REQUEST_URI},'http://localhost/'),
        [
            map {
                (my $field = $_) =~ s/^HTTPS?_//;
                ( $field => $env->{$_} );
            }
            grep { /^(?:HTTP|CONTENT|COOKIE)/i } keys %$env
        ],
        $req->raw_body
    );

    async_ua_request( $request, timeout => 1, cb => sub {
        my $res = shift;
        my @res_header;
        $res->headers->scan(sub{
            push @res_header, @_;
        });
        $cv->send([
            $res->code,
            \@res_header,
            [$res->content]
        ]);
    });
    
    return sub {
        my $start_response = shift;
        $cv->cb(
            sub {
                $start_response->( shift->recv );
            }
        );
    };
}

TODO: global変数の部分をpackageにしてオブジェクトの中に納める

created by blog.nomadscafe.jp

コメント

コメントはありません

コメントを投稿

コメントを投稿するにはログインが必要です