AIって結局何なのかよく分からないので、とりあえず100日間勉強してみた Day86
経緯についてはこちらをご参照ください。
■本日の進捗
- 類推問題を理解
■はじめに
今回も「ゼロから作るDeep Learning② 自然言語処理編(オライリー・ジャパン)」から学んでいきます。
今回は、前回実装したAnalogyクラスやその他の学習アルゴリズムを組み合わせてAnalogy全体を実装していきます。
■Analogyの実装
先ほどのAnalogyクラスを用いて実際に簡易版PTBデータセットを学習させて類推問題も解いてみます。
import sys import os sys.path.append('..') import numpy as np import matplotlib.pyplot as plt import pickle from sklearn.utils.extmath import randomized_svd import collections GPU = False key_file = { 'train':'ptb.train.txt', 'test':'ptb.test.txt', 'valid':'ptb.valid.txt' } save_file = { 'train':'ptb.train.npy', 'test':'ptb.test.npy', 'valid':'ptb.valid.npy' } vocab_file = 'ptb.vocab.pkl' dataset_dir = os.path.dirname(os.path.abspath(__file__)) mid_path = '..\..\Download_Dataset\lstm-master\data' def load_vocab(): vocab_path = os.path.join(dataset_dir, vocab_file) print(vocab_path) if os.path.exists(vocab_path): with open(vocab_path, 'rb') as f: word_to_id, id_to_word = pickle.load(f) return word_to_id, id_to_word word_to_id = {} id_to_word = {} data_type = 'train' file_name = key_file[data_type] file_path = os.path.join(dataset_dir, mid_path, file_name) words = open(file_path).read().replace('\n', '<eos>').strip().split() for i, word in enumerate(words): if word not in word_to_id: tmp_id = len(word_to_id) word_to_id[word] = tmp_id id_to_word[tmp_id] = word with open(vocab_path, 'wb') as f: pickle.dump((word_to_id, id_to_word), f) return word_to_id, id_to_word def load_data(data_type='train'): if data_type == 'val': data_type = 'valid' save_path = dataset_dir + '\\' + save_file[data_type] print('save_path:', save_path) word_to_id, id_to_word = load_vocab() if os.path.exists(save_path): corpus = np.load(save_path) return corpus, word_to_id, id_to_word file_name = key_file[data_type] file_path = os.path.join(dataset_dir, mid_path, file_name) words = open(file_path).read().replace('\n', '<eos>').strip().split() corpus = np.array([word_to_id[w] for w in words]) np.save(save_path, corpus) return corpus, word_to_id, id_to_word class MatMul: def __init__(self, W): self.params = [W] self.grads = [np.zeros_like(W)] self.x = None def forward(self, x): W, = self.params out = np.dot(x, W) self.x = x return out def backward(self, dout): W, = self.params dx = np.dot(dout, W.T) dW = np.dot(self.x.T, dout) self.grads[0][...] = dW return dx class Embedding: def __init__(self, W): self.params = [W] self.grads = [np.zeros_like(W)] self.idx = None def forward(self, idx): W, = self.params self.idx = idx out = W[idx] return out def backward(self, dout): dW, = self.grads dW[...] = 0 for i, word_id in enumerate(self.idx): dW[word_id] += dout[i] return None class SoftmaxCrossEntropy: def __init__(self): self.output = None self.y_true = None self.loss = None def forward(self, logits, y_true): exp_values = np.exp(logits - np.max(logits, axis=1, keepdims=True)) self.output = exp_values / np.sum(exp_values, axis=1, keepdims=True) self.y_true = y_true self.loss = -np.sum(y_true * np.log(self.output + 1e-7)) / y_true.shape[0] return self.loss def backward(self): return (self.output - self.y_true) / self.y_true.shape[0] def cross_entropy_error(y, t): delta = 1e-7 return -np.sum(t * np.log(y + delta)) / y.shape[0] class Adam: def __init__(self, lr=0.001, beta1=0.9, beta2=0.999): self.lr = lr self.beta1 = beta1 self.beta2 = beta2 self.iter = 0 self.m = None self.v = None def update(self, params, grads): if self.m is None: self.m, self.v = {}, {} for key, val in params.items(): self.m[key] = np.zeros_like(val) self.v[key] = np.zeros_like(val) self.iter += 1 lr_t = self.lr * np.sqrt(1.0 - self.beta2 ** self.iter) / (1.0 - self.beta1 ** self.iter) for key in params.keys(): self.m[key] = self.beta1 * self.m[key] + (1 - self.beta1) * grads[key] self.v[key] = self.beta2 * self.v[key] + (1 - self.beta2) * (grads[key] ** 2) params[key] -= lr_t * self.m[key] / (np.sqrt(self.v[key]) + 1e-7) def preprocess(text): text = text.lower() text = text.replace('.', ' .') words = text.split(' ') word_to_id = {} id_to_word = {} for word in words: if word not in word_to_id: new_id = len(word_to_id) word_to_id[word] = new_id id_to_word[new_id] = word corpus = np.array([word_to_id[w] for w in words]) return corpus, word_to_id, id_to_word def convert_one_hot(corpus, vocab_size): N = corpus.shape[0] if corpus.ndim == 1: one_hot = np.zeros((N, vocab_size), dtype=np.int32) for idx, word_id in enumerate(corpus): one_hot[idx, word_id] = 1 elif corpus.ndim == 2: C = corpus.shape[1] one_hot = np.zeros((N, C, vocab_size), dtype=np.int32) for idx_0, word_id in enumerate(corpus): for idx_1, word_id in enumerate(word_id): one_hot[idx_0, idx_1, word_id] = 1 return one_hot def create_contexts_target(corpus, window_size=1): target = corpus[window_size:-window_size] contexts = [] for idx in range(window_size, len(corpus)-window_size): cs = [] for t in range(-window_size, window_size + 1): if t == 0: continue cs.append(corpus[idx + t]) contexts.append(cs) return np.array(contexts), np.array(target) def normalize(x): if x.ndim == 2: s = np.sqrt((x * x).sum(1)) x /= s.reshape((s.shape[0], 1)) elif x.ndim == 1: s = np.sqrt((x * x).sum()) x /= s return x def analogy(a, b, c, word_to_id, id_to_word, word_matrix, top=5, answer=None): for word in (a, b, c): if word not in word_to_id: print('%s is not found' % word) return print('\n[analogy] ' + a + ':' + b + ' = ' + c + ':?') a_vec, b_vec, c_vec = word_matrix[word_to_id[a]], word_matrix[word_to_id[b]], word_matrix[word_to_id[c]] query_vec = b_vec - a_vec + c_vec query_vec = normalize(query_vec) similarity = np.dot(word_matrix, query_vec) if answer is not None: print("==>" + answer + ":" + str(np.dot(word_matrix[word_to_id[answer]], query_vec))) count = 0 for i in (-1 * similarity).argsort(): if np.isnan(similarity[i]): continue if id_to_word[i] in (a, b, c): continue print(' {0}: {1}'.format(id_to_word[i], similarity[i])) count += 1 if count >= top: return class EmbeddingDot: def __init__(self, W, embedding_dim): self.W = W self.embedding_dim = embedding_dim self.embed = Embedding(self.W) self.params = [self.W] self.grads = [np.zeros_like(self.W)] self.cache = None def forward(self, h, idx): target_W = self.embed.forward(idx) out = np.sum(target_W * h, axis=1) self.cache = (h, target_W) return out def backward(self, dout): h, target_W = self.cache dout = dout.reshape(dout.shape[0], 1) dtarget_W = dout * h self.embed.backward(dtarget_W) dh = dout * target_W return dh class UnigramSampler: def __init__(self, corpus, power, sample_size): self.sample_size = sample_size self.vocab_size = None self.word_p = None counts = collections.Counter() for word_id in corpus: corpus[word_id] += 1 vocab_size = len(corpus) self.vocab_size = vocab_size self.word_p = np.zeros(vocab_size) for i in range(vocab_size): self.word_p[i] = counts[i] self.word_p = np.power(self.word_p, power) def get_negative_sample(self, target): batch_size = target.shape[0] if not GPU: negative_sample = np.zeros((batch_size, self.sample_size), dtype=np.int32) for i in range(batch_size): p = self.word_p.copy() target_idx = target[i] p[target_idx] = 0 p /= p.sum() negative_sample[i, :] = np.random.choice(self.vocab_size, size=self.sample_size, replace=False, p=p) else: negative_sample = np.random.choice(self.vocab_size, size=(batch_size, self.sample_size), replace=True, p=self.word_p) return negative_sample class SigmoidWithLoss: def __init__(self): self.params, self.grads = [], [] self.loss = None self.y = None self.t = None def forward(self, h, t): self.y = 1 / (1 + np.exp(-h)) if t.ndim == 1: if self.y.ndim == 1: t = t[:, np.newaxis] else: t = np.eye(self.y.shape[1])[t] self.t = t self.loss = cross_entropy_error(self.y, self.t) return self.loss def backward(self, dout=1): batch_size = self.t.shape[0] t = self.t.squeeze() batch_size = self.t.shape[0] dx = (self.y - t) / batch_size dx *= dout return dx class NegativeSamplingLoss: def __init__(self, W, corpus, power=0.75, sample_size=5): self.sample_size = sample_size self.sampler = UnigramSampler(corpus, power, sample_size) embedding_dim = W.shape[1] self.loss_layers = [SigmoidWithLoss() for _ in range(sample_size + 1)] self.embed_dot_layers = [EmbeddingDot(W, embedding_dim) for _ in range(sample_size + 1)] self.params, self.grads = [], [] self.params += [W] for layer in self.embed_dot_layers: self.grads += layer.grads def forward(self, h, target): batch_size = target.shape[0] negative_sample = self.sampler.get_negative_sample(target) score = self.embed_dot_layers[0].forward(h, target) correct_label = np.ones(batch_size, dtype=np.int32) loss = self.loss_layers[0].forward(score, correct_label) negative_label = np.zeros(batch_size, dtype=np.int32) for i in range(self.sample_size): negative_target = negative_sample[:, i] negative_target = np.clip(negative_target, 0, vocab_size - 1) score = self.embed_dot_layers[1 + i].forward(h, negative_target) loss += self.loss_layers[1 + i].forward(score, negative_label) return loss def backward(self, dout=1): dh = 0 for l0, l1 in zip(self.loss_layers, self.embed_dot_layers): dscore = l0.backward(dout) dh += l1.backward(dscore) return dh class SimpleCBoW: def __init__(self, vocab_size, hidden_size): V, H = vocab_size, hidden_size W_in = 0.01 * np.random.randn(V, H).astype('f') W_out = 0.01 * np.random.randn(H, V).astype('f') self.layers = [ MatMul(W_in), MatMul(W_out) ] self.loss_layer = SoftmaxCrossEntropy() self.params, self.grads = [], [] for layer in self.layers: self.params += layer.params self.grads += layer.grads self.word_vecs = W_in def forward(self, contexts, target): h0 = self.layers[0].forward(contexts[:, 0]) h1 = self.layers[0].forward(contexts[:, 1]) h = (h0 + h1) * 0.5 score = self.layers[1].forward(h) loss = self.loss_layer.forward(score, target) return loss def backward(self, dout=1): ds = self.loss_layer.backward() da = self.layers[1].backward(ds) da *= 0.5 self.layers[0].backward(da) return None class CBOW: def __init__(self, vocab_size, hidden_size, window_size, corpus): V, H = vocab_size, hidden_size W_in = 0.01 * np.random.randn(V, H).astype('f') W_out = 0.01 * np.random.randn(V, H).astype('f') self.in_layers = [] for i in range(2 * window_size): layer = Embedding(W_in) self.in_layers.append(layer) self.ns_loss = NegativeSamplingLoss(W_out, corpus, power=0.75, sample_size=5) layers = self.in_layers + [self.ns_loss] self.params, self.grads = [], [] for layer in layers: self.params += layer.params self.grads += layer.grads self.word_vecs = W_in def forward(self, contexts, target): h = 0 for i, layer in enumerate(self.in_layers): h += layer.forward(contexts[:, i]) h *= 1 / len(self.in_layers) loss = self.ns_loss.forward(h, target) return loss def backward(self, dout=1): dout = self.ns_loss.backward(dout) dout *= 1 / len(self.in_layers) for layer in self.in_layers: layer.backward(dout) return None window_size = 1 hidden_size = 5 batch_size = 3 max_epoch = 10 wordvec_size = 100 corpus, word_to_id, id_to_word = load_data('train') vocab_size = len(word_to_id) contexts, target = create_contexts_target(corpus, window_size) model = CBOW(vocab_size, hidden_size, window_size, corpus) optimizer = Adam() params = {} grads = {} losses = [] for epoch in range(max_epoch): epoch_loss = 0 for i in range(0, len(contexts), batch_size): contexts_batch = contexts[i:i+batch_size] target_batch = target[i:i+batch_size] loss = model.forward(contexts_batch, target_batch) model.backward() optimizer.update(params, grads) epoch_loss += loss avg_loss = epoch_loss / (len(contexts) / batch_size) losses.append(avg_loss) print(f'Epoch {epoch} completed, Avg Loss: {avg_loss}') plt.plot(range(max_epoch), losses, marker='o') plt.xlabel('Epochs') plt.ylabel('Loss') plt.title('NeuralNetBased Analogy learning') plt.show() word_vecs = model.word_vecs analogy('king', 'queen', 'man', word_to_id, id_to_word, word_vecs) analogy('take', 'took', 'go', word_to_id, id_to_word, word_vecs) analogy('car', 'cars', 'child', word_to_id, id_to_word, word_vecs) analogy('good', 'better', 'bad', word_to_id, id_to_word, word_vecs)
[analogy] king:queen = man:?
woman: 5.16015625
veto: 4.9296875
ounce: 4.69140625
earthquake: 4.6328125
successor: 4.609375
[analogy] take:took = go:?
went: 4.55078125
points: 4.25
began: 4.09375
comes: 3.98046875
oct.: 3.90625
[analogy] car:cars = child:?
children: 5.21875
average: 4.7265625
yield: 4.20703125
cattle: 4.1875
priced: 4.1796875
[analogy] good:better = bad:?
more: 6.6484375
less: 6.0625
rather: 5.21875
slower: 4.734375
greater: 4.671875
[analogy] have:has = do:?
does: 8.984375
ca: 7.47265625
wo: 6.61328125
did: 6.6015625
is: 6.0625
[analogy] toyota:car = honda:?
auto: 5.0
state: 4.421875
moody: 4.234375
like: 4.06640625
interview: 3.89453125
[analogy] microsoft:windows = apple:?
priced: 4.8828125
san: 4.515625
plant: 4.234375
amsterdam: 4.16796875
maker: 4.109375
当初想定していた、「王と王女ならば男と?」といった類推タスクには「女」と直観的にも正しい回答ができています。
文法的にも過去形、複数形、比較級、三人称単数動詞など中学英語(でいいんだっけ?)はほぼ完璧に学習していることは驚くべきことです。
1CPUでは学習に時間がかかるものの、GPU化すればそれほど時間がかかる訳ではないことや、例え1CPUで1日かかったとしても人生で初めて英語(言語)に触れて1日でこれだけの文法単元や語彙を学習して理解できる人間はイギリスにもそうそういないでしょう。
最後の2つはちょっと意地悪してみました。
「トヨタと車ならばホンダと?」という類推問題では「バイク」や「飛行機」という回答を期待しましたが、恐らく「車」と言いたかったのでしょう(それはそれでありがとう)。しかしこのモデルでは提示した単語と同じ単語は禁止されているので、仕方なく「auto」を返してきたのだと思います。ある意味では、というより日本や米国ならば半数以上は「トヨタと車ならばホンダと車」と答えるでしょう。期待した答えではなかったですが正解です。
「マイクロソフトとウインドウズならばアップルと?」という類推問題は我ながら実に意地が悪いです笑。もちろんOSを答えてほしかったので、「macOS」と答えてくれると100点満点だったのですが、回答を見る限り恐らく「窓」と「林檎」に引っ張られています。ねらーにとっては同義語でしょうが、英語的にはこれでは意味が変わってしまいます。ただ、この誤答はこれらのモデルの課題を見る上で面白い結果になりました。
■おわりに
今回はこれまで実装してきたCBoWで学習した結果を用いて類推問題タスクを行いました。
実はこのコードは8時間経って数エポックしか進まなかったので、損失が減っている(学習はできている)ことだけを確認して諦めました。(欠陥があるのかも?笑)
そのため実際には学習済みの重みデータを用いていますが、このまま学習させておくと折角85日続いた毎日更新が止まってしまうので学習コードの完成を鑑みて完全な出力データの提示はご勘弁いただければと思います。
■参考文献
- Andreas C. Muller, Sarah Guido. Pythonではじめる機械学習. 中田 秀基 訳. オライリー・ジャパン. 2017. 392p.
- 斎藤 康毅. ゼロから作るDeep Learning Pythonで学ぶディープラーニングの理論と実装. オライリー・ジャパン. 2016. 320p.
- 斎藤 康毅. ゼロから作るDeep Learning② 自然言語処理編. オライリー・ジャパン. 2018. 432p.
- ChatGPT. 4o mini. OpenAI. 2024. https://chatgpt.com/
- API Reference. scikit-learn.org. https://scikit-learn.org/stable/api/index.html
- PyTorch documentation. pytorch.org. https://pytorch.org/docs/stable/index.html
- Keiron O’Shea, Ryan Nash. An Introduction to Convolutional Neural Networks. https://ar5iv.labs.arxiv.org/html/1511.08458
- API Reference. scipy.org. 2024. https://docs.scipy.org/doc/scipy/reference/index.html