RNN(LSTM)を使ってみる
今日は、以下を参考にLSTMを動かしてみる。
PyTorchを使って、「livedoorニュースコーパス」のニュース記事のタイトルの分類を行う。
データの読み込みには、Pandasを使い、形態素解析には、Mecabを使う。
PyTorchのデータ読み込みは、DataLoaderやDatasetがあった気がするがまぁいいだろう。
なので、まず以下のセットアップを行う。
pip install pandas
pip install torch
pip install mecab
pip install sklearn
LSTMとしては、PyTorchを使うので特に難しいことはない。使うだけなら。。
最終的に分類するので、LSTMを通した結果の文章ベクトルをLinearに通して、Softmaxで確率に落とせばよい。
単語の分散表現(単語ベクトル)は、以下でランダムに初期化して、学習させる。
nn.Embedding(vocab_size, embedding_dim)
今回のネットワークでは、この単語ベクトルとLSTMの重みが学習されていくはず。
単語ベクトルの分散具合と分類の分散具合が同じ方向になるのと、LSTMで隠れ層を伝わっていく時にどんな分散を強く伝えていけばよいかが学習されているのであろう。
使ったコードをコピペ
import os from glob import glob import pandas as pd import linecache import MeCab import re import torch import torch.nn as nn from sklearn.model_selection import train_test_split import torch.optim as optim # カテゴリを配列で取得 categories = [name for name in os.listdir( "text") if os.path.isdir("text/" + name)] # print(categories) # ['movie-enter', 'it-life-hack', 'kaden-channel', 'topic-news', 'livedoor-homme', 'peachy', 'sports-watch', 'dokujo-tsushin', 'smax'] datasets = pd.DataFrame(columns=["title", "category"]) for cat in categories: path = "text/" + cat + "/*.txt" files = glob(path) for text_name in files: title = linecache.getline(text_name, 3) s = pd.Series([title, cat], index=datasets.columns) datasets = datasets.append(s, ignore_index=True) # データフレームシャッフル datasets = datasets.sample(frac=1).reset_index(drop=True) # print(datasets.head()) tagger = MeCab.Tagger("-Owakati") def make_wakati(sentence): # MeCabで分かち書き sentence = tagger.parse(sentence) # 半角全角英数字除去 sentence = re.sub(r'[0-90-9a-zA-Za-zA-Z]+', " ", sentence) # 記号もろもろ除去 sentence = re.sub( r'[\._-―─!@#$%^&\-‐|\\*\“()_■×+α※÷⇒—●★☆〇◎◆▼◇△□(:〜~+=)/*&^%$#@!~`){}[]…\[\]\"\'\”\’:;<>?<>〔〕〈〉?、。・,\./『』【】「」→←○《》≪≫\n\u3000]+', "", sentence) # スペースで区切って形態素の配列へ wakati = sentence.split(" ") # 空の要素は削除 wakati = list(filter(("").__ne__, wakati)) return wakati # 単語ID辞書を作成する word2index = {} for title in datasets["title"]: wakati = make_wakati(title) for word in wakati: if word in word2index: continue word2index[word] = len(word2index) print("vocab size : ", len(word2index)) def sentence2index(sentence): wakati = make_wakati(sentence) return torch.tensor([word2index[w] for w in wakati], dtype=torch.long) category2index = {} for cat in categories: if cat in category2index: continue category2index[cat] = len(category2index) print(category2index) #{'movie-enter': 0, 'it-life-hack': 1, 'kaden-channel': 2, 'topic-news': 3, 'livedoor-homme': 4, 'peachy': 5, 'sports-watch': 6, 'dokujo-tsushin': 7, 'smax': 8} def category2tensor(cat): return torch.tensor([category2index[cat]], dtype=torch.long) class LSTMClassifier(nn.Module): # モデルで使う各ネットワークをコンストラクタで定義 def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size): # 親クラスのコンストラクタ。決まり文句 super(LSTMClassifier, self).__init__() # 隠れ層の次元数。これは好きな値に設定しても行列計算の過程で出力には出てこないので。 self.hidden_dim = hidden_dim # インプットの単語をベクトル化するために使う self.word_embeddings = nn.Embedding(vocab_size, embedding_dim) # LSTMの隠れ層。これ1つでOK。超便利。 self.lstm = nn.LSTM(embedding_dim, hidden_dim) # LSTMの出力を受け取って全結合してsoftmaxに食わせるための1層のネットワーク self.hidden2tag = nn.Linear(hidden_dim, tagset_size) # softmaxのLog版。dim=0で列、dim=1で行方向を確率変換。 self.softmax = nn.LogSoftmax(dim=1) # 順伝播処理はforward関数に記載 def forward(self, sentence): # 文章内の各単語をベクトル化して出力。2次元のテンソル embeds = self.word_embeddings(sentence) # 2次元テンソルをLSTMに食わせられる様にviewで3次元テンソルにした上でLSTMへ流す。 # 上記で説明した様にmany to oneのタスクを解きたいので、第二戻り値だけ使う。 _, lstm_out = self.lstm(embeds.view(len(sentence), 1, -1)) # lstm_out[0]は3次元テンソルになってしまっているので2次元に調整して全結合。 tag_space = self.hidden2tag(lstm_out[0].view(-1, self.hidden_dim)) # softmaxに食わせて、確率として表現 tag_scores = self.softmax(tag_space) return tag_scores # 元データを7:3に分ける(7->学習、3->テスト) traindata, testdata = train_test_split(datasets, train_size=0.7) # 単語のベクトル次元数 EMBEDDING_DIM = 10 # 隠れ層の次元数 HIDDEN_DIM = 128 # データ全体の単語数 VOCAB_SIZE = len(word2index) # 分類先のカテゴリの数 TAG_SIZE = len(categories) # モデル宣言 model = LSTMClassifier(EMBEDDING_DIM, HIDDEN_DIM, VOCAB_SIZE, TAG_SIZE) # 損失関数はNLLLoss()を使う。LogSoftmaxを使う時はこれを使うらしい。 loss_function = nn.NLLLoss() # 最適化の手法はSGDで。lossの減りに時間かかるけど、一旦はこれを使う。 optimizer = optim.SGD(model.parameters(), lr=0.01) # 各エポックの合計loss値を格納する losses = [] # 100ループ回してみる。(バッチ化とかGPU使ってないので結構時間かかる...) for epoch in range(100): all_loss = 0 for title, cat in zip(traindata["title"], traindata["category"]): # モデルが持ってる勾配の情報をリセット model.zero_grad() # 文章を単語IDの系列に変換(modelに食わせられる形に変換) inputs = sentence2index(title) # 順伝播の結果を受け取る out = model(inputs) # 正解カテゴリをテンソル化 answer = category2tensor(cat) # 正解とのlossを計算 loss = loss_function(out, answer) # 勾配をセット loss.backward() # 逆伝播でパラメータ更新 optimizer.step() # lossを集計 all_loss += loss.item() losses.append(all_loss) print("epoch", epoch, "\t", "loss", all_loss) print("done.")