Transformer로 한국어 품사 태거 만들기

한국어 품사 태거를 만들어 봅시다

품사 태거(Part-of-Speech Tagger)는 NLP에서 기본이 되는 태스크이고, 그 때문에 NLP를 배울 때 많이 만들어보는 기능이기도 합니다. 품사 태거가 NLP에서 나름 인기있는 이유는 몇가지가 있을 겁니다.

  • 다른 NLP Task의 입력 또는 전처리 역할을 합니다. 요즘은 트렌드가 언어학적인 요소를 많이 배제하는 추세지만 전통적으로 파싱, 번역 등의 NLP 파이프라인에 많이 쓰였습니다.
  • 한국어에서 BPE와 같은 통계적인 Tokenizer를 대신하는 방법으로 쓰이기도 합니다. 한국어의 특성상 통계적인 Subword보다는 형태소 단위를 토근으로 쓸 때 성능이 더 좋긴 경우들이 있습니다. (참조: 한국어 토큰의 단위는 뭐가 좋을까?)
  • 품사 태깅은 Sequential Labeling 문제이기 때문에 CRF, RNN 같은 NLP에 적합한 ML 기법을 배우고 활용하기 좋습니다.

한국어 품사 태거를 만드는 방법은 오랫동안 연구되었고 수많은 방법이 개발되었습니다. 오늘은 Transformer를 사용해서 한국어 품사 태거를 만들어보려고 합니다.

먼저 문제를 정의해봅시다

품사 태깅은 문장을 이루는 각 단어의 품사를 찾는 문제입니다. 영어 문장 예를 하나 들어보겠습니다.

I am a boy. -> I/대명사 + am/동사 + a/관사 + boy/명사 + ./마침표

하지만 한국어는 이렇게 간단치가 않습니다. 한국어가 형태소들이 붙어서 만들어지는 교착어이기 때문입니다. https://konlpy-ko.readthedocs.io/ko/v0.4.3/morph/에 있는 예를 가져와보겠습니다.

가방에 들어가신다 -> 가방/NNG + 에/JKM + 들어가/VV + 시/EPH + ㄴ다/EFN

NNG, JKM 등은 명사, 조사 등을 나타내는 품사 기호입니다. 적절한 품사를 붙이는 것 외에도 다른 문제들이 있습니다.

  1. 가방에가방 + 에로 쪼개집니다. 하지만 가 + 방에가 될 수도 있고, 가방에가 하나의 형태소가 될 수 있습니다. 어떻게 가방에가방 두개의 형태소가 합쳐졌다는 것을 알 수 있을까요?
  2. 들어가신다들어가 + 시 + ㄴ다가 됐습니다. 형태소 단위로 쪼개야한다는 것은 앞에서 말했지만 이번에는 원래 어절에는 없던 ㄴ다가 등장했습니다. 즉, 단순히 글자 사이의 경계를 찾는 문제가 아니라 어떤 때는 자음, 모음 단위까지 쪼개야하고 심지어는 원래 어절에는 없던 글자가 나타나기도 합니다. 예를 들어 위해위하 + 아로 쪼개집니다.

이렇게 각 어절을 형태소 단위로 나누고 숨겨진 형태소를 찾아내는 과정을 형태소 분석이라고 합니다. 즉, 한국어는 형태소 분석과 품사 태깅을 모두 해야 비로서 진정한 품사 태깅을 할 수 있습니다.

(논외로 이를 해결하기 위한 다른 접근법들도 있습니다. 형태소에 품사 태깅을 하는 대신에 어절을 태깅의 대상으로 보고 어절에 NNG+JKM과 같은 복합 태그를 붙이는 방법이 한 예입니다.)

우리는 품사 태깅을 두가지 문제로 나누어서 풀어보려고 합니다.

  1. 먼저 형태소 분석/복원을 담당하는 모듈을 만듭니다.
  2. 이렇게 복원된 형태소에 품사를 붙이는 모듈을 만듭니다.
  3. 이 두가지를 붙이면 온전한 한국어 품사 태깅 모듈이 됩니다.

형태소 복원 문제 정의

형태소 복원 단계에서는 품사는 신경쓰지 않습니다.

가방에 들어가신다 -> 가방 에 들어가 시 ㄴ다

즉, 위와 같은 문제를 풉니다. 한번 더 문제를 들여다보면 가방, 들어가, , ㄴ다는 다른 특성이 있습니다. 가방, 들어가는 어절의 첫번째 형태소이고 다른 것들은 그렇지 않습니다. 어쩌면 이런 특징이 형태소 복원에 도움을 줄지도 모릅니다. 그래서 문제를 약간 다르게 정의해보겠습니다.

가방에 들어가신다 -> 가방 ▁에 들어가 ▁시 ▁ㄴ다

어절의 처음이 아닌 형태소 앞에서는 을 붙여주었습니다. 주의할 점은 는 우리가 흔히 쓰는 _(언더바)가 아닙니다. 비슷하게 생겼지만요.

그럼 어떻게 가방에 들어가신다에 들어가신다를 가지고 가방 ▁에 들어가 ▁시 ▁ㄴ다를 만들 수 있을까요? 많은 방법이 있겠지만 우리는 Transformer Encoder와 Decoder를 써서 Seq2Seq 문제로 풀어보겠습니다. 마치 한국어를 넣어서 영어 문장을 번역하는 것처럼 가방에 들어가신다가방 ▁에 들어가 ▁시 ▁ㄴ다으로 번역하는 셈입니다.

품사 태깅 문제 정의

다음으로 복원된 형태소에 품사를 붙여보겠습니다.

가방 ▁에 들어가 ▁시 ▁ㄴ다 -> 가방/NNG + ▁에/JKM + 들어가/VV + ▁시/EPH + ▁ㄴ다/EFN

예를 들어보면 이런 문제가 될 겁니다. 각 형태소의 품사는 정해진 몇가지 품사 중에 하나가 될 것이기 때문에 Multi Classification 문제입니다. 그리고 앞뒤의 품사 태깅 결과에 영향을 받을 것이기 때문에 Sequential Labeling 문제입니다. LSTM과 같은 RNN을 쓸 수도 있지만 오늘은 Transformer Encoder를 써서 이 문제를 풀어보겠습니다.

코퍼스 준비

이제 문제를 정의했으니 데이터를 준비해보겠습니다. 이번 구현에서는 모두의 말뭉치 중 형태 분석 말뭉치를 사용합니다. 모두의 말뭉치는 무료지만 회원가입 후 신청서를 작성해야 다운로드를 받을 수 있습니다. 신청서 검토 후 승인에 하루에서 수일이 걸릴 수도 있습니다. 승인을 기다리는 동안 https://rlkujwkk7.toastcdn.net/7/NIKL_MP(v1.0).pdf에서 샘플을 볼 수 있습니다.

전처리

https://rlkujwkk7.toastcdn.net/7/NIKL_MP(v1.0).pdf에서 보신 것처럼 모두의 말뭉치는 다음과 같은 구조로 되있습니다. (우리에게 불필요한 node는 생략했습니다.)

{
    "document": {
        "sentence": [
            {
                "form": "<문장>",
                "word": [
                    {
                        "id": 1,
                        "form": "<어절1>",
                    },
                    {

                    }
                ],
                "morpheme": [
                    {

                    },
                    {
                        "id": 1,
                        "form": "<형태소1>",
                        "label": "<품사>",
                        "word_id": 1 // 이 형태소가 속한 어절의 id

                    }
                ]
            }
        ]
    }
}

앞에서 정의한 문제에 맞춰서 데이터를 변환하겠습니다. 그리고 나중에 한 문장씩 처리할 수 있도록 한 번에 하나의 문장이 위치하도록 데이터를 구성하겠습니다.

{"sentence": "가방에 들어가신다", "morphemes": "가방 ▁에 들어가 ▁시 ▁ㄴ다", "labels": ["NNG", "JKM", "VV", "EPH", "EFN"]}
{"sentence": "지구는 행성입니다.", "morphemes": "지구▁는 행성▁이▁ㅂ니다▁.", "labels": ["NNG", "JX", "NNG", "VCP", "EF", "SF"]}

전처리를 거치고 나면 이런 모습이 될 겁니다.

전처리 코드를 보겠습니다.

import json
import argparse
import tqdm

MORPHEME_SEPARATOR = "▁"
EOJEOL_SEPARATOR = " "

parser = argparse.ArgumentParser()
parser.add_argument('-i', '--input')
parser.add_argument('-o', '--output')

args = parser.parse_args()

total_sentences = 0
total_eojeols = 0

with open(args.output, 'w') as f:
    input_data = json.loads(open(args.input).read())

    for document_item in tqdm.tqdm(input_data['document']):
        for sentence_item in document_item['sentence']:
            total_sentences += len(sentence_item)

            if sentence_item['word'] and sentence_item['morpheme']:
                word_string = EOJEOL_SEPARATOR.join(word_item['form'] for word_item in sentence_item['word'])

                labels = []
                old_word_id = 0
                morphemes_per_eojeol = []
                morphemes = []
                for morpheme_item in sentence_item['morpheme']:
                    # 어절의 처음이 아닌 형태소 앞에 ▁를 붙여줍니다.
                    if morpheme_item['word_id'] != 1 and morpheme_item['word_id'] != old_word_id:
                        morphemes_per_eojeol.append(MORPHEME_SEPARATOR.join(morphemes))
                        morphemes = []
                        old_word_id = morpheme_item['word_id']

                    morphemes.append(morpheme_item['form'])

                    labels.append(morpheme_item['label'])

                morphemes_per_eojeol.append(MORPHEME_SEPARATOR.join(morphemes))

                morpheme_string = EOJEOL_SEPARATOR.join(morphemes_per_eojeol)

                total_eojeols += len(sentence_item['word'])

                f.write(json.dumps({'sentence': word_string, 'morphemes': morpheme_string, 'labels': labels}) + '\n')

print(f'Total sentences: {total_sentences}')
print(f'Total eojeols: {total_eojeols}')

위 코드를 preprocess.py로 저정합니다.

모두의 말뭉치를 받으면 NXMP1902008040.jsonSXMP1902008031.json 파일이 있습니다. 처음 파일은 문어체이고 두번째 파일은 구어체입니다. 우리는 두가지를 모두 사용하겠습니다. 이 두 파일은 현재 디렉토리의 corpus 디렉토리에 저장해두었습니다.

python3 preprocess.py -i corpus/NXMP1902008040.json -o corpus/NXMP_pre.json
python3 preprocess.py -i corpus/SXMP1902008031.json -o corpus/SXMP_pre.json

대략 문어체 200만 어절, 구어체 100만 어절임을 알 수 있습니다. 문장수로는 문어체가 15만, 구어체가 22만 쯤 됩니다. 구어체 문장들이 짧다는 것을 알 수 있습니다.

아래와 같이 두 파일을 합쳐줍니다.

cat corpus/NXMP_pre.json corpus/SXMP_pre.json > corpus/corpus.json

평가 데이터 분리하기

꼭 필요하지는 않지만 훈련 데이터를 shuffle 해주는 것이 좋습니다. 텍스트 데이터의 경우 문장 간에 일종의 패턴이나 흐름이 있기 때문에 데이터를 섞어 주지 않으면 모델이 이런 정보에 의지에서 학습을 할 가능성이 있습니다.

import sys
import random

lines = []

for line in sys.stdin:
    lines.append(line.strip())

random.shuffle(lines)

for line in lines:
    print(line)

위 파일을 shuffle.py로 저장하고 아래와 같이 실행합니다.

python3 shuffle.py < corpus/corpus.json > corpus/corpus.shuffled.json

이제 데이터가 잘 섞였으니 훈련 데이터와 평가 데이터를 분리합니다. 이번에는 간단히 평가 데이터로 1,000 문장만 사용하겠습니다.

head -1000 corpus/corpus.shuffled.json > corpus/test.json
tail +1001 corpus/corpus.shuffled.json > corpus/train.json

형태소 복원 모듈 만들기 (훈련기)

앞에서 정의했던 형태소 복원 문제를 다시 한번 떠올려보겠습니다.

가방에 들어가신다 -> 가방 ▁에 들어가 ▁시 ▁ㄴ다

Transformer를 사용할 예정이니, Transformer Encoder에 "가방에 들어가신다"를 입력으로 넣고 Transformer Decoder가 "가방 ▁에 들어가 ▁시 ▁ㄴ다"를 출력하도록 만들면 됩니다.

(오늘 사용할 코드의 많은 부분은 https://pytorch.org/tutorials/beginner/translation_transformer.html에서 차용했습니다.)

import와 device 준비

import torch
import torch.nn as nn
from torch.utils.data import IterableDataset
from torch.utils.data import DataLoader
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
from torchtext.vocab import Vocab
from torch.nn import TransformerEncoder, TransformerEncoderLayer, TransformerDecoder, TransformerDecoderLayer
from torch.utils.tensorboard import SummaryWriter

import json
import argparse
import datetime
import os
from collections import Counter
import math

torch.manual_seed(0)
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

사용할 모듈들을 import하고 cpu 또는 gpu를 결정합니다.

Dataset

class RestoreDataset(IterableDataset):
    UNK_INDEX = 0
    PAD_INDEX = 1
    BOS_INDEX = 2
    EOS_INDEX = 3

    def __init__(self, corpus_filename, max_characters, source_vocab=None, target_vocab=None):
        super(RestoreDataset, self).__init__()

        self.corpus_filename = corpus_filename
        self.max_characters = max_characters

        # 이미 만들어진 vocab을 사용하는 경우는 corpus에서 vocab을 만들지 않습니다.
        # 예를 들어 이미 저장된 checkpoint부터 훈련을 다시 사용할 경우가 그렇습니다.
        if source_vocab:
            self.source_vocab = source_vocab
            self.target_vocab = target_vocab
        else:
            print('Building a vocab')
            self.source_vocab, self.target_vocab = self.build_vocab()
            print(f'Found {len(self.source_vocab)} source tokens')        
            print(f'Found {len(self.target_vocab)} target tokens')        

    def build_vocab(self):
        source_counter = Counter()
        target_counter = Counter()

        for line in open(self.corpus_filename):            
            data = json.loads(line)
            # Tokenizer를 사용하지 않고 글자가 각 Vocab이 됩니다.
            source_counter.update(data['sentence'])
            # Output의 Vocab도 글자 단위지만 자음, 모음과 같이 입력에는 없는 Vocab이 있을 수도 있습니다.
            target_counter.update(data['morphemes'])

        # 특수 Vocab들을 추가합니다.
        return Vocab(source_counter, specials=['<unk>', '<pad>', '<bos>', '<eos>']), Vocab(target_counter, specials=['<unk>', '<pad>', '<bos>', '<eos>'])

    # Corpus에서 한 문장씩 읽은 후에 tensor 형태로 만들어서 돌려줍니다.
    def __iter__(self):
        for line in open(self.corpus_filename):
            data = json.loads(line)

            # 너무 긴 문장은 생략
            if len(data['sentence']) > self.max_characters:
                continue

            x = torch.tensor([self.source_vocab[c] for c in data['sentence'].strip()], dtype=torch.long)
            y = torch.tensor([self.target_vocab[c] for c in data['morphemes'].strip()], dtype=torch.long)

            yield x, y

우리는 Tokenzier를 사용하지 않고 입력과 출력에 글자를 단위로 사용할 예정입니다. 하지만 출력은 형태소이기 때문에 입력에는 없는 자음이나 모음들이 나타날 수도 있습니다. 입력 출력 모두 글자이지만 각자 다른 Vocab을 사용하는 이유입니다.

나중에 모델에서 Embedding을 사용할 예정이기 때문에 각 토큰(글자)를 Index로 변환했습니다.

Model

class MorphemeRestoreModel(nn.Module):
    def __init__(self, num_encoder_layers, num_decoder_layers, embedding_dim, source_vocab_size, target_vocab_size, num_heads, feedforward_dim=512, dropout=0.1):
        super(MorphemeRestoreModel, self).__init__()

        encoder_layer = TransformerEncoderLayer(d_model=embedding_dim, nhead=num_heads, dim_feedforward=feedforward_dim)
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
        decoder_layer = TransformerDecoderLayer(d_model=embedding_dim, nhead=num_heads, dim_feedforward=feedforward_dim)
        self.transformer_decoder = TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)

        self.generator = nn.Linear(embedding_dim, target_vocab_size)
        self.source_embedding = TokenEmbedding(source_vocab_size, embedding_dim)
        self.target_embedding = TokenEmbedding(target_vocab_size, embedding_dim)
        self.positional_encoding = PositionalEncoding(embedding_dim, dropout=dropout)

    def forward(self, source, target, source_mask, target_mask, source_padding_mask, target_padding_mask, memory_key_padding_mask):
        source_embedding = self.positional_encoding(self.source_embedding(source))
        target_embedding = self.positional_encoding(self.target_embedding(target))
        memory = self.transformer_encoder(source_embedding, source_mask, source_padding_mask)
        outs = self.transformer_decoder(target_embedding, memory, target_mask, None, target_padding_mask, memory_key_padding_mask)

        return self.generator(outs)

    def encode(self, source, source_mask):
        return self.transformer_encoder(self.positional_encoding(self.source_embedding(source)), source_mask)

    def decode(self, target, memory, target_mask):
        return self.transformer_decoder(self.positional_encoding(self.target_embedding(target)), memory, target_mask)

class PositionalEncoding(nn.Module):
    def __init__(self, emb_size: int, dropout, maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding):
        return self.dropout(token_embedding +
                            self.pos_embedding[:token_embedding.size(0),:])

class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(TokenEmbedding, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding_dim = embedding_dim

    def forward(self, tokens):
        return self.embedding(tokens.long()) * math.sqrt(self.embedding_dim)

def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))

    return mask

def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=DEVICE).type(torch.bool)

    src_padding_mask = (src == RestoreDataset.PAD_INDEX).transpose(0, 1)
    tgt_padding_mask = (tgt == RestoreDataset.PAD_INDEX).transpose(0, 1)

    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

이 코드는 https://pytorch.org/tutorials/beginner/translation_transformer.html 그대로입니다. Pytorch가 제공하는 Transformer Layer를 잘 쓰려면 개발 문서를 잘 읽어볼 필요가 있습니다. 몇가지 주의 사항을 남겨봅니다.

  • Transformer Encoder는 입력으로 [seq_len, batch_len, embedding_dim]을 기대합니다. v1.8 기준으로 아직 다른 Layer들과는 달리 batch_first 옵션을 지원하지 않기 때문에 주의해야합니다. pytorh 1.9에는 Transformer에서 batch_first가 추가될 예정입니다.
  • 하지만 padding mask(src_key_padding_mask, tgt_key_padding_mask)는 [batch_len, seq_len] 입니다. 헷갈릴 수 있는 부분이죠.

Transformer를 쓰다보면 Mask가 헷갈릴 수 있는데요. 크게 두가지 Mask가 있습니다. (참조: https://pytorch.org/docs/stable/generated/torch.nn.Transformer.html)

  • src_mask, tgt_mask: 우리 코드에서는 src_mask, tgt_mask라는 이름입니다. Self-attention을 훈련시킬 때 Attend 하지 말아야할 토큰을 가리는 역할입니다. 우리의 경우 입력 토큰에 대해서는 가릴 필요가 없습니다. 출력 토근은 현재보다 오른쪽 (미래) 토큰은 Attend 하지 못하게 해야합니다. generate_square_subsequent_mask() 함수가 삼각 행렬 형태로 이런 Mask를 만들어 줍니다.
  • src_key_padding_mask, tgt_key_padding_mask, memory_key_padding_mask: 우리 코드에서는 src_padding_mask, tgt_padding_mask, memory_key_padding_mask라는 이름입니다. 이 Mask는 실제 Token이 아닌 Padding Token을 무시하기 위해서 쓰입니다. 위 코드를 보시면 PAD_INDEX인 경우 True로 설정하는 것을 볼 수 있습니다. 앞에 설명한대로 이 마스크는 [batch_len, seq_len]이어야하기 때문에 transpose()를 해줍니다.
  • `

encode()decode()는 훈련 때는 사용하지 않습니다. 나중에 문장을 실제로 해석할 때 (Inference) 사용하지만 코드의 완결성을 위해서 포함해두었습니다.

Batch 내에서 Padding 넣어주기

def make_batch(data_batch):
    inputs = []
    labels = []

    for input_item, label_item in data_batch:
        inputs.append(torch.cat([torch.tensor([RestoreDataset.BOS_INDEX]), input_item, torch.tensor([RestoreDataset.EOS_INDEX])], dim=0))
        labels.append(torch.cat([torch.tensor([RestoreDataset.BOS_INDEX]), label_item, torch.tensor([RestoreDataset.EOS_INDEX])], dim=0))

    input_batch = pad_sequence(inputs, padding_value=RestoreDataset.PAD_INDEX)
    label_batch = pad_sequence(labels, padding_value=RestoreDataset.PAD_INDEX)

    return input_batch, label_batch

Transformer는 입력으로 mini-batch를 받고, 한 Batch 내에서 Sequence의 길이는 모두 같아야합니다. 이를 위해서 Dataloader의 collate_fn에 Padding 기능을 하는 함수를 지정해주어야합니다. 우리 코드에서는 make_batch()가 이 역할을 합니다.

주의해서 볼 점은 입력 문장의 앞에 <bos>, 끝에 <eos>를 붙여준다는 점입니다. 문장의 처음과 끝이라는 중요한 정보이기도 하고, 나중에 Decoder가 Token을 생성해 낼 때 <eos>를 만들면 생성이 끝났다고 판단하기 위해서입니다.

pad_sequence()는 첫번째 인자로 받은 List 내의 아이템 길이가 모두 같도록 padding_value로 Padding을 넣어줍니다. 인자로 batch_first를 주지 않으면 결과는 [seq_len, batch_len, embedding_dim]이 됩니다. Transformer 기대하는 형태이기 때문에 이대로 사용을 하겠습니다.

훈련 코드

def train(epoch, model, train_dataloader, source_vocab, target_vocab, loss_fn, optimizer):
    model.train()

    total_loss = 0.0

    begin_time_batch_print = datetime.datetime.now()

    for num_batches, (x, y) in enumerate(train_dataloader, 1):
        x = x.to(DEVICE)    
        y = y.to(DEVICE)

        # Teacher Enforcing을 위해서 Decoder 입력으로 마지막 토근(<eos>)을 제외한 값을 사용합니다.
        y_input = y[:-1, :]

        source_mask, target_mask, source_padding_mask, target_padding_mask = create_mask(x, y_input)
        source_mask = source_mask.cuda()
        target_mask = target_mask.cuda()
        source_padding_mask = source_padding_mask.cuda()
        target_padding_mask = target_padding_mask.cuda()

        logits = model(x, y_input, source_mask, target_mask, source_padding_mask, target_padding_mask, source_padding_mask)

        optimizer.zero_grad()

        # Decoder의 출력은 두번째 토큰부터 <eos>까지여야 합니다.
        y_out = y[1:, :]

        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), y_out.reshape(-1))
        loss.backward()

        optimizer.step()

        total_loss += loss.item()

        if num_batches % args.batch_print == 0:
            end_time_batch_print = datetime.datetime.now()   
            print(f'Epoch {epoch:>2d},\tBatch {num_batches:>6d},\tloss: {total_loss / num_batches:.10f}\tTook {str(end_time_batch_print - begin_time_batch_print)}')

            begin_time_batch_print = datetime.datetime.now()

    # 나중에 Checkpoint 부터 훈련이 재개할 때를 대비해서 필요한 정보를 저장합니다.
    print('Saving checkpoint')
    torch.save({
        'epoch': epoch,
        'source_vocab_size': len(source_vocab),
        'target_vocab_size': len(target_vocab),
        'embedding_dim': args.embedding_dim,
        'num_heads': args.num_heads,
        'feedforward_dim': args.feedforward_dim,
        'num_encoder_layers': args.num_encoder_layers,
        'num_decoder_layers': args.num_decoder_layers,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': total_loss,
        'source_vocab': source_vocab,
        'target_vocab': target_vocab
    }, os.path.join(args.output, f'model_{epoch:02d}.pth'))
    print('Done')

    return total_loss / num_batches

평범한 Pytorch 훈련 코드입니다. 나중에 훈련 재개를 위해서 Checkpoint를 저장합니다. (참조: Pytorch로 훈련 이어서하기)

Decoder의 입력과 출력으로 Teacher Enforcing을 쓰기 위해서 y_input = y[:-1, :], y_out = y[1:, :]과 같이 Input, Output을 만듭니다. Seq2Seq 모델에서는 이전에 생성된 토큰을 다음에 생성할 토큰의 입력으로 사용하곤합니다 (이를 autoregressive model이라고 부릅니다). 하지만 훈련 때는 생성된 토큰이 아직 정확하지 않고, 또 우리는 모델을 정답을 기반으로 훈련시켜야하기 때문에 모델이 생성한 값을 사용하지 않고 강제로 정답의 토큰을 사용합니다. 이런 방식을 Teacher Enforcing이라고 부릅니다.

main 실행 부분

if __name__ == '__main__':
    print(f'Using {DEVICE}')

    # 각종 hyperprameter를 명령행으로 받을 수 있도록 준비합니다.
    parser = argparse.ArgumentParser()
    parser.add_argument('-t', '--train', required=True)
    parser.add_argument('-b', '--batch', type=int, required=True)
    parser.add_argument('-e', '--epoch', type=int, required=True)
    parser.add_argument('-ed', '--embedding-dim', type=int)
    parser.add_argument('-nel', '--num-encoder-layers', type=int)
    parser.add_argument('-ndl', '--num-decoder-layers', type=int)
    parser.add_argument('-nh', '--num-heads', type=int)
    parser.add_argument('-fd', '--feedforward-dim', type=int)
    parser.add_argument('-bp', '--batch-print', type=int, required=True)
    parser.add_argument('-o', '--output', required=True)
    parser.add_argument('-r', '--resume', )
    parser.add_argument('-m', '--max-characters', type=int)

    args = parser.parse_args()

    # tensorboard를 통해서 loss를 추적합니다.
    writer = SummaryWriter()

    os.makedirs(args.output, exist_ok=True)

    # loss를 계산할 때 Padding 값은 무시합니다.
    loss_fn = torch.nn.CrossEntropyLoss(ignore_index=RestoreDataset.PAD_INDEX)

    # 처음부터 훈력을 시작하지 않고 Checkpoint부터 시작하는 경우입니다.
    # 주요 파라미터들과 Vocab을 Checkpoint에서 가져옵니다.
    if args.resume:
        model_data = torch.load(args.resume)

        source_vocab = model_data['source_vocab']
        source_vocab_size = len(source_vocab)

        target_vocab = model_data['target_vocab']
        target_vocab_size = len(target_vocab)

        model = MorphemeRestoreModel(
            num_encoder_layers=model_data['num_encoder_layers'],
            num_decoder_layers=model_data['num_decoder_layers'],
            embedding_dim=model_data['embedding_dim'],
            source_vocab_size=model_data['source_vocab_size'],
            target_vocab_size=model_data['target_vocab_size'],
            num_heads=model_data['num_heads'],
            feedforward_dim=model_data['feedforward_dim'])

        model = model.to(DEVICE)
        model.load_state_dict(model_data['model_state_dict'])

        optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
        optimizer.load_state_dict(model_data['optimizer_state_dict'])

        start_epoch = model_data['epoch'] + 1

        train_dataset = RestoreDataset(args.train, args.max_characters, source_vocab=source_vocab, target_vocab=target_vocab)
    # 처음부터 훈련을 시작하는 경우입니다.
    else:
        train_dataset = RestoreDataset(args.train, args.max_characters)

        source_vocab = train_dataset.source_vocab
        source_vocab_size = len(source_vocab)

        target_vocab = train_dataset.target_vocab
        target_vocab_size = len(target_vocab)

        model = MorphemeRestoreModel(
            num_encoder_layers=args.num_encoder_layers,
            num_decoder_layers=args.num_decoder_layers,
            embedding_dim=args.embedding_dim,
            source_vocab_size=source_vocab_size,
            target_vocab_size=target_vocab_size,
            num_heads=args.num_heads,
            feedforward_dim=args.feedforward_dim)

        for p in model.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)

        model = model.to(DEVICE)

        optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

        start_epoch = 1

    # Padding을 하기 위해서 collate_fn에 make_batch 함수를 지정해줍니다.
    train_dataloader = DataLoader(train_dataset, batch_size=args.batch, collate_fn=make_batch)

    for epoch in range(start_epoch, args.epoch + 1):
        begin_time = datetime.datetime.now()
        loss = train(epoch, model, train_dataloader, source_vocab, target_vocab, loss_fn, optimizer)
        end_time = datetime.datetime.now()

        writer.add_scalar("Loss/train", loss, epoch)

        print(f'Epoch {epoch:>2d},\tloss: {loss:>.10f},\tTook {str(end_time - begin_time)}')

        writer.flush()

    print('Saving the final model')
    torch.save({
        'source_vocab_size': len(source_vocab),
        'target_vocab_size': len(target_vocab),
        'embedding_dim': args.embedding_dim,
        'num_heads': args.num_heads,
        'feedforward_dim': args.feedforward_dim,
        'num_encoder_layers': args.num_encoder_layers,
        'num_decoder_layers': args.num_decoder_layers,
        'model_state_dict': model.state_dict(),
        'source_vocab': source_vocab,
        'target_vocab': target_vocab
    }, os.path.join(args.output, 'model_final.pth'))
    print('Done')

훈련을 처음부터 시작하지 않고 Checkpoint 부터 시작하는 경우를 처리하기 위해서 코드가 좀 길어졌습니다. 하지만 그 부분을 제외하고는 일반적은 Pytorch 코드입니다.

위 코드를 train_restore.py라고 저장합니다.

훈련 시작!

python3 train_restore.py -t corpus/train.json -b 16 -e 20 -ed 512 -nel 4 -ndl 4 -nh 8 -fd 512 -bp 100 -m 500 -o models.restore

위와 같이 명령을 실행합니다. 위 명령에서 볼 수 있듯이 hyperparameter는 다음과 같습니다.

  • batch size: 16
  • epoch: 20
  • embedding dimension: 512
  • encoder layer: 4
  • decoder layer: 4
  • multi-head self attention: 8
  • fully connected layer dimension: 512

추가로 문장 길이가 너무 긴 경우 out of memory가 날 수 있어서 최대 입력 토큰이 args.max_characters (이 경우에는 500)보다 긴 문장은 사용하지 않습니다.

이 코드를 실행해보면 상당히 많은 메모리를 사용할 수 있는 것을 볼 수 있습니다. 실행하는 환경에 따라서 hyperparamer들을 조정해보시길 바랍니다. 참고로 위와 같은 세팅에서는 약 9GB 정도의 메모리를 사용합니다.

형태소 복원 모듈 만들기 (테스트)

생성된 모델이 잘 작동하는지 살펴보겠습니다.

import, 모델

import torch
import torch.nn as nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer, TransformerDecoder, TransformerDecoderLayer

import argparse
import math

torch.manual_seed(0)
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

UNK_INDEX = 0
PAD_INDEX = 1
BOS_INDEX = 2
EOS_INDEX = 3

class MorphemeRestoreModel(nn.Module):
    def __init__(self, num_encoder_layers, num_decoder_layers, embedding_dim, source_vocab_size, target_vocab_size, num_heads, feedforward_dim=512, dropout=0.1):
        super(MorphemeRestoreModel, self).__init__()

        encoder_layer = TransformerEncoderLayer(d_model=embedding_dim, nhead=num_heads, dim_feedforward=feedforward_dim)
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
        decoder_layer = TransformerDecoderLayer(d_model=embedding_dim, nhead=num_heads, dim_feedforward=feedforward_dim)
        self.transformer_decoder = TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)

        self.generator = nn.Linear(embedding_dim, target_vocab_size)
        self.source_embedding = TokenEmbedding(source_vocab_size, embedding_dim)
        self.target_embedding = TokenEmbedding(target_vocab_size, embedding_dim)
        self.positional_encoding = PositionalEncoding(embedding_dim, dropout=dropout)

    def forward(self, source, target, source_mask, target_mask, source_padding_mask, target_padding_mask, memory_key_padding_mask):
        source_embedding = self.positional_encoding(self.source_embedding(source))
        target_embedding = self.positional_encoding(self.target_embedding(target))
        memory = self.transformer_encoder(source_embedding, source_mask, source_padding_mask)
        outs = self.transformer_decoder(target_embedding, memory, target_mask, None, target_padding_mask, memory_key_padding_mask)

        return self.generator(outs)

    def encode(self, source, source_mask):
        return self.transformer_encoder(self.positional_encoding(self.source_embedding(source)), source_mask)

    def decode(self, target, memory, target_mask):
        return self.transformer_decoder(self.positional_encoding(self.target_embedding(target)), memory, target_mask)

def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))

    return mask

class PositionalEncoding(nn.Module):
    def __init__(self, emb_size: int, dropout, maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding):
        return self.dropout(token_embedding +
                            self.pos_embedding[:token_embedding.size(0),:])

class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(TokenEmbedding, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding_dim = embedding_dim

    def forward(self, tokens):
        return self.embedding(tokens.long()) * math.sqrt(self.embedding_dim)

이 부분은 훈련 코드와 거의 같습니다.

Decoder 결과 만들기

# 매 단계에서 가장 확률이 높은 토큰을 취합니다. 대체 방법으로는 beam search 같은 방법도 있습니다.
def greedy_decode(model, source, source_mask, max_len, start_symbol):
    source = source.to(DEVICE)
    source_mask = source_mask.to(DEVICE)

    memory = model.encode(source, source_mask)
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)

    for i in range(max_len - 1):
        memory = memory.to(DEVICE)
        # 미래의 토큰은 보지 않도록 Mask를 해줍니다.
        target_mask = (generate_square_subsequent_mask(ys.size(0)).type(torch.bool)).to(DEVICE)
        out = model.decode(ys, memory, target_mask)
        out = out.transpose(0, 1)
        prob = model.generator(out[:, -1])
        # Greedy하게 각 단계에서 가장 높은 토큰을 취합니다.
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        # autoregressive 하도록 지금까지 생성된 토큰을 다음 입력으로 계속 붙여줍니다.
        ys = torch.cat([ys, torch.ones(1, 1).type_as(source.data).fill_(next_word)], dim=0)

        # <eos>를 만나면 생성을 멈춥니다.
        if next_word == EOS_INDEX:
            break

    return ys

def restore_morphemes(model, source_vocab, target_vocab, source):
    model.eval()

    # 훈련 때처럼 문장의 앞 뒤에 <bos>, <eos>를 붙여줍니다.
    tokens = [BOS_INDEX] + [source_vocab[token] for token in source] + [EOS_INDEX]
    num_tokens = len(tokens)
    source = torch.LongTensor(tokens).reshape(num_tokens, 1)
    source_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
    # 출력을 최대 300 토큰으로 제한하고 있습니다.
    target_tokens = greedy_decode(model, source, source_mask, max_len=300, start_symbol=BOS_INDEX).flatten()

    return ''.join([target_vocab.itos[token] for token in target_tokens]).replace('<bos>', '').replace('<eos>', '')

(훈련부와 마찬가지로 https://pytorch.org/tutorials/beginner/translation_transformer.html에서 많은 부분을 차용했습니다.)

Decoding을 위해서 Greedy 알고리즘을 쓰고 있는데 Beam Search 등을 적용하면 성능이 더 나아질 가능성도 있습니다.

main 실행 부분

if __name__ == '__main__':
    print(f'Using {DEVICE}')

    parser = argparse.ArgumentParser()
    parser.add_argument('-md', '--model')

    args = parser.parse_args()

    model_data = torch.load(args.model)

    source_vocab_size = model_data['source_vocab_size']
    source_vocab = model_data['source_vocab']
    target_vocab_size = model_data['target_vocab_size']
    target_vocab = model_data['target_vocab']

    model = MorphemeRestoreModel(
            num_encoder_layers=model_data['num_encoder_layers'],
            num_decoder_layers=model_data['num_decoder_layers'],
            embedding_dim=model_data['embedding_dim'],
            source_vocab_size=model_data['source_vocab_size'],
            target_vocab_size=model_data['target_vocab_size'],
            num_heads=model_data['num_heads'],
            feedforward_dim=model_data['feedforward_dim'])

    model.load_state_dict(model_data['model_state_dict'])
    model = model.to(DEVICE)

    model.eval()

    with torch.no_grad():
        while True:
            source = input('Description: ')
            if source:
                print('Answer:', restore_morphemes(model, source_vocab, target_vocab, source))
            else:
                break

저장된 모델을 읽어서 모델을 세팅하고 문장을 받아서 형태소 복원을 하는 부분입니다.

지금까지 전체 코드를 test_restore.py로 저장합니다.

Live Test

python3 test_restore.py -md models.restore/model_final.pth

과 같이 실행하고 몇가지를 테스트해보겠습니다.

Description: 가방에 들어가신다
Answer: 가방▁에 들어가▁시▁ㄴ다
Description: 아름다운 별 지구에 오신 것을 환영합니다.
Answer: 아름답▁ㄴ 별 지구▁에 오▁시▁ㄴ 것▁을 환영▁하▁ㅂ니다▁.

나쁘지 않게 결과가 나오는군요 :)

품사 태깅 모듈 만들기 (훈련기)

이제 절반(형태소 복원)을 만들었으니 나머지 절반인 품사 태깅 부분을 만들어 보겠습니다. 문제 정의를 다시 한번 살펴볼까요?

가방 ▁에 들어가 ▁시 ▁ㄴ다 -> 가방/NNG + ▁에/JKM + 들어가/VV + ▁시/EPH + ▁ㄴ다/EFN

실제 출력에 /를 사용하면 문장에 쓰이는 /와 혼동이 있을 수 있으니 우리는 내부적으로 (morpeheme, label) 형태로 결과를 저장하고 출력할 때는 morpheme\tlabel과 같이 출력해보겠습니다.

import, device 준비

import torch
import torch.nn as nn
from torch.utils.data import IterableDataset
from torch.utils.data import DataLoader
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
from torchtext.vocab import Vocab
from torch.nn import TransformerEncoder, TransformerEncoderLayer, TransformerDecoder, TransformerDecoderLayer
from torch.utils.tensorboard import SummaryWriter

import json
import argparse
import datetime
import os
from collections import Counter
import math

MORPHEME_SEPARATOR = "▁"
EOJEOL_SEPARATOR = " "

torch.manual_seed(0)
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

필요한 모듈들을 import하고 cpu, gpu를 결정합니다.

Dataset

class TaggingDataset(IterableDataset):
    UNK_INDEX = 0
    PAD_INDEX = 1

    def __init__(self, corpus_filename, max_characters, source_vocab=None, labels=None):
        super(TaggingDataset, self).__init__()

        self.corpus_filename = corpus_filename
        self.max_characters = max_characters

        if source_vocab:
            self.source_vocab = source_vocab
            self.labels = labels
        else:
            print('Building a vocab')
            self.source_vocab, self.labels = self.build_vocab()
            print(f'Found {len(self.source_vocab)} source tokens')        
            print(f'Found {len(self.labels)} target tokens')        

    def build_vocab(self):
        source_counter = Counter()
        labels = Counter()

        for line in open(self.corpus_filename):            
            data = json.loads(line)

            source_counter.update(self.tokenize_sentence(data['morphemes']))
            labels.update(data['labels'])

        return Vocab(source_counter, specials=['<unk>', '<pad>']), Vocab(labels)

    def __iter__(self):
        for line in open(self.corpus_filename):
            data = json.loads(line)

            # Skip too long sentences
            if len(data['sentence']) > self.max_characters:
                continue

            morphemes = [self.source_vocab[morpheme] for morpheme in self.tokenize_sentence(data['morphemes'])]
            x = torch.tensor(morphemes, dtype=torch.long)
            y = torch.tensor([self.labels[label] for label in data['labels']], dtype=torch.long)

            yield x, y

    # 형태소 복원 형태인 입력 문장에서 어절과 형태소들을 분리합니다.
    # 어절의 처음이 아닌 형태소 맨 앞에는 ▁를 살려줍니다.
    def tokenize_sentence(self, s):
        morphemes = []
        for eojeol in s.split():
            for i, morpheme in enumerate(eojeol.split(MORPHEME_SEPARATOR)):
                if i == 0:
                    morphemes.append(morpheme)
                else:
                    morphemes.append(MORPHEME_SEPARATOR + morpheme)

        return morphemes

품사 태깅을 위한 Dataset입니다. 전체적으로 형태소 복원 코드와 유사합니다. 입력 x는 형태소가 복원된 입력 문장의 각 형태소, 출력 y는 각 형태소에 대응되는 품사 태그입니다.

Model

class MorphemeTaggingModel(nn.Module):
    def __init__(self, num_encoder_layers, embedding_dim, source_vocab_size, labels_size, num_heads, feedforward_dim=512, dropout=0.1):
        super(MorphemeTaggingModel, self).__init__()

        self.embedding_dim = embedding_dim
        self.num_heads = num_heads
        self.feedforward_dim = feedforward_dim

        # 이번에는 Decoding이 필요없기 때문에 Transformer Encoder만 사용합니다.
        encoder_layer = TransformerEncoderLayer(d_model=embedding_dim, nhead=num_heads, dim_feedforward=feedforward_dim)
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)

        # Encoder의 출력이 Fully connected layer로 들어가서 최종 품사 태그가 결정됩니다.
        self.fc = nn.Linear(embedding_dim, labels_size)
        self.source_embedding = TokenEmbedding(source_vocab_size, embedding_dim)
        self.positional_encoding = PositionalEncoding(embedding_dim, dropout=dropout)

    def forward(self, source, source_mask, source_padding_mask=None):
        source_embedding = self.positional_encoding(self.source_embedding(source))
        outs = self.transformer_encoder(source_embedding, source_mask, source_padding_mask)

        return self.fc(outs)

class PositionalEncoding(nn.Module):
    def __init__(self, emb_size: int, dropout, maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding):
        return self.dropout(token_embedding +
                            self.pos_embedding[:token_embedding.size(0),:])

class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(TokenEmbedding, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding_dim = embedding_dim

    def forward(self, tokens):
        return self.embedding(tokens.long()) * math.sqrt(self.embedding_dim)

def create_mask(src):
    src_seq_len = src.shape[0]
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=DEVICE).type(torch.bool)
    src_padding_mask = (src == TaggingDataset.PAD_INDEX).transpose(0, 1)

    return src_mask, src_padding_mask

def make_batch(data_batch):
    inputs = []
    labels = []

    for input_item, label_item in data_batch:
        inputs.append(input_item)
        labels.append(label_item)

    input_batch = pad_sequence(inputs, padding_value=TaggingDataset.PAD_INDEX)
    label_batch = pad_sequence(labels, padding_value=TaggingDataset.PAD_INDEX)

    return input_batch, label_batch

이번에는 Decoding이 필요없고 Transformer Encoder만 사용합니다. Transformer Encoder의 Encoding된 값을 Fully connected layer에 넣어서 품사 태그를 결정합니다. Mask도 Decoder 관련 부분은 제거했습니다.

나머지 부분은 형태소 복원 훈련부와 같습니다.

훈련

def train(epoch, model, train_dataloader, source_vocab, labels, loss_fn, optimizer):
    model.train()

    total_loss = 0.0

    begin_time_batch_print = datetime.datetime.now()

    for num_batches, (x, y) in enumerate(train_dataloader, 1):
        x = x.to(DEVICE)    
        y = y.to(DEVICE)

        source_mask, source_padding_mask = create_mask(x)
        source_mask = source_mask.cuda()
        source_padding_mask = source_padding_mask.cuda()

        logits = model(x, source_mask, source_padding_mask)

        optimizer.zero_grad()

        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), y.reshape(-1))
        loss.backward()

        optimizer.step()

        total_loss += loss.item()

        if num_batches % args.batch_print == 0:
            end_time_batch_print = datetime.datetime.now()   
            print(f'Epoch {epoch:>2d},\tBatch {num_batches:>6d},\tloss: {total_loss / num_batches:.10f}\tTook {str(end_time_batch_print - begin_time_batch_print)}')

            begin_time_batch_print = datetime.datetime.now()

    print('Saving checkpoint')
    torch.save({
        'epoch': epoch,
        'source_vocab_size': len(source_vocab),
        'labels_size': len(labels),
        'embedding_dim': args.embedding_dim,
        'num_heads': args.num_heads,
        'feedforward_dim': args.feedforward_dim,
        'num_encoder_layers': args.num_encoder_layers,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': total_loss,
        'source_vocab': source_vocab,
        'labels': labels
    }, os.path.join(args.output, f'model_{epoch:02d}.pth'))
    print('Done')

    return total_loss / num_batches

훈련 부분도 매우 흡사합니다. 다만 출력이 품사 태그이기 때문에 loss를 계산할 때 y (품사 태그 인덱스)를 사용합니다.

main 실행 부분

if __name__ == '__main__':
    print(f'Using {DEVICE}')

    parser = argparse.ArgumentParser()
    parser.add_argument('-t', '--train', required=True)
    parser.add_argument('-b', '--batch', type=int, required=True)
    parser.add_argument('-e', '--epoch', type=int, required=True)
    parser.add_argument('-ed', '--embedding-dim', type=int)
    parser.add_argument('-nel', '--num-encoder-layers', type=int)
    parser.add_argument('-nh', '--num-heads', type=int)
    parser.add_argument('-fd', '--feedforward-dim', type=int)
    parser.add_argument('-bp', '--batch-print', type=int, required=True)
    parser.add_argument('-o', '--output', required=True)
    parser.add_argument('-r', '--resume', )
    parser.add_argument('-m', '--max-characters', type=int)

    args = parser.parse_args()

    writer = SummaryWriter()

    os.makedirs(args.output, exist_ok=True)

    loss_fn = torch.nn.CrossEntropyLoss(ignore_index=TaggingDataset.PAD_INDEX)

    if args.resume:
        model_data = torch.load(args.resume)

        source_vocab = model_data['source_vocab']
        source_vocab_size = len(source_vocab)

        labels = model_data['labels']
        labels_size = len(labels)

        model = MorphemeTaggingModel(
            num_encoder_layers=model_data['num_encoder_layers'],
            embedding_dim=model_data['embedding_dim'],
            source_vocab_size=model_data['source_vocab_size'],
            labels_size=model_data['labels_size'],
            num_heads=model_data['num_heads'],
            feedforward_dim=model_data['feedforward_dim'])

        model = model.to(DEVICE)
        model.load_state_dict(model_data['model_state_dict'])

        optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
        optimizer.load_state_dict(model_data['optimizer_state_dict'])

        start_epoch = model_data['epoch'] + 1

        train_dataset = TaggingDataset(args.train, args.max_characters, source_vocab=source_vocab, labels=labels)
    else:
        train_dataset = TaggingDataset(args.train, args.max_characters)

        source_vocab = train_dataset.source_vocab
        source_vocab_size = len(source_vocab)

        labels = train_dataset.labels
        labels_size = len(labels)

        model = MorphemeTaggingModel(
            num_encoder_layers=args.num_encoder_layers,
            embedding_dim=args.embedding_dim,
            source_vocab_size=source_vocab_size,
            labels_size=labels_size,
            num_heads=args.num_heads,
            feedforward_dim=args.feedforward_dim)

        for p in model.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)

        model = model.to(DEVICE)

        optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

        start_epoch = 1

    train_dataloader = DataLoader(train_dataset, batch_size=args.batch, collate_fn=make_batch)

    for epoch in range(start_epoch, args.epoch + 1):
        begin_time = datetime.datetime.now()
        loss = train(epoch, model, train_dataloader, source_vocab, labels, loss_fn, optimizer)
        end_time = datetime.datetime.now()

        writer.add_scalar("Tagging/Loss/train", loss, epoch)

        print(f'Epoch {epoch:>2d},\tloss: {loss:>.10f},\tTook {str(end_time - begin_time)}')

        writer.flush()

    print('Saving the final model')
    torch.save({
        'source_vocab_size': len(source_vocab),
        'labels_size': len(labels),
        'embedding_dim': args.embedding_dim,
        'num_heads': args.num_heads,
        'feedforward_dim': args.feedforward_dim,
        'num_encoder_layers': args.num_encoder_layers,
        'model_state_dict': model.state_dict(),
        'source_vocab': source_vocab,
        'labels': labels
    }, os.path.join(args.output, 'model_final.pth'))
    print('Done')

역시 형태소 복원 부분과 유사합니다.

이 코드들을 train_tagging.py로 저정합니다.

훈련 실행하기

python3 train_tagging.py -t corpus/train.json -b 128 -e 20 -ed 512 -nel 4 -nh 8 -fd 512 -bp 100 -m 500 -o models.tagging

위와 같이 실행합니다. Hyperparameter들은 형태소 복원 때와 유사하게 설정했습니다. Decoder 관련 값들이 사라졌고, Decoder를 쓰지 않기 때문에 메모리에 여유가 있어서 batch size를 늘렸습니다.

품사 태깅 모듈 만들기 (테스트)

import torch
import torch.nn as nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer

import argparse
import math

MORPHEME_SEPARATOR = "▁"
EOJEOL_SEPARATOR = " "

torch.manual_seed(0)
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

UNK_INDEX = 0
PAD_INDEX = 1

class MorphemeTaggingModel(nn.Module):
    def __init__(self, num_encoder_layers, embedding_dim, source_vocab_size, labels_size, num_heads, feedforward_dim=512, dropout=0.1):
        super(MorphemeTaggingModel, self).__init__()

        self.embedding_dim = embedding_dim

        encoder_layer = TransformerEncoderLayer(d_model=embedding_dim, nhead=num_heads, dim_feedforward=feedforward_dim)
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)

        self.fc = nn.Linear(embedding_dim, labels_size)
        self.source_embedding = TokenEmbedding(source_vocab_size, embedding_dim)
        self.positional_encoding = PositionalEncoding(embedding_dim, dropout=dropout)

    def forward(self, source, source_mask, source_padding_mask=None):
        source_embedding = self.positional_encoding(self.source_embedding(source))
        outs = self.transformer_encoder(source_embedding, source_mask, source_padding_mask)

        return self.fc(outs)

class PositionalEncoding(nn.Module):
    def __init__(self, emb_size: int, dropout, maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding):
        return self.dropout(token_embedding +
                            self.pos_embedding[:token_embedding.size(0),:])

class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(TokenEmbedding, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding_dim = embedding_dim

    def forward(self, tokens):
        return self.embedding(tokens.long()) * math.sqrt(self.embedding_dim)

def tokenize_sentence(s):
    morphemes = []
    for eojeol in s.split():
        for i, morpheme in enumerate(eojeol.split(MORPHEME_SEPARATOR)):
            if i == 0:
                morphemes.append(morpheme)
            else:
                morphemes.append(MORPHEME_SEPARATOR + morpheme)

    return morphemes

def predict_labels(model, source_vocab, labels, source):
    model.eval()

    tokens = tokenize_sentence(source)
    token_index = [source_vocab[token] for token in tokens]
    num_tokens = len(tokens)
    source = torch.LongTensor(token_index).reshape(num_tokens, 1).cuda()
    source_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool).cuda()

    logits = model(source, source_mask)
    label_index = torch.argmax(logits, dim=-1)
    label_index = label_index.reshape(-1)

    tags = [labels.itos[label] for label in label_index]

    result = []
    for morpheme, label in zip(tokens, tags):
        result.append((morpheme, label))

    return result

if __name__ == '__main__':
    print(f'Using {DEVICE}')

    parser = argparse.ArgumentParser()
    parser.add_argument('-md', '--model')

    args = parser.parse_args()

    model_data = torch.load(args.model)

    source_vocab_size = model_data['source_vocab_size']
    source_vocab = model_data['source_vocab']
    labels = model_data['labels']
    labels_size = model_data['labels_size']

    model = MorphemeTaggingModel(
            num_encoder_layers=model_data['num_encoder_layers'],
            embedding_dim=model_data['embedding_dim'],
            source_vocab_size=model_data['source_vocab_size'],
            labels_size=model_data['labels_size'],
            num_heads=model_data['num_heads'],
            feedforward_dim=model_data['feedforward_dim'])

    model.load_state_dict(model_data['model_state_dict'])
    model = model.to(DEVICE)

    model.eval()

    with torch.no_grad():
        while True:
            source = input('Description: ')
            if source:
                print('Answer:', predict_labels(model, source_vocab, labels, source))
            else:
                break

위 코드를 test_tagging.py로 저장하고 몇가지 문장을 시험해보겠습니다.

python3 test_tagging.py -md models.tagging/model_final.pth
Description: 가방▁에 들어가▁시▁ㄴ다
Answer: [('가방', 'NNG'), ('▁에', 'JKB'), ('들어가', 'VV'), ('▁시', 'EP'), ('▁ㄴ다', 'EF')]
Description: 아름답▁ㄴ 별 지구▁에 오▁시▁ㄴ 것▁을 환영▁하▁ㅂ니다▁.
Answer: [('아름답', 'VA'), ('▁ㄴ', 'ETM'), ('별', 'NNG'), ('지구', 'NNG'), ('▁에', 'JKB'), ('오', 'VV'), ('▁시', 'EP'), ('▁ㄴ', 'ETM'), ('것', 'NNB'), ('▁을', 'JKO'), ('환영', 'NNG'), ('▁하', 'XSV'), ('▁ㅂ니다', 'EF'), ('▁.', 'SF')]

괜찮아보이는군요 :)

전체 테스트 엮기

이제 형태소 복원과 품사 태깅 모두가 만들어졌습니다. 이 둘을 묶어서 잘 작동하는지 살펴보겠습니다. 코드는 길지만 실제로는 앞에 test_restore.pytest_tagging.py를 기계적으로 합쳤을 뿐입니다.

import torch
import torch.nn as nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer, TransformerDecoder, TransformerDecoderLayer

import argparse
import math

MORPHEME_SEPARATOR = "▁"
EOJEOL_SEPARATOR = " "

torch.manual_seed(0)
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

UNK_INDEX = 0
PAD_INDEX = 1
BOS_INDEX = 2
EOS_INDEX = 3

class MorphemeRestoreModel(nn.Module):
    def __init__(self, num_encoder_layers, num_decoder_layers, embedding_dim, source_vocab_size, target_vocab_size, num_heads, feedforward_dim=512, dropout=0.1):
        super(MorphemeRestoreModel, self).__init__()

        encoder_layer = TransformerEncoderLayer(d_model=embedding_dim, nhead=num_heads, dim_feedforward=feedforward_dim)
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
        decoder_layer = TransformerDecoderLayer(d_model=embedding_dim, nhead=num_heads, dim_feedforward=feedforward_dim)
        self.transformer_decoder = TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)

        self.generator = nn.Linear(embedding_dim, target_vocab_size)
        self.source_embedding = TokenEmbedding(source_vocab_size, embedding_dim)
        self.target_embedding = TokenEmbedding(target_vocab_size, embedding_dim)
        self.positional_encoding = PositionalEncoding(embedding_dim, dropout=dropout)

    def forward(self, source, target, source_mask, target_mask, source_padding_mask, target_padding_mask, memory_key_padding_mask):
        source_embedding = self.positional_encoding(self.source_embedding(source))
        target_embedding = self.positional_encoding(self.target_embedding(target))
        memory = self.transformer_encoder(source_embedding, source_mask, source_padding_mask)
        outs = self.transformer_decoder(target_embedding, memory, target_mask, None, target_padding_mask, memory_key_padding_mask)

        return self.generator(outs)

    def encode(self, source, source_mask):
        return self.transformer_encoder(self.positional_encoding(self.source_embedding(source)), source_mask)

    def decode(self, target, memory, target_mask):
        return self.transformer_decoder(self.positional_encoding(self.target_embedding(target)), memory, target_mask)

class MorphemeTaggingModel(nn.Module):
    def __init__(self, num_encoder_layers, embedding_dim, source_vocab_size, labels_size, num_heads, feedforward_dim=512, dropout=0.1):
        super(MorphemeTaggingModel, self).__init__()

        self.embedding_dim = embedding_dim

        encoder_layer = TransformerEncoderLayer(d_model=embedding_dim, nhead=num_heads, dim_feedforward=feedforward_dim)
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)

        self.fc = nn.Linear(embedding_dim, labels_size)
        self.source_embedding = TokenEmbedding(source_vocab_size, embedding_dim)
        self.positional_encoding = PositionalEncoding(embedding_dim, dropout=dropout)

    def forward(self, source, source_mask, source_padding_mask=None):
        source_embedding = self.positional_encoding(self.source_embedding(source))
        outs = self.transformer_encoder(source_embedding, source_mask, source_padding_mask)

        return self.fc(outs)

def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))

    return mask

def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=DEVICE).type(torch.bool)

    src_padding_mask = (src == PAD_INDEX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_INDEX).transpose(0, 1)

    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

class PositionalEncoding(nn.Module):
    def __init__(self, emb_size: int, dropout, maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding):
        return self.dropout(token_embedding +
                            self.pos_embedding[:token_embedding.size(0),:])

class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(TokenEmbedding, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding_dim = embedding_dim

    def forward(self, tokens):
        return self.embedding(tokens.long()) * math.sqrt(self.embedding_dim)

def greedy_decode(model, source, source_mask, max_len, start_symbol):
    source = source.to(DEVICE)
    source_mask = source_mask.to(DEVICE)

    memory = model.encode(source, source_mask)
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)

    for i in range(max_len - 1):
        memory = memory.to(DEVICE)
        target_mask = (generate_square_subsequent_mask(ys.size(0)).type(torch.bool)).to(DEVICE)
        out = model.decode(ys, memory, target_mask)
        out = out.transpose(0, 1)
        prob = model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        ys = torch.cat([ys, torch.ones(1, 1).type_as(source.data).fill_(next_word)], dim=0)
        if next_word == EOS_INDEX:
            break

    return ys

def restore_morphemes(model, source_vocab, target_vocab, source):
    model.eval()

    tokens = [BOS_INDEX] + [source_vocab[token] for token in source] + [EOS_INDEX]
    num_tokens = len(tokens)
    source = torch.LongTensor(tokens).reshape(num_tokens, 1)
    source_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
    target_tokens = greedy_decode(model, source, source_mask, max_len=300, start_symbol=BOS_INDEX).flatten()

    return ''.join([target_vocab.itos[token] for token in target_tokens]).replace('<bos>', '').replace('<eos>', '')

def tokenize_sentence(s):
    morphemes = []
    for eojeol in s.split():
        for i, morpheme in enumerate(eojeol.split(MORPHEME_SEPARATOR)):
            if i == 0:
                morphemes.append(morpheme)
            else:
                morphemes.append(MORPHEME_SEPARATOR + morpheme)

    return morphemes

def predict_labels(model, source_vocab, labels, source):
    model.eval()

    tokens = tokenize_sentence(source)
    token_index = [source_vocab[token] for token in tokens]
    num_tokens = len(tokens)
    source = torch.LongTensor(token_index).reshape(num_tokens, 1).cuda()
    source_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool).cuda()

    logits = model(source, source_mask)
    label_index = torch.argmax(logits, dim=-1)
    label_index = label_index.reshape(-1)

    tags = [labels.itos[label] for label in label_index]

    result = []
    for morpheme, label in zip(tokens, tags):
        result.append((morpheme, label))

    return result

if __name__ == '__main__':
    print(f'Using {DEVICE}')

    parser = argparse.ArgumentParser()
    parser.add_argument('-mdr', '--model-restore')
    parser.add_argument('-mdt', '--model-tagging')

    args = parser.parse_args()

    restore_model_data = torch.load(args.model_restore)

    restore_source_vocab_size = restore_model_data['source_vocab_size']
    restore_source_vocab = restore_model_data['source_vocab']
    restore_target_vocab_size = restore_model_data['target_vocab_size']
    restore_target_vocab = restore_model_data['target_vocab']

    restore_model = MorphemeRestoreModel(
            num_encoder_layers=restore_model_data['num_encoder_layers'],
            num_decoder_layers=restore_model_data['num_decoder_layers'],
            embedding_dim=restore_model_data['embedding_dim'],
            source_vocab_size=restore_model_data['source_vocab_size'],
            target_vocab_size=restore_model_data['target_vocab_size'],
            num_heads=restore_model_data['num_heads'],
            feedforward_dim=restore_model_data['feedforward_dim'])

    restore_model.load_state_dict(restore_model_data['model_state_dict'])
    restore_model = restore_model.to(DEVICE)

    tagging_model_data = torch.load(args.model_tagging)

    tagging_source_vocab_size = tagging_model_data['source_vocab_size']
    tagging_source_vocab = tagging_model_data['source_vocab']
    tagging_labels = tagging_model_data['labels']
    tagging_labels_size = tagging_model_data['labels_size']

    tagging_model = MorphemeTaggingModel(
            num_encoder_layers=tagging_model_data['num_encoder_layers'],
            embedding_dim=tagging_model_data['embedding_dim'],
            source_vocab_size=tagging_model_data['source_vocab_size'],
            labels_size=tagging_model_data['labels_size'],
            num_heads=tagging_model_data['num_heads'],
            feedforward_dim=tagging_model_data['feedforward_dim'])

    tagging_model.load_state_dict(tagging_model_data['model_state_dict'])
    tagging_model = tagging_model.to(DEVICE)

    restore_model.eval()
    tagging_model.eval()

    with torch.no_grad():
        while True:
            source = input('Description: ')
            if source:
                restored_sentence = restore_morphemes(restore_model, restore_source_vocab, restore_target_vocab, source)
                print('Restored form:', restored_sentence)
                result = predict_labels(tagging_model, tagging_source_vocab, tagging_labels, restored_sentence)

                output = []
                for i, (morpheme, label) in enumerate(result):
                    if morpheme.startswith(MORPHEME_SEPARATOR):
                        output.append(morpheme[1:] + '\t' + label)
                    elif i == 0:
                        output.append(morpheme + '\t' + label)
                    else:
                        output.append('\n' + morpheme + '\t' + label)


                print('\n'.join(output))
            else:
                break

위 파일을 test.py로 저장하고 실행해봅니다.

python3 test.py -mdr models.restore/model_final.pth -mdt models.tagging/model_final.pth
Description: 아름다운 별 지구에 오신 것을 환영합니다.
Restored form: 아름답▁ㄴ 별 지구▁에 오▁시▁ㄴ 것▁을 환영▁하▁ㅂ니다▁.
아름답  VA
ㄴ      ETM

별      NNG

지구    NNG
에      JKB

오      VV
시      EP
ㄴ      ETM

것      NNB
을      JKO

환영    NNG
하      XSV
ㅂ니다  EF
.       SF
Description: Transformer를 이용해서 한국어 품사 태거를 만들어보았습니다.
Restored form: Transformer▁를 이용▁하▁아서 한국어 품사 태거▁를 만들▁어▁보▁았▁습니다▁.
Transformer     NA
를      JKO

이용    NNG
하      XSV
아서    EC

한국어  NNP

품사    NNG

태거    NNG
를      JKO

만들    VV
어      EC
보      VX
았      EP
습니다  EF
.       SF

어절간 경계를 잘 보기 위해서 약간 출력 형태를 조정해봤습니다.

성능 평가

간단한 테스트에서는 잘 작동하는 것 같은데 정량적인 평가에서는 어떨까요? 앞에서 떼어놓았던 1,000개의 테스트 문장으로 평가를 해보겠습니다.

품사 태깅은 비교적 평가가 쉽습니다. 각 형태소에 정확한 태그가 붙었는지 안 붙었는지를 세어보면 되니까요. 하지만 형태소 복원은 좀 까다롭습니다. 정답과 예측 값 사이에 형태소 갯수가 같으리라는 보장이 없기 때문입니다. 형태소 복원 성능을 평가하기 위해서 많이 사용하는 지표는 F-score입니다. F-score는 precision과 recall의 조화평균인데요. precision을 통해서 예측된 값 중에 오류가 얼마나 있는지를 알 수 있고, recall을 통해서 정답 중에 얼마나 많은 형태소를 찾아냈는지 알 수 있습니다. 이 두 값은 trade-off 관계이기 때문에 둘을 조합한 F-score를 최종 점수로 사용합니다. Precision, Recall, F-score에 대한 자세한 설명은 좋은 글(https://velog.io/@vanang7/%EB%B6%84%EB%A5%98%EA%B8%B0-%ED%8F%89%EA%B0%80-%EC%A7%80%ED%91%9C-%EA%B0%84%EB%8B%A8-%EC%A0%95%EB%A6%AC)로 대신합니다.

앞의 test.py를 살짝 변형해서 evlaution 코드를 만들어보겠습니다.

import torch
import torch.nn as nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer, TransformerDecoder, TransformerDecoderLayer

import argparse
import math
import json

MORPHEME_SEPARATOR = "▁"
EOJEOL_SEPARATOR = " "

torch.manual_seed(0)
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

UNK_INDEX = 0
PAD_INDEX = 1
BOS_INDEX = 2
EOS_INDEX = 3

class MorphemeRestoreModel(nn.Module):
    def __init__(self, num_encoder_layers, num_decoder_layers, embedding_dim, source_vocab_size, target_vocab_size, num_heads, feedforward_dim=512, dropout=0.1):
        super(MorphemeRestoreModel, self).__init__()

        encoder_layer = TransformerEncoderLayer(d_model=embedding_dim, nhead=num_heads, dim_feedforward=feedforward_dim)
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
        decoder_layer = TransformerDecoderLayer(d_model=embedding_dim, nhead=num_heads, dim_feedforward=feedforward_dim)
        self.transformer_decoder = TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)

        self.generator = nn.Linear(embedding_dim, target_vocab_size)
        self.source_embedding = TokenEmbedding(source_vocab_size, embedding_dim)
        self.target_embedding = TokenEmbedding(target_vocab_size, embedding_dim)
        self.positional_encoding = PositionalEncoding(embedding_dim, dropout=dropout)

    def forward(self, source, target, source_mask, target_mask, source_padding_mask, target_padding_mask, memory_key_padding_mask):
        source_embedding = self.positional_encoding(self.source_embedding(source))
        target_embedding = self.positional_encoding(self.target_embedding(target))
        memory = self.transformer_encoder(source_embedding, source_mask, source_padding_mask)
        outs = self.transformer_decoder(target_embedding, memory, target_mask, None, target_padding_mask, memory_key_padding_mask)

        return self.generator(outs)

    def encode(self, source, source_mask):
        return self.transformer_encoder(self.positional_encoding(self.source_embedding(source)), source_mask)

    def decode(self, target, memory, target_mask):
        return self.transformer_decoder(self.positional_encoding(self.target_embedding(target)), memory, target_mask)

class MorphemeTaggingModel(nn.Module):
    def __init__(self, num_encoder_layers, embedding_dim, source_vocab_size, labels_size, num_heads, feedforward_dim=512, dropout=0.1):
        super(MorphemeTaggingModel, self).__init__()

        self.embedding_dim = embedding_dim

        encoder_layer = TransformerEncoderLayer(d_model=embedding_dim, nhead=num_heads, dim_feedforward=feedforward_dim)
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)

        self.fc = nn.Linear(embedding_dim, labels_size)
        self.source_embedding = TokenEmbedding(source_vocab_size, embedding_dim)
        self.positional_encoding = PositionalEncoding(embedding_dim, dropout=dropout)

    def forward(self, source, source_mask, source_padding_mask=None):
        source_embedding = self.positional_encoding(self.source_embedding(source))
        outs = self.transformer_encoder(source_embedding, source_mask, source_padding_mask)

        return self.fc(outs)

def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))

    return mask

def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=DEVICE).type(torch.bool)

    src_padding_mask = (src == PAD_INDEX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_INDEX).transpose(0, 1)

    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

class PositionalEncoding(nn.Module):
    def __init__(self, emb_size: int, dropout, maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding):
        return self.dropout(token_embedding +
                            self.pos_embedding[:token_embedding.size(0),:])

class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(TokenEmbedding, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding_dim = embedding_dim

    def forward(self, tokens):
        return self.embedding(tokens.long()) * math.sqrt(self.embedding_dim)

def greedy_decode(model, source, source_mask, max_len, start_symbol):
    source = source.to(DEVICE)
    source_mask = source_mask.to(DEVICE)

    memory = model.encode(source, source_mask)
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)

    for i in range(max_len - 1):
        memory = memory.to(DEVICE)
        target_mask = (generate_square_subsequent_mask(ys.size(0)).type(torch.bool)).to(DEVICE)
        out = model.decode(ys, memory, target_mask)
        out = out.transpose(0, 1)
        prob = model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        ys = torch.cat([ys, torch.ones(1, 1).type_as(source.data).fill_(next_word)], dim=0)
        if next_word == EOS_INDEX:
            break

    return ys

def restore_morphemes(model, source_vocab, target_vocab, source):
    model.eval()

    tokens = [BOS_INDEX] + [source_vocab[token] for token in source] + [EOS_INDEX]
    num_tokens = len(tokens)
    source = torch.LongTensor(tokens).reshape(num_tokens, 1)
    source_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
    target_tokens = greedy_decode(model, source, source_mask, max_len=300, start_symbol=BOS_INDEX).flatten()

    return ''.join([target_vocab.itos[token] for token in target_tokens]).replace('<bos>', '').replace('<eos>', '')

def tokenize_sentence(s):
    morphemes = []
    for eojeol in s.split():
        for i, morpheme in enumerate(eojeol.split(MORPHEME_SEPARATOR)):
            if i == 0:
                morphemes.append(morpheme)
            else:
                morphemes.append(MORPHEME_SEPARATOR + morpheme)

    return morphemes

def predict_labels(model, source_vocab, labels, source):
    model.eval()

    tokens = tokenize_sentence(source)
    token_index = [source_vocab[token] for token in tokens]
    num_tokens = len(tokens)
    source = torch.LongTensor(token_index).reshape(num_tokens, 1).cuda()
    source_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool).cuda()

    logits = model(source, source_mask)
    label_index = torch.argmax(logits, dim=-1)
    label_index = label_index.reshape(-1)

    tags = [labels.itos[label] for label in label_index]

    result = []
    for morpheme, label in zip(tokens, tags):
        result.append((morpheme, label))

    return result

def calculate(answers, predictions, morpheme_only=False):
    # morpheme_only가 True이면 품사 태깅은 무시하고 형태소 복원만 측정합니다.
    if morpheme_only:
        answer_items = [m for m, l in answers]
        prediction_items = [m for m, l in predictions]
    else:
        answer_items = [f'{m}\t{l}' for m, l in answers]
        prediction_items = [f'{m}\t{l}' for m, l in predictions]

    tp = tn = fp = 0

    for prediction_item in prediction_items:
        if prediction_item in answer_items:
            tp += 1
            # 한번 True Positive로 카운트되었으면 중복 계산을 위해 제거합니다.
            answer_items.remove(prediction_item)
        else:
            fp += 1

    # 아직도 answer에 남아있다는 것은 찾아야하는데 못 찾은 형태소(True Negative)라는 의미입니다.
    tn = len(answer_items)

    return {'tp': tp, 'tn': tn, 'fp': fp}

if __name__ == '__main__':
    print(f'Using {DEVICE}')

    parser = argparse.ArgumentParser()
    parser.add_argument('-mdr', '--model-restore')
    parser.add_argument('-mdt', '--model-tagging')
    parser.add_argument('-i', '--input')

    args = parser.parse_args()

    restore_model_data = torch.load(args.model_restore)

    restore_source_vocab_size = restore_model_data['source_vocab_size']
    restore_source_vocab = restore_model_data['source_vocab']
    restore_target_vocab_size = restore_model_data['target_vocab_size']
    restore_target_vocab = restore_model_data['target_vocab']

    restore_model = MorphemeRestoreModel(
            num_encoder_layers=restore_model_data['num_encoder_layers'],
            num_decoder_layers=restore_model_data['num_decoder_layers'],
            embedding_dim=restore_model_data['embedding_dim'],
            source_vocab_size=restore_model_data['source_vocab_size'],
            target_vocab_size=restore_model_data['target_vocab_size'],
            num_heads=restore_model_data['num_heads'],
            feedforward_dim=restore_model_data['feedforward_dim'])

    restore_model.load_state_dict(restore_model_data['model_state_dict'])
    restore_model = restore_model.to(DEVICE)

    tagging_model_data = torch.load(args.model_tagging)

    tagging_source_vocab_size = tagging_model_data['source_vocab_size']
    tagging_source_vocab = tagging_model_data['source_vocab']
    tagging_labels = tagging_model_data['labels']
    tagging_labels_size = tagging_model_data['labels_size']

    tagging_model = MorphemeTaggingModel(
            num_encoder_layers=tagging_model_data['num_encoder_layers'],
            embedding_dim=tagging_model_data['embedding_dim'],
            source_vocab_size=tagging_model_data['source_vocab_size'],
            labels_size=tagging_model_data['labels_size'],
            num_heads=tagging_model_data['num_heads'],
            feedforward_dim=tagging_model_data['feedforward_dim'])

    tagging_model.load_state_dict(tagging_model_data['model_state_dict'])
    tagging_model = tagging_model.to(DEVICE)

    restore_model.eval()
    tagging_model.eval()

    tp = tn = fp = 0

    with torch.no_grad():
        for line in open(args.input):
            data = json.loads(line)
            source = data['sentence']

            morphemes = []
            for eojeol in data['morphemes'].split():
                for i, morpheme in enumerate(eojeol.split(MORPHEME_SEPARATOR)):
                    if i == 0:
                        morphemes.append(morpheme)
                    else:
                        morphemes.append(MORPHEME_SEPARATOR + morpheme)

            answers = [(m, l) for m, l in zip(morphemes, data['labels'])]

            restored_sentence = restore_morphemes(restore_model, restore_source_vocab, restore_target_vocab, source)
            result = predict_labels(tagging_model, tagging_source_vocab, tagging_labels, restored_sentence)

            output = []
            for i, (morpheme, label) in enumerate(result):
                if morpheme.startswith(MORPHEME_SEPARATOR):
                    output.append(morpheme[1:] + '\t' + label)
                elif i == 0:
                    output.append(morpheme + '\t' + label)
                else:
                    output.append('\n' + morpheme + '\t' + label)

            calculation_result = calculate(answers, result, True)

            tp += calculation_result['tp']
            tn += calculation_result['tn']
            fp += calculation_result['fp']

    precision = tp / (tp + fp)
    recall = tp / (tp + tn)

    print(f'Precision: {precision} ({tp:,}/{tp + fp:,})')
    print(f'Recall: {recall} ({tp:,}/{tp + tn:,})')
    print(f'F-score: {2 * (precision * recall) / (precision + recall)}')

test.py의 거의 모든 코드가 같고 성능 계산에 필요한 calculate() 함수를 추가했습니다. calculate()을 정답과 예측 값을 비교하면서 true positive, true negative, false positive 를 계산합니다.

이 코드를 evaluate.py로 저장하고 아래와 같이 실행합니다.

python3 evaluate.py -mdr models.restore/model_final.pth -mdt models.tagging/model_final.pth -i corpus/test.json
분류 Precison Recall F-Score
형태소 복원만 98.93% 98.89% 98.91%
품사 태그까지 고려 98.81% 98.76% 98.79%

참고로 형태소 복원은 모두 맞다고 가정하고 품사 태깅 만을 측정하면 성능은 99.94%가 나왔습니다. 특별한 고오급(?) 기법을 쓰지 않고 아직 Hyperparameter 튜닝들을 하지 않은 점을 감안하면 나쁘지 않은 성능입니다.

마무리하며

오늘은 Pytorch의 Transformer를 사용해서 한국어 형태소 복원과 품사 태깅을 만드는 법을 알아보았습니다.
이 코드들은 어디까지나 학습용이고 실전에 그대로 사용하기는 어렵습니다. 느리고, 모델도 크고, 메모리도 많이 차지합니다. 실전에서는 KoNLPy 등을 통해서 빠르게 작동하는 품사 태거들을 사용하시길 추천드립니다.