end0tknr's kipple - web写経開発

太宰府天満宮の狛犬って、妙にカワイイ

IISサーバのログを自力で集計

iisサーバのログ書式 - end0tknr's kipple - 新web写経開発

「今どき、ログファイルベースのアクセス分析って、古いよねぇ」と思いつつ、 上記エントリの関連で、以下の perl script & ddl (sql)を書いてみた

#!/usr/local/bin/perl
use utf8;
use strict;
use warnings;
use DBI;
use Encode;
use MIME::Types;
use Time::Piece;
use Data::Dumper;

my $DBH;
my $UNZIP_CMD = '/usr/bin/unzip';  # win環境の場合、cygwinのexeを使うかな?
my $DB_CONF = {host=>'localhost',
               port=> 3306,
               name=>'analyze_iis_log',
               user=>'ないしょ',
               pass=>'ないしょ',
               opt=>{AutoCommit=>0, RaiseError=>1, mysql_enable_utf8=>1} };
my $DEL_LIMIT = {daily=>93, monthly=>24};
my $SAME_VISIT_MIN_LIMIT = 60;
my $BULK_INSERT_LIMIT = 50;


main(@ARGV);


sub main {
    my (@iis_log_zips) = @_;

    if(not @iis_log_zips){ # 引数 check
        return disp_usage_this_script();
    }

    $DBH = connect_db();

    del_old_parsed_log();  # 古いrecordが大量にあると、遅くなる為、削除
    del_old_analyzed();

    #### 単に? IISログをparseし、db tableへ保存 (GMT->JST程度は行います)
    my @update_dates; # 日別集計対象の日付群
    
    for my $iis_log_zip_path ( @iis_log_zips ){
        print "PARSE LOG - $iis_log_zip_path\n";
        # iis logのあるpathから filenameのみ抽出.
        my ($iis_log_zip) = $iis_log_zip_path =~ /([^\/\\]+)$/go;
        # 既に同じlogを集計している可能性がある為、一旦、削除
        del_parsed_log_by_log_file($iis_log_zip);
        # IISログをparseし、db tableへ保存
        push(@update_dates, parse_and_insert_log($iis_log_zip));
    }

    #### 日別集計
    my $update_months = {}; # 月別集計対象の年月群
    for my $update_date ( @update_dates ){
        print "ANALYZE DAILY- $update_date\n";
        
        my ($update_month) = $update_date =~ /^(\d+\D\d+)/go;
        $update_months->{$update_month} = 1;
        # 集計
        my ($new_analyzed,$new_analyzed_user) = analyze_daily($update_date);
        # 重複登録しないように削除
        del_analyzed("daily_analyzed",     $update_date);
        del_analyzed("daily_analyzed_user",$update_date);
        # 集計結果保存
        insert_daily_analyzed($new_analyzed);
        insert_daily_analyzed_user($new_analyzed_user);
    }

    #### 月別集計
    for my $update_month (keys %$update_months){
        print "ANALYZE MONTHLY - $update_month\n";
        # 集計
        my $new_analyzed = analyze_monthly($update_month);
        my $new_analyzed_user = analyze_monthly_user($update_month);
        # 重複登録しないように削除
        del_analyzed("monthly_analyzed",       $update_month ."-01");
        del_analyzed("monthly_analyzed_user",  $update_month ."-01");
        # 集計結果保存
        insert_monthly_analyzed($update_month ."-01",      $new_analyzed);
        insert_monthly_analyzed_user($update_month ."-01", $new_analyzed_user);
    }
    $DBH->commit;
    $DBH->disconnect;
}

sub disp_usage_this_script {
    print "Usage: $0  IIS_LOG_ZIP_1  [ IIS_LOG_ZIP_2 ... ]\n";
}


sub parse_and_insert_log {
    my ($iis_log_zip) = @_;
    
    my $cmd = "$UNZIP_CMD -p $iis_log_zip";
    open my $fh ,"-|", $cmd  or die "fail open $cmd $!";

    my @vals_groups;
    my $update_days = {};
    
    while(my $line = <$fh>){
        $line = Encode::decode('utf8',$line);
        
        next if $line =~ /^\#/o;  # IISログの先頭数行は「#」で始まるコメント
        
        my $log_cols = parse_log_line($line);

        # mime_type や 404 等の処理結果等で、分析対象を絞り込み
        next unless is_target_for_analyze($log_cols);

        # GMT->JST
        my $time_piece_gmt =
            Time::Piece->strptime("$log_cols->{date} $log_cols->{time}",
                                  '%Y-%m-%d %H:%M:%S');
        my $time_piece_jst = $time_piece_gmt + (3600 * 9);
        $update_days->{$time_piece_jst->strftime('%Y-%m-%d')} = 1;
        
        push(@vals_groups,
             [$iis_log_zip,
              $time_piece_jst->strftime('%Y-%m-%d %H:%M:%S'),
              lc($log_cols->{cs_uri_stem}),  # winは、大文字小文字を区別しない為
              $log_cols->{cs_username} ]);
        if( scalar(@vals_groups) > $BULK_INSERT_LIMIT ){
            bulk_insert_sql("parsed_log",
                            [qw/log_file datetime_jst url user/],
                            \@vals_groups );
            @vals_groups = ();
        }
    }

    if( scalar(@vals_groups) > 0 ){
        bulk_insert_sql("parsed_log",
                        [qw/log_file datetime_jst url user/],
                        \@vals_groups );
    }

    close($fh) or die "fail close $cmd $!";

    return keys %$update_days;
}

sub del_parsed_log_by_log_file {
    my ($log_file)  = @_;

    my $sql = "delete from parsed_log where log_file = ?";
    my $sth = $DBH->prepare($sql);
    unless( $sth->execute($log_file) ){
        die $sth->errstr,"$!";
    }
    return 1;
}

sub del_old_parsed_log {

    my $tm = Time::Piece::localtime;

    my $limit_daily =   $tm - (3600 * 24 * $DEL_LIMIT->{daily});
    my $limit_daily_str =   $limit_daily->ymd("-");

    my $sql = "delete from parsed_log where datetime_jst < ?";
    my $sth = $DBH->prepare($sql);
    unless( $sth->execute($limit_daily_str) ){
        die $sth->errstr,"$!";
    }
    return 1;
}

sub del_old_analyzed {

    my $tm = Time::Piece::localtime;

    my $limit_daily =   $tm - (3600 * 24 * $DEL_LIMIT->{daily});
    my $limit_monthly = $tm - (3600 * 24 * 30 * $DEL_LIMIT->{monthly});
    my $limit_daily_str =   $limit_daily->ymd("-");
    my $limit_monthly_str = $limit_daily->ymd("-");

    for my $tbl_and_limit (["daily_analyzed",          $limit_daily_str],
                           ["daily_analyzed_user",     $limit_daily_str],
                           ["monthly_analyzed_user",   $limit_monthly_str],
                           ["monthly_analyzed_user",   $limit_monthly_str]) {

        my $sql = "delete from $tbl_and_limit->[0] where date_jst < ?";
        my $sth = $DBH->prepare($sql);
        unless( $sth->execute($tbl_and_limit->[1]) ){
            die $sth->errstr,"$!";
        }
    }
    return 1;
}

sub del_analyzed {
    my ($tbl_name,$date) = @_;

    my $sql = "delete from $tbl_name where date_jst = ?";
    my $sth = $DBH->prepare($sql);
    unless( $sth->execute($date) ){
        die $sth->errstr,"$!";
    }
    return 1;
}

sub analyze_monthly {
    my ($month)  = @_;
    my $sql =<<EOF;
select
  url, sum(views) as views, sum(visits) as visits, sum(users) as users
from daily_analyzed
where date_jst between ? and ?
group by url
EOF
    my $sth = $DBH->prepare($sql);
    my $month_1st_day = Time::Piece->strptime($month."-01",'%Y-%m-%d');
    my @vals = ($month_1st_day->ymd("-"), join('-',$month,$month_1st_day->month_last_day));

    unless( $sth->execute(@vals) ){
        die $sth->errstr,"$sql $!";
    }

    my $ret = {};
    while(my $row = $sth->fetchrow_hashref){
        $ret->{$row->{url}}->{views} =  $row->{views};
        $ret->{$row->{url}}->{visits} = $row->{visits};
        $ret->{$row->{url}}->{users} =  $row->{users};
    }
    return $ret;
}

sub analyze_monthly_user {
    my ($month)  = @_;
    my $sql =<<EOF;
select
  url, user, sum(views) as views, sum(visits) as visits
from daily_analyzed_user
where date_jst between ? and ?
group by url,user
EOF
    my $sth = $DBH->prepare($sql);

    my $month_1st_day = Time::Piece->strptime($month."-01",'%Y-%m-%d');
    my @vals = ($month_1st_day->ymd("-"), join('-',$month,$month_1st_day->month_last_day));
    
    unless( $sth->execute(@vals) ){
        die $sth->errstr,"$sql $!";
    }

    my $ret = {};
    while(my $row = $sth->fetchrow_hashref){
        $ret->{"$row->{url}\t$row->{user}"}->{views} =  $row->{views};
        $ret->{"$row->{url}\t$row->{user}"}->{visits} = $row->{visits};
    }
    return $ret;
}


sub insert_monthly_analyzed {
    my ($date, $analyzed) = @_;

    my @insert_vals_groups;
    for my $url (sort keys %$analyzed){
        push(@insert_vals_groups,
             [$date, $url,
              $analyzed->{$url}->{views},
              $analyzed->{$url}->{visits},
              $analyzed->{$url}->{users}]);
            
        if(scalar(@insert_vals_groups) > $BULK_INSERT_LIMIT ){
            bulk_insert_sql("monthly_analyzed",
                            [qw/date_jst url views visits users/],
                            \@insert_vals_groups );
            @insert_vals_groups = ();
        }
    }
    if(scalar(@insert_vals_groups) > 0 ){
        bulk_insert_sql("monthly_analyzed",
                        [qw/date_jst url views visits users/],
                        \@insert_vals_groups );
    }
    
    return 1;
}

sub insert_monthly_analyzed_user {
    my ($date, $analyzed) = @_;

    my @insert_vals_groups;
    for my $url_user (keys %$analyzed){
        my ($url,$user) = split(/\t/,$url_user);
        
        push(@insert_vals_groups,
             [$date, $url, $user,
              $analyzed->{$url_user}->{views},
              $analyzed->{$url_user}->{visits}]);
        
        if(scalar(@insert_vals_groups) > $BULK_INSERT_LIMIT ){
            bulk_insert_sql("monthly_analyzed_user",
                            [qw/date_jst url user views visits/],
                            \@insert_vals_groups );
            @insert_vals_groups = ();
        }
    }
    if(scalar(@insert_vals_groups) > 0 ){
        bulk_insert_sql("monthly_analyzed_user",
                        [qw/date_jst url user views visits/],
                        \@insert_vals_groups );
    }
    return 1;
}


sub insert_daily_analyzed {
    my ($analyzed) = @_;

    my @insert_vals_groups;
    for my $date (sort keys %$analyzed ){

        for my $url (sort keys %{$analyzed->{$date}}){
            push(@insert_vals_groups,
                 [$date, $url,
                  $analyzed->{$date}->{$url}->{views},
                  $analyzed->{$date}->{$url}->{visits},
                  $analyzed->{$date}->{$url}->{users}]);
            
            if(scalar(@insert_vals_groups) > $BULK_INSERT_LIMIT ){
                bulk_insert_sql("daily_analyzed",
                                [qw/date_jst url views visits users/],
                                \@insert_vals_groups );
                @insert_vals_groups = ();
            }
        }
    }
    if(scalar(@insert_vals_groups) > 0 ){
        bulk_insert_sql("daily_analyzed",
                        [qw/date_jst url views visits users/],
                        \@insert_vals_groups );
    }
    
    return 1;
}

sub insert_daily_analyzed_user {
    my ($analyzed) = @_;

    my @insert_vals_groups;
    for my $date (sort keys %$analyzed ){

        for my $url_user (keys %{$analyzed->{$date}}){
            my ($url,$user) = split(/\t/,$url_user);
            
            push(@insert_vals_groups,
                 [$date, $url, $user,
                  $analyzed->{$date}->{$url_user}->{views},
                  $analyzed->{$date}->{$url_user}->{visits}]);
            
            if(scalar(@insert_vals_groups) > $BULK_INSERT_LIMIT ){
                bulk_insert_sql("daily_analyzed_user",
                                [qw/date_jst url user views visits/],
                                \@insert_vals_groups );
                @insert_vals_groups = ();
            }
        }
    }
    if(scalar(@insert_vals_groups) > 0 ){
        bulk_insert_sql("daily_analyzed_user",
                        [qw/date_jst url user views visits/],
                        \@insert_vals_groups );
    }
    return 1;
}

sub bulk_insert_sql {
    my ($tbl_name, $col_names, $vals_groups) = @_;

    my $col_names_str = join(',',@$col_names);
    my $place_holders = '('. '?,'x scalar(@$col_names);
    chop($place_holders);
    $place_holders .= ')';
    
    my @place_holders_groups;
    my @vals_all;
    for my $vals (@$vals_groups){
        push(@place_holders_groups, $place_holders);
        push(@vals_all, @$vals);
    }
    my $place_holders_groups_str = join(',', @place_holders_groups);
    
    my $sql =<<EOF;
insert into $tbl_name ($col_names_str)
values $place_holders_groups_str
EOF

    my $sth = $DBH->prepare($sql);

    unless ( $sth->execute(@vals_all) ){
        die $sth->errstr , $sql;
    }
    return 1;
}

sub analyze_daily {
    my ($update_date) = @_;

    my $sql =<<EOF;
select * from parsed_log
where datetime_jst between ? and ?
order by datetime_jst
EOF
    my $sth = $DBH->prepare($sql);
    my @vals = ("$update_date 00:00:00","$update_date 23:59:59");

    unless( $sth->execute(@vals) ){
        die $sth->errstr,"$!";
    }

    my $count_daily_user = {};  # url & user別分析
    my $count_daily = {};       # url別分析

    
    while(my $log_cols = $sth->fetchrow_hashref ){
        my ($date,$time) = split(/ /,$log_cols->{datetime_jst});
        my $url  = $log_cols->{url};
        my $user = $log_cols->{user};
        
        #### url & user別分析

        my $url_user = join("\t",$url, $user);
        # page view
        $count_daily_user->{$date}->{$url_user}->{views} += 1;
        $count_daily->{$date}->{$url}->{views} += 1;

        # 訪問数 : 最終accessからの経過時間で判断
        my $last_date_time = $count_daily_user->{$date}->{$url_user}->{last_time};
        my $new_date_time =  "$date $time";
        
        if(not is_same_visit($last_date_time, $new_date_time) ){
            $count_daily_user->{$date}->{$url_user}->{visits} += 1;
            $count_daily->{$date}->{$url}->{visits} += 1;
        }
        $count_daily_user->{$date}->{$url_user}->{last_time} = $new_date_time;

        $count_daily->{$date}->{$url}->{users}->{$user} = 1;    # user数
    }


    # 先程のwhile loopでは、{users} に user id 一覧を
    # hash mapで登録した為、user数に変換
    $count_daily = conv_user_hashes_to_size($count_daily);
    
    return $count_daily,$count_daily_user;
}

# 最終閲覧からの経過時間で、同一訪問 or notを判定
sub is_same_visit {
    my ($last_date_time_str, $new_date_time_str) = @_;

    return unless $last_date_time_str;

    my $last_date_time =
        Time::Piece->strptime($last_date_time_str,'%Y-%m-%d %H:%M:%S');
    my $new_date_time =
        Time::Piece->strptime($new_date_time_str,'%Y-%m-%d %H:%M:%S');

    my $time_seconds_obj = $new_date_time - $last_date_time;
    if($time_seconds_obj->minutes < $SAME_VISIT_MIN_LIMIT ){
        return 1;
    }
    return;
}


sub conv_user_hashes_to_size {
    my ($count_daily) = @_;
    
    for my $date (keys %$count_daily ){
        for my $url (keys %{$count_daily->{$date}} ){
            my $users_size =
                scalar(keys %{$count_daily->{$date}->{$url}->{users}});
            $count_daily->{$date}->{$url}->{users} = $users_size;
        }
    }
    return $count_daily;
}

sub is_target_for_analyze {
    my ($log_cols) = @_;

    if($log_cols->{cs_username} eq "-"){ #「-」はSmile未loginユーザによるアクセス
        return;
    }

    if($log_cols->{sc_status} eq "301" or       # MOVED_PERMANENTLY
       $log_cols->{sc_status} eq "307" or       # TEMPORARY_REDIRECT
       $log_cols->{sc_status} eq "401" or       # UNAUTHORIZED
       $log_cols->{sc_status} eq "404" or       # NOT FOUNT
       $log_cols->{sc_status} eq "500"){        # INTERNAL_SERVER_ERROR
        return;
    }

    my $mime_type = calc_mime_type($log_cols->{cs_uri_stem});
    if( $mime_type =~ /^image/o or
        $mime_type eq "text/css" or
        $mime_type eq "application/javascript"){
        return;
    }
    if( $log_cols->{cs_uri_stem} =~ /\/counter\.exe$/o ){
        return;
    }
    return 1;
}

sub parse_log_line {
    my ($line) = @_;
    my $log_cols = {};
    ($log_cols->{date},      $log_cols->{time},         $log_cols->{s_ip},
     $log_cols->{cs_method}, $log_cols->{cs_uri_stem},  $log_cols->{cs_uri_query},
     $log_cols->{s_port},    $log_cols->{cs_username},  $log_cols->{c_ip},
     $log_cols->{user_agent},$log_cols->{sc_status},    $log_cols->{sc_substatus},
     $log_cols->{sc_win32_status},                      $log_cols->{time_taken}) =
         split(/ /,$line);
    return $log_cols;
}

sub calc_mime_type {
    my ($cs_uri_stem) = @_;

    my $mt = MIME::Types->new();

    my $extention = "";
    if($cs_uri_stem =~ /\.([a-z]+)$/io){
        $extention = lc($1);
        my $mime_type  = $mt->mimeTypeOf($extention);
        return $mime_type  if $mime_type;
        return "unknown/$extention";
    }
    return "unknown";
}

sub connect_db {
    my $db = join(";",
                  "DBI:mysql:database=$DB_CONF->{name}",
                  "host=$DB_CONF->{host}",
                  "port=$DB_CONF->{port}");
    my $dbh = DBI->connect($db,
                           $DB_CONF->{user},
                           $DB_CONF->{pass},
                           $DB_CONF->{opt});
    return $dbh;
}
CREATE DATABASE analyze_iis_log CHARACTER SET utf8;

CREATE TABLE parsed_log (
log_file        varchar(40),
datetime_jst    datetime,
url             varchar(200),
user            varchar(40)
) comment='日別/月別集計の手間を軽減する為、まずIISのlogをこのtableに保存';
create index parsed_log_log_file on parsed_log(log_file);
create index parsed_log_log_file on parsed_log(datetime_jst);


CREATE TABLE daily_analyzed_user (
date_jst        date,
url             varchar(200),
user            varchar(40),
views           int,
visits          int,
primary key(date_jst,url,user)
) comment='日別 x URL x USER別 集計';

CREATE TABLE daily_analyzed (
date_jst        date,
url             varchar(200),
views           int,
visits          int,
users           int,
primary key(date_jst,url)
) comment='日別 x URL 別 集計';

CREATE TABLE monthly_analyzed_user (
date_jst        date,
url             varchar(200),
user            varchar(40),
views           int,
visits          int,
primary key(date_jst,url,user)
) comment='月別 x URL x USER別 集計';

CREATE TABLE monthly_analyzed (
date_jst        date,
url             varchar(200),
views           int,
visits          int,
users           int,
primary key(date_jst,url)
) comment='月別 x URL 別 集計';