한국어 띄어쓰기: 실전 예제로 배워보는 딥러닝 자연어 처리

한국어 띄어쓰기: 실전 예제로 배워보는 딥러닝 자연어 처리

아름다운별지구에오신여러분을환영합니다

딥러닝도 좀 배웠고, 자연어처리도 좀 배웠고, pytorch도 좀 배웠는데 막상 뭔가를 만들어 보려면 막막할 수 있습니다. 오늘은 pytorch를 이용해서 한국어 띄어쓰기 모듈바닥부터 만들어보고, 이 과정에서 실제로 어떻게 간단하 자연어처리기를 개발하는지 살펴보려고 합니다.

"아름다운별지구에오신여러분을환영합니다"를 입력으로 주면 "아름다운 별 지구에 오신 여러분을 환영합니다"로 바꿔주는 것이 오늘의 목표입니다.

딥러닝과 Pytorch에 대해서 기본적인 부분을 습득하고 있다고 가정하겠습니다. 특히 Pytorch에 대한 사용법은 설명하지 않으니 참고 부탁드립니다.

문제 정의, 발상의 전환

머신러닝 문제를 풀 때 가장 먼저 결정해야하는 것은 Unsupervised Learning 기법을 쓸지 Supervised Learning 기법을 쓸지입니다 . Supervised Learning의 성능이 좋기 때문에 꼭 필요한 경우가 아니라면 대체적으로는 Supervised Learning을 선택합니다. Supervised Learning을 선택했다면 이제 Classification 기법을 쓸지 Regression 기법을 쓰지 결정해야합니다. 예측하려는 값이 정해진 값(Class) 중 하나라면 Classification을 쓰면 되고, 연속값이라면 Regression을 쓰면 됩니다.

그렇다면 한국어 띄어쓰기는 어떻게 하면 될까요? 이때 바로 풀려는 문제를 머신러닝의 사고방식으로 재정의해야 합니다. 지금 선택이 앞으로 설계와 구현에 근본적인 영향을 미치게 됩니다. 그럼 오늘 사용할 문제 정의를 해보겠습니다.

Classification 문제로 바꾸자

문제를 정의할 때 사람이 어떻게 그 문제를 푸는지 곰곰히 생각해보면 도움이 될 때가 많습니다. 우리는 띄어쓰기를 어떻게 할까요? "아름다운별지구에오신여러분을환영합니다"라는 문장을 살펴보겠습니다. 문장을 앞에서부터 주~~욱 보다가 "'운'과 '별'사이를 띄어야겠군". 그리고 조금 더 보다가 "'별'과 '지' 사이도 띄어야겠군". 아마 이런식으로 보통 생각을 하실 겁니다. "'아'와 '름' 사이는 띄우지 않아도 되겠군"이라는 결정도 숨겨져있습니다. 이 경우에 이 문제를 아래와 같이 정의할 수 있습니다.

두 글자가 주어졌을 때 그 사이를 띄울지 말지 결정하는 문제

띄워야 한다면 1을 출력하고, 아니라면 (붙여야한다면) 0을 출력한다고 해보면 어엿한 Classification 문제가 됩니다.

(아, 름) -> 0
(름, 다) -> 0
(다, 운) -> 0
(운, 별) -> 1

입력과 출력이 결정됐습니다.

좋은 방법이지만 문제를 더 단순화 시킬 수 없을까요? 띄어쓰기를 두 글자 사이의 공백이라고 표현할 수도 있지만 한 글자 앞의 공백이라고 표현할 수도 있습니다 (비슷하게 한 글자 뒤의 공백이라고 할 수도 있고요). 이 경우에 0은 이 글자 앞에 공백을 넣지 말아라, 1은 이 글자 앞에 공백을 넣어라는 의미가 됩니다.

아 -> 0
름 -> 0
다 -> 0
운 -> 0
별 -> 1
지 -> 1
구 -> 0

입력과 출력이 이렇게 됩니다. 오늘은 표현이 더 간단한 두번째 방법을 택해서 진행해보겠습니다.

훈련 데이터 마련하기

정답 붙이기

훈련 데이터를 준비할 때 가장 어려운 부분은 아마도 Classification을 위한 정답을 붙이는 일일 것입니다. 품사 태깅이라면 각 단어의 품사를 다 붙여야하고, 동물 사진 인식이라면 각 사진에 동물의 종류를 다 붙여야합니다. 하지만 다행히도 우리는 문제 정의 과정에서 정답을 자동으로 생성하는 방법까지 찾았습니다. 이미 띄어쓰기가 되있는 텍스트가 있다면 각 글자 앞에 띄어쓰기를 해야하는지 아닌지 아주 쉽게 알 수 있기 때문입니다. 예를 들어 "아름다운 별 지구에 오신 여러분을 환영합니다"라는 텍스트가 있다면, 이 문장을 아래와 같은 데이터로 바꾸는 것은 어렵지 않은 일입니다.

아 -> 0
름 -> 0
다 -> 0
운 -> 0
별 -> 1
지 -> 1
구 -> 0
...

변환코드는 뒤에서 살펴보겠습니다.

대용량 데이터 구하기

딥러닝 모델을 훈련하기 위해서는 많은 데이터가 필요합니다. 다행히도 인터넷에서는 우리 문제를 해결하기 위한 대량의 텍스트가 데이터가 많습니다. 가장 대표적인 곳은 위키피디아입니다. https://dumps.wikimedia.org/kowiki/latest/에 가면 한국어 위키피디아 파일을 받을 수 있습니다. 이곳에서 덤프 파일을 받고 https://github.com/attardi/wikiextractor와 같은 툴을 이용해서 텍스트만 추출할 수 있습니다. 이 툴을 사용하면 1차적으로 텍스트가 추출되기는 하지만 몇가지 깔끔하지 않을 것들이 있습니다. 추출된 텍스트가 여러 파일로 나눠서 저장되기 때문에 나중에 합쳐야하고, 추출될 파일 안에 XML tag가 남는 경우가 있어서 약간의 후처리를 해주어야 합니다.

하지만 감사하게도 roboreport님께서 https://github.com/roboreport/doc2vec-api/에 모든 작업을 거친 순수 텍스트만 올려두셨습니다. 저 페이지에서 "Korean Wikipedia / space tokenizer (467MB)"을 받으면 됩니다. wc -l 명령으로 확인해보면 1,806,747 줄에 달하는 상당히 큰 텍스트 데이터입니다.

현재 디렉토리에 corpus라는 디렉토리를 만들고, 받은 텍스트 파일을 그 안에 raw.txt라는 이름으로 저장합니다.

평가 데이터 떼어두기

딥러닝 모델을 만들 때는 반드시 훈련에 사용하지 않고 평가 시에만 쓸 데이터를 따로 떼어둬야합니다. 따로 떼어둔 평가 데이터는 모델이 훈련 중에 보지 못한 데이터입니다. 따라서 모델이 overfitting 되지 않고 얼마나 잘 generalization 되었는지를 평가할 때 쓸 수 있습니다. 실제로 서비스를 한다고 하더라도, 서비스 때 들어오는 데이터는 훈련 데이터에 없을 가능성이 크기 때문에 별도의 평가 데이터는 아주 중요합니다. 아니, 필수입니다.

예전에 작은 데이터를 다루던 시절에는 훈련 데이터와 평가 데이터 사이의 암묵적인 비율이 있었습니다. 대체적으로 훈련 데이터 8, 평가 데이터 2의 비율으로 데이터를 나누곤했습니다. 하지만 요즘은 데이터가 아주 아주 커지고 있기 때문에 굳이 이 비율을 지키지 않는 경우도 많습니다. 8:2의 비율을 따르면 평가 데이터로 너무 많은 수가 가기 때문입니다. 전체가 10,000일 때 2와 전체가 1,000,000일 때 2는 엄청난 차이니까요. 우리는 평가 데이터로 2,000 줄을 사용하겠습니다.
(문장이 아닌 줄이라고 표현한 이유는 우리가 쓸 데이터가 마침표가 아닌 개행문자로 구분이 되있기 때문입니다)

평가 데이터를 떼어둘 때 또다른 중요한 고려 사항이 있습니다. 약 180만 줄 중에서 2천 줄을 어떻게 뽑아낼까요? 가장 직관적이고 쉬운 방법은 데이터의 가장 앞 2천 문장을 취하는 것입니다. 하지만 이 방법에는 문제가 있습니다. 위키피디아의 특성상 연속된 줄들은 같은 페이지에 속할 가능성이 높습니다. 다르게 말하면 어딘가 모르게 문장의 특성이 비슷할 수 있습니다. 즉, 비슷한 문장들로 평가를 하기 때문에 편항된 결과가 나올 수 있습니다.

이 문제를 해결하기 위한 가장 간단한 방법은 전체 데이터에서 무작위로 2천 문장을 뽑는 것입니다. 여러가지 방법이 있겠지만 이번에는 데이터 전체를 읽어서 shuffle 후에 다시 저장하는 간단한 방법을 쓰겠습니다.

(이 방법은 모든 데이터를 메모리에 올리기 때문에 엄청나게 큰 데이터를 다룰 때는 사용할 수 없습니다)

shuffle.py라는 파일을 만들고 아래와 같이 작성합니다.

import sys
import random

lines = []

for line in sys.stdin:
    lines.append(line.strip())

random.shuffle(lines)

for line in lines:
    print(line)

그리고 아래와 같이 실행합니다.

python3 shuffle.py < corpus/raw.txt > corpus/raw_shuffled.txt

컴퓨터 사용에 따라 처리되는 시간이 다를 수 있습니다.

명령이 완료되고 나서 아래와 같이 실행해보면 데이터가 잘 섞인 것을 알 수 있습니다.

head corpus/raw.txt corpus/raw_shuffled.txt

이번에는 평가 데이터 2천 문장을 떼어보겠습니다.

head -2000 corpus/raw_shuffled.txt > corpus/raw_shuffled_test.txt

head 명령은 파일의 앞에서부터 - 뒤에 숫자만큼을 출력합니다. 즉, 이 경우에는 raw.txt의 처음 2000문장을 출력합니다.

이제 나머지를 훈련 데이터로 저장합니다. raw.txt의 2001번째 줄부터 끝까지 저장하면 됩니다.

tail +2001 corpus/raw_shuffled.txt > corpus/raw_shuffled_train.txt

tail 명령은 + 뒤에 숫자번째 줄부터 파일의 끝까지를 출력합니다. 즉, 이 경우에는 2001번째 줄부터 파일의 끝까지를 출력합니다.

데이터 변환하기

지금 데이터는 (당연히) "아름다운 별 지구에 오신 여러분을 환영합니다"와 같이 저장되어있습니다. 이제 이 텍스트들을 앞에서 우리가 문제 정의한 것처럼 변환해주어야 합니다.

아 -> 0
름 -> 0
다 -> 0
운 -> 0
별 -> 1
지 -> 1
구 -> 0
...
def generate_labels(line):
    result = []

    if line:
        result.append('{}\t0'.format(line[0]))
        was_space = False

        for c in line[1:]:
            if c.isspace():
                was_space = True
            elif was_space:
                result.append('{}\t1'.format(c))
                was_space = False
            else:
                result.append('{}\t0'.format(c))
                was_space = False
    return result  

많은 텍스트 전처리 코드들이 그렇듯이 야생(?)의 데이터들을 다루다보면 코드가 깔끔하지 않은 경우가 있습니다. 먼저 이 코드가 하는 기본적인 일은 다음과 같습니다.

  • 현재 글자가 공백이라면 was_spaceTrue를 저장하고 다음으로 넘어갑니다.
  • 현재 글자가 공백이 아니라면 앞에 글자가 공백이었는지를 확인합니다. was_space를 보면 되겠죠. 공백이었다면 출력으로 1을 저장하고, 아니라면 0을 저장합니다. 그리고 다음 글자를 위해서 was_spaceFalse로 저장합니다. 다음 글자 입장에서 앞 글자 (=현재 글자)는 공백이 아니니까요.

저 둘에 들어가지 않은 edge case가 있습니다.

  • 우리의 문제 정의에서는 현재 글자의 앞이 공백인지 아닌지를 보기 때문에 가장 앞글자는 이 방법으로 처리할 수가 없습니다. 그래서 예외적으로 첫글자의 출력은 무조건 0으로 합니다. 실제로 첫글자 앞에 띄어쓰기를 하지는 않으니까요.
  • 가끔 가다가 공백이 두번 이상 연속으로 나오는 경우가 있습니다. 공백이 여러개라고 하더라도 무시하고 하나로 간주힙니다.

코드를 보시면 c == " "가 아니고 c.isspace()를 사용한 것을 알 수 있습니다. 우리 눈에는 공백처럼 보이지만 유니코드 특성상 실제로는 단순히 " "가 아닌 글자들이 많습니다. isspace()는 이런 것들을 고려해서 일반적으로 공백으로 간주되는 모든 유니코드 값들을 판단해주십니다.

전체 코드입니다.

import sys

def generate_labels(line):
    result = []

    if line:
        result.append('{}\t0'.format(line[0]))
        was_space = False

        for c in line[1:]:
            if c.isspace():
                was_space = True
            elif was_space:
                result.append('{}\t1'.format(c))
                was_space = False
            else:
                result.append('{}\t0'.format(c))
                was_space = False
    return result  

if __name__ == '__main__':
    is_first = True
    for line in sys.stdin:
        line = line.strip()
        result = generate_labels(line)

        if is_first:
            is_first = False
        elif not is_first:
            print('')

        print('\n'.join(result))

전체 코드를 generate_corpus.py로 저장하고 아래와 같이 실행해서 실제 사용할 데이터를 만듭니다.

python3 generate_corpus.py < corpus/raw_shuffled_train.txt > corpus/train.txt
python3 generate_corpus.py < corpus/raw_shuffled_test.txt > corpus/test.txt

훈련 데이터는 처리에 시간이 꽤 걸릴 수 있습니다.

훈련 모듈 만들기

Dataset

먼저 앞에서 만든 훈련 데이터를 읽어들일 Dataset을 만들어보겠습니다. Pytorch의 Dataset은 크게 map style와 iterable style이 있습니다. 둘의 가장 큰 차이는 배열처럼(a[i]) 인덱스를 써서 데이터의 일부에 접근이 가능하냐, 아니면 순차적으로만 접근이 가능하냐입니다. map style dataset은 인덱스로 접근이 가능해야합니다.

또 다른 관점은 데이터 전체를 메모리에 올릴 크기가 되느냐 아니냐입니다. 항상 그런 것은 아니지만 대체적으로 map style으로 구현하기 위해서는 메모리에 데이터 전체를 올리는 것이 편리합니다 (물론 디스크에 데이터를 두고도 구현을 할 수 있기 때문에 항상 그런 것을 아닙니다). 데이터가 아주 커서 메모리에 로딩이 어렵다면 필요할 때 하나씩 데이터를 메모리로 읽어들이는 iterable style dataset이 효과적입니다. 이번에도 IterableDataset을 상속받아서 iterable style dataset을 만들겠습니다.

__init__()

Dataset에 필요한 기본 정보들을 지정할 __init__()부터 만들어보겠습니다.

class MyDataset(IterableDataset):
    def __init__(self, corpus_filename, max_characters=0, character_to_index=None):
        super(MyDataset).__init__()

        self.corpus_filename = corpus_filename

        self.max_characters = max_characters

        if character_to_index:
            self.character_to_index = character_to_index
        else:
            print('Building a character to index')
            self.character_to_index = self.build_character_to_index()
            print(f'Done. {len(self.character_to_index) - 4} characters were found')

corpus_filename은 읽어들일 데이터 파일의 경로입니다. 앞에서 generate_corpus.py로 만든 파일입니다.

max_characters는 한 줄을 처리할 때 얼마나 많은 글자까지 허용할지 수치입니다. 기본적으로 RNN은 가변길이의 입력을 처리할 수 있습니다. 하지만 훈련시에 입력이 너무 길어지면 Out Of Memory와 같은 문제가 생길 수 있습니다. 우리 코드는 실제로 훈련 데이터의 한 줄이 수천글자로 이루어져있다고 하더라도, max_characters 글자 만큼만 훈련에 사용합니다. max_characters가 어떻게 쓰이는지는 뒤에서 살펴보겠습니다.

다음은 글자를 숫자로 변환해주는 부분입니다. 딥러닝은 수치를 이해하는 모델이기 때문에 우리의 입력인 글자를 이해하지는 못 합니다. 그렇기 때문에 글자를 숫자 형태로 변환해서 넣어주어야합니다. 글자를 숫자로 표현하는 방법이 여러가지 있지만 우리는 one-hot 벡터를 사용하겠습니다.

예를 들어 우리 시스템이 처리할 수 있는 글자가 ['a', 'b', 'c', 'd'] 이렇게 총 4글자라고 가정해보겠습니다. a[1, 0, 0, 0]과 같이 표현합니다. c[0, 0, 1, 0]과 같이 표현합니다. 보시는 것처럼 전체 글자 수 크기 만큼의 zero tensor를 만들고, 표현하려는 글자가 해당하는 곳만 1으로 설정합니다. 나머지는 다 0으로 채우고요. 이렇게 하려면 각 글자의 자리가 어디인지를 알 수 있어야합니다. 이 역할하는 데이터가 character_to_index입니다. character_to_index['c']와 같이 하면 'c'의 자리 (이 경우에는 2)를 구할 수 있습니다.

이 코드에서는 __init__()이 호출될 때 인자로 이 데이터를 받았다면 그대로 사용하지만, 아니라면 훈련 데이터로부터 character_to_index를 만듭니다. 이 역할을 하는 것이 바로 build_character_to_index()입니다.

character_to_index

    def build_character_to_index(self):
        characters = set()
        character_list = ['PAD', 'UNK', 'BOL', 'EOL']

        for line in open(self.corpus_filename):
            line = line.strip()
            if line:
                character, _ = line.split('\t')
                characters.add(character)

        character_list.extend(list(sorted(characters)))

        return {c:i for i, c in enumerate(character_list)}   

이 코드의 초반부가 하는 일은 훈련 데이터를 한줄 한줄 읽으면서 모든 글자를 characters라는 set에 저장하는 일입니다. set의 특성상 중복된 값은 저장되지 않기 때문에 이 과정을 마치고 나면 characters에는 훈련 데이터에 존재하는 모든 글자가 중복없이 유일한 값으로 저장되어있습니다.

그 전에 한가지 짚고 가야할 중요한 점이 있습니다. 훈련 데이터의 모든 글자를 characters에 저장하고 난 다음에, character_list에 이 값을 옮기는데, 이 때 이미 character_list = ['PAD', 'UNK', 'BOL', 'EOL']와 같이 character_list에는 'PAD', 'UNK', 'BOL', 'EOL'가 저장되있습니다. 이 네 가지 심볼은 훈련데이터에는 없지만, 훈련 과정에서 특별히 사용되는 심볼들입니다. 어디에 사용하는지는 뒤에서 살펴보겠습니다.

이렇게 특수 심볼을 포함한 모든 글자가 모아지면 dictionary comprehension 문법을 사용해서 최종 결과를 리턴합니다.

결과는 예를 들어 다음과 같을 것입니다

{'PAD':0, 'UNK':1, 'BOL':2, 'EOL':3, '가': 4, '나':5, ...}

__iter__()

다음으로는 IterableDataset에서 가장 중요하다고 할 수 있는 __iter__()입니다. __iter__()의 역할은 데이터에서 항목을 하나씩 꺼내주는 것입니다.

    def __iter__(self):
        x = ['BOL',]
        y = [0.0,]
        for line in open(self.corpus_filename):
            if line == '\n':
                x.append('EOL')
                y.append(0.0)
                yield torch.tensor([self.character_to_index.get(c, self.character_to_index['UNK']) for c in x]), torch.tensor(y)

                x = ['BOL',]
                y = [0.0,]
            else:
                if self.max_characters == 0 or len(x) <= self.max_characters:
                    character, label = line.rstrip().split('\t')
                    x.append(character)
                    y.append(float(label))

        yield torch.tensor([self.character_to_index.get(c, self.character_to_index['UNK']) for c in x]), torch.tensor(y)

이 코드가 하는 일을 간단히 요약해보겠습니다.

  • 데이터 파일의 각 줄을 돌면서 글자와 그 글자에 해당하는 출력 Label(0 또는 1)을 저장합니다.
  • 이 때 max_characters가 지정되어있다면 (0이 아니라면), max_characters 이상의 글자는 무시합니다.
  • 저장을 하다가 빈 줄을 만나면 데이터상의 한 줄이 끝났다는 의미이고, 지금까지 모은 글자를 tensor로 바꾸고 반환(yield)합니다. 변환된 tensor에는 각 글자에 해당하는 Index(character_to_idnex로 찾은 값)가 할당딥니다.

입력값의 처음과 끝에 각각 BOL(Begin-Of-Line)EOL(End-Of-Line)을 넣어주고 있습니다. 문장의 첫 글자와 마지막 글자는 띄어쓰기 입장에서 특수한 정보를 가지기 때문에 (예를 들어 첫글자 앞은 대부분 띄어쓰기를 하지 않습니다) 이런 정보를 추가해주면 성능 향상에 조금이나마 도움이 될 수 있습니다.

torch.tensor([self.character_to_index.get(c, self.character_to_index['UNK']) for c in x])를 보면 UNK을 사용할 것을 알 수 있습니다. 훈련 데이터에서는 그럴 일이 없지만, 평가 데이터나 서비스시에 데이터를 다루다보면 훈련 데이터에는 없던 글자가 있을 수 있습니다. 이 경우에 UNK이라는 값으로 대체하겠다는 의미입니다.

Helper 함수

마지막으로 character_to_index에 접근하기 위한 간단한 Helper 함수들입니다.

    def get_vocab_size(self):
        return len(self.character_to_index)

    def get_character_to_index(self):
        return self.character_to_index

Model

이번 글에서는 아주 간단한 Model을 사용하려고 합니다. 아주 간단하게 LSTM -> Linear로 이어지는 구성입니다.

class SpaceModel(nn.Module):
    def __init__(self, vocab_size, hidden_dim, embedding_dim):
        super(SpaceModel, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=2, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, 1)

    def forward(self, x):
        out = self.embedding(x)
        out, _ = self.lstm(out)
        prediction = self.fc(out)

        return prediction

먼저 각 글자를 Embedding하기 위한 Embedding Layer를 만듭니다. 숫자가 아닌 값을 입력으로 사용하기 위해서는 보통 one-hot vector나 Embedding을 사용하는데 여기에서는 Embedding을 사용합니다.

LSTM은 내부적으로 Layer를 두개로 하고, Bidirectional으로 했습니다. LSTM이 Bidirectional인 경우 Output이 두 배가 되기 때문에 Linear 레이어의 입력은 hidden_dim * 2로 했습니다. LSTM에 batch_first 인자가 True인 것을 볼 수 있습니다. 기본값은 False인데 이 경우에 LSTM의 입력값은 (Sequence 길이, Batch 크기, Vocab 크기)가 되야합니다. 우리는 (Batch 크기, Sequence 길이, Vocab 크기)로 입력을 만들 것이기 때문에 batch_firstTrue로 했습니다.

forward() 함수를 보면 self.lstm(x)의 두번째 반환값은 무시하고 있습니다. 두번째 반환값은 내부 상태를 알려주는 값들인데 우리에게는 필요없습니다.

LSTM의 Output을 그대로 출력이 하나인 Linear 레이어로 넣습니다. 최종 레이어의 출력을 구성할 때 몇가지 고려사항이 있습니다.

  • 우리가 풀려는 문제는 Label이 0 또는 1인 Binary Classification이기 때문에 최종 노드를 하나로만 했습니다. 또다른 방법으로는 최종 노드를 2개로 하고 각 노드가 0과 1의 각각 확률을 표현하게 할 수도 있습니다.
  • 보시는 것처럼 최종 레이어에 activation function이 없습니다. 보통은 Sigmoid나 Softmax를 사용하는데요. 우리는 나중에 loss fuction으로 BCEWithLogitsLoss를 사용하려고 합니다. BCEWithLogitsLoss는 내부적으로 Sigmoid 후에 CrossEntropy를 취하기 때문에 따로 Sigmoid 레이어가 필요없습니다. 만약에 최종 노드를 0과 1을 표현하는 두 개로 했다면 Softmax를 썼을 수도 있습니다.

이렇게 설계했기 때문에 아래와 같은 해석이 가능합니다.

  • 훈련 시에는 모델의 예측 값이 0 또는 1 이 되는 방향으로 파라미터들을 훈련시킵니다. 훈련 데이터의 Label은 0 또는 1 이고 Sigmoid 후의 값도 0에서 1사이이기 때문에 잘 맞는 느낌이죠?
  • 하지만 실제 문장을 해석할 때 (Inference time) 예측값은 0 또는 1이 아닌 0과 1사이의 어떤 값이 나옵니다. 많이 쓰는 기법은 값이 0.5 이상이면 1으로 간주하고, 0.5 미만이면 0으로 간주하는 방법입니다.

훈련 실행 부분

Parameter 준비

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-t', '--train', type=str)
    parser.add_argument('-b', '--batch', type=int)
    parser.add_argument('-e', '--epoch', type=int)
    parser.add_argument('-m', '--max-characters', type=int)
    parser.add_argument('-hd', '--hidden-dim', type=int)
    parser.add_argument('-bp', '--batch-print', type=int)
    parser.add_argument('-o', '--output', type=str)
    parser.add_argument('-ed', '--embedding-dim', type=int)
    parser.add_argument('-l', '--loss-file', type=str, required=False)
    args = parser.parse_args()

명령행에서 몇가지 파라미터를 설정할 수 있도록 설정합니다.

  • --train: 훈련 데이터의 파일 경로
  • --batch: mini batch 크기
  • --epoch: 총 epoch 수
  • --max-characters: 앞에서 설명한 max_characters 제한
  • --hidden-dim: LSTM의 Hidden Dimension
  • --batch-print: 한 Epoch 내에서 얼마나 자주 로그를 출력할지. 만약에 100이면 100번째 Batch 마다 정보를 출력합니다.
  • --output: 최종 모델과 Epoch 별 Checkpoints가 저장될 디렉토리
  • --embedding-dim: Embedding Layer를 위한 Embedding Dimension
  • --loss-file: 매 Epoch의 loss를 저장할 파일 경로

모델과 데이터 준비

    BATCH_SIZE = args.batch

    train_dataset = MyDataset(args.train, args.max_characters)
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, collate_fn=make_batch)

    model = SpaceModel(train_dataset.get_vocab_size(), args.hidden_dim, args.embedding_dim)
    model.cuda()

    optimizer = optim.Adam(model.parameters())
    loss_function = nn.BCEWithLogitsLoss().cuda()

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, collate_fn=make_batch) 이 부분의 LSTM과 같은 RNN을 사용할 때 중요한 부분입니다. 특히 collate_fn=make_batch이 중요합니다. LSTM도 다른 pytorch.nn 모듈처럼 입력은 mini batch 형태입니다. 한 mini batch 내에서 Sequence의 길이(글자수)는 같아야 한다는 제약이 있습니다(mini match 간에는 달라도 상관없습니다). 하지만 우리 훈련 데이터에서 보듯이 각 줄을 구성하는 글자수가 다 다릅니다. 이 때 필요한 것이 Padding입니다. 즉 그 Batch 내에서 가장 긴 줄에 길이에 맞춰서 다른 줄들에 빈 값을 추가해줍니다. 이 때 채워주는 값으로 우리는 PAD를 사용합니다. 드디어 앞에서 특수 심볼이라고 했던 네 가지가 모두 나왔습니다.

DataLoader는 기본적으로 Padding을 지원하지 않기 때문에 Padding을 위해서는 각 데이터를 어떻게 묶어서 Batch를 만들지 알려주어야 합니다. 이때 사용하는 인자가 collate_fn입니다. DataLoader는 (내부적으로 Dataset의 __iter__()를 호출해서) 한 Batch에 필요한 데이터를 다 모았으면, 데이터를 반환하기 전에 이 데이터 묶음을 collate_fn으로 보냅니다. Batch 데이터를 반환하기 전에 무언가 조작할 수 있는 기회를 주는 셈이죠. 물론 collate_fn에 값을 지정하지 않으면 아무것도 하지 않습니다. 그럼 우리가 collate_fn에 지정한 make_batch가 하는 일을 살펴보겠습니다.

def make_batch(samples):
    inputs = []
    labels = []

    for sample in samples:
        inputs.append(sample[0])
        labels.append(sample[1])

    inputs = pad_sequence(inputs, batch_first=True)
    labels = pad_sequence(labels, batch_first=True)

    return [inputs, labels]

sampels에는 DataLoader가 Dataset으로부터 한땀한땀 모은 데이터들이 들어있습니다. 우리가 만든 Dataset은 (x, y) 쌍을 반환하기 때문에 (x, y)가 목록으로 주~욱 들어있습니다.

코드 초반에는 x는 x끼리 모아서 inputs에 저장하고, y는 y끼리 모아서 labels에 저장합니다. 이렇게 모은 inputslabelstorch.nn.utils.rnn.pad_sequence을 적용해서 가장 긴 길이에 맞춰서 Padding을 해줍니다. Padding할 값을 특별히 지정해주지 않으면 0.0으로 채웁니다.

(사실 이 코드는 잠재적인 문제가 있습니다. character_to_index['PAD']가 0이고 pad_sequence()의 기본 Padding 값이 0이어서 맞지만, 다른 Padding 값을 사용한다면 pad_sequence()에 이를 알려주서야합니다.)

훈련 준비

    N_EPOCHES = args.epoch

    PRINT_BATCHES = args.batch_print

    if args.loss_file:
        open(args.loss_file, 'w').write('')

훈련 루프에 들어가지 전에 몇가지 값을 설정하고 loss 파일이 있었다면 내용을 지웁니다.

훈련 루프

    for epoch in range(1, N_EPOCHES + 1): 
        total_loss_per_epoch = 0
        total_batch = 0      

        begin_time = datetime.datetime.now()

        for x, y in train_dataloader:
            total_batch += 1

            x = x.cuda()
            y = y.cuda()

            optimizer.zero_grad()

            output = model(x)            

            loss = loss_function(output.view(-1), y.view(-1))
            total_loss_per_epoch += loss.item()

            loss.backward()
            optimizer.step()

            if total_batch % PRINT_BATCHES == 0:
                print(f'Epoch {epoch:>2d},\tBatch {total_batch:>6d},\tloss: {total_loss_per_epoch / total_batch:.10f}')

        torch.save({
            'epoch': epoch,
            'character_to_index': train_dataset.get_character_to_index(),
            'vocab_size': train_dataset.get_vocab_size(),
            'hidden_dim': args.hidden_dim,
            'embedding_dim': args.embedding_dim,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': total_loss_per_epoch
        }, os.path.join(args.output, f'model_{epoch:02d}.pth'))

        end_time = datetime.datetime.now()

        print(f'Epoch {epoch:>2d},\tloss: {total_loss_per_epoch / total_batch:>.10f},\tTook {str(end_time - begin_time)}')

        if args.loss_file:
            open(args.loss_file, 'a').write(f'{total_loss_per_epoch / total_batch:>.10f}\n')

전반적으로 평이한 Pytorch 훈련 코드입니다. 몇가지 특이 사항만 짚어보겠습니다.

  • loss = loss_function(output.view(-1), y.view(-1)): Model의 Output과 y의 Shape을 BCEWithLogitsLoss가 받을 수 있도록 만들어줍니다.
  • 나중에 훈련 재개를 위해서 Epoch이 끝날 때마다 필요한 정보를 저장합니다.
  • torch.save({ 'epoch': epoch, 'character_to_index': train_dataset.get_character_to_index(), 'vocab_size': train_dataset.get_vocab_size(), 'hidden_dim': args.hidden_dim, 'embedding_dim': args.embedding_dim, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'loss': total_loss_per_epoch }, os.path.join(args.output, f'model_{epoch:02d}.pth'))

최종 모델 저장

    torch.save({
        'character_to_index': train_dataset.get_character_to_index(),
        'vocab_size': train_dataset.get_vocab_size(),
        'hidden_dim': args.hidden_dim,
        'embedding_dim': args.embedding_dim,
        'model_state_dict': model.state_dict(),

    }, os.path.join(args.output, 'model_final.pth'))

마지막으로 훈련이 끝나면 결과를 저장합니다.

전체 코드

훈련 부분 전체 코드입니다. 전체 코드를 train.py라고 저장한 후 아래와 같은 명령으로 실행합니다.

mkdir models
python3 train.py -t corpus/train.txt -b 128 -e 10 -m 300 -hd 128 -ed 512 -bp 100 -o models/ -l models/loss.txt

만약에 Out-of-memory 오류가 난다면 batch (-b)를 줄여보세요. 훈련 속도가 조금 느려질 수는 있습니다. 8, 4 정도까지 줄였는데도 Out-of-memory 오류가 난다면 성능 감소를 어느 정도 감수하고, max_characters (-m)이나 hidden dimension (-hd), embedding dimesion (-ed)을 줄여야합니다.

이 코드는 GPU 사용을 가정했기 때문에 GPU 사용 환경이 안되있다면 문제가 오류가 나면서 실행이 되지 않을 수도 있습니다.

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import IterableDataset
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

import argparse
import datetime
import os

class MyDataset(IterableDataset):
    def __init__(self, corpus_filename, max_characters=0, character_to_index=None):
        super(MyDataset).__init__()

        self.corpus_filename = corpus_filename

        self.max_characters = max_characters

        if character_to_index:
            self.character_to_index = character_to_index
        else:
            print('Building a character to index')
            self.character_to_index = self.build_character_to_index()
            print(f'Done. {len(self.character_to_index) - 4} characters were found')

    def __iter__(self):
        x = ['BOL',]
        y = [0.0,]
        for line in open(self.corpus_filename):
            if line == '\n':
                x.append('EOL')
                y.append(0.0)
                yield torch.tensor([self.character_to_index.get(c, self.character_to_index['UNK']) for c in x]), torch.tensor(y)

                x = ['BOL',]
                y = [0.0,]
            else:
                if self.max_characters == 0 or len(x) <= self.max_characters:
                    character, label = line.rstrip().split('\t')
                    x.append(character)
                    y.append(float(label))

        yield torch.tensor([self.character_to_index.get(c, self.character_to_index['UNK']) for c in x]), torch.tensor(y)

    def build_character_to_index(self):
        characters = set()
        character_list = ['PAD', 'UNK', 'BOL', 'EOL']

        for line in open(self.corpus_filename):
            line = line.strip()
            if line:
                character, _ = line.split('\t')
                characters.add(character)

        character_list.extend(list(sorted(characters)))

        return {c:i for i, c in enumerate(character_list)}   

    def get_vocab_size(self):
        return len(self.character_to_index)

    def get_character_to_index(self):
        return self.character_to_index

class SpaceModel(nn.Module):
    def __init__(self, vocab_size, hidden_dim, embedding_dim):
        super(SpaceModel, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=2, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, 1)

    def forward(self, x):
        out = self.embedding(x)
        out, _ = self.lstm(out)
        prediction = self.fc(out)

        return prediction

def make_batch(samples):
    inputs = []
    labels = []

    for sample in samples:
        inputs.append(sample[0])
        labels.append(sample[1])

    inputs = pad_sequence(inputs, batch_first=True)
    labels = pad_sequence(labels, batch_first=True)

    return [inputs, labels]

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-t', '--train', type=str)
    parser.add_argument('-b', '--batch', type=int)
    parser.add_argument('-e', '--epoch', type=int)
    parser.add_argument('-m', '--max-characters', type=int)
    parser.add_argument('-hd', '--hidden-dim', type=int)
    parser.add_argument('-bp', '--batch-print', type=int)
    parser.add_argument('-o', '--output', type=str)
    parser.add_argument('-ed', '--embedding-dim', type=int)
    parser.add_argument('-l', '--loss-file', type=str, required=False)
    args = parser.parse_args()

    BATCH_SIZE = args.batch

    train_dataset = MyDataset(args.train, args.max_characters)
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, collate_fn=make_batch)

    model = SpaceModel(train_dataset.get_vocab_size(), args.hidden_dim, args.embedding_dim)
    model.cuda()

    optimizer = optim.Adam(model.parameters())
    loss_function = nn.BCEWithLogitsLoss().cuda()

    N_EPOCHES = args.epoch

    PRINT_BATCHES = args.batch_print

    if args.loss_file:
        open(args.loss_file, 'w').write('')

    for epoch in range(1, N_EPOCHES + 1): 
        total_loss_per_epoch = 0
        total_batch = 0      

        begin_time = datetime.datetime.now()

        for x, y in train_dataloader:
            total_batch += 1

            x = x.cuda()
            y = y.cuda()

            optimizer.zero_grad()

            output = model(x)            

            loss = loss_function(output.view(-1), y.view(-1))
            total_loss_per_epoch += loss.item()

            loss.backward()
            optimizer.step()

            if total_batch % PRINT_BATCHES == 0:
                print(f'Epoch {epoch:>2d},\tBatch {total_batch:>6d},\tloss: {total_loss_per_epoch / total_batch:.10f}')

        torch.save({
            'epoch': epoch,
            'character_to_index': train_dataset.get_character_to_index(),
            'vocab_size': train_dataset.get_vocab_size(),
            'hidden_dim': args.hidden_dim,
            'embedding_dim': args.embedding_dim,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': total_loss_per_epoch
        }, os.path.join(args.output, f'model_{epoch:02d}.pth'))

        end_time = datetime.datetime.now()

        print(f'Epoch {epoch:>2d},\tloss: {total_loss_per_epoch / total_batch:>.10f},\tTook {str(end_time - begin_time)}')

        if args.loss_file:
            open(args.loss_file, 'a').write(f'{total_loss_per_epoch / total_batch:>.10f}\n')

    torch.save({
        'character_to_index': train_dataset.get_character_to_index(),
        'vocab_size': train_dataset.get_vocab_size(),
        'hidden_dim': args.hidden_dim,
        'embedding_dim': args.embedding_dim,
        'model_state_dict': model.state_dict(),

    }, os.path.join(args.output, 'model_final.pth'))

성능 평가하기

그럼 훈련된 모델의 성능을 평가하는 코드를 만들어보겠습니다. Dataset은 훈련 모듈과 같기 때문에 설명은 생략하겠습니다.

Model

class SpaceModel(nn.Module):
    def __init__(self, vocab_size, hidden_dim, embedding_dim):
        super(SpaceModel, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=2, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out = self.embedding(x)
        out, _ = self.lstm(out)
        out = self.fc(out)
        prediction = self.sigmoid(out)

        return prediction

훈련 모듈 코드와 똑같지만 마지막에 Sigmoid 레이어가 추가된 것을 볼 수 있습니다. 훈련 시에는 BCEWithLogitsLoss가 내부적으로 Sigmoid를 취해주었지만 평가시에는 없기 때문에 레이어를 추가했습니다.

실행 준비

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-t', '--test', type=str, help='path to a training corpus file')
    parser.add_argument('-b', '--batch', type=int, help='the number of batches')
    parser.add_argument('-m', '--max-characters', type=int)
    parser.add_argument('-md', '--model')
    args = parser.parse_args()

    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    print(f'Using {device} for testing')   

이번에는 필요한 경우 CPU 머신에서도 돌릴 수 있도록 런타임 때 CPU/GPU를 선택할 수 있도록 했습니다.

모델과 평가 데이터 로딩

    model_data = torch.load(args.model)

    character_to_index = model_data['character_to_index']    

    model = SpaceModel(len(character_to_index), model_data['hidden_dim'], model_data['embedding_dim'])    
    model.load_state_dict(model_data['model_state_dict'])
    model.to(device)
    model.eval()

    character_to_index = model_data['character_to_index']    

    BATCH_SIZE = args.batch

    test_dataset = MyDataset(args.test, max_characters=args.max_characters, character_to_index=character_to_index)
    test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, collate_fn=make_batch)    

    total_space = 0
    correct_space = 0
    total_nonspace = 0
    correct_nonspace = 0

Model을 사용한 평가시에는 model.eval()을 호출해주는 것이 중요합니다. 평가시에는 훈련 때와 다르게 Dropout이나 Normalization을 적용하지 않아야하는 경우가 있는데, model.eval()으로 평가 모드로 변경을 해주어야 이 부분이 정상 작동합니다.

훈련 모듈에서 저장했던 모델을 불러옵니다. character_to_index도 같이 저장했었습니다. 훈련시와 평가시에 같은 character_to_index 데이터를 사용하는 것이 중요합니다.

평가하기

with torch.no_grad():
        for batch_index, (x, y) in enumerate(test_dataloader, 1):
            x = x.to(device)
            y = y.to(device)

            output = model(x)

            for characters, answers, predictions in zip(x, y, output):
                for character, answer, prediction in zip(characters, answers.view(-1), predictions.view(-1)):
                    if character.item() in (character_to_index['PAD'], character_to_index['BOL'], character_to_index['EOL']):
                        continue

                    answer_label = int(answer)
                    predicted_label = 1 if prediction > 0.5 else 0                    

                    if answer_label == 0:
                        total_nonspace += 1
                        if predicted_label == 0:
                            correct_nonspace += 1
                    elif answer_label == 1:
                        total_space += 1
                        if predicted_label == 1:
                            correct_space += 1

        print(f'Space accuracy: {correct_space / total_space * 100:.2f}% ({correct_space}/{total_space})')
        print(f'Non-Space accuracy: {correct_nonspace / total_nonspace * 100:.2f}% ({correct_nonspace}/{total_nonspace})')
        print(f'Accuracy: {(correct_space + correct_nonspace) / (total_space + total_nonspace) * 100:.2f}% ({correct_space + correct_nonspace}/{total_space + total_nonspace})')

세가지 지표를 평가합니다.

  1. 띄어써야할 곳을 잘 맞췄는지
  2. 띄어써야하지 말 곳을 잘 맞췄는지
  3. 두 가지를 종합한 성능

이렇게 나누어서 평가하는 이유는 이렇습니다. 만약에 모든 글자에 대해서 띄어쓴다고 예측하면 1번은 100% 겠지만, 2번은 0%입니다. 모든 글자에 대해서 띄어쓰지 않는다고 예측하면 1번은 0%겠지만, 2번은 100%이구요. 이 두가지를 모두 만족해야 좋은 띄어쓰기 모듈입니다.

평가 시에는 Gradient를 추적할 필요가 없기 때문에 with torch.no_grad():와 같이 해주어야 합니다.

평가 부분은 Padding의 존재 때문에 코드가 좀 난잡합니다.

            for characters, answers, predictions in zip(x, y, output):
                for character, answer, prediction in zip(characters, answers.view(-1), predictions.view(-1)):
                    if character.item() in (character_to_index['PAD'], character_to_index['BOL'], character_to_index['EOL']):
                        continue

평가 부분 역시 mini batch 형태로 입력이 들어가야하기 때문에 훈련 때와 마찬가지로 Padding을 해주고 있습니다. 문제는 Padding된 부분은 평가시에 제외를 해야한다는 점입니다 (BOL, EOL 도 마찬가지입니다). 그렇지 않으면 띄어써야하지 말 곳을 잘 맞췄는지가 과장되게 집계됩니다. 이를 위해서 해당 글자가 PAD, BOL, EOL인 경우 통계에서 제외합니다.

predicted_label = 1 if prediction > 0.5 else 0   

위 코드를 보면 Sigmoid 후 0.5를 기준으로 0과 1으로 구분하는 것을 알 수 있습니다.

전체 코드

평가 부분 전체 코드입니다. 전체 코드를 eval.py라고 저장하고 아래와 같이 실행합니다. 훈련 결과 성능은 아래와 같습니다. 간단한 모델치고는 나쁘지 않네요 :)

python3 eval.py -md models/model_final.pth -m 300 -t corpus/test.txt -b 128
Using cuda for testing
Space accuracy: 96.69% (44547/46072)
Non-Space accuracy: 98.74% (127842/129469)
Accuracy: 98.20% (172389/175541)

띄어쓰기를 하지 말아야하는데 띄어쓰는 경우(잘못 띄어쓴 경우)보다 띄어쓰기를 해야 하는데 하지 않은 경우(잘못 붙여 쓴 경우)가 더 많은 것을 알 수 있습니다. 그렇게 생각해보면 사람도 띄어쓰기를 과도하게 하는 문제보다는 띄어쓸 곳을 안 띄우는 문제가 더 많은 것 같기도 합니다.

import torch
import torch.nn as nn
from torch.utils.data import IterableDataset
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

import argparse

class MyDataset(IterableDataset):
    def __init__(self, corpus_filename, max_characters=0, character_to_index=None):
        super(MyDataset).__init__()

        self.corpus_filename = corpus_filename

        self.max_characters = max_characters

        if character_to_index:
            self.character_to_index = character_to_index
        else:
            print('Building a character to index')
            self.character_to_index = self.build_character_to_index()
            print(f'Done. {len(self.character_to_index) - 4} characters were found')

    def __iter__(self):
        x = ['BOL',]
        y = [0.0,]
        for line in open(self.corpus_filename):
            if line == '\n':
                x.append('EOL')
                y.append(0.0)
                yield torch.tensor([self.character_to_index.get(c, self.character_to_index['UNK']) for c in x]), torch.tensor(y)

                x = ['BOL',]
                y = [0.0,]
            else:
                if self.max_characters == 0 or len(x) <= self.max_characters:
                    character, label = line.rstrip().split('\t')
                    x.append(character)
                    y.append(float(label))

        yield torch.tensor([self.character_to_index.get(c, self.character_to_index['UNK']) for c in x]), torch.tensor(y)

    def build_character_to_index(self):
        characters = set()
        character_list = ['PAD', 'UNK', 'BOL', 'EOL']

        for line in open(self.corpus_filename):
            line = line.strip()
            if line:
                character, _ = line.split('\t')
                characters.add(character)

        character_list.extend(list(sorted(characters)))

        return {c:i for i, c in enumerate(character_list)}    

    def line_to_tensor(self, characters):
        result = torch.zeros(len(characters), len(self.character_to_index), device=torch.device('cuda'))
        for i, character in enumerate(characters):
            result[i][self.character_to_index.get(character, self.character_to_index['UNK'])] = 1

        return result

    def get_vocab_size(self):
        return len(self.character_to_index)

    def get_character_to_index(self):
        return self.character_to_index

class SpaceModel(nn.Module):
    def __init__(self, vocab_size, hidden_dim, embedding_dim):
        super(SpaceModel, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=2, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out = self.embedding(x)
        out, _ = self.lstm(out)
        out = self.fc(out)
        prediction = self.sigmoid(out)

        return prediction

def make_batch(samples):
    inputs = []
    labels = []

    for sample in samples:
        inputs.append(sample[0])
        labels.append(sample[1])

    inputs = pad_sequence(inputs, batch_first=True)
    labels = pad_sequence(labels, batch_first=True)

    return [inputs, labels]


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-t', '--test', type=str, help='path to a training corpus file')
    parser.add_argument('-b', '--batch', type=int, help='the number of batches')
    parser.add_argument('-m', '--max-characters', type=int)
    parser.add_argument('-md', '--model')
    args = parser.parse_args()

    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    print(f'Using {device} for testing')    

    model_data = torch.load(args.model)

    character_to_index = model_data['character_to_index']    

    model = SpaceModel(len(character_to_index), model_data['hidden_dim'], model_data['embedding_dim'])    
    model.load_state_dict(model_data['model_state_dict'])
    model.to(device)
    model.eval()

    character_to_index = model_data['character_to_index']    

    BATCH_SIZE = args.batch

    test_dataset = MyDataset(args.test, max_characters=args.max_characters, character_to_index=character_to_index)
    test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, collate_fn=make_batch)    

    total_space = 0
    correct_space = 0
    total_nonspace = 0
    correct_nonspace = 0

    with torch.no_grad():
        for batch_index, (x, y) in enumerate(test_dataloader, 1):
            x = x.to(device)
            y = y.to(device)

            output = model(x)

            for characters, answers, predictions in zip(x, y, output):
                for character, answer, prediction in zip(characters, answers.view(-1), predictions.view(-1)):
                    if character.item() in (character_to_index['PAD'], character_to_index['BOL'], character_to_index['EOL']):
                        continue

                    answer_label = int(answer)
                    predicted_label = 1 if prediction > 0.5 else 0                    

                    if answer_label == 0:
                        total_nonspace += 1
                        if predicted_label == 0:
                            correct_nonspace += 1
                    elif answer_label == 1:
                        total_space += 1
                        if predicted_label == 1:
                            correct_space += 1

        print(f'Space accuracy: {correct_space / total_space * 100:.2f}% ({correct_space}/{total_space})')
        print(f'Non-Space accuracy: {correct_nonspace / total_nonspace * 100:.2f}% ({correct_nonspace}/{total_nonspace})')
        print(f'Accuracy: {(correct_space + correct_nonspace) / (total_space + total_nonspace) * 100:.2f}% ({correct_space + correct_nonspace}/{total_space + total_nonspace})')

테스트해보기

이번에는 띄어쓰기 없는 문장을 입력해서 띄어쓰기를 잘 하는지 보겠습니다. 이번에는 전체 코드부터 보겠습니다.

import torch
import torch.nn as nn

import argparse

def line_to_tensor(line, character_to_index):
    characters = ['BOL',]
    characters.extend(list(line))
    characters.append('EOL')

    return torch.tensor([character_to_index.get(c, character_to_index['UNK']) for c in characters])

def to_result(characters, labels):
    result = []
    for c, label in zip(characters, labels):
        if label == 1:
            result.append(' ')
        result.append(c)

    return ''.join(result)

class SpaceModel(nn.Module):
    def __init__(self, vocab_size, hidden_dim, embedding_dim):
        super(SpaceModel, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=2, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out = self.embedding(x)
        out, _ = self.lstm(out)
        out = self.fc(out)
        prediction = self.sigmoid(out)

        return prediction

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-m', '--model')
    parser.add_argument('-s', '--sentence')
    args = parser.parse_args()

    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    model_data = torch.load(args.model)
    character_to_index = model_data['character_to_index']

    model = SpaceModel(len(character_to_index), model_data['hidden_dim'], model_data['embedding_dim'])    
    model.load_state_dict(model_data['model_state_dict'])
    model.to(device)
    model.eval()


    character_to_index = model_data['character_to_index']

    with torch.no_grad():
        sentence = ''.join(c for c in args.sentence if not c.isspace())

        input = line_to_tensor(sentence, character_to_index).to(device)
        input = torch.unsqueeze(input, 0)

        output = model(input)

        labels = [1 if y > 0.5 else 0 for y in output.view(-1)[1:-1]]

        print(to_result(sentence, labels))

많은 부분을 앞에서 다루었기 때문에 특징적인 부분만 보겠습니다.

def to_result(characters, labels):
    result = []
    for c, label in zip(characters, labels):
        if label == 1:
            result.append(' ')
        result.append(c)

    return ''.join(result)

글자와 글자 앞의 띄어쓰기 여부를 알려주면 실제로 띄어쓰기를 넣어서 최종 결과를 만들어주는 함수입니다.

    with torch.no_grad():
        sentence = ''.join(c for c in args.sentence if not c.isspace())

        input = line_to_tensor(sentence, character_to_index).to(device)
        input = torch.unsqueeze(input, 0)

        output = model(input)

        labels = [1 if y > 0.5 else 0 for y in output.view(-1)[1:-1]]

        print(to_result(sentence, labels))

테스트를 하는 부분인데요. 평가 시와 마찬가지로 with torch.no_grad():으로 Gradient 추적을 하지 않습니다. 앞에 언급한 것처럼 torch.nn을 mini batch를 입력으로 받기 때문에 입력이 하나라고 하더라도 input = torch.unsqueeze(input, 0)와 같이 mini batch 형태로 만들어주어야합니다. (0번째, 즉 batch dimension을 추가합니다.)

labels = [1 if y > 0.5 else 0 for y in output.view(-1)[1:-1]]

우리 Dataset은 내부적으로 BOL, EOL을 붙이기 때문에 [1:-1]으로 그 부분을 제거합니다.

아래 보는 것처럼 BOL, EOLline_to_tensor에서 붙여줍니다. Dataset을 쓰지 않기 때문에 line_to_tensor에서 이 작업을 수행합니다.

def line_to_tensor(line, character_to_index):
    characters = ['BOL',]
    characters.extend(list(line))
    characters.append('EOL')

    return torch.tensor([character_to_index.get(c, character_to_index['UNK']) for c in characters])

몇가지 실행 예시

잘 되는지 한번 해볼까요?

python3 test.py -m models/model_final.pth -s "아름다운별지구에오신여러분을환영합니다"
아름다운 별 지구에 오신 여러분을 환영합니다

python3 test.py -m models/model_final.pth -s "내일또일요일이오는군요.벌써부터 출근하기가싫어요."
내일 또 일요일이 오는 군요. 벌써부터 출근하기가 싫어요.

python3 test.py -m models/model_final.pth -s "딥러닝을배우고싶은데어디에서부터시작해야할까요?조금이라도힌트가있으면좋겠어요."
딥 러닝을 배우고 싶은데 어디에서부터 시작해야 할까요? 조금이라도 힌트가 있으면 좋겠어요.

python3 test.py -m models/all/model_final.pth -s "인싸력이부족해서혼밥은아직어렵다는"
인싸력이 부족해서 혼밥은 아직 어렵다는

python3 test.py -m models/all/model_final.pth -s "그키스는심문관의가슴속에서불타고있었지만,그래도여전히자기사상에빠져있었지."
그 키스는 심문관의 가슴 속에서 불타고 있었지만, 그래도 여전히 자기 사상에 빠져 있었지.

나쁘지 않네요 :)

마치며

딥러닝과 자연어처리의 기본적인 공부를 마치고 나서 실제 개발을 해보려고 하면 어디에서부터 어떻게 시작해야할지 막막할 때가 많습니다. 이 글이 그런 상황에서 조금이나마 도움이 되었으면 합니다.

"아름다운 별 지구에 오신 여러분을 환영합니다"