#!head -n 3 /proc/meminfo # 코랩 확인용 리눅스 메모리 확인
import torch
torch.cuda.is_available() # 코랩에서 내가 GPU 사용 중인지 확인
from google.colab import drive
drive.mount('/content/drive') # 구글 드라이브의 파일을 추가 학습하기 위하여 구글드라이브 연동
!pip install ratsnlp # 대표적 모델인 이거 다운..
###########################################################
import torch # 토치 임포트.
from ratsnlp.nlpbook.qa import QATrainArguments
args = QATrainArguments( # 보미 코리아 QA모델 사용 기본 모델 셋팅
pretrained_model_name="beomi/kcbert-base",
downstream_corpus_name="korquad-v1",
downstream_model_dir="/content/drive/MyDrive/notebook",
max_seq_length=256,#학습최대
max_query_length=32,#질문최대길이.
doc_stride=64,
batch_size=32 if torch.cuda.is_available() else 4,
learning_rate=0.000001,
epochs=7,
tpu_cores=0 if torch.cuda.is_available() else 8,
seed=7,
)
주석을 써두었으니 대강 읽어보면서 사전 학습된 한국어QA 모델인 korquad-v1 사용
from ratsnlp import nlpbook
nlpbook.set_seed(args)
nlpbook.set_logger(args)
nlpbook.download_downstream_dataset(args) # 데이터 다운
########토크나이저 정의 #####
from transformers import BertTokenizer
tokenizer = BertTokenizer.from_pretrained( # 사전학습 모델의 토크나이저가 제일 적당하다
args.pretrained_model_name,
do_lower_case=False,
)
당연한 말아지만 사전학습된 모델의 최적 토크나이저는 사전학습 시 사용한 토크나이저 기본 옵션이 제일 적당하다.
# 학습데이터 구축
from ratsnlp.nlpbook.qa import KorQuADV1Corpus, QADataset
from torch.utils.data import DataLoader, SequentialSampler, RandomSampler
corpus = KorQuADV1Corpus()
train_dataset = QADataset(
args=args,
corpus=corpus,
tokenizer=tokenizer,
mode="train",
)
train_dataloader = DataLoader(
train_dataset,
batch_size=args.batch_size,
sampler=RandomSampler(train_dataset, replacement=False),
collate_fn=nlpbook.data_collator,
drop_last=False,
num_workers=args.cpu_workers,
)
학습데이터를 구축하고 (여기서 내가 json파일을 가공하였다면 형식을 여기서 사용하는 학습 파일과 맞춘뒤 파일명 그대로 해서 바꾸면 추가학습이 가능하다.)
#테스트 데이터 구축
val_dataset = QADataset(
args=args,
corpus=corpus,
tokenizer=tokenizer,
mode="val",
)
val_dataloader = DataLoader(
val_dataset,
batch_size=args.batch_size,
sampler=SequentialSampler(val_dataset),
collate_fn=nlpbook.data_collator,
drop_last=False,
num_workers=args.cpu_workers,
)
테스트 데이터는 그대로 사용하겠다.
# 모델
from transformers import BertForQuestionAnswering
model = BertForQuestionAnswering.from_pretrained(
args.pretrained_model_name,
# config = config
config="beomi/kcbert-base",
)
# pretrained_model_config = BertConfig.from_pretrained("beomi/kcbert-base",max_position_embeddings = 512) # BertForQuestionAnswering.from_pretrained(pretrained_model_name_or_path = "beomi/kcbert-base").config_class(max_position_embeddings = 1024)# .config_class
# model = BertForQuestionAnswering.from_pretrained(
# args.pretrained_model_name,
# # config = config
# config=pretrained_model_config,)
# from transformers import BertForQuestionAnswering
# model = BertForQuestionAnswering.from_pretrained(
# args.pretrained_model_name,
# config=pretrained_model_config,
# )
# 최초 학습 아닐시 안해도 됨
from ratsnlp.nlpbook.qa import QATask
task = QATask(model, args)
trainer = nlpbook.get_trainer(args)
# 학습시작
trainer.fit(
task,
train_dataloaders=train_dataloader,
val_dataloaders=val_dataloader,
)
학습을 시작한다.
# model.to(device) # 모델을 GPU에 얹어줌,.
torch.save(model, f'./BertForQuestionAnswering_batch_32_ep7_v1.model.pt')
모델을 저장하고
# model= torch.load("/content/BertForQuestionAnswering.model.pt", map_location=device)
# model= torch.load("/content/BertForQuestionAnswering.model.pt") # 모델 로드
model= torch.load("/content/drive/MyDrive/notebook/BertForQuestionAnswering_batch_32_ep7_v1.model.pt") # 모델 로드
때에 맞게 저장된 모델을 불러온다.
from typing import List, Tuple, Dict, Any
import json
import random
class KoMRC:
def __init__(self, data, indices: List[Tuple[int, int, int]]):
self._data = data
self._indices = indices
# Json을 불러오는 메소드
@classmethod
def load(cls, file_path: str):
with open(file_path, 'r', encoding='utf-8') as fd:
data = json.load(fd)
indices = []
for d_id, document in enumerate(data['data']):
for p_id, paragraph in enumerate(document['paragraphs']):
for q_id, _ in enumerate(paragraph['qas']):
indices.append((d_id, p_id, q_id))
return cls(data, indices)
# 데이터 셋을 잘라내는 메소드
@classmethod
def split(cls, dataset, eval_ratio: float=.1, seed=42):
indices = list(dataset._indices)
random.seed(seed)
random.shuffle(indices)
train_indices = indices[int(len(indices) * eval_ratio):]
eval_indices = indices[:int(len(indices) * eval_ratio)]
return cls(dataset._data, train_indices), cls(dataset._data, eval_indices)
def __getitem__(self, index: int) -> Dict[str, Any]: # 슬라이싱 사용가능
d_id, p_id, q_id = self._indices[index]
paragraph = self._data['data'][d_id]['paragraphs'][p_id]
context = paragraph['context']
qa = paragraph['qas'][q_id]
guid = qa['id']
question = qa['question']
answers = qa['answers']
return {
'id': guid,
'context': context,
'question': question,
'answers': answers
}
def __len__(self) -> int: # len() 가능
return len(self._indices)
dataset = KoMRC.load('/content/Korpora/korquad-v1/KorQuAD_v1.0_test.json')
print("Number of Samples:", len(dataset))
print(dataset[0])
모델을 로드하고 샘플 갯수를 세서 하나만 확인하는 식으로 구성해두었다.
question = dataset[0]["question"]
context = dataset[0]["context"]
truncated_query = tokenizer.encode(
question,
add_special_tokens=False,
truncation=True,
max_length=args.max_query_length
)
inputs = tokenizer.encode_plus(
text=truncated_query,
text_pair=context,
truncation="only_second",
padding="max_length",
max_length=args.max_seq_length,
return_token_type_ids=True,
)
outputs = model(**{k: torch.tensor([v]) for k, v in inputs.items()})
질문과 응답을 변수에 저장하고 아웃풋들을 뽑아낸다.
start_pred = outputs.start_logits.argmax(dim=-1).item()
end_pred = outputs.end_logits.argmax(dim=-1).item()
pred_text = tokenizer.decode(inputs['input_ids'][start_pred:end_pred+1])
{'start_pred' : start_pred,
'end_pred' : end_pred,
'question': question,
'context': context,
'answer': pred_text,}
해당 부분은 5가지 필요한 부분만 좀 잘라냈다. 왜냐하면 여기서 내가 원하는 정답은 문장 수준이 아니기 때문이다
import numpy as np
def get_top_answers(possible_starts,possible_ends,input_ids):
answers = []
for start,end in zip(possible_starts,possible_ends):
#+1 for end
answer = tokenizer.convert_tokens_to_string(tokenizer.convert_ids_to_tokens(input_ids[start:end+1]))
answers.append( answer )
return answers
가장 높은 수준의 답들을 가져오게 하였다. 답이 빈칸인 경우도 존재한다. (문장이거나 단어 수준도 존재)
import numpy as np
def answer_question(question,context,topN):
truncated_query = tokenizer.encode(
question,
add_special_tokens=False,
truncation=True,
max_length=args.max_query_length
)
inputs = tokenizer.encode_plus(
text=truncated_query,
text_pair=context,
truncation="only_second",
padding="max_length",
max_length=args.max_seq_length,
return_token_type_ids=True,)
input_ids = inputs["input_ids"]
outputs = model(**{k: torch.tensor([v]) for k, v in inputs.items()})
answer_start_scores = outputs["start_logits"]
answer_end_scores = outputs["end_logits"]
possible_starts = np.argsort(answer_start_scores.cpu().detach().numpy()).flatten()[::-1][:topN]
possible_ends = np.argsort(answer_end_scores.cpu().detach().numpy()).flatten()[::-1][:topN]
answer_start = torch.argmax(answer_start_scores)
# Get the most likely end of answer with the argmax of the score
answer_end = torch.argmax(answer_end_scores) + 1
answer = tokenizer.convert_tokens_to_string(tokenizer.convert_ids_to_tokens(input_ids[answer_start:answer_end]))
answers = get_top_answers(possible_starts,possible_ends,input_ids )
answer_info = {"answer":answer,"answer_start":answer_start,"answer_end":answer_end,"input_ids":input_ids,
"answer_start_scores":answer_start_scores,"answer_end_scores":answer_end_scores,"inputs":inputs,"answers":answers,
"possible_starts":possible_starts,"possible_ends":possible_ends}
for i in range(len(answers)):
answers[i] = answers[i].replace("[CLS]","")
answers[i] = answers[i].replace("[SEP]","")
answer = answers[0]
while True:
for i in range(len(answers)):
if (len(answer)>10) or (len(answer) == 0):
answer = answers[i]
break
return answer
문장이거나 단어 수준을 제외하기 위해서 답의 길이가 10개가 넘는것을 if 문을 통해 제거하였고, cls , sep등을 포함하는 단어(5개씩 뽑다보니 같이 나와버림)들은 문장중에서 해당 부분을 제거하였다.
question = dataset[0]["question"]
context = dataset[0]["context"]
topN = 5
truncated_query = tokenizer.encode(
question,
add_special_tokens=False,
truncation=True,
max_length=args.max_query_length
)
inputs = tokenizer.encode_plus(
text=truncated_query,
text_pair=context,
truncation="only_second",
padding="max_length",
max_length=args.max_seq_length,
return_token_type_ids=True,)
input_ids = inputs["input_ids"]
outputs = model(**{k: torch.tensor([v]) for k, v in inputs.items()})
answer_start_scores = outputs["start_logits"]
answer_end_scores = outputs["end_logits"]
possible_starts = np.argsort(answer_start_scores.cpu().detach().numpy()).flatten()[::-1][:topN]
possible_ends = np.argsort(answer_end_scores.cpu().detach().numpy()).flatten()[::-1][:topN]
answer_start = torch.argmax(answer_start_scores)
# Get the most likely end of answer with the argmax of the score
answer_end = torch.argmax(answer_end_scores) + 1
answer = tokenizer.convert_tokens_to_string(tokenizer.convert_ids_to_tokens(input_ids[answer_start:answer_end]))
answers = get_top_answers(possible_starts,possible_ends,input_ids )
answer_info = {"answer":answer,"answer_start":answer_start,"answer_end":answer_end,"input_ids":input_ids,
"answer_start_scores":answer_start_scores,"answer_end_scores":answer_end_scores,"inputs":inputs,"answers":answers,
"possible_starts":possible_starts,"possible_ends":possible_ends}
for i in range(len(answers)):
answers[i] = answers[i].replace("[CLS]","")
answers[i] = answers[i].replace("[SEP]","")
answer = answers[0]
while True:
for i in range(len(answers)):
if (len(answer)>10) or (len(answer) == 0):
answer = answers[i]
break
해당 부분에서 전부 걸러져서 나오게 된다. 위에 두가지 펑션을 사용하였다.
결과적으로 5개중에서 단어 수준인 정답일 확율 가장 높은 5개중 1개가 들어간다.
파일은 두개 첨부하였는데 하나는 코랩에서 GPU사용했던 파일이고 하나는 컴퓨터 내부에서 전처리용으로만 썻던 파일이다.
'IT - 코딩 > AI, 예측모델' 카테고리의 다른 글
chat GPT를 활용한 한국어 댓글 긍부정 판단 vs 공개 사전학습 모델 긍부정 판단 비교 (with python) (0) | 2023.06.23 |
---|---|
자연어 처리 _ 키워드 추출 key bert ) with python & pytorch (2) | 2023.01.03 |
자연어처리 긍부정 판단 with python & pytorch (bert) (0) | 2022.11.19 |
딥러닝을 응용한 환율예측으로 가상화폐 차익거래 기회 백테스팅 (2) 수익율 측정 (0) | 2022.09.08 |
딥러닝을 응용한 환율예측으로 가상화폐 차익거래 기회 백테스팅 (1) (0) | 2022.09.08 |