허깅 페이스 아르길라 뉴스 요약 데이터세트 사용

#뉴스 요약 데이터세트 불러오기
import numpy as np
from datasets import load_dataset

# "argilla/news-summary" 데이터셋 로드
news = load_dataset("argilla/news-summary", split="test")

# pandas 데이터프레임으로 변환한 후 5000개의 샘플을 랜덤하게 추출하여 샘플링
df = news.to_pandas().sample(5000, random_state=42)[["text", "prediction"]]

# 요약을 위한 입력 텍스트에 "summarize: " 문자열 추가
df["text"] = "summarize: " + df["text"]

# 정답 요약문 컬럼을 라벨로 변경
df["prediction"] = df["prediction"].map(lambda x: x[0]["text"])

# 전체 데이터셋을 훈련, 검증, 테스트 세트로 나눔
train, valid, test = np.split(
    df.sample(frac=1, random_state=42), [int(0.6 * len(df)), int(0.8 * len(df))]
)

# 훈련, 검증, 테스트 데이터셋 크기 출력
print(f"Source News : {train.text.iloc[0][:200]}")
print(f"Summarization : {train.prediction.iloc[0][:50]}")
print(f"Training Data Size : {len(train)}")
print(f"Validation Data Size : {len(valid)}")
print(f"Testing Data Size : {len(test)}")
#뉴스 요약 데이터세트 전처리
import torch
from transformers import T5Tokenizer
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.data import RandomSampler, SequentialSampler
from torch.nn.utils.rnn import pad_sequence

# 데이터셋 생성 함수 정의
def make_dataset(data, tokenizer, device):
    # 소스 문장을 토큰화하고 텐서로 변환
    source = tokenizer(
        text=data.text.tolist(),
        padding="max_length",
        max_length=128,
        pad_to_max_length=True,
        truncation=True,
        return_tensors="pt"
    )

    # 대상 요약 문장을 토큰화하고 텐서로 변환
    target = tokenizer(
        text=data.prediction.tolist(),
        padding="max_length", 
        max_length=128,
        pad_to_max_length=True,
        truncation=True,
        return_tensors="pt"
    )
    
    # 소스 및 대상의 입력 토큰 및 어텐션 마스크를 가져옴
    source_ids = source["input_ids"].squeeze().to(device)
    source_mask = source["attention_mask"].squeeze().to(device)
    target_ids = target["input_ids"].squeeze().to(device)
    target_mask = target["attention_mask"].squeeze().to(device)
    
    # 소스 및 대상의 입력 토큰 및 어텐션 마스크를 텐서 데이터셋으로 변환하여 반환
    return TensorDataset(source_ids, source_mask, target_ids, target_mask)

# 데이터로더 생성 함수 정의
def get_datalodader(dataset, sampler, batch_size):
    data_sampler = sampler(dataset)
    dataloader = DataLoader(dataset, sampler=data_sampler, batch_size=batch_size)
    return dataloader

# 하이퍼파라미터 설정
epochs = 5
batch_size = 8
device = "cuda" if torch.cuda.is_available() else "cpu"
tokenizer = T5Tokenizer.from_pretrained(
    pretrained_model_name_or_path="t5-small"
)

# 훈련 데이터셋 전처리 및 데이터로더 생성
train_dataset = make_dataset(train, tokenizer, device)
train_dataloader = get_datalodader(train_dataset, RandomSampler, batch_size)

# 검증 데이터셋 전처리 및 데이터로더 생성
valid_dataset = make_dataset(valid, tokenizer, device)
valid_dataloader = get_datalodader(valid_dataset, SequentialSampler, batch_size)

# 테스트 데이터셋 전처리 및 데이터로더 생성
test_dataset = make_dataset(test, tokenizer, device)
test_dataloader = get_datalodader(test_dataset, SequentialSampler, batch_size)

# 데이터로더에서 첫 번째 미니배치 출력하여 확인
print(next(iter(train_dataloader)))

# 토크나이저의 특정 토큰 인덱스를 토큰으로 변환하여 출력 (예: 시작 토큰, 종료 토큰)
print(tokenizer.convert_ids_to_tokens(21603))
print(tokenizer.convert_ids_to_tokens(10))
#T5 모델 선언
from torch import optim
from transformers import T5ForConditionalGeneration

model = T5ForConditionalGeneration.from_pretrained(
    pretrained_model_name_or_path="t5-small",  # 사전 학습된 T5-small 모델 로드
).to(device)  # 모델을 GPU 또는 CPU로 이동

# 옵티마이저 설정 (AdamW 사용)
optimizer = optim.AdamW(
    model.parameters(),  # 모델의 파라미터를 최적화 대상으로 지정
    lr=1e-5,  # 학습률 설정
    eps=1e-8  # AdamW 옵티마이저의 epsilon 값 설정
)
#T5 모델 학습 및 평가
import numpy as np
from torch import nn

# 정확도 계산 함수 정의
def calc_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()  # 예측값에서 가장 높은 확률을 가진 인덱스 추출
    labels_flat = labels.flatten()  # 라벨을 1차원 배열로 평평하게 펴기
    return np.sum(pred_flat == labels_flat) / len(labels_flat)  # 정확하게 예측한 비율 계산

# 모델 학습 함수 정의
def train(model, optimizer, dataloader):
    model.train()  # 모델을 학습 모드로 설정
    train_loss = 0.0

    for source_ids, source_mask, target_ids, target_mask in dataloader:
        decoder_input_ids = target_ids[:, :-1].contiguous()  # 디코더 입력 토큰 제거
        labels = target_ids[:, 1:].clone().detach()  # 라벨 생성
        labels[target_ids[:, 1:] == tokenizer.pad_token_id] = -100  # 패딩 토큰을 무시하기 위해 -100으로 설정

        outputs = model(
            input_ids=source_ids,
            attention_mask=source_mask,
            decoder_input_ids=decoder_input_ids,
            labels=labels,
        )

        loss = outputs.loss
        train_loss += loss.item()  # 학습 손실 누적

        optimizer.zero_grad()  # 기울기 초기화
        loss.backward()  # 역전파 진행
        optimizer.step()  # 옵티마이저로 파라미터 업데이트

    train_loss = train_loss / len(dataloader)  # 전체 손실을 미니배치 수로 나누어 평균 손실 계산
    return train_loss

# 모델 평가 함수 정의
def evaluation(model, dataloader):
    with torch.no_grad():
        model.eval()  # 모델을 평가 모드로 설정
        val_loss = 0.0

        for source_ids, source_mask, target_ids, target_mask in dataloader:
            decoder_input_ids = target_ids[:, :-1].contiguous()  # 디코더 입력 토큰 제거
            labels = target_ids[:, 1:].clone().detach()  # 라벨 생성
            labels[target_ids[:, 1:] == tokenizer.pad_token_id] = -100  # 패딩 토큰을 무시하기 위해 -100으로 설정

            outputs = model(
                input_ids=source_ids,
                attention_mask=source_mask,
                decoder_input_ids=decoder_input_ids,
                labels=labels,
            )

            loss = outputs.loss
            val_loss += loss

    val_loss = val_loss / len(dataloader)  # 전체 손실을 미니배치 수로 나누어 평균 손실 계산
    return val_loss

best_loss = 10000
for epoch in range(epochs):
    train_loss = train(model, optimizer, train_dataloader)  # 학습 진행
    val_loss = evaluation(model, valid_dataloader)  # 검증 진행
    print(f"Epoch {epoch + 1}: Train Loss: {train_loss:.4f} Val Loss: {val_loss:.4f}")

    if val_loss < best_loss:  # 현재 검증 손실이 최고 손실보다 낮은 경우
        best_loss = val_loss  # 최고 손실 갱신
        torch.save(model.state_dict(), "./models/T5ForConditionalGeneration.pt")  # 모델 가중치 저장
        print("Saved the model weights")  # 모델 가중치 저장 완료 메시지 출력
#T5 생성 모델 테스트
model.eval()  # 모델을 평가 모드로 설정
with torch.no_grad():  # 그래디언트 계산 비활성 모드
    for source_ids, source_mask, target_ids, target_mask in test_dataloader:  # 테스트 데이터로더 순회
        generated_ids = model.generate(  # T5 모델을 사용하여 요약 생성
            input_ids=source_ids,  # 입력 문장 토큰
            attention_mask=source_mask,  # 입력 어텐션 마스크
            max_length=128,  # 생성된 문장의 최대 길이
            num_beams=3,  # 빔 검색을 위한 빔 수
            repetition_penalty=2.5,  # 중복 패널티
            length_penalty=1.0,  # 길이 패널티
            early_stopping=True,  # 조기 중단 설정
        )

        for generated, target in zip(generated_ids, target_ids):  # 생성된 문장과 실제 요약 문장을 반복하며 비교
            pred = tokenizer.decode(  # 생성된 요약 문장을 텍스트로 디코딩
                generated, skip_special_tokens=True, clean_up_tokenization_spaces=True
            )
            actual = tokenizer.decode(  # 실제 요약 문장을 텍스트로 디코딩
                target, skip_special_tokens=True, clean_up_tokenization_spaces=True
            )
            print("Generated Headline Text:", pred)  # 생성된 요약 문장 출력
            print("Actual Headline Text   :", actual)  # 실제 요약 문장 출력
        break  # 첫 번째 미니배치만 처리하고 반복문 종료