AIって結局何なのかよく分からないので、とりあえず100日間勉強してみた Day73
経緯についてはこちらをご参照ください。
■本日の進捗
畳み込みニューラルネットワークを理解
■はじめに
今回も「ゼロから作るDeep Learning Pythonで学ぶディープラーニングの理論と実装(オライリー・ジャパン)」で、深層学習を学んでいきます。
今回はこれまで構築してきた畳み込み層やプーリング層のモジュールを用いて畳み込みニューラルネットワークを実装していきたいと思います。
■CNNの実装
今回はCNNの実装に四苦八苦しました…
結論から言うとデバックしきれていません。という訳で今回は下記の参考文献のコードをほぼ丸パクリしています。そのせいか不明ですがこれまでのニューラルネットワークと比較して学習にめちゃめちゃ時間がかかります。そのうちちゃんと自力で実装するのでご容赦ください…
import sys import numpy as np import matplotlib.pyplot as plt sys.path.append("./") from work.mnist import load_mnist import pickle from collections import OrderedDict def conv_output_size(input_size, filter_size, stride=1, pad=0): return (input_size + 2*pad - filter_size) / stride + 1 def im2col(input_data, filter_h, filter_w, stride=1, pad=0): """ Parameters ---------- input_data : (データ数, チャンネル, 高さ, 幅)の4次元配列からなる入力データ filter_h : フィルターの高さ filter_w : フィルターの幅 stride : ストライド pad : パディング Returns ------- col : 2次元配列 """ N, C, H, W = input_data.shape out_h = (H + 2*pad - filter_h)//stride + 1 out_w = (W + 2*pad - filter_w)//stride + 1 img = np.pad(input_data, [(0,0), (0,0), (pad, pad), (pad, pad)], 'constant') col = np.zeros((N, C, filter_h, filter_w, out_h, out_w)) for y in range(filter_h): y_max = y + stride*out_h for x in range(filter_w): x_max = x + stride*out_w col[:, :, y, x, :, :] = img[:, :, y:y_max:stride, x:x_max:stride] col = col.transpose(0, 4, 5, 1, 2, 3).reshape(N*out_h*out_w, -1) return col def col2im(col, input_shape, filter_h, filter_w, stride=1, pad=0): """ Parameters ---------- col : input_shape : 入力データの形状(例:(10, 1, 28, 28)) filter_h : filter_w stride pad Returns ------- """ N, C, H, W = input_shape out_h = (H + 2*pad - filter_h)//stride + 1 out_w = (W + 2*pad - filter_w)//stride + 1 col = col.reshape(N, out_h, out_w, C, filter_h, filter_w).transpose(0, 3, 4, 5, 1, 2) img = np.zeros((N, C, H + 2*pad + stride - 1, W + 2*pad + stride - 1)) for y in range(filter_h): y_max = y + stride*out_h for x in range(filter_w): x_max = x + stride*out_w img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :] return img[:, :, pad:H + pad, pad:W + pad] def numerical_gradient(f, x): h = 1e-4 # 0.0001 grad = np.zeros_like(x) it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite']) while not it.finished: idx = it.multi_index tmp_val = x[idx] x[idx] = tmp_val + h fxh1 = f(x) # f(x+h) x[idx] = tmp_val - h fxh2 = f(x) # f(x-h) grad[idx] = (fxh1 - fxh2) / (2*h) x[idx] = tmp_val # 値を元に戻す it.iternext() return grad class Relu: def __init__(self): self.mask = None def forward(self, x): self.mask = (x <= 0) out = x.copy() out[self.mask] = 0 return out def backward(self, dout): dout[self.mask] = 0 dx = dout return dx def sigmoid(x): return 1 / (1 + np.exp(-x)) class Sigmoid: def __init__(self): self.out = None def forward(self, x): out = sigmoid(x) self.out = out return out def backward(self, dout): dx = dout * (1.0 - self.out) * self.out return dx class Affine: def __init__(self, W, b): self.W =W self.b = b self.x = None self.original_x_shape = None # 重み・バイアスパラメータの微分 self.dW = None self.db = None def forward(self, x): # テンソル対応 self.original_x_shape = x.shape x = x.reshape(x.shape[0], -1) self.x = x out = np.dot(self.x, self.W) + self.b return out def backward(self, dout): dx = np.dot(dout, self.W.T) self.dW = np.dot(self.x.T, dout) self.db = np.sum(dout, axis=0) dx = dx.reshape(*self.original_x_shape) # 入力データの形状に戻す(テンソル対応) return dx # Convolutionクラス class Convolution: def __init__(self, W, b, stride=1, pad=0): self.W = W self.b = b self.stride = stride self.pad = pad self.x = None self.col = None self.col_W = None self.dW = None self.db = None def forward(self, x): FN, C, FH, FW = self.W.shape N, C, H, W = x.shape out_h = 1 + int((H + 2*self.pad - FH) / self.stride) out_w = 1 + int((W + 2*self.pad - FW) / self.stride) col = im2col(x, FH, FW, self.stride, self.pad) col_W = self.W.reshape(FN, -1).T out = np.dot(col, col_W) + self.b out = out.reshape(N, out_h, out_w, -1).transpose(0, 3, 1, 2) self.x = x self.col = col self.col_W = col_W return out def backward(self, dout): FN, C, FH, FW = self.W.shape dout = dout.transpose(0,2,3,1).reshape(-1, FN) self.db = np.sum(dout, axis=0) self.dW = np.dot(self.col.T, dout) self.dW = self.dW.transpose(1, 0).reshape(FN, C, FH, FW) dcol = np.dot(dout, self.col_W.T) dx = col2im(dcol, self.x.shape, FH, FW, self.stride, self.pad) return dx # Poolingクラス class Pooling: def __init__(self, pool_h, pool_w, stride=2, pad=0): self.pool_h = pool_h self.pool_w = pool_w self.stride = stride self.pad = pad self.x = None self.arg_max = None def forward(self, x): N, C, H, W = x.shape out_h = int(1 + (H - self.pool_h) / self.stride) out_w = int(1 + (W - self.pool_w) / self.stride) col = im2col(x, self.pool_h, self.pool_w, self.stride, self.pad) col = col.reshape(-1, self.pool_h * self.pool_w) arg_max = np.argmax(col, axis=1) out = np.max(col, axis=1) out = out.reshape(N, out_h, out_w, C).transpose(0, 3, 1, 2) self.x = x self.arg_max = arg_max return out def backward(self, dout): dout = dout.transpose(0, 2, 3, 1) pool_size = self.pool_h * self.pool_w dmax = np.zeros((dout.size, pool_size)) dmax[np.arange(self.arg_max.size), self.arg_max.flatten()] = dout.flatten() dmax = dmax.reshape(dout.shape + (pool_size,)) dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1) dx = col2im(dcol, self.x.shape, self.pool_h, self.pool_w, self.stride, self.pad) return dx def softmax(x): x = x - np.max(x, axis=-1, keepdims=True) # オーバーフロー対策 return np.exp(x) / np.sum(np.exp(x), axis=-1, keepdims=True) def cross_entropy_error(y, t): delta = 1e-7 # tがクラスインデックスの形状であれば、one-hotエンコーディングに変換する if t.ndim == 1: t = np.eye(y.shape[1])[t] return -np.sum(t * np.log(y + delta)) / y.shape[0] class SoftmaxWithLoss: def __init__(self): self.loss = None self.y = None # softmaxの出力 self.t = None # 教師データ def forward(self, x, t): self.t = t self.y = softmax(x) self.loss = cross_entropy_error(self.y, self.t) return self.loss def backward(self, dout=1): batch_size = self.t.shape[0] if self.t.size == self.y.size: # 教師データがone-hot-vectorの場合 dx = (self.y - self.t) / batch_size else: dx = self.y.copy() dx[np.arange(batch_size), self.t] -= 1 dx = dx / batch_size return dx # 畳み込みニューラルネットワーク class SimpleConvNet: """単純なConvNet conv - relu - pool - affine - relu - affine - softmax Parameters ---------- input_size : 入力サイズ(MNISTの場合は784) hidden_size_list : 隠れ層のニューロンの数のリスト(e.g. [100, 100, 100]) output_size : 出力サイズ(MNISTの場合は10) activation : 'relu' or 'sigmoid' weight_init_std : 重みの標準偏差を指定(e.g. 0.01) 'relu'または'he'を指定した場合は「Heの初期値」を設定 'sigmoid'または'xavier'を指定した場合は「Xavierの初期値」を設定 """ def __init__(self, input_dim=(1, 28, 28), conv_param={'filter_num':30, 'filter_size':5, 'pad':0, 'stride':1}, hidden_size=100, output_size=10, weight_init_std=0.01): filter_num = conv_param['filter_num'] filter_size = conv_param['filter_size'] filter_pad = conv_param['pad'] filter_stride = conv_param['stride'] input_size = input_dim[1] conv_output_size = (input_size - filter_size + 2*filter_pad) / filter_stride + 1 pool_output_size = int(filter_num * (conv_output_size/2) * (conv_output_size/2)) # 重みの初期化 self.params = {} self.params['W1'] = weight_init_std * \ np.random.randn(filter_num, input_dim[0], filter_size, filter_size) self.params['b1'] = np.zeros(filter_num) self.params['W2'] = weight_init_std * \ np.random.randn(pool_output_size, hidden_size) self.params['b2'] = np.zeros(hidden_size) self.params['W3'] = weight_init_std * \ np.random.randn(hidden_size, output_size) self.params['b3'] = np.zeros(output_size) # レイヤの生成 self.layers = OrderedDict() self.layers['Conv1'] = Convolution(self.params['W1'], self.params['b1'], conv_param['stride'], conv_param['pad']) self.layers['Relu1'] = Relu() self.layers['Pool1'] = Pooling(pool_h=2, pool_w=2, stride=2) self.layers['Affine1'] = Affine(self.params['W2'], self.params['b2']) self.layers['Relu2'] = Relu() self.layers['Affine2'] = Affine(self.params['W3'], self.params['b3']) self.last_layer = SoftmaxWithLoss() def predict(self, x): for layer in self.layers.values(): x = layer.forward(x) return x def loss(self, x, t): """損失関数を求める 引数のxは入力データ、tは教師ラベル """ y = self.predict(x) return self.last_layer.forward(y, t) def accuracy(self, x, t, batch_size=100): if t.ndim != 1 : t = np.argmax(t, axis=1) acc = 0.0 for i in range(int(x.shape[0] / batch_size)): tx = x[i*batch_size:(i+1)*batch_size] tt = t[i*batch_size:(i+1)*batch_size] y = self.predict(tx) y = np.argmax(y, axis=1) acc += np.sum(y == tt) return acc / x.shape[0] def numerical_gradient(self, x, t): """勾配を求める(数値微分) Parameters ---------- x : 入力データ t : 教師ラベル Returns ------- 各層の勾配を持ったディクショナリ変数 grads['W1']、grads['W2']、...は各層の重み grads['b1']、grads['b2']、...は各層のバイアス """ loss_w = lambda w: self.loss(x, t) grads = {} for idx in (1, 2, 3): grads['W' + str(idx)] = numerical_gradient(loss_w, self.params['W' + str(idx)]) grads['b' + str(idx)] = numerical_gradient(loss_w, self.params['b' + str(idx)]) return grads def gradient(self, x, t): """勾配を求める(誤差逆伝搬法) Parameters ---------- x : 入力データ t : 教師ラベル Returns ------- 各層の勾配を持ったディクショナリ変数 grads['W1']、grads['W2']、...は各層の重み grads['b1']、grads['b2']、...は各層のバイアス """ # forward self.loss(x, t) # backward dout = 1 dout = self.last_layer.backward(dout) layers = list(self.layers.values()) layers.reverse() for layer in layers: dout = layer.backward(dout) # 設定 grads = {} grads['W1'], grads['b1'] = self.layers['Conv1'].dW, self.layers['Conv1'].db grads['W2'], grads['b2'] = self.layers['Affine1'].dW, self.layers['Affine1'].db grads['W3'], grads['b3'] = self.layers['Affine2'].dW, self.layers['Affine2'].db return grads def save_params(self, file_name="params.pkl"): params = {} for key, val in self.params.items(): params[key] = val with open(file_name, 'wb') as f: pickle.dump(params, f) def load_params(self, file_name="params.pkl"): with open(file_name, 'rb') as f: params = pickle.load(f) for key, val in params.items(): self.params[key] = val for i, key in enumerate(['Conv1', 'Affine1', 'Affine2']): self.layers[key].W = self.params['W' + str(i+1)] self.layers[key].b = self.params['b' + str(i+1)] class SGD: """確率的勾配降下法(Stochastic Gradient Descent)""" def __init__(self, lr=0.01): self.lr = lr def update(self, params, grads): for key in params.keys(): params[key] -= self.lr * grads[key] class Adam: """Adam (http://arxiv.org/abs/1412.6980v8)""" def __init__(self, lr=0.001, beta1=0.9, beta2=0.999): self.lr = lr self.beta1 = beta1 self.beta2 = beta2 self.iter = 0 self.m = None self.v = None def update(self, params, grads): if self.m is None: self.m, self.v = {}, {} for key, val in params.items(): self.m[key] = np.zeros_like(val) self.v[key] = np.zeros_like(val) self.iter += 1 lr_t = self.lr * np.sqrt(1.0 - self.beta2**self.iter) / (1.0 - self.beta1**self.iter) for key in params.keys(): #self.m[key] = self.beta1*self.m[key] + (1-self.beta1)*grads[key] #self.v[key] = self.beta2*self.v[key] + (1-self.beta2)*(grads[key]**2) self.m[key] += (1 - self.beta1) * (grads[key] - self.m[key]) self.v[key] += (1 - self.beta2) * (grads[key]**2 - self.v[key]) params[key] -= lr_t * self.m[key] / (np.sqrt(self.v[key]) + 1e-7) #unbias_m += (1 - self.beta1) * (grads[key] - self.m[key]) # correct bias #unbisa_b += (1 - self.beta2) * (grads[key]*grads[key] - self.v[key]) # correct bias #params[key] += self.lr * unbias_m / (np.sqrt(unbisa_b) + 1e-7) class Trainer: """ニューラルネットの訓練を行うクラス """ def __init__(self, network, x_train, t_train, x_test, t_test, epochs=20, mini_batch_size=100, optimizer='SGD', optimizer_param={'lr':0.01}, evaluate_sample_num_per_epoch=None, verbose=True): self.network = network self.verbose = verbose self.x_train = x_train self.t_train = t_train self.x_test = x_test self.t_test = t_test self.epochs = epochs self.batch_size = mini_batch_size self.evaluate_sample_num_per_epoch = evaluate_sample_num_per_epoch # optimizer # optimizer_class_dict = {'sgd':SGD, 'momentum':Momentum, 'nesterov':Nesterov, # 'adagrad':AdaGrad, 'rmsprop':RMSprop, 'adam':Adam} optimizer_class_dict = {'sgd':SGD, 'adam':Adam} self.optimizer = optimizer_class_dict[optimizer.lower()](**optimizer_param) self.train_size = x_train.shape[0] self.iter_per_epoch = max(self.train_size / mini_batch_size, 1) self.max_iter = int(epochs * self.iter_per_epoch) self.current_iter = 0 self.current_epoch = 0 self.train_loss_list = [] self.train_acc_list = [] self.test_acc_list = [] def train_step(self): batch_mask = np.random.choice(self.train_size, self.batch_size) x_batch = self.x_train[batch_mask] t_batch = self.t_train[batch_mask] grads = self.network.gradient(x_batch, t_batch) self.optimizer.update(self.network.params, grads) loss = self.network.loss(x_batch, t_batch) self.train_loss_list.append(loss) if self.verbose: print("train loss:" + str(loss)) if self.current_iter % self.iter_per_epoch == 0: self.current_epoch += 1 x_train_sample, t_train_sample = self.x_train, self.t_train x_test_sample, t_test_sample = self.x_test, self.t_test if not self.evaluate_sample_num_per_epoch is None: t = self.evaluate_sample_num_per_epoch x_train_sample, t_train_sample = self.x_train[:t], self.t_train[:t] x_test_sample, t_test_sample = self.x_test[:t], self.t_test[:t] train_acc = self.network.accuracy(x_train_sample, t_train_sample) test_acc = self.network.accuracy(x_test_sample, t_test_sample) self.train_acc_list.append(train_acc) self.test_acc_list.append(test_acc) if self.verbose: print("=== epoch:" + str(self.current_epoch) + ", train acc:" + str(train_acc) + ", test acc:" + str(test_acc) + " ===") self.current_iter += 1 def train(self): for i in range(self.max_iter): self.train_step() test_acc = self.network.accuracy(self.x_test, self.t_test) if self.verbose: print("=============== Final Test Accuracy ===============") print("test acc:" + str(test_acc)) # データの読み込み (x_train, t_train), (x_test, t_test) = load_mnist(flatten=False) # 処理に時間のかかる場合はデータを削減 #x_train, t_train = x_train[:5000], t_train[:5000] #x_test, t_test = x_test[:1000], t_test[:1000] max_epochs = 20 network = SimpleConvNet(input_dim=(1,28,28), conv_param = {'filter_num': 30, 'filter_size': 5, 'pad': 0, 'stride': 1}, hidden_size=100, output_size=10, weight_init_std=0.01) trainer = Trainer(network, x_train, t_train, x_test, t_test, epochs=max_epochs, mini_batch_size=100, optimizer='Adam', optimizer_param={'lr': 0.001}, evaluate_sample_num_per_epoch=1000) trainer.train() # パラメータの保存 network.save_params("params.pkl") print("Saved Network Parameters!") # グラフの描画 markers = {'train': 'o', 'test': 's'} x = np.arange(max_epochs) plt.plot(x, trainer.train_acc_list, marker='o', label='train', markevery=2) plt.plot(x, trainer.test_acc_list, marker='s', label='test', markevery=2) plt.xlabel("epochs") plt.ylabel("accuracy") plt.ylim(0, 1.0) plt.legend(loc='lower right') plt.show()
■おわりに
必ずリベンジします!
■参考文献
- Andreas C. Muller, Sarah Guido. Pythonではじめる機械学習. 中田 秀基 訳. オライリー・ジャパン. 2017. 392p.
- 斎藤 康毅. ゼロから作るDeep Learning Pythonで学ぶディープラーニングの理論と実装. オライリー・ジャパン. 2016. 320p.
- ChatGPT. 4o mini. OpenAI. 2024. https://chatgpt.com/
- API Reference. scikit-learn.org. https://scikit-learn.org/stable/api/index.html
- PyTorch documentation. pytorch.org. https://pytorch.org/docs/stable/index.html
- Keiron O’Shea, Ryan Nash. An Introduction to Convolutional Neural Networks. https://ar5iv.labs.arxiv.org/html/1511.08458