end0tknr's kipple - web写経開発

太宰府天満宮の狛犬って、妙にカワイイ

Batch Normalizationによるアクティベーション分布の広がり調整

先程のentryでは、 重み初期値が広がりが持つようXavierやHeを使用していますが Batch Normalizationという手法を追加することでも、 アクティベーション分布の広がりを調整できるようです。

尚、アクティベーションとは、Affine層による処理を指します。

Affine ReLU Affine Softmax input CrossEntropyError L BatchNormalization

Batch Normalization の利点

以下

  • 学習を速く進行 (学習係数を大きくできる)
  • 初期値にそれほど依存しない
  • 過学習抑制 (Dropout 等の必要性を減らす)

Batch Normalization による正規化式

 \Large{
x_i ← \frac{x_i - μ_B}{ \sqrt{ σ_B^2 + ε } }
}

ただし、[tex: \large{μB、 σB2、ε}]は以下

 \large{
平均:μ_B = \frac {1}{m} \sum ^m_{i=1} x_i }
 \large{
分散:σ_B^2 = \frac {1}{m} \sum ^m_{i=1} (x_i - μ_B)^2 }

ε:ゼロ除算を避ける為の小さな値。例 10e-7

上記の  \Large{ x_i } を以下のように変換。

 \large{ y_i ← γ x_i + β }

Batch Normalization の計算グラフ

先程の数式は、以下の計算グラフで表すことがことができます。

x1N1xiNx+^2N1xiNx+ε××+γβxdx(N,D)γ(D,)(D,)β(N,D)outdout

Batch Normalization の python 実装

# coding: utf-8
import sys, os
import numpy as np
import matplotlib.pyplot as plt
import urllib.request
import gzip
from collections import OrderedDict

max_epochs    = 20
batch_size    = 100
learning_rate = 0.01
x_train = []
t_train = []
train_size    = 0
    
def main():
    
    # MNISTデータのdownload
    global x_train
    global t_train
    mymnist = MyMnist()
    (x_train, t_train, x_test, t_test) = mymnist.load_mnist()

    # 学習data削減
    x_train = x_train[:1000]
    t_train = t_train[:1000]
    global train_size
    train_size    = x_train.shape[0]

    # 3.グラフの描画==========
    
    # 対数スケールの配列生成
    weight_scale_list = np.logspace(0, -4, num=16)
    # 等差数列の生成
    x = np.arange(max_epochs)

    for i, w in enumerate(weight_scale_list):
        print(str(i+1), "/ 16" )
        train_acc_list, bn_train_acc_list = __train(w)

        plt.subplot(4,4,i+1)
        plt.title("W:" + str(w))
        if i == 15:
            plt.plot(x,
                     bn_train_acc_list,
                     label='Batch Normalization',
                     markevery=2)
            plt.plot(x,
                     train_acc_list,
                     linestyle = "--",
                     label='without BatchNorm',
                     markevery=2)
        else:
            plt.plot(x,
                     bn_train_acc_list,
                     markevery=2)
            plt.plot(x,
                     train_acc_list,
                     linestyle="--",
                     markevery=2)

        plt.ylim(0, 1.0)
        if i % 4:
            plt.yticks([])
        else:
            plt.ylabel("accuracy")
        if i < 12:
            plt.xticks([])
        else:
            plt.xlabel("epochs")
        plt.legend(loc='lower right')

    plt.show()

def __train(weight_init_std):
    bn_network = MultiLayerNetExtend(
        input_size=784,
        hidden_size_list=[100, 100, 100, 100, 100],
        output_size=10, 
        weight_init_std=weight_init_std,
        use_batchnorm=True)
    
    network = MultiLayerNetExtend(
        input_size=784,
        hidden_size_list=[100, 100, 100, 100, 100],
        output_size=10,
        weight_init_std=weight_init_std )
    
    optimizer = SGD(lr=learning_rate)
    
    train_acc_list    = []
    bn_train_acc_list = []
    
    iter_per_epoch = max(train_size / batch_size, 1)
    epoch_cnt = 0
    
    for i in range(1000000000):
        batch_mask = np.random.choice(train_size, batch_size)
        x_batch = x_train[batch_mask]
        t_batch = t_train[batch_mask]
    
        for _network in (bn_network, network):
            grads = _network.gradient(x_batch, t_batch)
            optimizer.update(_network.params, grads)
    
        if i % iter_per_epoch == 0:
            train_acc = network.accuracy(x_train, t_train)
            bn_train_acc = bn_network.accuracy(x_train, t_train)
            train_acc_list.append(train_acc)
            bn_train_acc_list.append(bn_train_acc)
    
            print("epoch:" + str(epoch_cnt) + " | " + str(train_acc) + " - " + str(bn_train_acc))
    
            epoch_cnt += 1
            if epoch_cnt >= max_epochs:
                break
                
    return train_acc_list, bn_train_acc_list

# 拡張版の全結合による多層ニューラルネットワーク
# ( Weiht Decay、Dropout、Batch Normalizationの機能を持つ )
class MultiLayerNetExtend:
    
    def __init__(
            self,
            input_size,         # 入力size(MNISTの場合784)
            hidden_size_list,   # 隠れ層のneuron数list(例[100, 100, 100])
            output_size,        # 出力size (MNISTの場合は10)
            activation='relu',  # 活性化関数 relu sigmoid
            weight_init_std='relu', # ※
            weight_decay_lambda=0,  # Weight Decay(L2ノルム)の強さ
            use_dropout = False,    
            dropout_ration = 0.5,   # Dropoutの割り合い
            use_batchnorm=False):
        # ※ weight_init_std : 重みの標準偏差を指定(e.g. 0.01)
        #       relu or he の「Heの初期値」、
        #       sigmoid or xavierの場合「Xavierの初期値」

        
        self.input_size         = input_size
        self.output_size        = output_size
        self.hidden_size_list   = hidden_size_list
        self.hidden_layer_num   = len(hidden_size_list)
        self.use_dropout        = use_dropout
        self.weight_decay_lambda= weight_decay_lambda
        self.use_batchnorm      = use_batchnorm
        self.params = {}

        # 重みの初期化
        self.__init_weight(weight_init_std)

        # レイヤの生成
        activation_layer = {'sigmoid': Sigmoid, 'relu': Relu}
        self.layers = OrderedDict()
        for idx in range(1, self.hidden_layer_num+1):
            self.layers['Affine' + str(idx)] = Affine(self.params['W' + str(idx)],
                                                      self.params['b' + str(idx)])
            if self.use_batchnorm:
                self.params['gamma' + str(idx)] = np.ones(hidden_size_list[idx-1])
                self.params['beta' + str(idx)]  = np.zeros(hidden_size_list[idx-1])
                self.layers['BatchNorm' + str(idx)] = \
                    BatchNormalization( self.params['gamma'+ str(idx)],
                                        self.params['beta' + str(idx)] )
                
            self.layers['Activation_function' + str(idx)] =\
                activation_layer[activation]()
            
            if self.use_dropout:
                self.layers['Dropout' + str(idx)] = Dropout(dropout_ration)

        idx = self.hidden_layer_num + 1
        self.layers['Affine' + str(idx)] = Affine(self.params['W' + str(idx)],
                                                  self.params['b' + str(idx)])
        self.last_layer = SoftmaxWithLoss()
        
    # 重みの初期値設定
    def __init_weight(self, weight_init_std):
        # weight_init_std : 重みの標準偏差を指定(e.g. 0.01)
        #    'relu'または'he'を指定した場合は「Heの初期値」を設定
        #    'sigmoid'または'xavier'を指定した場合は「Xavierの初期値」を設定

        all_size_list = \
            [self.input_size] + self.hidden_size_list + [self.output_size]
        for idx in range(1, len(all_size_list)):
            scale = weight_init_std
            
            # ReLUを使う場合に推奨される初期値
            if str(weight_init_std).lower() in ('relu', 'he'):
                scale = np.sqrt(2.0 / all_size_list[idx - 1])
            # sigmoidを使う場合に推奨される初期値
            elif str(weight_init_std).lower() in ('sigmoid', 'xavier'):
                scale = np.sqrt(1.0 / all_size_list[idx - 1])
                
            self.params['W' + str(idx)] = \
                scale * np.random.randn(all_size_list[idx-1], all_size_list[idx])
            self.params['b' + str(idx)] = np.zeros(all_size_list[idx])

    def predict(self, x, train_flg=False):
        for key, layer in self.layers.items():
            if "Dropout" in key or "BatchNorm" in key:
                x = layer.forward(x, train_flg)
            else:
                x = layer.forward(x)

        return x

    # x: 入力data、t:教師ラベル
    def loss(self, x, t, train_flg=False):
        y = self.predict(x, train_flg)

        weight_decay = 0
        for idx in range(1, self.hidden_layer_num + 2):
            W = self.params['W' + str(idx)]
            weight_decay += 0.5 * self.weight_decay_lambda * np.sum(W**2)

        return self.last_layer.forward(y, t) + weight_decay

    def accuracy(self, x, t):
        y = self.predict(x, train_flg=False)
        y = np.argmax(y, axis=1)
        if t.ndim != 1 : t = np.argmax(t, axis=1)

        accuracy = np.sum(y == t) / float(x.shape[0])
        return accuracy

    # 勾配を求める (数値微分)。x: 入力data、t:教師ラベル
    def numerical_gradient(self, x, t):
        loss_W = lambda W: self.loss(x, t, train_flg=True)

        grads = {}
        for idx in range(1, self.hidden_layer_num+2):
            grads['W' + str(idx)] = \
                self._numerical_gradient(loss_W, self.params['W' + str(idx)])
            grads['b' + str(idx)] = \
                self._numerical_gradient(loss_W, self.params['b' + str(idx)])
            
            if self.use_batchnorm and idx != self.hidden_layer_num+1:
                grads['gamma' + str(idx)] = \
                    self._numerical_gradient(loss_W,
                                             self.params['gamma' + str(idx)])
                grads['beta' + str(idx)] = \
                    self._numerical_gradient(loss_W,
                                             self.params['beta' + str(idx)])

        return grads

    def _numerical_gradient(self, f, x):
        h = 1e-4 # 0.0001
        grad = np.zeros_like(x)

        it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite'])
        while not it.finished:
            idx = it.multi_index
            tmp_val = x[idx]
            x[idx] = tmp_val + h
            fxh1 = f(x) # f(x+h)

            x[idx] = tmp_val - h 
            fxh2 = f(x) # f(x-h)
            grad[idx] = (fxh1 - fxh2) / (2*h)

            x[idx] = tmp_val # 値を元に戻す
            it.iternext()
        return grad
    
        
    def gradient(self, x, t):
        # forward
        self.loss(x, t, train_flg=True)

        # backward
        dout = 1
        dout = self.last_layer.backward(dout)

        layers = list(self.layers.values())
        layers.reverse()
        for layer in layers:
            dout = layer.backward(dout)

        # 設定
        grads = {}
        for idx in range(1, self.hidden_layer_num+2):
            grads['W' + str(idx)] = \
                self.layers['Affine' + str(idx)].dW + \
                self.weight_decay_lambda * self.params['W' + str(idx)]
            
            grads['b' + str(idx)] = self.layers['Affine' + str(idx)].db

            if self.use_batchnorm and idx != self.hidden_layer_num+1:
                grads['gamma' + str(idx)]= \
                    self.layers['BatchNorm' + str(idx)].dgamma
                grads['beta' + str(idx)] = \
                    self.layers['BatchNorm' + str(idx)].dbeta

        return grads
    
class Affine:
    def __init__(self, W, b):
        self.W =W
        self.b = b
        
        self.x = None
        self.original_x_shape = None
        # 重み・バイアスパラメータの微分
        self.dW = None
        self.db = None

    def forward(self, x):
        # テンソル対応
        self.original_x_shape = x.shape
        x = x.reshape(x.shape[0], -1)
        self.x = x

        out = np.dot(self.x, self.W) + self.b
        return out

    def backward(self, dout):
        dx = np.dot(dout, self.W.T)
        self.dW = np.dot(self.x.T, dout)
        self.db = np.sum(dout, axis=0)
        dx = dx.reshape(*self.original_x_shape)
        return dx
    
class Sigmoid:
    def __init__(self):
        self.out = None

    def forward(self, x):
        out = sigmoid(x)
        self.out = out
        return out

    def backward(self, dout):
        dx = dout * (1.0 - self.out) * self.out
        return dx
    
class Relu:
    def __init__(self):
        self.mask = None

    def forward(self, x):
        self.mask = (x <= 0)
        out = x.copy()
        out[self.mask] = 0
        return out

    def backward(self, dout):
        dout[self.mask] = 0
        dx = dout
        return dx

# http://arxiv.org/abs/1502.03167
class BatchNormalization:
    
    def __init__(self,
                 gamma,
                 beta,
                 momentum=0.9,
                 running_mean=None,
                 running_var=None):
        
        self.gamma    = gamma
        self.beta     = beta
        self.momentum = momentum
        self.input_shape = None # Conv層の場合は4次元、全結合層の場合は2次元

        # テスト時に使用する平均と分散
        self.running_mean = running_mean
        self.running_var  = running_var
        
        # backward時に使用する中間データ
        self.batch_size = None
        self.xc         = None
        self.std        = None
        self.dgamma     = None
        self.dbeta      = None

    def forward(self, x, train_flg=True):
        self.input_shape = x.shape
        if x.ndim != 2:
            N, C, H, W = x.shape
            x = x.reshape(N, -1)

        out = self.__forward(x, train_flg)
        
        return out.reshape(*self.input_shape)
            
    def __forward(self, x, train_flg):
        if self.running_mean is None:
            N, D = x.shape
            self.running_mean = np.zeros(D)
            self.running_var  = np.zeros(D)
                        
        if train_flg:
            mu = x.mean(axis=0)
            xc = x - mu
            var = np.mean(xc**2, axis=0)
            std = np.sqrt(var + 10e-7)
            xn = xc / std
            
            self.batch_size = x.shape[0]
            self.xc = xc
            self.xn = xn
            self.std = std
            self.running_mean = \
                self.momentum * self.running_mean + (1-self.momentum) * mu
            self.running_var = \
                self.momentum * self.running_var + (1-self.momentum) * var
        else:
            xc = x - self.running_mean
            xn = xc / ((np.sqrt(self.running_var + 10e-7)))
            
        out = self.gamma * xn + self.beta 
        return out

    def backward(self, dout):
        if dout.ndim != 2:
            N, C, H, W = dout.shape
            dout = dout.reshape(N, -1)

        dx = self.__backward(dout)

        dx = dx.reshape(*self.input_shape)
        return dx

    def __backward(self, dout):
        dbeta  = dout.sum(axis=0)
        dgamma = np.sum(self.xn * dout, axis=0)
        dxn  = self.gamma * dout
        dxc  = dxn / self.std
        dstd = -np.sum((dxn * self.xc) / (self.std * self.std), axis=0)
        dvar = 0.5 * dstd / self.std
        dxc  += (2.0 / self.batch_size) * self.xc * dvar
        dmu  = np.sum(dxc, axis=0)
        dx   = dxc - dmu / self.batch_size
        
        self.dgamma = dgamma
        self.dbeta = dbeta
        
        return dx

# http://arxiv.org/abs/1207.0580
class Dropout:
    def __init__(self, dropout_ratio=0.5):
        self.dropout_ratio = dropout_ratio
        self.mask = None

    def forward(self, x, train_flg=True):
        if train_flg:
            self.mask = np.random.rand(*x.shape) > self.dropout_ratio
            return x * self.mask
        else:
            return x * (1.0 - self.dropout_ratio)

    def backward(self, dout):
        return dout * self.mask

class SoftmaxWithLoss:
    def __init__(self):
        self.loss = None
        self.y = None # softmaxの出力
        self.t = None # 教師データ

    def forward(self, x, t):
        self.t = t
        self.y = self.softmax(x)
        self.loss = self.cross_entropy_error(self.y, self.t)
        
        return self.loss

    def cross_entropy_error(self, y, t):
        if y.ndim == 1:
            t = t.reshape(1, t.size)
            y = y.reshape(1, y.size)

        # 教師データがone-hot-vectorの場合、正解ラベルのインデックスに変換
        if t.size == y.size:
            t = t.argmax(axis=1)

        batch_size = y.shape[0]
        return -np.sum(np.log(y[np.arange(batch_size), t] + 1e-7)) / batch_size

    def softmax(self,x):
        x = x - np.max(x, axis=-1, keepdims=True)   # オーバーフロー対策
        return np.exp(x) / np.sum(np.exp(x), axis=-1, keepdims=True)

    def backward(self, dout=1):
        batch_size = self.t.shape[0]
        if self.t.size == self.y.size: # 教師データがone-hot-vectorの場合
            dx = (self.y - self.t) / batch_size
        else:
            dx = self.y.copy()
            dx[np.arange(batch_size), self.t] -= 1
            dx = dx / batch_size
        
        return dx
    

class MyMnist:
    def __init__(self):
        pass

    def load_mnist(self):
        data_files = self.download_mnist()
        # convert numpy
        dataset = {}
        dataset['train_img']   = self.load_img(  data_files['train_img'] )
        dataset['train_label'] = self.load_label(data_files['train_label'])
        dataset['test_img']    = self.load_img(  data_files['test_img']  )
        dataset['test_label']  = self.load_label(data_files['test_label'])

        for key in ('train_img', 'test_img'):
            dataset[key] = dataset[key].astype(np.float32)
            dataset[key] /= 255.0

        for key in ('train_label','test_label'):
            dataset[key]=self.change_one_hot_label( dataset[key] )

        return (dataset['train_img'],
                dataset['train_label'],
                dataset['test_img'],
                dataset['test_label'] )

    def change_one_hot_label(self,X):
        T = np.zeros((X.size, 10))
        for idx, row in enumerate(T):
            row[X[idx]] = 1
        return T
    
    def download_mnist(self):
        url_base = 'http://yann.lecun.com/exdb/mnist/'
        key_file = {'train_img'  :'train-images-idx3-ubyte.gz',
                    'train_label':'train-labels-idx1-ubyte.gz',
                    'test_img'   :'t10k-images-idx3-ubyte.gz',
                    'test_label' :'t10k-labels-idx1-ubyte.gz' }
        data_files = {}
        dataset_dir = os.path.dirname(os.path.abspath(__file__))
        
        for data_name, file_name in key_file.items():
            req_url   = url_base+file_name
            file_path = dataset_dir + "/" + file_name

            request  = urllib.request.Request( req_url )
            response = urllib.request.urlopen(request).read()
            with open(file_path, mode='wb') as f:
                f.write(response)
                
            data_files[data_name] = file_path
        return data_files

    def load_img( self,file_path):
        img_size    = 784 # = 28*28
        
        with gzip.open(file_path, 'rb') as f:
            data = np.frombuffer(f.read(), np.uint8, offset=16)
        data = data.reshape(-1, img_size)
        return data
    
    def load_label(self,file_path):
        with gzip.open(file_path, 'rb') as f:
            labels = np.frombuffer(f.read(), np.uint8, offset=8)
        return labels


# 確率的勾配降下法(Stochastic Gradient Descent)
class SGD:
    def __init__(self, lr=0.01):
        self.lr = lr
        
    def update(self, params, grads):
        for key in params.keys():
            params[key] -= self.lr * grads[key] 


if __name__ == '__main__':
    main()

上記を実行すると、以下のように表示されます。

グラフの青線が Batch Normalization に該当しますが、 こちらの方が、学習が速いことが分かります。

(dl_scratch) C:\Users\end0t\tmp\deep-learning-from-scratch\ch06>python foo4.py
1 / 16
epoch:0 | 0.097 - 0.101
epoch:1 | 0.116 - 0.116
epoch:2 | 0.116 - 0.128
epoch:3 | 0.116 - 0.144
epoch:4 | 0.116 - 0.146
epoch:5 | 0.116 - 0.156
epoch:6 | 0.116 - 0.189
epoch:7 | 0.116 - 0.194
epoch:8 | 0.116 - 0.219
epoch:9 | 0.116 - 0.239
epoch:10 | 0.116 - 0.259
epoch:11 | 0.116 - 0.272
epoch:12 | 0.116 - 0.288
epoch:13 | 0.116 - 0.302
epoch:14 | 0.116 - 0.318
epoch:15 | 0.116 - 0.331
epoch:16 | 0.116 - 0.336
epoch:17 | 0.116 - 0.352
epoch:18 | 0.116 - 0.372
epoch:19 | 0.116 - 0.38
No artists with labels found to put in legend.  Note that artists whose label start with an underscore are ignored when legend() is called with no argument.
2 / 16
epoch:0 | 0.094 - 0.081
foo4.py:220: RuntimeWarning: overflow encountered in square
  weight_decay += 0.5 * self.weight_decay_lambda * np.sum(W**2)
foo4.py:220: RuntimeWarning: invalid value encountered in double_scalars
  weight_decay += 0.5 * self.weight_decay_lambda * np.sum(W**2)
C:\Users\end0t\Anaconda2\envs\dl_scratch\lib\site-packages\numpy\core\fromnumeric.py:86: RuntimeWarning: overflow encountered in reduce
  return ufunc.reduce(obj, axis, dtype, out, **passkwargs)

<略>

No artists with labels found to put in legend.  Note that artists whose label start with an underscore are ignored when legend() is called with no argument.
16 / 16
epoch:0 | 0.1 - 0.121
epoch:1 | 0.117 - 0.341
epoch:2 | 0.117 - 0.43
epoch:3 | 0.117 - 0.416
epoch:4 | 0.117 - 0.4
epoch:5 | 0.117 - 0.462
epoch:6 | 0.117 - 0.446
epoch:7 | 0.117 - 0.454
epoch:8 | 0.117 - 0.52
epoch:9 | 0.117 - 0.533
epoch:10 | 0.117 - 0.52
epoch:11 | 0.117 - 0.527
epoch:12 | 0.117 - 0.482
epoch:13 | 0.117 - 0.523
epoch:14 | 0.117 - 0.52
epoch:15 | 0.117 - 0.522
epoch:16 | 0.117 - 0.516
epoch:17 | 0.117 - 0.523
epoch:18 | 0.117 - 0.544
epoch:19 | 0.117 - 0.572