クラウドデザインパターンでは、バッチ処理のパターンとして
Queuing Chain , Priority Queue , Job Observer , Scheduled Autoscaling
が、ありますが、オンプレミス構成にも使える冗長構成を考えてみる
構成 - APサーバから、高稼働率のRDS, S3 を共有
┌AP1(worker) ┐ ┌─────────────────┐ │Perl ├─┬┤DB(AWS RDS, mysql):JOB QUEUEも保持│ └──────┘ │└─────────────────┘ ┌AP2(worker) ┐ │┌─────────────────┐ │Perl ├─┴┤file sys(s3fsによるファイル共有) │ └──────┘ └─────────────────┘
JOB QUEUE テーブル
CREATE TABLE job_queue ( job_name varchar(100) comment 'add queue時に登録されるjob名.', planed_time timestamp DEFAULT '0000-00-00 00:00:00' comment 'add queue時に登録される開始予定.', start_time timestamp DEFAULT '0000-00-00 00:00:00' comment '実行時に登録される開始日時', job_worker varchar(50) comment '実行時に登録されるworker名. 例えば、host名', primary key (job_name, planed_time) ) COMMENT='分散されたサーバが実行するcron jobを管理.';
perlコントローラ
package SampleApp::CtrlBatch::Batch1; use strict; use utf8; use base qw(SampleApp::CtrlBatch); #継承 use Date::Calc qw/Today/; use Data::Dumper; sub new { my ($class,$c) = @_; my $self = {c=>$c}; $self = bless $self, $class; return $self; } sub execute_main { my ($self) = @_; my $job_queue_rs = {}; #JOB QUEUEにより分散処理する場合 if( $self->{c}->config()->{batch}->{use_job_queue} ){ my $now_str; #実行可能な状態にあるJOB QUEUEを取得 ($job_queue_rs,$now_str) = $self->get_planed_job($now_str); if( $job_queue_rs ){ #JOB QUEUEにworkerを登録し、他workerによる処理を禁止(排他制御) $self->set_job_worker($job_queue_rs,$now_str); } else { $self->info(ref($self), " - skip! no jobs in queue"); return; } } $self->info(ref($self), " - updating process starts"); #### TODO この辺りで実際のバッチ処理 $self->info(ref($self), " - updating process completed"); }
↑こちらの親classが↓こちら
package SampleApp::CtrlBatc; use strict; use utf8; use base qw(SampleApp); #継承 use Date::Calc qw/Today_and_Now/; use Sys::Hostname; use Data::Dumper; #実行可能にあるjobをqueueから取得 sub get_planed_job { my ($self) = @_; my $job_name; if( ref($self) =~ /([^:]+)$/o ){ $job_name = $1; } my $now_str = sprintf("%04d-%02d-%02d %02d:%02d:%02d", Date::Calc::Today_and_Now() ); my $search_cond ={job_name=> $job_name, planed_time=>{'<'=> $now_str }, job_worker=>undef }; my $schema = $self->db; my $result_set = $schema->resultset('JobQueue') ->search($search_cond, {order_by=>[{-asc =>'planed_time'}]}) ->slice(0,1); #offset, limit my $tbl_row = $result_set->next(); return ($tbl_row, $now_str); } #JOB QUEUEにworkerを登録し、他workerによる処理を禁止(排他制御) sub set_job_worker { my ($self,$job_queue_rs,$now_str) = @_; $job_queue_rs->job_worker( Sys::Hostname::hostname() ); $job_queue_rs->start_time($now_str); $job_queue_rs->update; } 1; __END__