類推問題(analogy) 後編


AIって結局何なのかよく分からないので、とりあえず100日間勉強してみた Day86


経緯についてはこちらをご参照ください。



■本日の進捗

  • 類推問題を理解


■はじめに

今回も「ゼロから作るDeep Learning② 自然言語処理編(オライリー・ジャパン)」から学んでいきます。

今回は、前回実装したAnalogyクラスやその他の学習アルゴリズムを組み合わせてAnalogy全体を実装していきます。

■Analogyの実装

先ほどのAnalogyクラスを用いて実際に簡易版PTBデータセットを学習させて類推問題も解いてみます。

import sys
import os
sys.path.append('..')
import numpy as np
import matplotlib.pyplot as plt

import pickle
from sklearn.utils.extmath import randomized_svd
import collections

GPU = False

key_file = {
    'train':'ptb.train.txt',
    'test':'ptb.test.txt',
    'valid':'ptb.valid.txt'
}
save_file = {
    'train':'ptb.train.npy',
    'test':'ptb.test.npy',
    'valid':'ptb.valid.npy'
}
vocab_file = 'ptb.vocab.pkl'
dataset_dir = os.path.dirname(os.path.abspath(__file__))
mid_path = '..\..\Download_Dataset\lstm-master\data'

def load_vocab():
    vocab_path = os.path.join(dataset_dir, vocab_file)
    print(vocab_path)
    if os.path.exists(vocab_path):
        with open(vocab_path, 'rb') as f:
            word_to_id, id_to_word = pickle.load(f)
        return word_to_id, id_to_word

    word_to_id = {}
    id_to_word = {}
    data_type = 'train'
    file_name = key_file[data_type]
    file_path = os.path.join(dataset_dir, mid_path, file_name)

    words = open(file_path).read().replace('\n', '<eos>').strip().split()

    for i, word in enumerate(words):
        if word not in word_to_id:
            tmp_id = len(word_to_id)
            word_to_id[word] = tmp_id
            id_to_word[tmp_id] = word

    with open(vocab_path, 'wb') as f:
        pickle.dump((word_to_id, id_to_word), f)

    return word_to_id, id_to_word

def load_data(data_type='train'):
    if data_type == 'val': data_type = 'valid'
    save_path = dataset_dir + '\\' + save_file[data_type]
    print('save_path:', save_path)

    word_to_id, id_to_word = load_vocab()

    if os.path.exists(save_path):
        corpus = np.load(save_path)
        return corpus, word_to_id, id_to_word
    
    file_name = key_file[data_type]
    file_path = os.path.join(dataset_dir, mid_path, file_name)
    words = open(file_path).read().replace('\n', '<eos>').strip().split()
    corpus = np.array([word_to_id[w] for w in words])

    np.save(save_path, corpus)
    return corpus, word_to_id, id_to_word

class MatMul:
    def __init__(self, W):
        self.params = [W]
        self.grads = [np.zeros_like(W)]
        self.x = None

    def forward(self, x):
        W, = self.params
        out = np.dot(x, W)
        self.x = x
        return out
    
    def backward(self, dout):
        W, = self.params
        dx = np.dot(dout, W.T)
        dW = np.dot(self.x.T, dout)
        self.grads[0][...] = dW
        return dx
    
class Embedding:
    def __init__(self, W):
        self.params = [W]
        self.grads = [np.zeros_like(W)]
        self.idx = None

    def forward(self, idx):
        W, = self.params
        self.idx = idx

        out = W[idx]
        return out
    
    def backward(self, dout):
        dW, = self.grads
        dW[...] = 0
        
        for i, word_id in enumerate(self.idx):
            dW[word_id] += dout[i]
        
        return None
    
class SoftmaxCrossEntropy:
    def __init__(self):
        self.output = None
        self.y_true = None
        self.loss = None

    def forward(self, logits, y_true):
        exp_values = np.exp(logits - np.max(logits, axis=1, keepdims=True))
        self.output = exp_values / np.sum(exp_values, axis=1, keepdims=True)
        self.y_true = y_true
        self.loss = -np.sum(y_true * np.log(self.output + 1e-7)) / y_true.shape[0]
        return self.loss

    def backward(self):
        return (self.output - self.y_true) / self.y_true.shape[0]

def cross_entropy_error(y, t):
    delta = 1e-7
    return -np.sum(t * np.log(y + delta)) / y.shape[0]

class Adam:
    def __init__(self, lr=0.001, beta1=0.9, beta2=0.999):
        self.lr = lr
        self.beta1 = beta1
        self.beta2 = beta2
        self.iter = 0
        self.m = None
        self.v = None

    def update(self, params, grads):
        if self.m is None:
            self.m, self.v = {}, {}
            for key, val in params.items():
                self.m[key] = np.zeros_like(val)
                self.v[key] = np.zeros_like(val)

        self.iter += 1
        lr_t = self.lr * np.sqrt(1.0 - self.beta2 ** self.iter) / (1.0 - self.beta1 ** self.iter)

        for key in params.keys():
            self.m[key] = self.beta1 * self.m[key] + (1 - self.beta1) * grads[key]
            self.v[key] = self.beta2 * self.v[key] + (1 - self.beta2) * (grads[key] ** 2)

            params[key] -= lr_t * self.m[key] / (np.sqrt(self.v[key]) + 1e-7)

def preprocess(text):
    text = text.lower()
    text = text.replace('.', ' .')
    words = text.split(' ')

    word_to_id = {}
    id_to_word = {}
    for word in words:
        if word not in word_to_id:
            new_id = len(word_to_id)
            word_to_id[word] = new_id
            id_to_word[new_id] = word

    corpus = np.array([word_to_id[w] for w in words])
    return corpus, word_to_id, id_to_word
    
def convert_one_hot(corpus, vocab_size):
    N = corpus.shape[0]

    if corpus.ndim == 1:
        one_hot = np.zeros((N, vocab_size), dtype=np.int32)
        for idx, word_id in enumerate(corpus):
            one_hot[idx, word_id] = 1

    elif corpus.ndim == 2:
        C = corpus.shape[1]
        one_hot = np.zeros((N, C, vocab_size), dtype=np.int32)
        for idx_0, word_id in enumerate(corpus):
            for idx_1, word_id in enumerate(word_id):
                one_hot[idx_0, idx_1, word_id] = 1

    return one_hot

def create_contexts_target(corpus, window_size=1):
    target = corpus[window_size:-window_size]
    contexts = []

    for idx in range(window_size, len(corpus)-window_size):
        cs = []
        for t in range(-window_size, window_size + 1):
            if t == 0:
                continue
            cs.append(corpus[idx + t])
        contexts.append(cs)

    return np.array(contexts), np.array(target)

def normalize(x):
    if x.ndim == 2:
        s = np.sqrt((x * x).sum(1))
        x /= s.reshape((s.shape[0], 1))
    elif x.ndim == 1:
        s = np.sqrt((x * x).sum())
        x /= s
    return x

def analogy(a, b, c, word_to_id, id_to_word, word_matrix, top=5, answer=None):
    for word in (a, b, c):
        if word not in word_to_id:
            print('%s is not found' % word)
            return
        
    print('\n[analogy] ' + a + ':' + b + ' = ' + c + ':?')
    a_vec, b_vec, c_vec = word_matrix[word_to_id[a]], word_matrix[word_to_id[b]], word_matrix[word_to_id[c]]
    query_vec = b_vec - a_vec + c_vec
    query_vec = normalize(query_vec)

    similarity = np.dot(word_matrix, query_vec)

    if answer is not None:
        print("==>" + answer + ":" + str(np.dot(word_matrix[word_to_id[answer]], query_vec)))

    count = 0
    for i in (-1 * similarity).argsort():
        if np.isnan(similarity[i]):
            continue
        if id_to_word[i] in (a, b, c):
            continue
        print(' {0}: {1}'.format(id_to_word[i], similarity[i]))

        count += 1
        if count >= top:
            return

class EmbeddingDot:
    def __init__(self, W, embedding_dim):
        self.W = W
        self.embedding_dim = embedding_dim
        self.embed = Embedding(self.W)
        self.params = [self.W]
        self.grads = [np.zeros_like(self.W)]

        self.cache = None

    def forward(self, h, idx):
        target_W = self.embed.forward(idx)

        out = np.sum(target_W * h, axis=1)

        self.cache = (h, target_W)
        return out
    
    def backward(self, dout):
        h, target_W = self.cache
        dout = dout.reshape(dout.shape[0], 1)

        dtarget_W = dout * h
        self.embed.backward(dtarget_W)
        dh = dout * target_W
        return dh
    
class UnigramSampler:
    def __init__(self, corpus, power, sample_size):
        self.sample_size = sample_size
        self.vocab_size = None
        self.word_p = None

        counts = collections.Counter()
        for word_id in corpus:
            corpus[word_id] += 1
        
        vocab_size = len(corpus)
        self.vocab_size = vocab_size

        self.word_p = np.zeros(vocab_size)
        for i in range(vocab_size):
            self.word_p[i] = counts[i]
        
        self.word_p = np.power(self.word_p, power)

    def get_negative_sample(self, target):
        batch_size = target.shape[0]

        if not GPU:
            negative_sample = np.zeros((batch_size, self.sample_size), dtype=np.int32)

            for i in range(batch_size):
                p = self.word_p.copy()
                target_idx = target[i]
                p[target_idx] = 0
                p /= p.sum()
                negative_sample[i, :] = np.random.choice(self.vocab_size, size=self.sample_size, replace=False, p=p)
        else:
            negative_sample = np.random.choice(self.vocab_size, size=(batch_size, self.sample_size), replace=True, p=self.word_p)
        return negative_sample

class SigmoidWithLoss:
    def __init__(self):
        self.params, self.grads = [], []
        self.loss = None
        self.y = None
        self.t = None

    def forward(self, h, t):
        self.y =  1 / (1 + np.exp(-h))

        if t.ndim == 1:
            if self.y.ndim == 1:
                t = t[:, np.newaxis]
            else:
                t = np.eye(self.y.shape[1])[t]
        self.t = t

        self.loss = cross_entropy_error(self.y, self.t)
        return self.loss

    def backward(self, dout=1):
        batch_size = self.t.shape[0]
        t = self.t.squeeze()
        batch_size = self.t.shape[0]
        dx = (self.y - t) / batch_size
        dx *= dout
        return dx
    
class NegativeSamplingLoss:
    def __init__(self, W, corpus, power=0.75, sample_size=5):
        self.sample_size = sample_size
        self.sampler = UnigramSampler(corpus, power, sample_size)
        embedding_dim = W.shape[1]
        self.loss_layers = [SigmoidWithLoss() for _ in range(sample_size + 1)]
        self.embed_dot_layers = [EmbeddingDot(W, embedding_dim) for _ in range(sample_size + 1)]
        self.params, self.grads = [], []
        self.params += [W]
        for layer in self.embed_dot_layers:
            self.grads += layer.grads

        
    def forward(self, h, target):
        batch_size = target.shape[0]
        negative_sample = self.sampler.get_negative_sample(target)

        score = self.embed_dot_layers[0].forward(h, target)
        correct_label = np.ones(batch_size, dtype=np.int32)

        loss = self.loss_layers[0].forward(score, correct_label)

        negative_label = np.zeros(batch_size, dtype=np.int32)
        for i in range(self.sample_size):
            negative_target = negative_sample[:, i]
            negative_target = np.clip(negative_target, 0, vocab_size - 1)
            score = self.embed_dot_layers[1 + i].forward(h, negative_target)
            loss += self.loss_layers[1 + i].forward(score, negative_label)
        
        return loss
    
    def backward(self, dout=1):
        dh = 0
        for l0, l1 in zip(self.loss_layers, self.embed_dot_layers):
            dscore = l0.backward(dout)
            dh += l1.backward(dscore)

        return dh

class SimpleCBoW:
    def __init__(self, vocab_size, hidden_size):
        V, H = vocab_size, hidden_size

        W_in = 0.01 * np.random.randn(V, H).astype('f')
        W_out = 0.01 * np.random.randn(H, V).astype('f')

        self.layers = [
            MatMul(W_in),
            MatMul(W_out)
        ]
        self.loss_layer = SoftmaxCrossEntropy()

        self.params, self.grads = [], []
        for layer in self.layers:
            self.params += layer.params
            self.grads += layer.grads

        self.word_vecs = W_in

    def forward(self, contexts, target):
        h0 = self.layers[0].forward(contexts[:, 0])
        h1 = self.layers[0].forward(contexts[:, 1])
        h = (h0 + h1) * 0.5
        score = self.layers[1].forward(h)
        loss = self.loss_layer.forward(score, target)
        return loss
    
    def backward(self, dout=1):
        ds = self.loss_layer.backward()
        da = self.layers[1].backward(ds)
        da *= 0.5
        self.layers[0].backward(da)
        return None
    
class CBOW:
    def __init__(self, vocab_size, hidden_size, window_size, corpus):
        V, H = vocab_size, hidden_size

        W_in = 0.01 * np.random.randn(V, H).astype('f')
        W_out = 0.01 * np.random.randn(V, H).astype('f')

        self.in_layers = []
        for i in range(2 * window_size):
            layer = Embedding(W_in)
            self.in_layers.append(layer)
        self.ns_loss = NegativeSamplingLoss(W_out, corpus, power=0.75, sample_size=5)


        layers = self.in_layers + [self.ns_loss]
        self.params, self.grads = [], []
        for layer in layers:
            self.params += layer.params
            self.grads += layer.grads

        self.word_vecs = W_in

    def forward(self, contexts, target):
        h = 0
        for i, layer in enumerate(self.in_layers):
            h += layer.forward(contexts[:, i])
        h *= 1 / len(self.in_layers)

        loss = self.ns_loss.forward(h, target)
        return loss

    def backward(self, dout=1):
        dout = self.ns_loss.backward(dout)
        dout *= 1 / len(self.in_layers)
        for layer in self.in_layers:
            layer.backward(dout)
        return None


window_size = 1
hidden_size = 5
batch_size = 3
max_epoch = 10

wordvec_size = 100

corpus, word_to_id, id_to_word = load_data('train')

vocab_size = len(word_to_id)
contexts, target = create_contexts_target(corpus, window_size)

model = CBOW(vocab_size, hidden_size, window_size, corpus)

optimizer = Adam()

params = {}
grads = {}

losses = []

for epoch in range(max_epoch):
    epoch_loss = 0
    for i in range(0, len(contexts), batch_size):
        contexts_batch = contexts[i:i+batch_size]
        target_batch = target[i:i+batch_size]

        loss = model.forward(contexts_batch, target_batch)
        
        model.backward()

        optimizer.update(params, grads)
        
        epoch_loss += loss

    avg_loss = epoch_loss / (len(contexts) / batch_size)
    losses.append(avg_loss)
    print(f'Epoch {epoch} completed, Avg Loss: {avg_loss}')

plt.plot(range(max_epoch), losses, marker='o')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('NeuralNetBased Analogy learning')
plt.show()

word_vecs = model.word_vecs

analogy('king', 'queen', 'man', word_to_id, id_to_word, word_vecs)
analogy('take', 'took', 'go', word_to_id, id_to_word, word_vecs)
analogy('car', 'cars', 'child', word_to_id, id_to_word, word_vecs)
analogy('good', 'better', 'bad', word_to_id, id_to_word, word_vecs) 

当初想定していた、「王と王女ならば男と?」といった類推タスクには「女」と直観的にも正しい回答ができています。

文法的にも過去形、複数形、比較級、三人称単数動詞など中学英語(でいいんだっけ?)はほぼ完璧に学習していることは驚くべきことです。

1CPUでは学習に時間がかかるものの、GPU化すればそれほど時間がかかる訳ではないことや、例え1CPUで1日かかったとしても人生で初めて英語(言語)に触れて1日でこれだけの文法単元や語彙を学習して理解できる人間はイギリスにもそうそういないでしょう。

最後の2つはちょっと意地悪してみました。

「トヨタと車ならばホンダと?」という類推問題では「バイク」や「飛行機」という回答を期待しましたが、恐らく「車」と言いたかったのでしょう(それはそれでありがとう)。しかしこのモデルでは提示した単語と同じ単語は禁止されているので、仕方なく「auto」を返してきたのだと思います。ある意味では、というより日本や米国ならば半数以上は「トヨタと車ならばホンダと車」と答えるでしょう。期待した答えではなかったですが正解です。

「マイクロソフトとウインドウズならばアップルと?」という類推問題は我ながら実に意地が悪いです笑。もちろんOSを答えてほしかったので、「macOS」と答えてくれると100点満点だったのですが、回答を見る限り恐らく「窓」と「林檎」に引っ張られています。ねらーにとっては同義語でしょうが、英語的にはこれでは意味が変わってしまいます。ただ、この誤答はこれらのモデルの課題を見る上で面白い結果になりました。

■おわりに

今回はこれまで実装してきたCBoWで学習した結果を用いて類推問題タスクを行いました。

実はこのコードは8時間経って数エポックしか進まなかったので、損失が減っている(学習はできている)ことだけを確認して諦めました。(欠陥があるのかも?笑)

そのため実際には学習済みの重みデータを用いていますが、このまま学習させておくと折角85日続いた毎日更新が止まってしまうので学習コードの完成を鑑みて完全な出力データの提示はご勘弁いただければと思います。

■参考文献

  1. Andreas C. Muller, Sarah Guido. Pythonではじめる機械学習. 中田 秀基 訳. オライリー・ジャパン. 2017. 392p.
  2. 斎藤 康毅. ゼロから作るDeep Learning Pythonで学ぶディープラーニングの理論と実装. オライリー・ジャパン. 2016. 320p.
  3. 斎藤 康毅. ゼロから作るDeep Learning② 自然言語処理編. オライリー・ジャパン. 2018. 432p.
  4. ChatGPT. 4o mini. OpenAI. 2024. https://chatgpt.com/
  5. API Reference. scikit-learn.org. https://scikit-learn.org/stable/api/index.html
  6. PyTorch documentation. pytorch.org. https://pytorch.org/docs/stable/index.html
  7. Keiron O’Shea, Ryan Nash. An Introduction to Convolutional Neural Networks. https://ar5iv.labs.arxiv.org/html/1511.08458
  8. API Reference. scipy.org. 2024. https://docs.scipy.org/doc/scipy/reference/index.html


コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です