end0tknr's kipple - 新web写経開発

http://d.hatena.ne.jp/end0tknr/ から移転しました

Net::Amazon::EC2やNet::Amazon::S3 + Parallel::ForkManagerによる分散処理

多数のサーバを使用して、計算量の多い処理を行う必要が出てきたので、Net::Amazon::EC2やNet::Amazon::S3による分散処理を試してみました。

システム構成は次の通りで、map reduceのような分散処理は行わず、単純に複数のサーバを起動して、Parallel::ForkManagerによる並列処理を行います。


scriptは次のとおりで、
Net::Amazon::EC2で、ec2を複数起動し、
Parallel::ForkManagerで、分散処理、
※ただし、起動したec2へのコマンド送信は、Net::SSH::Perl を使用。
※処理後のデータは、Net::Amazon::S3でs3へ保存しています。
処理後はec2を停止し、余計な料金が発生しないようにしています。
というものです。

#!/usr/local/bin/perl
use strict;
use FindBin;
use Log::Log4perl;
use Net::Amazon::EC2;
use Net::Amazon::S3;
use Net::SSH::Perl;
use Parallel::ForkManager;
use Data::Dumper;

my $EC2_OBJ;
my $EC2_OBJ_DEF = {
    AWSAccessKeyId  => 'ないしょ',
    SecretAccessKey => 'ないしょ'
};
my $EC2_INSTANCES = {};
my $RUN_INSTANCES_DEF =

  #最新の開発PGを含んだAMIを用意しましょう
  {
    cmd_param => {
        ImageId      => 'ami-84db39ed',
        InstanceType => 'm1.small',
        MinCount     => 500,
        MaxCount     => 500
    },
    sleep => 30,
    retry => 10
  };

#最大の分散数の定義
my $GET_IDLE_INSTANCE_DEF = { retry => 30, sleep => 30 };
my $MAX_PROCESS = 500;

my $ITEMCODE_LIST_DIR    = $FindBin::Bin . '/itemcode_list';
my $ITEMCODE_LIST_REGEXP = '.txt';

my $SSH_DEF = { user => '', passwd => '' };

my $LOG_DEF = {
    'log4perl.rootLogger'                => 'INFO, LOGFILE',
    'log4perl.appender.LOGFILE'          => 'Log::Log4perl::Appender::File',
    'log4perl.appender.LOGFILE.filename' => $FindBin::Bin . "/log/tenkai.log",
    'log4perl.appender.LOGFILE.mode'     => 'append',
    'log4perl.appender.LOGFILE.layout'   => 'PatternLayout',
    'log4perl.appender.LOGFILE.layout.ConversionPattern' => '%d %F [%p] %m %n'
};
Log::Log4perl::init($LOG_DEF);
my $LOG = Log::Log4perl::get_logger("rootLogger");

main();

sub main {

    #init
    $EC2_OBJ = Net::Amazon::EC2->new(%$EC2_OBJ_DEF);
    $LOG->info('ec2_run_instances()');
    ec2_run_instances();		#ec2を複数起動

    $LOG->info('get_itemcodes()');
    my $itemcodes = get_itemcodes();	#処理対象の一覧取得

    my $pm = Parallel::ForkManager->new($MAX_PROCESS);

    for my $itemcode_info (@$itemcodes) {
        my ( $itemcode, $tei_list, $no, $itemcode_size ) =
          split( "\t", $itemcode_info );
	#処理を行っていないec2を取得
        my $ec2_ins = get_idle_ec2_instance();

        my $pid = $pm->start and next;    # do the fork

        $EC2_INSTANCES->{ $ec2_ins->{instance_id} }->{state} = 'working';
        $LOG->info(
            "do_parallel() $tei_list $itemcode $no/$itemcode_size start");

        do_parallel( $ec2_ins, $itemcode );	#ec2が行う処理

        $LOG->info("do_parallel() $tei_list $itemcode $no/$itemcode_size done");
        $EC2_INSTANCES->{ $ec2_ins->{instance_id} }->{state} = undef;

        $pm->finish;                      # do the exit in the child process
    }

    #finalize
    $LOG->info('ec2_terminate_instances()');
    ec2_terminate_instances();	#ec2の停止
    $pm->wait_all_children;
}

sub get_itemcodes {

    my $dh;
    unless ( opendir( $dh, $ITEMCODE_LIST_DIR ) ) {
        $LOG->error("can't opendir $ITEMCODE_LIST_DIR");
        die_script();
    }
    my @itemcode_lists = grep /$ITEMCODE_LIST_REGEXP/, readdir($dh);
    unless ( closedir($dh) ) {
        $LOG->error("can't closedir $ITEMCODE_LIST_DIR");
        die_script();
    }

    my @ret;
    for my $itemcode_list (@itemcode_lists) {

        my $itemcode_path = "$ITEMCODE_LIST_DIR/$itemcode_list";
        my $fh;
        unless ( open( $fh, $itemcode_path ) ) {
            $LOG->error("can't open file $itemcode_path");
            die_script();
        }
        my @itemcodes     = <$fh>;
        my $itemcode_size = @itemcodes;
        unless ( close($fh) ) {
            $LOG->error("can't close file $itemcode_path");
            die_script();
        }

        my $i = 0;
        for my $itemcode (@itemcodes) {
            $itemcode =~ s/\s+$//go;
            push( @ret,
                join( "\t", $itemcode, $itemcode_list, ++$i, $itemcode_size ) );
        }
    }
    return \@ret;
}

sub ec2_run_instances {

    #とりあえず起動commandを送って、待つ
    my $reservation_info =
      $EC2_OBJ->run_instances( %{ $RUN_INSTANCES_DEF->{cmd_param} } );

    my %not_run_instances =
      map { $_->{instance_id} => $_ } @{ $reservation_info->{instances_set} };

    #起動済instanceを収集
    my $retry = 0;
    while ( $retry <= $RUN_INSTANCES_DEF->{retry}
        and keys %not_run_instances > 0 )
    {

        sleep( $RUN_INSTANCES_DEF->{sleep} );

        for my $ins_id ( keys %not_run_instances ) {
            my $rsv_info =
              $EC2_OBJ->describe_instances( InstanceId => $ins_id );
            my $instance = $rsv_info->[0]->{instances_set}->[0];

            if ( $instance->{instance_state}->{name} eq "running" ) {
                $EC2_INSTANCES->{ $instance->{instance_id} }->{state} = undef;
                $EC2_INSTANCES->{ $instance->{instance_id} }->{instance} =
                  $instance;
                delete( $not_run_instances{$ins_id} );
            }
        }
        $retry++;
    }

    #起動を待ちきれなかったinstanceはterminate
    for my $ins_id ( keys %not_run_instances ) {
        $LOG->warn("couldn't wait for running EC2 instance, terminate:$ins_id");
        $EC2_OBJ->terminate_instances( InstanceId => $ins_id );
    }

    #1個も起動できなければdie
    if ( keys %$EC2_INSTANCES < 1 ) {
        $LOG->error("couldn't run EC2 instances");
        die_script();
    }
}

sub ec2_terminate_instances {

    for my $instance_id ( keys %$EC2_INSTANCES ) {
        $EC2_OBJ->terminate_instances( InstanceId => $instance_id );
    }
}

sub do_parallel {
    my ( $ec2_ins, $itemcode ) = @_;

    my $ssh = Net::SSH::Perl->new( $ec2_ins->{dns_name} );
    $ssh->login( $SSH_DEF->{user}, $SSH_DEF->{passwd} );

    #    ex. $ssh->cmd("/home/endo/dev/tenkai $itemcode");
    #    1)get itemdata(.t, .p) from amazon s3 (Net::Amazon::S3)
    #    2)do tenkai
    #    3)commit outdata to amazon s3 (Net::Amazon::S3)
    #    4)get old outdata(.o) from amazon s3 (Net::Amazon::S3)
    #    5)diff outdata
    #    6)commit diff to amazon s3 (Net::Amazon::S3)

    $ssh->cmd("exit");
}

sub get_idle_ec2_instance {

    my $retry = 0;
    while ( $retry <= $GET_IDLE_INSTANCE_DEF->{retry} ) {

        for my $ins_id ( keys %$EC2_INSTANCES ) {
            unless ( $EC2_INSTANCES->{$ins_id}->{state} ) {
                return $EC2_INSTANCES->{$ins_id}->{instance};
            }
        }

        $retry++;
        $LOG->info("get_idle_ec2_instance():retry",
		   " $retry/$GET_IDLE_INSTANCE_DEF->{retry}");
        sleep( $GET_IDLE_INSTANCE_DEF->{max_retry} );
    }

    $LOG->error("fail get_idle_instance()");
    die_script();
}

#die するときは、ec2のinstanceをterminateしないと料金が発生しますよ
sub die_script {
    ec2_terminate_instances();
    die;
}

何となく動作したので、本気で導入を考えてみるかも。