AIって結局何なのかよく分からないので、とりあえず100日間勉強してみた Day86
経緯についてはこちらをご参照ください。
■本日の進捗
- 類推問題を理解
■はじめに
今回も「ゼロから作るDeep Learning② 自然言語処理編(オライリー・ジャパン)」から学んでいきます。
今回は、前回実装したAnalogyクラスやその他の学習アルゴリズムを組み合わせてAnalogy全体を実装していきます。
■Analogyの実装
先ほどのAnalogyクラスを用いて実際に簡易版PTBデータセットを学習させて類推問題も解いてみます。
import sys
import os
sys.path.append('..')
import numpy as np
import matplotlib.pyplot as plt
import pickle
from sklearn.utils.extmath import randomized_svd
import collections
GPU = False
key_file = {
'train':'ptb.train.txt',
'test':'ptb.test.txt',
'valid':'ptb.valid.txt'
}
save_file = {
'train':'ptb.train.npy',
'test':'ptb.test.npy',
'valid':'ptb.valid.npy'
}
vocab_file = 'ptb.vocab.pkl'
dataset_dir = os.path.dirname(os.path.abspath(__file__))
mid_path = '..\..\Download_Dataset\lstm-master\data'
def load_vocab():
vocab_path = os.path.join(dataset_dir, vocab_file)
print(vocab_path)
if os.path.exists(vocab_path):
with open(vocab_path, 'rb') as f:
word_to_id, id_to_word = pickle.load(f)
return word_to_id, id_to_word
word_to_id = {}
id_to_word = {}
data_type = 'train'
file_name = key_file[data_type]
file_path = os.path.join(dataset_dir, mid_path, file_name)
words = open(file_path).read().replace('\n', '<eos>').strip().split()
for i, word in enumerate(words):
if word not in word_to_id:
tmp_id = len(word_to_id)
word_to_id[word] = tmp_id
id_to_word[tmp_id] = word
with open(vocab_path, 'wb') as f:
pickle.dump((word_to_id, id_to_word), f)
return word_to_id, id_to_word
def load_data(data_type='train'):
if data_type == 'val': data_type = 'valid'
save_path = dataset_dir + '\\' + save_file[data_type]
print('save_path:', save_path)
word_to_id, id_to_word = load_vocab()
if os.path.exists(save_path):
corpus = np.load(save_path)
return corpus, word_to_id, id_to_word
file_name = key_file[data_type]
file_path = os.path.join(dataset_dir, mid_path, file_name)
words = open(file_path).read().replace('\n', '<eos>').strip().split()
corpus = np.array([word_to_id[w] for w in words])
np.save(save_path, corpus)
return corpus, word_to_id, id_to_word
class MatMul:
def __init__(self, W):
self.params = [W]
self.grads = [np.zeros_like(W)]
self.x = None
def forward(self, x):
W, = self.params
out = np.dot(x, W)
self.x = x
return out
def backward(self, dout):
W, = self.params
dx = np.dot(dout, W.T)
dW = np.dot(self.x.T, dout)
self.grads[0][...] = dW
return dx
class Embedding:
def __init__(self, W):
self.params = [W]
self.grads = [np.zeros_like(W)]
self.idx = None
def forward(self, idx):
W, = self.params
self.idx = idx
out = W[idx]
return out
def backward(self, dout):
dW, = self.grads
dW[...] = 0
for i, word_id in enumerate(self.idx):
dW[word_id] += dout[i]
return None
class SoftmaxCrossEntropy:
def __init__(self):
self.output = None
self.y_true = None
self.loss = None
def forward(self, logits, y_true):
exp_values = np.exp(logits - np.max(logits, axis=1, keepdims=True))
self.output = exp_values / np.sum(exp_values, axis=1, keepdims=True)
self.y_true = y_true
self.loss = -np.sum(y_true * np.log(self.output + 1e-7)) / y_true.shape[0]
return self.loss
def backward(self):
return (self.output - self.y_true) / self.y_true.shape[0]
def cross_entropy_error(y, t):
delta = 1e-7
return -np.sum(t * np.log(y + delta)) / y.shape[0]
class Adam:
def __init__(self, lr=0.001, beta1=0.9, beta2=0.999):
self.lr = lr
self.beta1 = beta1
self.beta2 = beta2
self.iter = 0
self.m = None
self.v = None
def update(self, params, grads):
if self.m is None:
self.m, self.v = {}, {}
for key, val in params.items():
self.m[key] = np.zeros_like(val)
self.v[key] = np.zeros_like(val)
self.iter += 1
lr_t = self.lr * np.sqrt(1.0 - self.beta2 ** self.iter) / (1.0 - self.beta1 ** self.iter)
for key in params.keys():
self.m[key] = self.beta1 * self.m[key] + (1 - self.beta1) * grads[key]
self.v[key] = self.beta2 * self.v[key] + (1 - self.beta2) * (grads[key] ** 2)
params[key] -= lr_t * self.m[key] / (np.sqrt(self.v[key]) + 1e-7)
def preprocess(text):
text = text.lower()
text = text.replace('.', ' .')
words = text.split(' ')
word_to_id = {}
id_to_word = {}
for word in words:
if word not in word_to_id:
new_id = len(word_to_id)
word_to_id[word] = new_id
id_to_word[new_id] = word
corpus = np.array([word_to_id[w] for w in words])
return corpus, word_to_id, id_to_word
def convert_one_hot(corpus, vocab_size):
N = corpus.shape[0]
if corpus.ndim == 1:
one_hot = np.zeros((N, vocab_size), dtype=np.int32)
for idx, word_id in enumerate(corpus):
one_hot[idx, word_id] = 1
elif corpus.ndim == 2:
C = corpus.shape[1]
one_hot = np.zeros((N, C, vocab_size), dtype=np.int32)
for idx_0, word_id in enumerate(corpus):
for idx_1, word_id in enumerate(word_id):
one_hot[idx_0, idx_1, word_id] = 1
return one_hot
def create_contexts_target(corpus, window_size=1):
target = corpus[window_size:-window_size]
contexts = []
for idx in range(window_size, len(corpus)-window_size):
cs = []
for t in range(-window_size, window_size + 1):
if t == 0:
continue
cs.append(corpus[idx + t])
contexts.append(cs)
return np.array(contexts), np.array(target)
def normalize(x):
if x.ndim == 2:
s = np.sqrt((x * x).sum(1))
x /= s.reshape((s.shape[0], 1))
elif x.ndim == 1:
s = np.sqrt((x * x).sum())
x /= s
return x
def analogy(a, b, c, word_to_id, id_to_word, word_matrix, top=5, answer=None):
for word in (a, b, c):
if word not in word_to_id:
print('%s is not found' % word)
return
print('\n[analogy] ' + a + ':' + b + ' = ' + c + ':?')
a_vec, b_vec, c_vec = word_matrix[word_to_id[a]], word_matrix[word_to_id[b]], word_matrix[word_to_id[c]]
query_vec = b_vec - a_vec + c_vec
query_vec = normalize(query_vec)
similarity = np.dot(word_matrix, query_vec)
if answer is not None:
print("==>" + answer + ":" + str(np.dot(word_matrix[word_to_id[answer]], query_vec)))
count = 0
for i in (-1 * similarity).argsort():
if np.isnan(similarity[i]):
continue
if id_to_word[i] in (a, b, c):
continue
print(' {0}: {1}'.format(id_to_word[i], similarity[i]))
count += 1
if count >= top:
return
class EmbeddingDot:
def __init__(self, W, embedding_dim):
self.W = W
self.embedding_dim = embedding_dim
self.embed = Embedding(self.W)
self.params = [self.W]
self.grads = [np.zeros_like(self.W)]
self.cache = None
def forward(self, h, idx):
target_W = self.embed.forward(idx)
out = np.sum(target_W * h, axis=1)
self.cache = (h, target_W)
return out
def backward(self, dout):
h, target_W = self.cache
dout = dout.reshape(dout.shape[0], 1)
dtarget_W = dout * h
self.embed.backward(dtarget_W)
dh = dout * target_W
return dh
class UnigramSampler:
def __init__(self, corpus, power, sample_size):
self.sample_size = sample_size
self.vocab_size = None
self.word_p = None
counts = collections.Counter()
for word_id in corpus:
corpus[word_id] += 1
vocab_size = len(corpus)
self.vocab_size = vocab_size
self.word_p = np.zeros(vocab_size)
for i in range(vocab_size):
self.word_p[i] = counts[i]
self.word_p = np.power(self.word_p, power)
def get_negative_sample(self, target):
batch_size = target.shape[0]
if not GPU:
negative_sample = np.zeros((batch_size, self.sample_size), dtype=np.int32)
for i in range(batch_size):
p = self.word_p.copy()
target_idx = target[i]
p[target_idx] = 0
p /= p.sum()
negative_sample[i, :] = np.random.choice(self.vocab_size, size=self.sample_size, replace=False, p=p)
else:
negative_sample = np.random.choice(self.vocab_size, size=(batch_size, self.sample_size), replace=True, p=self.word_p)
return negative_sample
class SigmoidWithLoss:
def __init__(self):
self.params, self.grads = [], []
self.loss = None
self.y = None
self.t = None
def forward(self, h, t):
self.y = 1 / (1 + np.exp(-h))
if t.ndim == 1:
if self.y.ndim == 1:
t = t[:, np.newaxis]
else:
t = np.eye(self.y.shape[1])[t]
self.t = t
self.loss = cross_entropy_error(self.y, self.t)
return self.loss
def backward(self, dout=1):
batch_size = self.t.shape[0]
t = self.t.squeeze()
batch_size = self.t.shape[0]
dx = (self.y - t) / batch_size
dx *= dout
return dx
class NegativeSamplingLoss:
def __init__(self, W, corpus, power=0.75, sample_size=5):
self.sample_size = sample_size
self.sampler = UnigramSampler(corpus, power, sample_size)
embedding_dim = W.shape[1]
self.loss_layers = [SigmoidWithLoss() for _ in range(sample_size + 1)]
self.embed_dot_layers = [EmbeddingDot(W, embedding_dim) for _ in range(sample_size + 1)]
self.params, self.grads = [], []
self.params += [W]
for layer in self.embed_dot_layers:
self.grads += layer.grads
def forward(self, h, target):
batch_size = target.shape[0]
negative_sample = self.sampler.get_negative_sample(target)
score = self.embed_dot_layers[0].forward(h, target)
correct_label = np.ones(batch_size, dtype=np.int32)
loss = self.loss_layers[0].forward(score, correct_label)
negative_label = np.zeros(batch_size, dtype=np.int32)
for i in range(self.sample_size):
negative_target = negative_sample[:, i]
negative_target = np.clip(negative_target, 0, vocab_size - 1)
score = self.embed_dot_layers[1 + i].forward(h, negative_target)
loss += self.loss_layers[1 + i].forward(score, negative_label)
return loss
def backward(self, dout=1):
dh = 0
for l0, l1 in zip(self.loss_layers, self.embed_dot_layers):
dscore = l0.backward(dout)
dh += l1.backward(dscore)
return dh
class SimpleCBoW:
def __init__(self, vocab_size, hidden_size):
V, H = vocab_size, hidden_size
W_in = 0.01 * np.random.randn(V, H).astype('f')
W_out = 0.01 * np.random.randn(H, V).astype('f')
self.layers = [
MatMul(W_in),
MatMul(W_out)
]
self.loss_layer = SoftmaxCrossEntropy()
self.params, self.grads = [], []
for layer in self.layers:
self.params += layer.params
self.grads += layer.grads
self.word_vecs = W_in
def forward(self, contexts, target):
h0 = self.layers[0].forward(contexts[:, 0])
h1 = self.layers[0].forward(contexts[:, 1])
h = (h0 + h1) * 0.5
score = self.layers[1].forward(h)
loss = self.loss_layer.forward(score, target)
return loss
def backward(self, dout=1):
ds = self.loss_layer.backward()
da = self.layers[1].backward(ds)
da *= 0.5
self.layers[0].backward(da)
return None
class CBOW:
def __init__(self, vocab_size, hidden_size, window_size, corpus):
V, H = vocab_size, hidden_size
W_in = 0.01 * np.random.randn(V, H).astype('f')
W_out = 0.01 * np.random.randn(V, H).astype('f')
self.in_layers = []
for i in range(2 * window_size):
layer = Embedding(W_in)
self.in_layers.append(layer)
self.ns_loss = NegativeSamplingLoss(W_out, corpus, power=0.75, sample_size=5)
layers = self.in_layers + [self.ns_loss]
self.params, self.grads = [], []
for layer in layers:
self.params += layer.params
self.grads += layer.grads
self.word_vecs = W_in
def forward(self, contexts, target):
h = 0
for i, layer in enumerate(self.in_layers):
h += layer.forward(contexts[:, i])
h *= 1 / len(self.in_layers)
loss = self.ns_loss.forward(h, target)
return loss
def backward(self, dout=1):
dout = self.ns_loss.backward(dout)
dout *= 1 / len(self.in_layers)
for layer in self.in_layers:
layer.backward(dout)
return None
window_size = 1
hidden_size = 5
batch_size = 3
max_epoch = 10
wordvec_size = 100
corpus, word_to_id, id_to_word = load_data('train')
vocab_size = len(word_to_id)
contexts, target = create_contexts_target(corpus, window_size)
model = CBOW(vocab_size, hidden_size, window_size, corpus)
optimizer = Adam()
params = {}
grads = {}
losses = []
for epoch in range(max_epoch):
epoch_loss = 0
for i in range(0, len(contexts), batch_size):
contexts_batch = contexts[i:i+batch_size]
target_batch = target[i:i+batch_size]
loss = model.forward(contexts_batch, target_batch)
model.backward()
optimizer.update(params, grads)
epoch_loss += loss
avg_loss = epoch_loss / (len(contexts) / batch_size)
losses.append(avg_loss)
print(f'Epoch {epoch} completed, Avg Loss: {avg_loss}')
plt.plot(range(max_epoch), losses, marker='o')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('NeuralNetBased Analogy learning')
plt.show()
word_vecs = model.word_vecs
analogy('king', 'queen', 'man', word_to_id, id_to_word, word_vecs)
analogy('take', 'took', 'go', word_to_id, id_to_word, word_vecs)
analogy('car', 'cars', 'child', word_to_id, id_to_word, word_vecs)
analogy('good', 'better', 'bad', word_to_id, id_to_word, word_vecs)
[analogy] king:queen = man:?
woman: 5.16015625
veto: 4.9296875
ounce: 4.69140625
earthquake: 4.6328125
successor: 4.609375
[analogy] take:took = go:?
went: 4.55078125
points: 4.25
began: 4.09375
comes: 3.98046875
oct.: 3.90625
[analogy] car:cars = child:?
children: 5.21875
average: 4.7265625
yield: 4.20703125
cattle: 4.1875
priced: 4.1796875
[analogy] good:better = bad:?
more: 6.6484375
less: 6.0625
rather: 5.21875
slower: 4.734375
greater: 4.671875
[analogy] have:has = do:?
does: 8.984375
ca: 7.47265625
wo: 6.61328125
did: 6.6015625
is: 6.0625
[analogy] toyota:car = honda:?
auto: 5.0
state: 4.421875
moody: 4.234375
like: 4.06640625
interview: 3.89453125
[analogy] microsoft:windows = apple:?
priced: 4.8828125
san: 4.515625
plant: 4.234375
amsterdam: 4.16796875
maker: 4.109375
当初想定していた、「王と王女ならば男と?」といった類推タスクには「女」と直観的にも正しい回答ができています。
文法的にも過去形、複数形、比較級、三人称単数動詞など中学英語(でいいんだっけ?)はほぼ完璧に学習していることは驚くべきことです。
1CPUでは学習に時間がかかるものの、GPU化すればそれほど時間がかかる訳ではないことや、例え1CPUで1日かかったとしても人生で初めて英語(言語)に触れて1日でこれだけの文法単元や語彙を学習して理解できる人間はイギリスにもそうそういないでしょう。
最後の2つはちょっと意地悪してみました。
「トヨタと車ならばホンダと?」という類推問題では「バイク」や「飛行機」という回答を期待しましたが、恐らく「車」と言いたかったのでしょう(それはそれでありがとう)。しかしこのモデルでは提示した単語と同じ単語は禁止されているので、仕方なく「auto」を返してきたのだと思います。ある意味では、というより日本や米国ならば半数以上は「トヨタと車ならばホンダと車」と答えるでしょう。期待した答えではなかったですが正解です。
「マイクロソフトとウインドウズならばアップルと?」という類推問題は我ながら実に意地が悪いです笑。もちろんOSを答えてほしかったので、「macOS」と答えてくれると100点満点だったのですが、回答を見る限り恐らく「窓」と「林檎」に引っ張られています。ねらーにとっては同義語でしょうが、英語的にはこれでは意味が変わってしまいます。ただ、この誤答はこれらのモデルの課題を見る上で面白い結果になりました。
■おわりに
今回はこれまで実装してきたCBoWで学習した結果を用いて類推問題タスクを行いました。
実はこのコードは8時間経って数エポックしか進まなかったので、損失が減っている(学習はできている)ことだけを確認して諦めました。(欠陥があるのかも?笑)
そのため実際には学習済みの重みデータを用いていますが、このまま学習させておくと折角85日続いた毎日更新が止まってしまうので学習コードの完成を鑑みて完全な出力データの提示はご勘弁いただければと思います。
■参考文献
- Andreas C. Muller, Sarah Guido. Pythonではじめる機械学習. 中田 秀基 訳. オライリー・ジャパン. 2017. 392p.
- 斎藤 康毅. ゼロから作るDeep Learning Pythonで学ぶディープラーニングの理論と実装. オライリー・ジャパン. 2016. 320p.
- 斎藤 康毅. ゼロから作るDeep Learning② 自然言語処理編. オライリー・ジャパン. 2018. 432p.
- ChatGPT. 4o mini. OpenAI. 2024. https://chatgpt.com/
- API Reference. scikit-learn.org. https://scikit-learn.org/stable/api/index.html
- PyTorch documentation. pytorch.org. https://pytorch.org/docs/stable/index.html
- Keiron O’Shea, Ryan Nash. An Introduction to Convolutional Neural Networks. https://ar5iv.labs.arxiv.org/html/1511.08458
- API Reference. scipy.org. 2024. https://docs.scipy.org/doc/scipy/reference/index.html