先程のentryでは、 重み初期値が広がりが持つようXavierやHeを使用していますが Batch Normalizationという手法を追加することでも、 アクティベーション分布の広がりを調整できるようです。
尚、アクティベーションとは、Affine層による処理を指します。
Batch Normalization の利点
以下
- 学習を速く進行 (学習係数を大きくできる)
- 初期値にそれほど依存しない
- 過学習抑制 (Dropout 等の必要性を減らす)
Batch Normalization による正規化式
ただし、[tex: \large{μB、 σB2、ε}]は以下
ε:ゼロ除算を避ける為の小さな値。例 10e-7
上記の を以下のように変換。
Batch Normalization の計算グラフ
先程の数式は、以下の計算グラフで表すことがことができます。
Batch Normalization の python 実装
# coding: utf-8 import sys, os import numpy as np import matplotlib.pyplot as plt import urllib.request import gzip from collections import OrderedDict max_epochs = 20 batch_size = 100 learning_rate = 0.01 x_train = [] t_train = [] train_size = 0 def main(): # MNISTデータのdownload global x_train global t_train mymnist = MyMnist() (x_train, t_train, x_test, t_test) = mymnist.load_mnist() # 学習data削減 x_train = x_train[:1000] t_train = t_train[:1000] global train_size train_size = x_train.shape[0] # 3.グラフの描画========== # 対数スケールの配列生成 weight_scale_list = np.logspace(0, -4, num=16) # 等差数列の生成 x = np.arange(max_epochs) for i, w in enumerate(weight_scale_list): print(str(i+1), "/ 16" ) train_acc_list, bn_train_acc_list = __train(w) plt.subplot(4,4,i+1) plt.title("W:" + str(w)) if i == 15: plt.plot(x, bn_train_acc_list, label='Batch Normalization', markevery=2) plt.plot(x, train_acc_list, linestyle = "--", label='without BatchNorm', markevery=2) else: plt.plot(x, bn_train_acc_list, markevery=2) plt.plot(x, train_acc_list, linestyle="--", markevery=2) plt.ylim(0, 1.0) if i % 4: plt.yticks([]) else: plt.ylabel("accuracy") if i < 12: plt.xticks([]) else: plt.xlabel("epochs") plt.legend(loc='lower right') plt.show() def __train(weight_init_std): bn_network = MultiLayerNetExtend( input_size=784, hidden_size_list=[100, 100, 100, 100, 100], output_size=10, weight_init_std=weight_init_std, use_batchnorm=True) network = MultiLayerNetExtend( input_size=784, hidden_size_list=[100, 100, 100, 100, 100], output_size=10, weight_init_std=weight_init_std ) optimizer = SGD(lr=learning_rate) train_acc_list = [] bn_train_acc_list = [] iter_per_epoch = max(train_size / batch_size, 1) epoch_cnt = 0 for i in range(1000000000): batch_mask = np.random.choice(train_size, batch_size) x_batch = x_train[batch_mask] t_batch = t_train[batch_mask] for _network in (bn_network, network): grads = _network.gradient(x_batch, t_batch) optimizer.update(_network.params, grads) if i % iter_per_epoch == 0: train_acc = network.accuracy(x_train, t_train) bn_train_acc = bn_network.accuracy(x_train, t_train) train_acc_list.append(train_acc) bn_train_acc_list.append(bn_train_acc) print("epoch:" + str(epoch_cnt) + " | " + str(train_acc) + " - " + str(bn_train_acc)) epoch_cnt += 1 if epoch_cnt >= max_epochs: break return train_acc_list, bn_train_acc_list # 拡張版の全結合による多層ニューラルネットワーク # ( Weiht Decay、Dropout、Batch Normalizationの機能を持つ ) class MultiLayerNetExtend: def __init__( self, input_size, # 入力size(MNISTの場合784) hidden_size_list, # 隠れ層のneuron数list(例[100, 100, 100]) output_size, # 出力size (MNISTの場合は10) activation='relu', # 活性化関数 relu sigmoid weight_init_std='relu', # ※ weight_decay_lambda=0, # Weight Decay(L2ノルム)の強さ use_dropout = False, dropout_ration = 0.5, # Dropoutの割り合い use_batchnorm=False): # ※ weight_init_std : 重みの標準偏差を指定(e.g. 0.01) # relu or he の「Heの初期値」、 # sigmoid or xavierの場合「Xavierの初期値」 self.input_size = input_size self.output_size = output_size self.hidden_size_list = hidden_size_list self.hidden_layer_num = len(hidden_size_list) self.use_dropout = use_dropout self.weight_decay_lambda= weight_decay_lambda self.use_batchnorm = use_batchnorm self.params = {} # 重みの初期化 self.__init_weight(weight_init_std) # レイヤの生成 activation_layer = {'sigmoid': Sigmoid, 'relu': Relu} self.layers = OrderedDict() for idx in range(1, self.hidden_layer_num+1): self.layers['Affine' + str(idx)] = Affine(self.params['W' + str(idx)], self.params['b' + str(idx)]) if self.use_batchnorm: self.params['gamma' + str(idx)] = np.ones(hidden_size_list[idx-1]) self.params['beta' + str(idx)] = np.zeros(hidden_size_list[idx-1]) self.layers['BatchNorm' + str(idx)] = \ BatchNormalization( self.params['gamma'+ str(idx)], self.params['beta' + str(idx)] ) self.layers['Activation_function' + str(idx)] =\ activation_layer[activation]() if self.use_dropout: self.layers['Dropout' + str(idx)] = Dropout(dropout_ration) idx = self.hidden_layer_num + 1 self.layers['Affine' + str(idx)] = Affine(self.params['W' + str(idx)], self.params['b' + str(idx)]) self.last_layer = SoftmaxWithLoss() # 重みの初期値設定 def __init_weight(self, weight_init_std): # weight_init_std : 重みの標準偏差を指定(e.g. 0.01) # 'relu'または'he'を指定した場合は「Heの初期値」を設定 # 'sigmoid'または'xavier'を指定した場合は「Xavierの初期値」を設定 all_size_list = \ [self.input_size] + self.hidden_size_list + [self.output_size] for idx in range(1, len(all_size_list)): scale = weight_init_std # ReLUを使う場合に推奨される初期値 if str(weight_init_std).lower() in ('relu', 'he'): scale = np.sqrt(2.0 / all_size_list[idx - 1]) # sigmoidを使う場合に推奨される初期値 elif str(weight_init_std).lower() in ('sigmoid', 'xavier'): scale = np.sqrt(1.0 / all_size_list[idx - 1]) self.params['W' + str(idx)] = \ scale * np.random.randn(all_size_list[idx-1], all_size_list[idx]) self.params['b' + str(idx)] = np.zeros(all_size_list[idx]) def predict(self, x, train_flg=False): for key, layer in self.layers.items(): if "Dropout" in key or "BatchNorm" in key: x = layer.forward(x, train_flg) else: x = layer.forward(x) return x # x: 入力data、t:教師ラベル def loss(self, x, t, train_flg=False): y = self.predict(x, train_flg) weight_decay = 0 for idx in range(1, self.hidden_layer_num + 2): W = self.params['W' + str(idx)] weight_decay += 0.5 * self.weight_decay_lambda * np.sum(W**2) return self.last_layer.forward(y, t) + weight_decay def accuracy(self, x, t): y = self.predict(x, train_flg=False) y = np.argmax(y, axis=1) if t.ndim != 1 : t = np.argmax(t, axis=1) accuracy = np.sum(y == t) / float(x.shape[0]) return accuracy # 勾配を求める (数値微分)。x: 入力data、t:教師ラベル def numerical_gradient(self, x, t): loss_W = lambda W: self.loss(x, t, train_flg=True) grads = {} for idx in range(1, self.hidden_layer_num+2): grads['W' + str(idx)] = \ self._numerical_gradient(loss_W, self.params['W' + str(idx)]) grads['b' + str(idx)] = \ self._numerical_gradient(loss_W, self.params['b' + str(idx)]) if self.use_batchnorm and idx != self.hidden_layer_num+1: grads['gamma' + str(idx)] = \ self._numerical_gradient(loss_W, self.params['gamma' + str(idx)]) grads['beta' + str(idx)] = \ self._numerical_gradient(loss_W, self.params['beta' + str(idx)]) return grads def _numerical_gradient(self, f, x): h = 1e-4 # 0.0001 grad = np.zeros_like(x) it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite']) while not it.finished: idx = it.multi_index tmp_val = x[idx] x[idx] = tmp_val + h fxh1 = f(x) # f(x+h) x[idx] = tmp_val - h fxh2 = f(x) # f(x-h) grad[idx] = (fxh1 - fxh2) / (2*h) x[idx] = tmp_val # 値を元に戻す it.iternext() return grad def gradient(self, x, t): # forward self.loss(x, t, train_flg=True) # backward dout = 1 dout = self.last_layer.backward(dout) layers = list(self.layers.values()) layers.reverse() for layer in layers: dout = layer.backward(dout) # 設定 grads = {} for idx in range(1, self.hidden_layer_num+2): grads['W' + str(idx)] = \ self.layers['Affine' + str(idx)].dW + \ self.weight_decay_lambda * self.params['W' + str(idx)] grads['b' + str(idx)] = self.layers['Affine' + str(idx)].db if self.use_batchnorm and idx != self.hidden_layer_num+1: grads['gamma' + str(idx)]= \ self.layers['BatchNorm' + str(idx)].dgamma grads['beta' + str(idx)] = \ self.layers['BatchNorm' + str(idx)].dbeta return grads class Affine: def __init__(self, W, b): self.W =W self.b = b self.x = None self.original_x_shape = None # 重み・バイアスパラメータの微分 self.dW = None self.db = None def forward(self, x): # テンソル対応 self.original_x_shape = x.shape x = x.reshape(x.shape[0], -1) self.x = x out = np.dot(self.x, self.W) + self.b return out def backward(self, dout): dx = np.dot(dout, self.W.T) self.dW = np.dot(self.x.T, dout) self.db = np.sum(dout, axis=0) dx = dx.reshape(*self.original_x_shape) return dx class Sigmoid: def __init__(self): self.out = None def forward(self, x): out = sigmoid(x) self.out = out return out def backward(self, dout): dx = dout * (1.0 - self.out) * self.out return dx class Relu: def __init__(self): self.mask = None def forward(self, x): self.mask = (x <= 0) out = x.copy() out[self.mask] = 0 return out def backward(self, dout): dout[self.mask] = 0 dx = dout return dx # http://arxiv.org/abs/1502.03167 class BatchNormalization: def __init__(self, gamma, beta, momentum=0.9, running_mean=None, running_var=None): self.gamma = gamma self.beta = beta self.momentum = momentum self.input_shape = None # Conv層の場合は4次元、全結合層の場合は2次元 # テスト時に使用する平均と分散 self.running_mean = running_mean self.running_var = running_var # backward時に使用する中間データ self.batch_size = None self.xc = None self.std = None self.dgamma = None self.dbeta = None def forward(self, x, train_flg=True): self.input_shape = x.shape if x.ndim != 2: N, C, H, W = x.shape x = x.reshape(N, -1) out = self.__forward(x, train_flg) return out.reshape(*self.input_shape) def __forward(self, x, train_flg): if self.running_mean is None: N, D = x.shape self.running_mean = np.zeros(D) self.running_var = np.zeros(D) if train_flg: mu = x.mean(axis=0) xc = x - mu var = np.mean(xc**2, axis=0) std = np.sqrt(var + 10e-7) xn = xc / std self.batch_size = x.shape[0] self.xc = xc self.xn = xn self.std = std self.running_mean = \ self.momentum * self.running_mean + (1-self.momentum) * mu self.running_var = \ self.momentum * self.running_var + (1-self.momentum) * var else: xc = x - self.running_mean xn = xc / ((np.sqrt(self.running_var + 10e-7))) out = self.gamma * xn + self.beta return out def backward(self, dout): if dout.ndim != 2: N, C, H, W = dout.shape dout = dout.reshape(N, -1) dx = self.__backward(dout) dx = dx.reshape(*self.input_shape) return dx def __backward(self, dout): dbeta = dout.sum(axis=0) dgamma = np.sum(self.xn * dout, axis=0) dxn = self.gamma * dout dxc = dxn / self.std dstd = -np.sum((dxn * self.xc) / (self.std * self.std), axis=0) dvar = 0.5 * dstd / self.std dxc += (2.0 / self.batch_size) * self.xc * dvar dmu = np.sum(dxc, axis=0) dx = dxc - dmu / self.batch_size self.dgamma = dgamma self.dbeta = dbeta return dx # http://arxiv.org/abs/1207.0580 class Dropout: def __init__(self, dropout_ratio=0.5): self.dropout_ratio = dropout_ratio self.mask = None def forward(self, x, train_flg=True): if train_flg: self.mask = np.random.rand(*x.shape) > self.dropout_ratio return x * self.mask else: return x * (1.0 - self.dropout_ratio) def backward(self, dout): return dout * self.mask class SoftmaxWithLoss: def __init__(self): self.loss = None self.y = None # softmaxの出力 self.t = None # 教師データ def forward(self, x, t): self.t = t self.y = self.softmax(x) self.loss = self.cross_entropy_error(self.y, self.t) return self.loss def cross_entropy_error(self, y, t): if y.ndim == 1: t = t.reshape(1, t.size) y = y.reshape(1, y.size) # 教師データがone-hot-vectorの場合、正解ラベルのインデックスに変換 if t.size == y.size: t = t.argmax(axis=1) batch_size = y.shape[0] return -np.sum(np.log(y[np.arange(batch_size), t] + 1e-7)) / batch_size def softmax(self,x): x = x - np.max(x, axis=-1, keepdims=True) # オーバーフロー対策 return np.exp(x) / np.sum(np.exp(x), axis=-1, keepdims=True) def backward(self, dout=1): batch_size = self.t.shape[0] if self.t.size == self.y.size: # 教師データがone-hot-vectorの場合 dx = (self.y - self.t) / batch_size else: dx = self.y.copy() dx[np.arange(batch_size), self.t] -= 1 dx = dx / batch_size return dx class MyMnist: def __init__(self): pass def load_mnist(self): data_files = self.download_mnist() # convert numpy dataset = {} dataset['train_img'] = self.load_img( data_files['train_img'] ) dataset['train_label'] = self.load_label(data_files['train_label']) dataset['test_img'] = self.load_img( data_files['test_img'] ) dataset['test_label'] = self.load_label(data_files['test_label']) for key in ('train_img', 'test_img'): dataset[key] = dataset[key].astype(np.float32) dataset[key] /= 255.0 for key in ('train_label','test_label'): dataset[key]=self.change_one_hot_label( dataset[key] ) return (dataset['train_img'], dataset['train_label'], dataset['test_img'], dataset['test_label'] ) def change_one_hot_label(self,X): T = np.zeros((X.size, 10)) for idx, row in enumerate(T): row[X[idx]] = 1 return T def download_mnist(self): url_base = 'http://yann.lecun.com/exdb/mnist/' key_file = {'train_img' :'train-images-idx3-ubyte.gz', 'train_label':'train-labels-idx1-ubyte.gz', 'test_img' :'t10k-images-idx3-ubyte.gz', 'test_label' :'t10k-labels-idx1-ubyte.gz' } data_files = {} dataset_dir = os.path.dirname(os.path.abspath(__file__)) for data_name, file_name in key_file.items(): req_url = url_base+file_name file_path = dataset_dir + "/" + file_name request = urllib.request.Request( req_url ) response = urllib.request.urlopen(request).read() with open(file_path, mode='wb') as f: f.write(response) data_files[data_name] = file_path return data_files def load_img( self,file_path): img_size = 784 # = 28*28 with gzip.open(file_path, 'rb') as f: data = np.frombuffer(f.read(), np.uint8, offset=16) data = data.reshape(-1, img_size) return data def load_label(self,file_path): with gzip.open(file_path, 'rb') as f: labels = np.frombuffer(f.read(), np.uint8, offset=8) return labels # 確率的勾配降下法(Stochastic Gradient Descent) class SGD: def __init__(self, lr=0.01): self.lr = lr def update(self, params, grads): for key in params.keys(): params[key] -= self.lr * grads[key] if __name__ == '__main__': main()
上記を実行すると、以下のように表示されます。
グラフの青線が Batch Normalization に該当しますが、 こちらの方が、学習が速いことが分かります。
(dl_scratch) C:\Users\end0t\tmp\deep-learning-from-scratch\ch06>python foo4.py 1 / 16 epoch:0 | 0.097 - 0.101 epoch:1 | 0.116 - 0.116 epoch:2 | 0.116 - 0.128 epoch:3 | 0.116 - 0.144 epoch:4 | 0.116 - 0.146 epoch:5 | 0.116 - 0.156 epoch:6 | 0.116 - 0.189 epoch:7 | 0.116 - 0.194 epoch:8 | 0.116 - 0.219 epoch:9 | 0.116 - 0.239 epoch:10 | 0.116 - 0.259 epoch:11 | 0.116 - 0.272 epoch:12 | 0.116 - 0.288 epoch:13 | 0.116 - 0.302 epoch:14 | 0.116 - 0.318 epoch:15 | 0.116 - 0.331 epoch:16 | 0.116 - 0.336 epoch:17 | 0.116 - 0.352 epoch:18 | 0.116 - 0.372 epoch:19 | 0.116 - 0.38 No artists with labels found to put in legend. Note that artists whose label start with an underscore are ignored when legend() is called with no argument. 2 / 16 epoch:0 | 0.094 - 0.081 foo4.py:220: RuntimeWarning: overflow encountered in square weight_decay += 0.5 * self.weight_decay_lambda * np.sum(W**2) foo4.py:220: RuntimeWarning: invalid value encountered in double_scalars weight_decay += 0.5 * self.weight_decay_lambda * np.sum(W**2) C:\Users\end0t\Anaconda2\envs\dl_scratch\lib\site-packages\numpy\core\fromnumeric.py:86: RuntimeWarning: overflow encountered in reduce return ufunc.reduce(obj, axis, dtype, out, **passkwargs) <略> No artists with labels found to put in legend. Note that artists whose label start with an underscore are ignored when legend() is called with no argument. 16 / 16 epoch:0 | 0.1 - 0.121 epoch:1 | 0.117 - 0.341 epoch:2 | 0.117 - 0.43 epoch:3 | 0.117 - 0.416 epoch:4 | 0.117 - 0.4 epoch:5 | 0.117 - 0.462 epoch:6 | 0.117 - 0.446 epoch:7 | 0.117 - 0.454 epoch:8 | 0.117 - 0.52 epoch:9 | 0.117 - 0.533 epoch:10 | 0.117 - 0.52 epoch:11 | 0.117 - 0.527 epoch:12 | 0.117 - 0.482 epoch:13 | 0.117 - 0.523 epoch:14 | 0.117 - 0.52 epoch:15 | 0.117 - 0.522 epoch:16 | 0.117 - 0.516 epoch:17 | 0.117 - 0.523 epoch:18 | 0.117 - 0.544 epoch:19 | 0.117 - 0.572