seq2seq
PyTorchでSeq2Seqを実装してみた を試してみる。
- 引き算する数式を文字列で認識して、結果を出力する。
- データはランダムに数字を生成して、実際に引き算して作成できるから簡単に用意できる。
- seq2seqは、EncoderとDecoderの2種類のネットワークを作成して結合し、それぞれを学習させる。
- Decoderは、学習時と推論時でロジックが変わる。
- 学習時には、結果も時系列がわかっているため、次の時系列のデータを入力することができる。
- 但し、LSTMの出力hは、次に渡していく必要がある。
- 推論時には、Linearした後、SoftMaxまたは、argmaxで、文字を確定させ、それを次のLSTMに渡す。
- それをまたEmbedingし単語ベクトル化して処理する。
- ミニバッチ学習を行うため、学習時はバッチ数の配列操作となる。
# Decoderで使うデータはoutput_tensorを1つずらしたものを使う # Decoderのインプットとするデータ source = output_tensor[:, :-1] # Decoderの教師データ # 生成開始を表す"_"を削っている target = output_tensor[:, 1:]
for j in range(decoder_output.size()[1]): # バッチ毎にまとめてloss計算 # 生成する文字は4文字なので、4回ループ loss += criterion(decoder_output[:, j, :], target[:, j])
- 損失関数の計算も、バッチ毎にまとめて行う。
- スライス操作になれが必要。
- 【Python】スライス操作についてまとめ
- pyTorchのTensor型とは
学習としては、以下のような感じで重みが計算されているはず。
- LSTMの重み
- Encorder
- 計算式として、どの文字列の単語ベクトルを強く残すか
- Embeding (単語ベクトル)
- 0-9の数字の文字と、「 」、「-」、「_」の合計13種類をいかにうまく分散表現させて、計算しているように見せれるか
以下、コードを貼り付け
import pandas as pd import torch import torch.nn as nn import torch.optim as optim from sklearn.model_selection import train_test_split import random from sklearn.utils import shuffle # 数字の文字をID化 char2id = {str(i): i for i in range(10)} # 空白(10):系列の長さを揃えるようのパディング文字 # -(11):マイナスの文字 # _(12):系列生成開始を知らせる文字 char2id.update({" ": 10, "-": 11, "_": 12}) # 空白込みの3桁の数字をランダムに生成 def generate_number(): number = [random.choice(list("0123456789")) for _ in range(random.randint(1, 3))] return int("".join(number)) # 確認 print(generate_number()) # 753 # 系列の長さを揃えるために空白パディング def add_padding(number, is_input=True): number = "{: <7}".format(number) if is_input else "{: <5s}".format(number) return number # 確認 num = generate_number() print("\"" + str(add_padding(num)) + "\"") # "636 " # 7 # データ準備 input_data = [] output_data = [] # データを50000件準備する while len(input_data) < 50000: x = generate_number() y = generate_number() z = x - y input_char = add_padding(str(x) + "-" + str(y)) output_char = add_padding("_" + str(z), is_input=False) # データをIDにに変換 input_data.append([char2id[c] for c in input_char]) output_data.append([char2id[c] for c in output_char]) # 確認 print(input_data[987]) print(output_data[987]) # [1, 5, 11, 2, 6, 6, 10] (←"15-266") # [12, 11, 2, 5, 1] (←"_-251") # 7:3にデータをわける train_x, test_x, train_y, test_y = train_test_split( input_data, output_data, train_size=0.7) # データをバッチ化するための関数 def train2batch(input_data, output_data, batch_size=100): input_batch = [] output_batch = [] input_shuffle, output_shuffle = shuffle(input_data, output_data) for i in range(0, len(input_data), batch_size): input_batch.append(input_shuffle[i:i+batch_size]) output_batch.append(output_shuffle[i:i+batch_size]) return input_batch, output_batch embedding_dim = 200 # 文字の埋め込み次元数 hidden_dim = 128 # LSTMの隠れ層のサイズ vocab_size = len(char2id) # 扱う文字の数。今回は13文字 # GPU使う用 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Encoderクラス class Encoder(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim): super(Encoder, self).__init__() self.hidden_dim = hidden_dim self.word_embeddings = nn.Embedding( vocab_size, embedding_dim, padding_idx=char2id[" "]) self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True) def forward(self, sequence): embedding = self.word_embeddings(sequence) # Many to Oneなので、第2戻り値を使う _, state = self.lstm(embedding) # state = (h, c) return state # Decoderクラス class Decoder(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim): super(Decoder, self).__init__() self.hidden_dim = hidden_dim self.word_embeddings = nn.Embedding( vocab_size, embedding_dim, padding_idx=char2id[" "]) self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True) # LSTMの128次元の隠れ層を13次元に変換する全結合層 self.hidden2linear = nn.Linear(hidden_dim, vocab_size) def forward(self, sequence, encoder_state): embedding = self.word_embeddings(sequence) # Many to Manyなので、第1戻り値を使う。 # 第2戻り値は推論時に次の文字を生成するときに使います。 output, state = self.lstm(embedding, encoder_state) output = self.hidden2linear(output) return output, state # GPU使えるように。 encoder = Encoder(vocab_size, embedding_dim, hidden_dim).to(device) decoder = Decoder(vocab_size, embedding_dim, hidden_dim).to(device) # 損失関数 criterion = nn.CrossEntropyLoss() # 最適化 encoder_optimizer = optim.Adam(encoder.parameters(), lr=0.001) decoder_optimizer = optim.Adam(decoder.parameters(), lr=0.001) BATCH_NUM = 100 EPOCH_NUM = 100 all_losses = [] print("training ...") for epoch in range(1, EPOCH_NUM+1): epoch_loss = 0 # epoch毎のloss # データをミニバッチに分ける input_batch, output_batch = train2batch( train_x, train_y, batch_size=BATCH_NUM) for i in range(len(input_batch)): # 勾配の初期化 encoder_optimizer.zero_grad() decoder_optimizer.zero_grad() # データをテンソルに変換 input_tensor = torch.tensor(input_batch[i], device=device) output_tensor = torch.tensor(output_batch[i], device=device) # Encoderの順伝搬 encoder_state = encoder(input_tensor) # Decoderで使うデータはoutput_tensorを1つずらしたものを使う # Decoderのインプットとするデータ source = output_tensor[:, :-1] # Decoderの教師データ # 生成開始を表す"_"を削っている target = output_tensor[:, 1:] loss = 0 # 学習時はDecoderはこのように1回呼び出すだけでグルっと系列をループしているからこれでOK # sourceが4文字なので、以下でLSTMが4回再帰的な処理してる decoder_output, _ = decoder(source, encoder_state) # decoder_output.size() = (100,4,13) # 「13」は生成すべき対象の文字が13文字あるから。decoder_outputの3要素目は # [-14.6240, -3.7612, -11.0775, ..., -5.7391, -15.2419, -8.6547] # こんな感じの値が入っており、これの最大値に対応するインデックスを予測文字とみなす for j in range(decoder_output.size()[1]): # バッチ毎にまとめてloss計算 # 生成する文字は4文字なので、4回ループ loss += criterion(decoder_output[:, j, :], target[:, j]) epoch_loss += loss.item() # 誤差逆伝播 loss.backward() # パラメータ更新 # Encoder、Decoder両方学習 encoder_optimizer.step() decoder_optimizer.step() # 損失を表示 print("Epoch %d: %.2f" % (epoch, epoch_loss)) all_losses.append(epoch_loss) if epoch_loss < 1: break print("学習終了") # 推論 # Decoderのアウトプットのテンソルから要素が最大のインデックスを返す。つまり生成文字を意味する def get_max_index(decoder_output): results = [] for h in decoder_output: results.append(torch.argmax(h)) return torch.tensor(results, device=device).view(BATCH_NUM, 1) # 評価用データ test_input_batch, test_output_batch = train2batch(test_x, test_y) input_tensor = torch.tensor(test_input_batch, device=device) predicts = [] for i in range(len(test_input_batch)): with torch.no_grad(): # 勾配計算させない encoder_state = encoder(input_tensor[i]) # Decoderにはまず文字列生成開始を表す"_"をインプットにするので、"_"のtensorをバッチサイズ分作成 start_char_batch = [[char2id["_"]] for _ in range(BATCH_NUM)] decoder_input_tensor = torch.tensor(start_char_batch, device=device) # 変数名変換 decoder_hidden = encoder_state # バッチ毎の結果を結合するための入れ物を定義 batch_tmp = torch.zeros(100, 1, dtype=torch.long, device=device) # print(batch_tmp.size()) # (100,1) for _ in range(5): decoder_output, decoder_hidden = decoder( decoder_input_tensor, decoder_hidden) # 予測文字を取得しつつ、そのまま次のdecoderのインプットとなる decoder_input_tensor = get_max_index(decoder_output.squeeze()) # バッチ毎の結果を予測順に結合 batch_tmp = torch.cat([batch_tmp, decoder_input_tensor], dim=1) # 最初のbatch_tmpの0要素が先頭に残ってしまっているのでスライスして削除 predicts.append(batch_tmp[:, 1:]) # バッチ毎の予測結果がまとまって格納されてます。 print(len(predicts)) # 150 print(predicts[0].size()) # (100, 5) # 表示 id2char = {str(i): str(i) for i in range(10)} id2char.update({"10": "", "11": "-", "12": ""}) row = [] for i in range(len(test_input_batch)): batch_input = test_input_batch[i] batch_output = test_output_batch[i] batch_predict = predicts[i] for inp, output, predict in zip(batch_input, batch_output, batch_predict): x = [id2char[str(idx)] for idx in inp] y = [id2char[str(idx)] for idx in output] p = [id2char[str(idx.item())] for idx in predict] x_str = "".join(x) y_str = "".join(y) p_str = "".join(p) judge = "O" if y_str == p_str else "X" row.append([x_str, y_str, p_str, judge]) predict_df = pd.DataFrame(row, columns=["input", "answer", "predict", "judge"]) # 正解率を表示 print(len(predict_df.query('judge == "O"')) / len(predict_df)) # 0.8492 # 間違えたデータを一部見てみる print(predict_df.query('judge == "X"').head(10))