이번 포스팅의 내용은 저번 포스팅 에서 다루었던 transformer 모델을 직접 구현하는 과정을 담았다.
transformer 기반의 간단한 챗봇을 만들었는데, 결론부터 말하자면 구조는 제대로 갖췄지만 학습 부족으로 인해 매우 낮은 성능이 나왔다.
그래도 구조를 공부했던 transformer을 직접 구현했다는 것에 의미를 두고 그 과정을 하나씩 끄적여보겠습니다.
전체 코드는 여기
AIFFEL_quest_rs/Exploration/Ex07/transformer_chatbot.ipynb at main · choiwonjini/AIFFEL_quest_rs
Contribute to choiwonjini/AIFFEL_quest_rs development by creating an account on GitHub.
github.com
0. transformer 아키텍처
이전 포스팅에서 다루었지만 다시 한 번 transformer의 구조를 눈에 담아보자.

transformer는 인코더, 디코더로 나뉘어 있다.
인코더는
- Vector Embedding & PE
- MHA
- Add & Norm
- FFN
- Add & Norm
디코더는
- Vector Embedding & PE
- MMHA
- Add & Norm
- Encoder-Decoder Attention
- Add & Norm
- FFN
- Add & Norm
- softmax & FC layer
순서로 시퀀스를 처리한다.
자세한 내용은 여기를 참고.
1. 패키지 및 데이터 로드
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
import sentencepiece as spm
import math
import os
import re
import urllib.request
import zipfile
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
!wget https://github.com/songys/Chatbot_data/raw/master/ChatbotData.csv
data = pd.read_csv("/content/ChatbotData.csv")
data.head()
Q A label
0 12시 땡! 하루가 또 가네요. 0
1 1지망 학교 떨어졌어 위로해 드립니다. 0
2 3박4일 놀러가고 싶다 여행은 언제나 좋죠. 0
3 3박4일 정도 놀러가고 싶다 여행은 언제나 좋죠. 0
4 PPL 심하네 눈살이 찌푸려지죠. 0
위와 같이 입력 문장에 대해 간단한 대답을 하는 챗봇 대화 데이터임을 확인할 수 있다.
2. 전처리
# 중복, 결측치 확인
print(data.duplicated().sum())
print(data.isna().sum())
0
Q 0
A 0
label 0
dtype: int64
# label 칼럼은 필요 없으므로 제거
del data['label']
중복 데이터와 결측치는 존재하지 않았고, 세 번째 칼럼 label은 감성분석용 칼럼이므로 이번 포스팅의 주제에서는 불필요해서 제거했다.
아래는 텍스트 전처리를 진행하는 함수이다. 특수문자 제거, 반복문자 정규화 등의 과정이 포함된다.
def preprocess_sentence(text):
"""
챗봇 학습용 한국어 전처리 함수
-------------------------------------------------------
주요 처리 단계:
1) 한글, 영어, 숫자, ., ?, !, , , 등만 남기기
2) 연속 공백 정리
3) 과도한 반복 문자 정규화 (ㅋㅋㅋㅋ → ㅋㅋ)
4) 단일 자음/모음 제거 (의미 없는 ㅇ, ㅎ 등)
5) 단어와 구두점 사이 거리 만들기
6) 공백 정리
"""
# 1) 허용되지 않은 문자 제거 (한글, 영어, 숫자, 구두점, 감정 표현만 남김)
text = re.sub(r'[^a-zA-Z가-힣0-9.,?!~\s]', ' ', text)
# 2) 연속된 공백 정리
text = re.sub(r'\s+', ' ', text).strip()
# 3) 반복 문자 정규화 (3번 이상 반복 → 2번)
text = re.sub(r'(.)\1{2,}', r'\1\1', text)
# 4) 잡음 정리
text = re.sub(r'[ㅋㅎㅠㅜ]{2,}', '', text)
text = re.sub(r'\.{2,}', '.', text)
text = re.sub(r'~{2,}', '~', text)
# 5) 단어와 구두점 사이 거리 만들기
# ex: I am a student." => "I am a student .
text = re.sub(r"([?.!,])", r" \1 ", text)
text = re.sub(r'[" "]+', " ", text)
# 6) 공백 정리
text = re.sub(r'\s+', ' ', text).strip()
return text
# 샘플 문장 테스트
sample_sentence = " 안녕하세요ㅋㅋㅋㅋ? Hello! I'm a student😊, nice to meet you!"
preprocessed_sentence = preprocess_sentence(sample_sentence)
print(preprocessed_sentence)
안녕하세요 ? Hello ! I m a student , nice to meet you !
샘플 문장을 보니 전처리가 잘 진행되는 것을 알 수 있으므로 본 데이터에 적용해준다.
data["Q"] = data["Q"].apply(preprocess_sentence)
data["A"] = data["A"].apply(preprocess_sentence)
data.head()
Q A
0 12시 땡 ! 하루가 또 가네요 .
1 1지망 학교 떨어졌어 위로해 드립니다 .
2 3박4일 놀러가고 싶다 여행은 언제나 좋죠 .
3 3박4일 정도 놀러가고 싶다 여행은 언제나 좋죠 .
4 PPL 심하네 눈살이 찌푸려지죠 .
3. SentencePiece 토큰화
전처리가 끝난 text를 SentencePiece 토큰화하기 위해서, 먼저 train 데이터로 SP 모델을 학습시켜야 한다.
# DataFrame의 'Q'와 'A'를 합쳐 하나의 학습용 텍스트로 저장
with open("train_corpus.txt", "w", encoding="utf-8") as f:
for q, a in zip(data["Q"], data["A"]):
f.write(str(q).strip() + "\n")
f.write(str(a).strip() + "\n")
# SentencePiece 모델 학습
spm.SentencePieceTrainer.Train(
input="/content/train_corpus.txt",
model_prefix="spm_cornell",
vocab_size=8000,
character_coverage=1.0,
model_type="bpe",
max_sentence_length=999999,
pad_id=0, # <pad>
bos_id=1, # <s>
eos_id=2, # </s>
unk_id=3 # <unk>
)
학습이 끝났으면 sp 모델을 불러와서 예제 문장으로 테스트 해보자
sp = spm.SentencePieceProcessor()
sp.Load("spm_cornell.model")
# 예제 문장
sentence = "이번 winter에 여자친구랑 japan 갈거야. "
sentence = preprocess_sentence(sentence)
print("전처리 후의 문장:", sentence)
# 1. 토크나이징 (subword 단위로 분할)
tokens = sp.encode(sentence, out_type=str)
print("Tokenized:", tokens)
# 2. 인코딩 (서브워드를 정수 ID로 변환)
encoded = sp.encode(sentence, out_type=int)
print("Encoded:", encoded)
# 3. 디코딩 (정수 ID → 원본 문장 복원)
decoded = sp.decode(encoded)
print("Decoded:", decoded)
전처리 후의 문장: 이번 winter에 여자친구랑 japan 갈거야 .
Tokenized: ['▁이번', '▁', 'wi', 'n', 'ter', '에', '▁여자친구랑', '▁', 'j', 'a', 'p', 'a', 'n', '▁갈', '거야', '▁.']
Encoded: [1388, 6769, 3, 7460, 3, 6791, 2322, 6769, 7867, 7651, 3, 7651, 7460, 578, 1067, 4]
Decoded: 이번 ⁇ n ⁇ 에 여자친구랑 ja ⁇ an 갈거야 .
학습 데이터에 winter, japan이 없기 때문에 ?? 로 나타난다.
4. Dataset 구현
이번 단계에서는 입력 문장(question과 출력 문장(answer)을 받아서,
인코더 입력(enc_input), 디코더 입력(dec_input), 디코더 타겟(dec_target)을 만들어주는 함수를 만든다.
class ChatBotDataset(Dataset):
def __init__(self, pairs, sp, max_length=40):
super().__init__()
self.sp = sp
self.max_length = max_length
self.data = []
for q_text, a_text in pairs:
# 1) 토크나이즈
# "hi" → [53, 17]
q_ids = sp.EncodeAsIds(q_text)
a_ids = sp.EncodeAsIds(a_text)
# 2) [CLS]/[SEP] 같은 별도 스페셜 토큰을 쓸 수도 있으나,
# 여기서는 SentencePiece 기본 <s>, </s> 등 혹은 사용자 정의 토큰 활용 가능
# 간단히 <s>=sp.bos_id(), </s>=sp.eos_id()로 가정해본다면:
# sp.SetEncodeExtraOptions("bos:eos") 등으로 설정하는 방법도 있음.
# 여기서는 수동으로 bos/eos id를 붙인다고 가정
# [53, 17] → [1, 53, 17, 2]
bos_id = sp.bos_id() if sp.bos_id() >= 0 else 1 # 혹은 임의값
eos_id = sp.eos_id() if sp.eos_id() >= 0 else 2
q_tokens = [bos_id] + q_ids + [eos_id]
a_tokens = [bos_id] + a_ids + [eos_id]
# 3) 길이 제한
if len(q_tokens) > max_length or len(a_tokens) > max_length:
continue
# 4) 고정 길이 패딩
q_tokens += [0]*(max_length - len(q_tokens)) # 0 -> <pad> 가정
a_tokens += [0]*(max_length - len(a_tokens))
# 5) 디코더 입력(dec_input): a_tokens[:-1], 타겟(outputs): a_tokens[1:]
# Teacher Forcing을 위해 a_tokens을 한 칸씩 밀어서 사용
dec_input = a_tokens[:-1]
target = a_tokens[1:]
self.data.append({
"enc_input": q_tokens,
"dec_input": dec_input,
"target": target
})
def __len__(self):
return len(self.data)
# Dataset이 [] 인덱싱을 지원하게 해줌
def __getitem__(self, idx):
sample = self.data[idx]
enc_input = torch.tensor(sample["enc_input"], dtype=torch.long)
dec_input = torch.tensor(sample["dec_input"], dtype=torch.long)
target = torch.tensor(sample["target"], dtype=torch.long)
return enc_input, dec_input, target
이렇게 정의한 ChatbotDataset 클래스로 dataset을 정의하고 예시를 확인해보자
pairs = list(zip(data["Q"], data["A"]))
dataset = ChatBotDataset(pairs, sp, max_length=40)
# dataset[0] → (encoder_input_tensor, decoder_input_tensor, target_tensor) 반환
for encoder_input, decoder_input, decoder_label in dataset:
print("텐서 크기 :",encoder_input.size())
print(encoder_input)
print(sp.decode(encoder_input.tolist()))
print(decoder_input)
print(sp.decode(decoder_input.tolist()))
print(decoder_label)
print(sp.decode(decoder_label.tolist()))
break
텐서 크기 : torch.Size([40])
tensor([ 1, 5553, 6811, 3202, 109, 2, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0])
12시 땡 !
tensor([ 1, 4486, 214, 5923, 4, 2, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0])
하루가 또 가네요 .
tensor([4486, 214, 5923, 4, 2, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0])
하루가 또 가네요 .
이 dataset을 모델에 학습시키기 위해 dataloader에 담는다.
dataloader = DataLoader(dataset,batch_size=32,shuffle=True)
for encoder_input, decoder_input, decoder_label in dataloader:
print(encoder_input.size())
print(decoder_input.size())
print(decoder_label.size())
break
결과:
torch.Size([32, 40])
torch.Size([32, 39])
torch.Size([32, 39])
5. 모델 구성
각 파트의 역할은 이전 포스팅에서 모두 다루었으므로 이번에는 코드만 간단히.
(포스팅 최상단의 transformer 아키텍처를 참고하면 코드의 흐름을 이해하기 수월할 것입니다.)
5.1. Positional Encoding
class PositionalEncoding(nn.Module):
def __init__(self, position, d_model):
super(PositionalEncoding, self).__init__()
self.d_model = d_model
self.position = position
self.pos_encoding = self._build_pos_encoding(position, d_model)
def _get_angles(self, position, i, d_model):
return 1.0 / (10000.0 ** ((2.0 * (i // 2)) / d_model)) * position
def _build_pos_encoding(self, position, d_model):
pos = torch.arange(position, dtype=torch.float32).unsqueeze(1)
i = torch.arange(d_model, dtype=torch.float32).unsqueeze(0)
angle_rads = self._get_angles(pos, i, d_model)
sines = torch.sin(angle_rads[:, 0::2])
cosines = torch.cos(angle_rads[:, 1::2])
pos_encoding = torch.zeros(position, d_model)
pos_encoding[:, 0::2] = sines
pos_encoding[:, 1::2] = cosines
pos_encoding = pos_encoding.unsqueeze(0) # shape: [1, position, d_model]
return pos_encoding
def forward(self, x):
return x + self.pos_encoding[:, :x.size(1), :].to(x.device)
5.2. Attention
# Scaled Dot-Product Attention
def scaled_dot_product_attention(query, key, value, mask=None):
# 1) Q와 K의 내적을 통해 score(유사도) 계산
# key.transpose(-1, -2): (batch_size, heads, depth, seq_len)
# matmul 결과 shape: (batch_size, heads, seq_len, seq_len)
matmul_qk = torch.matmul(query, key.transpose(-1, -2))
# 2) depth에 따라 정규화
depth = key.size(-1) # depth = d_model / heads
logits = matmul_qk / math.sqrt(depth)
# 3) 마스크가 주어졌다면 -1e9(아주 작은 값)를 더해 소프트맥스에서 제외시키도록 함
if mask is not None:
logits = logits + (mask * -1e9)
# 4) 소프트맥스 계산해 attention weights 생성
attention_weights = F.softmax(logits, dim=-1)
# 5) attention weights와 value의 내적
output = torch.matmul(attention_weights, value)
return output, attention_weights
# Multi-head Attention
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads, name="multi_head_attention"):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.d_model = d_model
# d_model은 num_heads로 나누어떨어져야 함
assert d_model % num_heads == 0
self.depth = d_model // num_heads
# 파이토치에서 Dense는 nn.Linear로 대응
self.query_dense = nn.Linear(d_model, d_model)
self.key_dense = nn.Linear(d_model, d_model)
self.value_dense = nn.Linear(d_model, d_model)
self.out_dense = nn.Linear(d_model, d_model)
def split_heads(self, x, batch_size):
"""
x: (batch_size, seq_len, d_model)
=> (batch_size, num_heads, seq_len, depth) 형태로 변환
"""
x = x.view(batch_size, -1, self.num_heads, self.depth)
x = x.permute(0, 2, 1, 3) # (batch_size, num_heads, seq_len, depth)
return x
def forward(self, query, key, value, mask=None):
"""
query, key, value: (batch_size, seq_len, d_model)
mask: (batch_size, 1, seq_len, seq_len) 등으로 broadcast 가능하도록 구성
"""
batch_size = query.size(0)
# Q, K, V에 각각 Linear 적용
query = self.query_dense(query)
key = self.key_dense(key)
value = self.value_dense(value)
# Head 분할
query = self.split_heads(query, batch_size)
key = self.split_heads(key, batch_size)
value = self.split_heads(value, batch_size)
# 스케일드 닷 프로덕트 어텐션
# _ : 두 번째 반환하는 변수(attention weight)는 안 쓰겠다.
scaled_attention, _ = scaled_dot_product_attention(query, key, value, mask)
# (batch_size, num_heads, seq_len, depth) -> (batch_size, seq_len, num_heads, depth)
scaled_attention = scaled_attention.permute(0, 2, 1, 3).contiguous()
# 다시 (batch_size, seq_len, d_model)로 합치기
concat_attention = scaled_attention.view(batch_size, -1, self.d_model)
# 최종 Dense
output = self.out_dense(concat_attention)
return output
5.3. Masking
5.3.1. padding masking
def create_padding_mask(x):
# x == 0 위치를 찾아 float형 1로 변환
mask = (x == 0).float()
# (batch_size, seq_len) -> (batch_size, 1, 1, seq_len)
mask = mask.unsqueeze(1).unsqueeze(2)
return mask
5.3.2. look-ahead masking
다음 단어 가리기
def create_look_ahead_mask(x):
seq_len = x.size(1)
# (seq_len, seq_len) 크기의 하삼각 행렬(tril) 생성 후 1에서 빼서
# 상삼각이 1, 하삼각(자기 자신 포함)이 0이 되도록 설정
# => 미래 토큰(자신 인덱스보다 큰 위치) 마스킹
look_ahead_mask = 1 - torch.tril(torch.ones((seq_len, seq_len)))
# 패딩 마스크 생성 (shape: (batch_size, 1, 1, seq_len))
padding_mask = create_padding_mask(x)
# look_ahead_mask: (seq_len, seq_len) -> (1, seq_len, seq_len)
look_ahead_mask = look_ahead_mask.unsqueeze(0)
# -> (1, seq_len, seq_len) -> (1, 1, seq_len, seq_len)
look_ahead_mask = look_ahead_mask.unsqueeze(1)
look_ahead_mask = look_ahead_mask.to(x.device)
# look-ahead 마스크와 패딩 마스크를 합성 (둘 중 하나라도 1이면 마스킹)
# 최종 shape은 브로드캐스팅으로 (batch_size, 1, seq_len, seq_len)
combined_mask = torch.max(look_ahead_mask, padding_mask)
return combined_mask
5.4. Encoder
5.4.1. Encoder Layer
class EncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
super(EncoderLayer, self).__init__()
self.mha = MultiHeadAttention(d_model, num_heads) # 이전에 구현한 MHA
self.dropout1 = nn.Dropout(dropout)
self.norm1 = nn.LayerNorm(d_model, eps=1e-6)
# 피드포워드 부분 (Dense -> ReLU -> Dense)
self.ffn = nn.Sequential(
nn.Linear(d_model, ff_dim),
nn.ReLU(),
nn.Linear(ff_dim, d_model)
)
self.dropout2 = nn.Dropout(dropout)
self.norm2 = nn.LayerNorm(d_model, eps=1e-6)
def forward(self, x, mask=None):
# (1) 멀티 헤드 어텐션 (셀프 어텐션)
attn_output = self.mha(x, x, x, mask) # (batch_size, seq_len, d_model)
attn_output = self.dropout1(attn_output)
out1 = self.norm1(x + attn_output) # 잔차 연결 + LayerNorm
# (2) 피드포워드 신경망
ffn_output = self.ffn(out1) # (batch_size, seq_len, d_model)
ffn_output = self.dropout2(ffn_output)
out2 = self.norm2(out1 + ffn_output) # 잔차 연결 + LayerNorm
return out2
5.4.2. Encoder
class Encoder(nn.Module):
def __init__(self,
vocab_size,
num_layers,
ff_dim,
d_model,
num_heads,
dropout=0.1):
super(Encoder, self).__init__()
self.d_model = d_model
# (1) 임베딩 레이어
self.embedding = nn.Embedding(vocab_size, d_model)
# (2) 포지셔널 인코딩
self.pos_encoding = PositionalEncoding(position=vocab_size, d_model=d_model)
self.dropout = nn.Dropout(dropout)
# (3) EncoderLayer 쌓기
self.enc_layers = nn.ModuleList([
EncoderLayer(d_model, num_heads, ff_dim, dropout)
for _ in range(num_layers)
])
def forward(self, x, mask=None):
# (1) 임베딩 & sqrt(d_model)로 스케일링
x = self.embedding(x) * math.sqrt(self.d_model)
# (2) 포지셔널 인코딩 적용 + 드롭아웃
x = self.pos_encoding(x) # shape: (batch_size, seq_len, d_model)
x = self.dropout(x)
# (3) num_layers만큼 쌓아올린 EncoderLayer 통과
for layer in self.enc_layers:
x = layer(x, mask)
return x
5.5. Decoder
5.5.1. Decoder Layer
class DecoderLayer(nn.Module):
def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
super(DecoderLayer, self).__init__()
# 첫 번째 서브 레이어 (디코더 내부 셀프 어텐션)
self.self_mha = MultiHeadAttention(d_model, num_heads)
self.norm1 = nn.LayerNorm(d_model, eps=1e-6)
# 두 번째 서브 레이어 (인코더-디코더 어텐션)
self.encdec_mha = MultiHeadAttention(d_model, num_heads)
self.norm2 = nn.LayerNorm(d_model, eps=1e-6)
# 세 번째 서브 레이어 (피드포워드 네트워크)
self.ffn = nn.Sequential(
nn.Linear(d_model, ff_dim), # Dense(units=ff_dim)
nn.ReLU(), # activation='relu'
nn.Linear(ff_dim, d_model) # Dense(units=d_model)
)
self.norm3 = nn.LayerNorm(d_model, eps=1e-6)
# 드롭아웃
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.dropout3 = nn.Dropout(dropout)
def forward(self, x, enc_outputs, look_ahead_mask=None, padding_mask=None):
# 1) 셀프 어텐션 (디코더 내부)
# x, x, x 는 Q, K, V를 의미함
self_attn_out = self.self_mha(x, x, x, mask=look_ahead_mask)
self_attn_out = self.dropout1(self_attn_out)
out1 = self.norm1(x + self_attn_out) # 잔차 연결 + LayerNorm
# 2) 인코더-디코더 어텐션
# out1 = Q, enc_outputs = K & V
encdec_attn_out = self.encdec_mha(out1, enc_outputs, enc_outputs, mask=padding_mask)
encdec_attn_out = self.dropout2(encdec_attn_out)
out2 = self.norm2(out1 + encdec_attn_out) # 잔차 연결 + LayerNorm
# 3) 피드포워드 (Dense -> ReLU -> Dense)
ffn_out = self.ffn(out2)
ffn_out = self.dropout3(ffn_out)
out3 = self.norm3(out2 + ffn_out) # 잔차 연결 + LayerNorm
return out3
5.5.2. Decoder
class Decoder(nn.Module):
def __init__(self,
vocab_size,
num_layers,
ff_dim,
d_model,
num_heads,
dropout=0.1):
super(Decoder, self).__init__()
self.d_model = d_model
# (1) 임베딩 레이어
self.embedding = nn.Embedding(vocab_size, d_model)
# (2) 포지셔널 인코딩
# 실제 학습 시에는 최대 시퀀스 길이에 맞추어 쓰기도 함
self.pos_encoding = PositionalEncoding(position=vocab_size, d_model=d_model)
self.dropout = nn.Dropout(dropout)
# (3) DecoderLayer 쌓기
self.dec_layers = nn.ModuleList([
DecoderLayer(d_model, num_heads, ff_dim, dropout)
for _ in range(num_layers)
])
def forward(self, x, enc_outputs, look_ahead_mask=None, padding_mask=None):
# (1) 임베딩 + sqrt(d_model)로 스케일링
x = self.embedding(x) * math.sqrt(self.d_model)
# (2) 포지셔널 인코딩 + 드롭아웃
x = self.pos_encoding(x) # (batch_size, tgt_seq_len, d_model)
x = self.dropout(x)
# (3) num_layers만큼 쌓인 DecoderLayer 통과
for layer in self.dec_layers:
x = layer(x, enc_outputs, look_ahead_mask, padding_mask)
return x
5.6. Transformer
class Transformer(nn.Module):
def __init__(self,
vocab_size,
num_layers, # 인코더/디코더 층 수
units, # feed-forward 네트워크의 중간 차원(ff_dim)
d_model, # 임베딩 및 내부 표현 차원
num_heads, # 멀티헤드 어텐션의 헤드 수
dropout=0.1):
super(Transformer, self).__init__()
# 인코더
self.encoder = Encoder(
vocab_size=vocab_size,
num_layers=num_layers,
ff_dim=units,
d_model=d_model,
num_heads=num_heads,
dropout=dropout
)
# 디코더
self.decoder = Decoder(
vocab_size=vocab_size,
num_layers=num_layers,
ff_dim=units,
d_model=d_model,
num_heads=num_heads,
dropout=dropout
)
# 최종 출력층: (d_model) -> (vocab_size)
self.final_linear = nn.Linear(d_model, vocab_size)
# 참고: 텐서플로우 코드의 `name="transformer"`는 파이토치에선 보통 사용 안 함
def forward(self, inputs, dec_inputs):
# 1) 인코더 패딩 마스크 생성
# 패딩 토큰 무시하기 위함
enc_padding_mask = create_padding_mask(inputs) # shape (batch_size, 1, 1, src_seq_len)
# 2) 디코더 look-ahead + 패딩 마스크
# 미래 단어 가리기
look_ahead_mask = create_look_ahead_mask(dec_inputs) # shape (batch_size, 1, tgt_seq_len, tgt_seq_len)
# 3) 디코더에서 인코더 출력 쪽을 마스킹할 때 쓸 패딩 마스크
# 패딩 토큰 무시
dec_padding_mask = create_padding_mask(inputs) # shape (batch_size, 1, 1, src_seq_len)
# 4) 인코더 수행
enc_outputs = self.encoder(
x=inputs,
mask=enc_padding_mask
) # shape: (batch_size, src_seq_len, d_model)
# 5) 디코더 수행
dec_outputs = self.decoder(
x=dec_inputs, # (batch_size, tgt_seq_len)
enc_outputs=enc_outputs,# (batch_size, src_seq_len, d_model)
look_ahead_mask=look_ahead_mask,
padding_mask=dec_padding_mask
) # shape: (batch_size, tgt_seq_len, d_model)
# 6) 최종 Dense (vocab_size)
logits = self.final_linear(dec_outputs) # (batch_size, tgt_seq_len, vocab_size)
return logits
5.6.1. 하이퍼파라미터 설정 & 모델 구조 출력
# 하이퍼파라미터 설정
NUM_LAYERS = 6 # 인코더/디코더 층 수
D_MODEL = 512 # 임베딩 및 내부 표현 차원
NUM_HEADS = 8 # 멀티헤드 어텐션에서의 헤드 수
UNITS = 2048 # 피드포워드 신경망의 은닉 차원
DROPOUT = 0.1 # 드롭아웃 비율
VOCAB_SIZE = 8000 # 단어 집합 크기
# 모델 생성
model = Transformer(
vocab_size=VOCAB_SIZE,
num_layers=NUM_LAYERS,
units=UNITS,
d_model=D_MODEL,
num_heads=NUM_HEADS,
dropout=DROPOUT
)
print(model)
Transformer(
(encoder): Encoder(
(embedding): Embedding(8000, 512)
(pos_encoding): PositionalEncoding()
(dropout): Dropout(p=0.1, inplace=False)
(enc_layers): ModuleList(
(0-5): 6 x EncoderLayer(
(mha): MultiHeadAttention(
(query_dense): Linear(in_features=512, out_features=512, bias=True)
(key_dense): Linear(in_features=512, out_features=512, bias=True)
(value_dense): Linear(in_features=512, out_features=512, bias=True)
(out_dense): Linear(in_features=512, out_features=512, bias=True)
)
(dropout1): Dropout(p=0.1, inplace=False)
(norm1): LayerNorm((512,), eps=1e-06, elementwise_affine=True)
(ffn): Sequential(
(0): Linear(in_features=512, out_features=2048, bias=True)
(1): ReLU()
(2): Linear(in_features=2048, out_features=512, bias=True)
)
(dropout2): Dropout(p=0.1, inplace=False)
(norm2): LayerNorm((512,), eps=1e-06, elementwise_affine=True)
)
)
)
(decoder): Decoder(
(embedding): Embedding(8000, 512)
(pos_encoding): PositionalEncoding()
(dropout): Dropout(p=0.1, inplace=False)
(dec_layers): ModuleList(
(0-5): 6 x DecoderLayer(
(self_mha): MultiHeadAttention(
(query_dense): Linear(in_features=512, out_features=512, bias=True)
(key_dense): Linear(in_features=512, out_features=512, bias=True)
(value_dense): Linear(in_features=512, out_features=512, bias=True)
(out_dense): Linear(in_features=512, out_features=512, bias=True)
)
(norm1): LayerNorm((512,), eps=1e-06, elementwise_affine=True)
(encdec_mha): MultiHeadAttention(
(query_dense): Linear(in_features=512, out_features=512, bias=True)
(key_dense): Linear(in_features=512, out_features=512, bias=True)
(value_dense): Linear(in_features=512, out_features=512, bias=True)
(out_dense): Linear(in_features=512, out_features=512, bias=True)
)
(norm2): LayerNorm((512,), eps=1e-06, elementwise_affine=True)
(ffn): Sequential(
(0): Linear(in_features=512, out_features=2048, bias=True)
(1): ReLU()
(2): Linear(in_features=2048, out_features=512, bias=True)
)
(norm3): LayerNorm((512,), eps=1e-06, elementwise_affine=True)
(dropout1): Dropout(p=0.1, inplace=False)
(dropout2): Dropout(p=0.1, inplace=False)
(dropout3): Dropout(p=0.1, inplace=False)
)
)
)
(final_linear): Linear(in_features=512, out_features=8000, bias=True)
)
출력된 모델 구조를 보니 위에서 정의해 놓았던 모든 layer들이 잘 포함되어 있는 것을 볼 수 있다.
6. 모델 학습
6.1. 손실함수 & 학습률
학습률은 스케쥴링 기법을 사용한다.
# 손실 함수
loss_function = nn.CrossEntropyLoss(ignore_index=sp.pad_id())
# Learning rate
## Custom Learning rate Scheduling
## 모델 학습 초기에는 lr 급격히 높였다가,
## 이후 train step에서 천천히 낮추며 안정적으로 수렴하게 하는 고급 기법
def get_lr_lambda(d_model, warmup_steps=4000):
d_model = float(d_model)
def lr_lambda(step):
# step은 0부터 시작하므로 +1로 보정
step = step + 1
return (d_model ** -0.5) * min(step ** -0.5, step * (warmup_steps ** -1.5))
return lr_lambda
# 하이퍼파라미터 설정
d_model = 512
warmup_steps = 4000
total_steps = 200000 # 총 학습 스텝
# 학습률 스케줄 시각화
steps = np.arange(1, total_steps + 1)
learning_rates = [get_lr_lambda(d_model, warmup_steps)(step) for step in steps]
# 그래프 출력
plt.figure(figsize=(10, 5))
plt.plot(steps, learning_rates, label="Learning Rate")
plt.xlabel("Training Steps")
plt.ylabel("Learning Rate")
plt.title("Transformer Learning Rate Schedule")
plt.legend()
plt.grid(True)
plt.show()

6.2. 모델 컴파일
옵티마이저와 스케쥴러를 선언하고,
Acc 측정 함수를 정의한다.
# Optimizer 정의
optimizer = optim.Adam(model.parameters(), betas=(0.9, 0.98), eps=1e-9)
# Scheduler 정의
scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=get_lr_lambda(D_MODEL, warmup_steps=4000))
def accuracy_function(y_pred, y_true, pad_id=0):
"""
y_pred: (batch_size, seq_len, vocab_size)
y_true: (batch_size, seq_len)
"""
preds = y_pred.argmax(dim=-1) # (batch_size, seq_len)
mask = (y_true != pad_id)
correct = (preds == y_true) & mask
acc = correct.float().sum() / mask.float().sum()
return acc
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
6.3. train 함수 정의
def train_step(model, batch, optimizer, loss_function, device):
model.train()
enc_input, dec_input, target = [x.to(device) for x in batch]
optimizer.zero_grad()
# 모델 포워드 패스
logits = model(enc_input, dec_input) # (batch_size, seq_len, vocab_size)
# Loss 계산 (패딩 토큰 무시)
loss = loss_function(logits.permute(0, 2, 1), target) # (batch_size, vocab_size, seq_len) 필요
# Backpropagation
loss.backward()
optimizer.step()
return loss.item(), accuracy_function(logits, target, pad_id=sp.pad_id())
def train(model, dataloader, optimizer, loss_function, scheduler, num_epochs, device):
model.to(device)
for epoch in range(num_epochs):
total_loss, total_acc = 0, 0
for step, batch in enumerate(dataloader):
loss, acc = train_step(model, batch, optimizer, loss_function, device)
total_loss += loss
total_acc += acc
# 일정 스텝마다 로그 출력
if step % 100 == 0:
print(f"[Epoch {epoch+1}, Step {step}] Loss: {loss:.4f}, Acc: {acc:.4f}")
# 학습률 스케줄러 업데이트
scheduler.step()
avg_loss = total_loss / len(dataloader)
avg_acc = total_acc / len(dataloader)
print(f"Epoch {epoch+1} Completed - Avg Loss: {avg_loss:.4f}, Avg Acc: {avg_acc:.4f}")
6.4. 모델 학습
%%time
train(
model=model,
dataloader=dataloader,
optimizer=optimizer,
loss_function=loss_function,
scheduler=scheduler,
num_epochs=50, # 원하는 에폭 수
device=device
)
[Epoch 1, Step 0] Loss: 9.1348, Acc: 0.0000
[Epoch 1, Step 100] Loss: 9.1644, Acc: 0.0000
[Epoch 1, Step 200] Loss: 9.1202, Acc: 0.0000
[Epoch 1, Step 300] Loss: 8.9643, Acc: 0.0000
Epoch 1 Completed - Avg Loss: 9.0817, Avg Acc: 0.0002
[Epoch 2, Step 0] Loss: 8.9876, Acc: 0.0000
[Epoch 2, Step 100] Loss: 8.8341, Acc: 0.0000
[Epoch 2, Step 200] Loss: 8.6066, Acc: 0.0390
[Epoch 2, Step 300] Loss: 8.5195, Acc: 0.1598
Epoch 2 Completed - Avg Loss: 8.6568, Avg Acc: 0.0622
[Epoch 3, Step 0] Loss: 8.2930, Acc: 0.2028
[Epoch 3, Step 100] Loss: 7.9630, Acc: 0.2500
[Epoch 3, Step 200] Loss: 8.0413, Acc: 0.2383
[Epoch 3, Step 300] Loss: 7.7864, Acc: 0.2542
Epoch 3 Completed - Avg Loss: 7.9280, Avg Acc: 0.2421
.
.
.
[Epoch 48, Step 0] Loss: 5.6416, Acc: 0.3134
[Epoch 48, Step 100] Loss: 5.4186, Acc: 0.3220
[Epoch 48, Step 200] Loss: 5.6109, Acc: 0.3151
[Epoch 48, Step 300] Loss: 5.7959, Acc: 0.3122
Epoch 48 Completed - Avg Loss: 5.6558, Avg Acc: 0.3077
[Epoch 49, Step 0] Loss: 5.8584, Acc: 0.2920
[Epoch 49, Step 100] Loss: 5.5221, Acc: 0.3091
[Epoch 49, Step 200] Loss: 5.7842, Acc: 0.3063
[Epoch 49, Step 300] Loss: 5.6101, Acc: 0.3102
Epoch 49 Completed - Avg Loss: 5.6470, Avg Acc: 0.3078
[Epoch 50, Step 0] Loss: 5.7390, Acc: 0.3000
[Epoch 50, Step 100] Loss: 5.5182, Acc: 0.3242
[Epoch 50, Step 200] Loss: 5.8209, Acc: 0.2890
[Epoch 50, Step 300] Loss: 5.2307, Acc: 0.3397
Epoch 50 Completed - Avg Loss: 5.6404, Avg Acc: 0.3077
CPU times: user 41min 15s, sys: 4.12 s, total: 41min 19s
Wall time: 41min 34s
7. 챗봇 테스트
예측(inference) 단계는 기본적으로 다음과 같은 과정을 거친다.
- 새로운 입력 문장에 대해서는 훈련 때와 동일한 전처리를 거친다.
- 입력 문장을 토크나이징하고, START_TOKEN과 END_TOKEN을 추가한다.
- 패딩 마스킹과 룩 어헤드 마스킹을 계산한다.
- 디코더는 입력 시퀀스로부터 다음 단어를 예측한다.
- 디코더는 예측된 다음 단어를 기존의 입력 시퀀스에 추가하여 새로운 입력으로 사용한다.
- END_TOKEN이 예측되거나 문장의 최대 길이에 도달하면 디코더는 동작을 멈춘다.
위의 과정을 모두 담은 decoder_inference() 함수를 정의한다.
def decoder_inference(model, sentence, tokenizer, device='cpu'):
START_TOKEN = tokenizer.bos_id()
END_TOKEN = tokenizer.eos_id()
MAX_LENGTH = 40
# 전처리
sentence = preprocess_sentence(sentence)
# 인코더 입력: [START] + 인코딩 + [END]
enc_input_ids = [START_TOKEN] + tokenizer.encode(sentence) + [END_TOKEN]
# 차원 확장: (batch_size=1, seq_len)
enc_input = torch.tensor([enc_input_ids], dtype=torch.long, device=device)
# 디코더 입력(dec_input)을 START_TOKEN만 포함한 상태로 시작
dec_input = torch.tensor([[START_TOKEN]], dtype=torch.long, device=device)
model.eval() # 모델 평가 모드
with torch.no_grad():
for i in range(MAX_LENGTH):
# 모델 forward: (enc_input, dec_input) -> (batch_size=1, seq_len, vocab_size)
logits = model(enc_input, dec_input)
# 마지막 타임스텝의 예측만 추출: shape (1, 1, vocab_size)
# logits[:, -1, :] -> (1, vocab_size)
last_step_logits = logits[:, -1, :]
# argmax로 가장 높은 확률의 토큰 선택
predicted_id = torch.argmax(last_step_logits, dim=-1) # shape: (1,)
# 종료 토큰이면 중단
if predicted_id.item() == END_TOKEN:
break
# 디코더 입력(dec_input)에 예측 토큰을 이어붙임
predicted_id = predicted_id.unsqueeze(0) # shape (1,1)
dec_input = torch.cat([dec_input, predicted_id], dim=1)
# 최종 시퀀스: dec_input: (1, seq_len)에서 (seq_len,)로
output_sequence = dec_input.squeeze(0).tolist() # e.g. [START_TOKEN, ..., 토큰들...]
return output_sequence
def sentence_generation(model, sentence, tokenizer, device='cpu'):
# 디코더 인퍼런스 -> 예측된 토큰 시퀀스
output_seq = decoder_inference(model, sentence, tokenizer, device=device)
# 토크나이저로 디코딩 (패딩, START/END 토큰 등은 제외하거나 처리)
# 여기서는 단순히 tokenizer.decode() 직접 호출
predicted_sentence = tokenizer.decode(
[token for token in output_seq if token < tokenizer.GetPieceSize()]
)
print("입력 :", sentence)
print("출력 :", predicted_sentence)
return predicted_sentence
sentence = '이번 겨울에 여자친구랑 일본 여행 갈거야'
sentence_generation(model, sentence, sp, device)
결과:
입력 : 이번 겨울에 여자친구랑 일본 여행 갈거야
출력 : 좋은 더 좋은 .
흠... 출력 문장을 보니 맥락에 맞지 않는 이상한 말을 하고 있다(사실 말이라고 하기도 이상한, 그저 단어들).
이후 문제를 해결하기 위해 Ablation study를 몇 번 더 진행했지만,
1차 시도:
NUM_LAYERS = 3
UNITS = 512
Epoch 50 Completed - Avg Loss: 5.8324, Avg Acc: 0.2938
2차 시도:
NUM_LAYERS = 3 → 6
UNITS = 512 → 2048
Epoch 50 Completed - Avg Loss: 5.6404, Avg Acc: 0.3077
3차 시도:
??
왜 3차시도부터의 결과가 없냐하면? 모두 날아갔다.
코랩에서 진행하던 중 GPU 할당량을 모두 사용해서 ipynb 파일을 다운받고 다른 계정에서 열어보니..
2차 시도까지의 결과만 저장되어 있고 이후 7? 8?차 시도까지의 메모들이 모두 사라져있었다 OTL
하지만 기억나는 건 챗봇의 성능이 처음에 비해 눈에 띄게 좋아지지 않았다는 점이다ㅋㅋ
이유가 뭘까???
아마 학습량 부족이 가장 큰 원인이지 않을까 생각한다.
(추후 광석 퍼실님께 듣기로는 원래 transformer 모델은 GPU 8개로 4일 이상 학습했다고 한다...)
결과가 시원치 않지만 처음에 언급했던 대로, 배웠던 transformer의 구조를 직접 만들어보며 공부를 했다는 것에 의미를 두고,
성능 개선은 다음 Going-Deeper 단계의 NLP 과정에서 이어서 해볼 것이다. (아마 11월 중순이 될 듯)
결론은
transformer를 처음 공부할 때는 난잡하고 도통 머리에 들어오지 않았었는데, 이것저것 문서, 영상 자료 찾아보고 직접 구현까지 해보니, 처음에 비해서 훨씬 명확히 이해하고 있다고 생각한다. (퍼실님 질문이 들어오면 어버버할 것 같긴 함)
다음 포스팅은 대망의 아이펠 첫 번째 해커톤 DLthon에 대해 다루겠습니다.
'AI > NLP' 카테고리의 다른 글
| [11/6~11/11] 아이펠 리서치 15기 TIL | DLthon: 감정 분류 해커톤 (2) (0) | 2025.11.15 |
|---|---|
| [11/6~11/11] 아이펠 리서치 15기 TIL | DLthon: 감정 분류 해커톤 (1) (0) | 2025.11.15 |
| [10/31] 아이펠 리서치 15기 TIL | Transformer (0) | 2025.11.05 |
| [10/29] 아이펠 리서치 15기 TIL | Attention (0) | 2025.11.01 |
| [10/26] 아이펠 리서치 15기 TIL | RNN & LSTM (1) | 2025.10.26 |