多数のサーバを使用して、計算量の多い処理を行う必要が出てきたので、Net::Amazon::EC2やNet::Amazon::S3による分散処理を試してみました。
システム構成は次の通りで、map reduceのような分散処理は行わず、単純に複数のサーバを起動して、Parallel::ForkManagerによる並列処理を行います。
scriptは次のとおりで、
Net::Amazon::EC2で、ec2を複数起動し、
Parallel::ForkManagerで、分散処理、
※ただし、起動したec2へのコマンド送信は、Net::SSH::Perl を使用。
※処理後のデータは、Net::Amazon::S3でs3へ保存しています。
処理後はec2を停止し、余計な料金が発生しないようにしています。
というものです。
#!/usr/local/bin/perl use strict; use FindBin; use Log::Log4perl; use Net::Amazon::EC2; use Net::Amazon::S3; use Net::SSH::Perl; use Parallel::ForkManager; use Data::Dumper; my $EC2_OBJ; my $EC2_OBJ_DEF = { AWSAccessKeyId => 'ないしょ', SecretAccessKey => 'ないしょ' }; my $EC2_INSTANCES = {}; my $RUN_INSTANCES_DEF = #最新の開発PGを含んだAMIを用意しましょう { cmd_param => { ImageId => 'ami-84db39ed', InstanceType => 'm1.small', MinCount => 500, MaxCount => 500 }, sleep => 30, retry => 10 }; #最大の分散数の定義 my $GET_IDLE_INSTANCE_DEF = { retry => 30, sleep => 30 }; my $MAX_PROCESS = 500; my $ITEMCODE_LIST_DIR = $FindBin::Bin . '/itemcode_list'; my $ITEMCODE_LIST_REGEXP = '.txt'; my $SSH_DEF = { user => '', passwd => '' }; my $LOG_DEF = { 'log4perl.rootLogger' => 'INFO, LOGFILE', 'log4perl.appender.LOGFILE' => 'Log::Log4perl::Appender::File', 'log4perl.appender.LOGFILE.filename' => $FindBin::Bin . "/log/tenkai.log", 'log4perl.appender.LOGFILE.mode' => 'append', 'log4perl.appender.LOGFILE.layout' => 'PatternLayout', 'log4perl.appender.LOGFILE.layout.ConversionPattern' => '%d %F [%p] %m %n' }; Log::Log4perl::init($LOG_DEF); my $LOG = Log::Log4perl::get_logger("rootLogger"); main(); sub main { #init $EC2_OBJ = Net::Amazon::EC2->new(%$EC2_OBJ_DEF); $LOG->info('ec2_run_instances()'); ec2_run_instances(); #ec2を複数起動 $LOG->info('get_itemcodes()'); my $itemcodes = get_itemcodes(); #処理対象の一覧取得 my $pm = Parallel::ForkManager->new($MAX_PROCESS); for my $itemcode_info (@$itemcodes) { my ( $itemcode, $tei_list, $no, $itemcode_size ) = split( "\t", $itemcode_info ); #処理を行っていないec2を取得 my $ec2_ins = get_idle_ec2_instance(); my $pid = $pm->start and next; # do the fork $EC2_INSTANCES->{ $ec2_ins->{instance_id} }->{state} = 'working'; $LOG->info( "do_parallel() $tei_list $itemcode $no/$itemcode_size start"); do_parallel( $ec2_ins, $itemcode ); #ec2が行う処理 $LOG->info("do_parallel() $tei_list $itemcode $no/$itemcode_size done"); $EC2_INSTANCES->{ $ec2_ins->{instance_id} }->{state} = undef; $pm->finish; # do the exit in the child process } #finalize $LOG->info('ec2_terminate_instances()'); ec2_terminate_instances(); #ec2の停止 $pm->wait_all_children; } sub get_itemcodes { my $dh; unless ( opendir( $dh, $ITEMCODE_LIST_DIR ) ) { $LOG->error("can't opendir $ITEMCODE_LIST_DIR"); die_script(); } my @itemcode_lists = grep /$ITEMCODE_LIST_REGEXP/, readdir($dh); unless ( closedir($dh) ) { $LOG->error("can't closedir $ITEMCODE_LIST_DIR"); die_script(); } my @ret; for my $itemcode_list (@itemcode_lists) { my $itemcode_path = "$ITEMCODE_LIST_DIR/$itemcode_list"; my $fh; unless ( open( $fh, $itemcode_path ) ) { $LOG->error("can't open file $itemcode_path"); die_script(); } my @itemcodes = <$fh>; my $itemcode_size = @itemcodes; unless ( close($fh) ) { $LOG->error("can't close file $itemcode_path"); die_script(); } my $i = 0; for my $itemcode (@itemcodes) { $itemcode =~ s/\s+$//go; push( @ret, join( "\t", $itemcode, $itemcode_list, ++$i, $itemcode_size ) ); } } return \@ret; } sub ec2_run_instances { #とりあえず起動commandを送って、待つ my $reservation_info = $EC2_OBJ->run_instances( %{ $RUN_INSTANCES_DEF->{cmd_param} } ); my %not_run_instances = map { $_->{instance_id} => $_ } @{ $reservation_info->{instances_set} }; #起動済instanceを収集 my $retry = 0; while ( $retry <= $RUN_INSTANCES_DEF->{retry} and keys %not_run_instances > 0 ) { sleep( $RUN_INSTANCES_DEF->{sleep} ); for my $ins_id ( keys %not_run_instances ) { my $rsv_info = $EC2_OBJ->describe_instances( InstanceId => $ins_id ); my $instance = $rsv_info->[0]->{instances_set}->[0]; if ( $instance->{instance_state}->{name} eq "running" ) { $EC2_INSTANCES->{ $instance->{instance_id} }->{state} = undef; $EC2_INSTANCES->{ $instance->{instance_id} }->{instance} = $instance; delete( $not_run_instances{$ins_id} ); } } $retry++; } #起動を待ちきれなかったinstanceはterminate for my $ins_id ( keys %not_run_instances ) { $LOG->warn("couldn't wait for running EC2 instance, terminate:$ins_id"); $EC2_OBJ->terminate_instances( InstanceId => $ins_id ); } #1個も起動できなければdie if ( keys %$EC2_INSTANCES < 1 ) { $LOG->error("couldn't run EC2 instances"); die_script(); } } sub ec2_terminate_instances { for my $instance_id ( keys %$EC2_INSTANCES ) { $EC2_OBJ->terminate_instances( InstanceId => $instance_id ); } } sub do_parallel { my ( $ec2_ins, $itemcode ) = @_; my $ssh = Net::SSH::Perl->new( $ec2_ins->{dns_name} ); $ssh->login( $SSH_DEF->{user}, $SSH_DEF->{passwd} ); # ex. $ssh->cmd("/home/endo/dev/tenkai $itemcode"); # 1)get itemdata(.t, .p) from amazon s3 (Net::Amazon::S3) # 2)do tenkai # 3)commit outdata to amazon s3 (Net::Amazon::S3) # 4)get old outdata(.o) from amazon s3 (Net::Amazon::S3) # 5)diff outdata # 6)commit diff to amazon s3 (Net::Amazon::S3) $ssh->cmd("exit"); } sub get_idle_ec2_instance { my $retry = 0; while ( $retry <= $GET_IDLE_INSTANCE_DEF->{retry} ) { for my $ins_id ( keys %$EC2_INSTANCES ) { unless ( $EC2_INSTANCES->{$ins_id}->{state} ) { return $EC2_INSTANCES->{$ins_id}->{instance}; } } $retry++; $LOG->info("get_idle_ec2_instance():retry", " $retry/$GET_IDLE_INSTANCE_DEF->{retry}"); sleep( $GET_IDLE_INSTANCE_DEF->{max_retry} ); } $LOG->error("fail get_idle_instance()"); die_script(); } #die するときは、ec2のinstanceをterminateしないと料金が発生しますよ sub die_script { ec2_terminate_instances(); die; }
何となく動作したので、本気で導入を考えてみるかも。