seq2seq

PyTorchでSeq2Seqを実装してみた を試してみる。

  • 引き算する数式を文字列で認識して、結果を出力する。
  • データはランダムに数字を生成して、実際に引き算して作成できるから簡単に用意できる。
  • seq2seqは、EncoderとDecoderの2種類のネットワークを作成して結合し、それぞれを学習させる。
  • Decoderは、学習時と推論時でロジックが変わる。
    • 学習時には、結果も時系列がわかっているため、次の時系列のデータを入力することができる。
    • 但し、LSTMの出力hは、次に渡していく必要がある。
    • 推論時には、Linearした後、SoftMaxまたは、argmaxで、文字を確定させ、それを次のLSTMに渡す。
    • それをまたEmbedingし単語ベクトル化して処理する。
  • ミニバッチ学習を行うため、学習時はバッチ数の配列操作となる。
        # Decoderで使うデータはoutput_tensorを1つずらしたものを使う
        # Decoderのインプットとするデータ
        source = output_tensor[:, :-1]

        # Decoderの教師データ
        # 生成開始を表す"_"を削っている
        target = output_tensor[:, 1:]
        for j in range(decoder_output.size()[1]):
            # バッチ毎にまとめてloss計算
            # 生成する文字は4文字なので、4回ループ
            loss += criterion(decoder_output[:, j, :], target[:, j])

学習としては、以下のような感じで重みが計算されているはず。

  • LSTMの重み
    • Encorder
    • 計算式として、どの文字列の単語ベクトルを強く残すか
  • Embeding (単語ベクトル)
    • 0-9の数字の文字と、「 」、「-」、「_」の合計13種類をいかにうまく分散表現させて、計算しているように見せれるか

以下、コードを貼り付け

import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim

from sklearn.model_selection import train_test_split
import random
from sklearn.utils import shuffle

# 数字の文字をID化
char2id = {str(i): i for i in range(10)}

# 空白(10):系列の長さを揃えるようのパディング文字
# -(11):マイナスの文字
# _(12):系列生成開始を知らせる文字
char2id.update({" ": 10, "-": 11, "_": 12})

# 空白込みの3桁の数字をランダムに生成


def generate_number():
    number = [random.choice(list("0123456789"))
              for _ in range(random.randint(1, 3))]
    return int("".join(number))


# 確認
print(generate_number())
# 753

# 系列の長さを揃えるために空白パディング


def add_padding(number, is_input=True):
    number = "{: <7}".format(number) if is_input else "{: <5s}".format(number)
    return number


# 確認
num = generate_number()
print("\"" + str(add_padding(num)) + "\"")
# "636    "
# 7

# データ準備
input_data = []
output_data = []

# データを50000件準備する
while len(input_data) < 50000:
    x = generate_number()
    y = generate_number()
    z = x - y
    input_char = add_padding(str(x) + "-" + str(y))
    output_char = add_padding("_" + str(z), is_input=False)

    # データをIDにに変換
    input_data.append([char2id[c] for c in input_char])
    output_data.append([char2id[c] for c in output_char])

# 確認
print(input_data[987])
print(output_data[987])
# [1, 5, 11, 2, 6, 6, 10] (←"15-266")
# [12, 11, 2, 5, 1] (←"_-251")

# 7:3にデータをわける
train_x, test_x, train_y, test_y = train_test_split(
    input_data, output_data, train_size=0.7)


# データをバッチ化するための関数
def train2batch(input_data, output_data, batch_size=100):
    input_batch = []
    output_batch = []
    input_shuffle, output_shuffle = shuffle(input_data, output_data)
    for i in range(0, len(input_data), batch_size):
        input_batch.append(input_shuffle[i:i+batch_size])
        output_batch.append(output_shuffle[i:i+batch_size])
    return input_batch, output_batch


embedding_dim = 200  # 文字の埋め込み次元数
hidden_dim = 128  # LSTMの隠れ層のサイズ
vocab_size = len(char2id)  # 扱う文字の数。今回は13文字

# GPU使う用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Encoderクラス


class Encoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(Encoder, self).__init__()
        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(
            vocab_size, embedding_dim, padding_idx=char2id[" "])
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)

    def forward(self, sequence):
        embedding = self.word_embeddings(sequence)
        # Many to Oneなので、第2戻り値を使う
        _, state = self.lstm(embedding)
        # state = (h, c)
        return state


# Decoderクラス


class Decoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(Decoder, self).__init__()
        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(
            vocab_size, embedding_dim, padding_idx=char2id[" "])
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        # LSTMの128次元の隠れ層を13次元に変換する全結合層
        self.hidden2linear = nn.Linear(hidden_dim, vocab_size)

    def forward(self, sequence, encoder_state):
        embedding = self.word_embeddings(sequence)
        # Many to Manyなので、第1戻り値を使う。
        # 第2戻り値は推論時に次の文字を生成するときに使います。
        output, state = self.lstm(embedding, encoder_state)
        output = self.hidden2linear(output)
        return output, state


# GPU使えるように。
encoder = Encoder(vocab_size, embedding_dim, hidden_dim).to(device)
decoder = Decoder(vocab_size, embedding_dim, hidden_dim).to(device)

# 損失関数
criterion = nn.CrossEntropyLoss()

# 最適化
encoder_optimizer = optim.Adam(encoder.parameters(), lr=0.001)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=0.001)

BATCH_NUM = 100
EPOCH_NUM = 100

all_losses = []
print("training ...")
for epoch in range(1, EPOCH_NUM+1):
    epoch_loss = 0  # epoch毎のloss

    # データをミニバッチに分ける
    input_batch, output_batch = train2batch(
        train_x, train_y, batch_size=BATCH_NUM)

    for i in range(len(input_batch)):

        # 勾配の初期化
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        # データをテンソルに変換
        input_tensor = torch.tensor(input_batch[i], device=device)
        output_tensor = torch.tensor(output_batch[i], device=device)

        # Encoderの順伝搬
        encoder_state = encoder(input_tensor)

        # Decoderで使うデータはoutput_tensorを1つずらしたものを使う
        # Decoderのインプットとするデータ
        source = output_tensor[:, :-1]

        # Decoderの教師データ
        # 生成開始を表す"_"を削っている
        target = output_tensor[:, 1:]

        loss = 0
        # 学習時はDecoderはこのように1回呼び出すだけでグルっと系列をループしているからこれでOK
        # sourceが4文字なので、以下でLSTMが4回再帰的な処理してる
        decoder_output, _ = decoder(source, encoder_state)
        # decoder_output.size() = (100,4,13)
        # 「13」は生成すべき対象の文字が13文字あるから。decoder_outputの3要素目は
        # [-14.6240,  -3.7612, -11.0775,  ...,  -5.7391, -15.2419,  -8.6547]
        # こんな感じの値が入っており、これの最大値に対応するインデックスを予測文字とみなす

        for j in range(decoder_output.size()[1]):
            # バッチ毎にまとめてloss計算
            # 生成する文字は4文字なので、4回ループ
            loss += criterion(decoder_output[:, j, :], target[:, j])

        epoch_loss += loss.item()

        # 誤差逆伝播
        loss.backward()

        # パラメータ更新
        # Encoder、Decoder両方学習
        encoder_optimizer.step()
        decoder_optimizer.step()

    # 損失を表示
    print("Epoch %d: %.2f" % (epoch, epoch_loss))

    all_losses.append(epoch_loss)
    if epoch_loss < 1:
        break

print("学習終了")

# 推論
# Decoderのアウトプットのテンソルから要素が最大のインデックスを返す。つまり生成文字を意味する


def get_max_index(decoder_output):
    results = []
    for h in decoder_output:
        results.append(torch.argmax(h))
    return torch.tensor(results, device=device).view(BATCH_NUM, 1)


# 評価用データ
test_input_batch, test_output_batch = train2batch(test_x, test_y)
input_tensor = torch.tensor(test_input_batch, device=device)

predicts = []
for i in range(len(test_input_batch)):
    with torch.no_grad():  # 勾配計算させない
        encoder_state = encoder(input_tensor[i])

        # Decoderにはまず文字列生成開始を表す"_"をインプットにするので、"_"のtensorをバッチサイズ分作成
        start_char_batch = [[char2id["_"]] for _ in range(BATCH_NUM)]
        decoder_input_tensor = torch.tensor(start_char_batch, device=device)

        # 変数名変換
        decoder_hidden = encoder_state

        # バッチ毎の結果を結合するための入れ物を定義
        batch_tmp = torch.zeros(100, 1, dtype=torch.long, device=device)
        # print(batch_tmp.size())
        # (100,1)

        for _ in range(5):
            decoder_output, decoder_hidden = decoder(
                decoder_input_tensor, decoder_hidden)
            # 予測文字を取得しつつ、そのまま次のdecoderのインプットとなる
            decoder_input_tensor = get_max_index(decoder_output.squeeze())
            # バッチ毎の結果を予測順に結合
            batch_tmp = torch.cat([batch_tmp, decoder_input_tensor], dim=1)

        # 最初のbatch_tmpの0要素が先頭に残ってしまっているのでスライスして削除
        predicts.append(batch_tmp[:, 1:])

# バッチ毎の予測結果がまとまって格納されてます。
print(len(predicts))
# 150
print(predicts[0].size())
# (100, 5)

# 表示
id2char = {str(i): str(i) for i in range(10)}
id2char.update({"10": "", "11": "-", "12": ""})
row = []
for i in range(len(test_input_batch)):
    batch_input = test_input_batch[i]
    batch_output = test_output_batch[i]
    batch_predict = predicts[i]
    for inp, output, predict in zip(batch_input, batch_output, batch_predict):
        x = [id2char[str(idx)] for idx in inp]
        y = [id2char[str(idx)] for idx in output]
        p = [id2char[str(idx.item())] for idx in predict]

        x_str = "".join(x)
        y_str = "".join(y)
        p_str = "".join(p)

        judge = "O" if y_str == p_str else "X"
        row.append([x_str, y_str, p_str, judge])
predict_df = pd.DataFrame(row, columns=["input", "answer", "predict", "judge"])

# 正解率を表示
print(len(predict_df.query('judge == "O"')) / len(predict_df))
# 0.8492
# 間違えたデータを一部見てみる
print(predict_df.query('judge == "X"').head(10))