読者です 読者をやめる 読者になる 読者になる

end0tknr's kipple - 新web写経開発

http://d.hatena.ne.jp/end0tknr/ から移転します

aws+perlによるcron jobの冗長構成を考える

クラウドデザインパターンでは、バッチ処理のパターンとして
 Queuing Chain , Priority Queue , Job Observer , Scheduled Autoscaling
が、ありますが、オンプレミス構成にも使える冗長構成を考えてみる

構成 - APサーバから、高稼働率のRDS, S3 を共有

┌AP1(worker) ┐    ┌─────────────────┐
│Perl        ├─┬┤DB(AWS RDS, mysql):JOB QUEUEも保持│
└──────┘  │└─────────────────┘
┌AP2(worker) ┐  │┌─────────────────┐
│Perl        ├─┴┤file sys(s3fsによるファイル共有)  │
└──────┘    └─────────────────┘

JOB QUEUE テーブル

CREATE TABLE job_queue (
job_name    varchar(100) comment 'add queue時に登録されるjob名.',
planed_time timestamp DEFAULT '0000-00-00 00:00:00'
                         comment 'add queue時に登録される開始予定.',
start_time  timestamp DEFAULT '0000-00-00 00:00:00'
                         comment '実行時に登録される開始日時',
job_worker  varchar(50)  comment '実行時に登録されるworker名. 例えば、host名',
primary key (job_name, planed_time)
) COMMENT='分散されたサーバが実行するcron jobを管理.';

perlコントローラ

package SampleApp::CtrlBatch::Batch1;
use strict;
use utf8;
use base qw(SampleApp::CtrlBatch); #継承
use Date::Calc qw/Today/;
use Data::Dumper;

sub new {
    my ($class,$c) = @_;
    my $self = {c=>$c};
    $self =  bless $self, $class;
    return $self;
}

sub execute_main {
    my ($self) = @_;

    my $job_queue_rs = {};

    #JOB QUEUEにより分散処理する場合
    if( $self->{c}->config()->{batch}->{use_job_queue} ){
        my $now_str;
        #実行可能な状態にあるJOB QUEUEを取得
        ($job_queue_rs,$now_str) = $self->get_planed_job($now_str);

        if( $job_queue_rs ){
            #JOB QUEUEにworkerを登録し、他workerによる処理を禁止(排他制御)
            $self->set_job_worker($job_queue_rs,$now_str);
        } else {
            $self->info(ref($self), " - skip! no jobs in queue");
            return;
        }
    }

    $self->info(ref($self), " - updating process starts");

    #### TODO この辺りで実際のバッチ処理

    $self->info(ref($self), " - updating process completed");
}

↑こちらの親classが↓こちら

package SampleApp::CtrlBatc;
use strict;
use utf8;
use base qw(SampleApp); #継承
use Date::Calc qw/Today_and_Now/;
use Sys::Hostname;
use Data::Dumper;

#実行可能にあるjobをqueueから取得
sub get_planed_job {
    my ($self) = @_;

    my $job_name;
    if( ref($self) =~ /([^:]+)$/o ){
        $job_name = $1;
    }
    my $now_str = sprintf("%04d-%02d-%02d %02d:%02d:%02d",
                          Date::Calc::Today_and_Now() );

    my $search_cond ={job_name=> $job_name,
                      planed_time=>{'<'=> $now_str },
                      job_worker=>undef };

    my $schema = $self->db;
    my $result_set = $schema->resultset('JobQueue')
        ->search($search_cond,
                 {order_by=>[{-asc =>'planed_time'}]})
            ->slice(0,1);   #offset, limit
    my $tbl_row = $result_set->next();
    return ($tbl_row, $now_str);
}

#JOB QUEUEにworkerを登録し、他workerによる処理を禁止(排他制御)
sub set_job_worker {
    my ($self,$job_queue_rs,$now_str) = @_;
    $job_queue_rs->job_worker( Sys::Hostname::hostname() );
    $job_queue_rs->start_time($now_str);
    $job_queue_rs->update;
}

1;
__END__