Fine-tuning LLM (4) Load & Download data

2025. 6. 10. 21:41·IT, Digital

썸네일

서론

지난 포스팅에서는 데이터를 정제하는 코드를 작성했다. 이번 포스팅에서는 데이터를 로드하고 로드된 데이터를 저장하는 방법에 대해서 적도록 하겠다. 이전 포스팅을 보려면 아래 포스팅을 참고하길 바란다. 여기서 만든 클래스를 활용해서 정제된 데이터를 로딩하는 것이기에 꼭 필요한 과정이다.

 

Fine-tuning LLM (3) Item Parsing Class

서론지난 포스팅에서는 데이터셋을 어떻게 수집하고, 데이터를 어떻게 분석하는 지 그 과정에 대해서 알아보았다. 이번 포스팅에서는 이렇게 분석한 데이터를 어떻게 프롬프트에 반영하는지에

quiseol.com


init  함수와 load 함수

우선 init함수를 통해 name과 dataset을 초기화한다. name은 카테고리명을, dataset은 데이터셋을 나중에 로드할때 사용된다. 이후 실제 데이터를 로드하기 위해 load_dataset 함수를 만든다. 그러면 처음에 loader = ItemLoader("toys")라고 하면 toy라는 카테고리가 name에 들어가고 이후 items = loader.load()를 선언 하면 아래 함수가 작동되는 것이다. 아래 load를 할때 metadata에 self.name이 들어가는데 이거는 이 데이터셋의 이름이 저렇게 되어있기 때문이다.

def load(self, workers=8):
        print(f"Loading dataset {self.name}", flush=True)
        self.dataset = load_dataset("McAuley-Lab/Amazon-Reviews-2023", f"raw_meta_{self.name}", split="full", trust_remote_code=True)
        results = self.load_in_parallel(workers)
        return results

 

load_in_parallel(workers)는 loading하는 것을 병렬로 처리하게 만드는 함수다. workers는 본인이 쓰는 컴퓨터가 몇 코어인지에 따라 달라짐. 아무튼 이어서 load_in_parallel 함수를 적겠다.

    def load_in_parallel(self, workers):
        results = []
        chunk_count = (len(self.dataset) // CHUNK_SIZE) + 1
        with ProcessPoolExecutor(max_workers=workers) as pool:
            for batch in tqdm(pool.map(self.from_chunk, self.chunk_generator()), total=chunk_count):
                results.extend(batch)
        for result in results:
            result.category = self.name
        return results

 

그럼 workers의 개수에 따라 ProcessPoolExecutor가 병렬처리를 시작한다. 이 과정을 통해 프로세스가 빨라지게 된다. 그리고 이때 dataset을 병렬처리하기 위해 chunk단위로 나눠야되는데 이걸 chunk_generator함수를 통해 구현한다. chunk size로 분할해서 읽는 것이다. 해당 함수는 아래와 같다. 전체 사이즈를 chunk 단위로 나눠서 yield하는 구조임. 참고로 chunk_size는 1000으로 줬다. 전체는 40만개가 넘는 데이터지만, 평균적으로 한 카테코리당 5만개정도 있으니까 1000개가 적당하다고 판단한 것이다.

def chunk_generator(self):
        size = len(self.dataset)
        for i in range(0, size, CHUNK_SIZE):
            yield self.dataset.select(range(i, min(i + CHUNK_SIZE, size)))

이렇게 chunk_generator로부터 받은 것들을 필터링하기 위해서는 필터링 뒤 tem 객체 리스트를 리턴하는 함수를 만들어야 된다. 그것은 아래와 같다.

 def from_chunk(self, chunk):
        batch = []
        for datapoint in chunk:
            result = self.from_datapoint(datapoint)
            if result:
                batch.append(result)
        return batch

chunk list를 받아서 from_datapoint(=필터링 함수)에서 필터링을 한 뒤 batch에 넣어주는 함수인 것이다. 필터링하는 from_dataset함수는 아래와 같다.

    def from_datapoint(self, datapoint):
        try:
            price_str = datapoint['price']
            if price_str:
                price = float(price_str)
                if MIN_PRICE <= price <= MAX_PRICE:
                    item = Item(datapoint, price)
                    return item if item.include else None
        except ValueError:
            return None

여기서 MIN_PRICE, MAX_PRICE가 있는 이유는 outlier 제거용이다. 좀 더 데이터 분석적으로 접근할 수도 있지만 그게 목적은 아니기에 1000미만에서 끊었다.


Loading 함수 최종

이번 포스팅은 입력을 받으면 함수가 작동되는 순서로 포스팅을 했다. 하지만 a라는 함수가 b라는 함수를 사용한다면 b함수는 a함수 위에서 선언되어야 함은 기본적인 상식이다. 그래서 본 포스팅의 역순으로 class를 구성하면 된다. 아무튼 전체 코드는 아래와 같다.

from datetime import datetime
from tqdm import tqdm
from datasets import load_dataset
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from items import Item

CHUNK_SIZE = 1000
MIN_PRICE = 0.5
MAX_PRICE = 999.9
class ItemLoader:

    def __init__(self, name):
        self.name = name
        self.dataset = None

    def from_datapoint(self, datapoint):
        try:
            price_str = datapoint['price']
            if price_str:
                price = float(price_str)
                if MIN_PRICE <= price <= MAX_PRICE:
                    item = Item(datapoint, price)
                    return item if item.include else None
        except ValueError:
            return None

    def from_chunk(self, chunk): 
        batch = []
        for datapoint in chunk:
            result = self.from_datapoint(datapoint)
            if result:
                batch.append(result)
        return batch

    def chunk_generator(self):
        size = len(self.dataset)
        for i in range(0, size, CHUNK_SIZE):
            yield self.dataset.select(range(i, min(i + CHUNK_SIZE, size)))

    def load_in_parallel(self, workers):
        results = []
        chunk_count = (len(self.dataset) // CHUNK_SIZE) + 1
        with ProcessPoolExecutor(max_workers=workers) as pool:
            for batch in tqdm(pool.map(self.from_chunk, self.chunk_generator()), total=chunk_count):
                results.extend(batch)
        for result in results:
            result.category = self.name
        return results
            
    def load(self, workers=8):
        start = datetime.now()
        print(f"Loading dataset {self.name}", flush=True)
        self.dataset = load_dataset("McAuley-Lab/Amazon-Reviews-2023", f"raw_meta_{self.name}", split="full", trust_remote_code=True)
        results = self.load_in_parallel(workers)
        finish = datetime.now()
        print(f"Completed {self.name} with {len(results):,} datapoints in {(finish-start).total_seconds()/60:.1f} mins", flush=True)
        return results

데이터 다운로드

이렇게 작성한 이후 dataset cateogory명을 담은 list를 만들고, 하나하나 불러가며 items 리스트에 넣게되면 전체 데이터셋을 로딩할 수 있게된다.

dataset_names = [
    "Automotive",
    "Electronics",
    "Office_Products",
    "Tools_and_Home_Improvement",
    "Cell_Phones_and_Accessories",
    "Toys_and_Games",
    "Appliances",
    "Musical_Instruments",
]

items = []
for dataset_name in dataset_names:
    loader = ItemLoader(dataset_name)
    items.extend(loader.load())

www.quiseol.com/entry/Fine-tuning-llm-2에서는 처음에 시작하자마자 로딩을 했지만 이렇게 로딩한 걸 위와 같이 investigation을 한 뒤 전체적인 정제 과정을 거쳐야된다. 이렇게 다 진행이 되고 다음번엔 이짓을 하지 않기 위해서는 이렇게 만들어진 데이터셋을 저장해야된다. 보통 전체 데이터 중 5-10%를 test data로 남겨두지만, 이번은 전체 데이터를 사용하지 않을 것이기에 train을 40만개, test를 2,000개로 나눠서 샘플을 만들 것이다. 그리고 이후  활용을 위해 pkl파일로 저장한다. 이렇게 만들어진 pkl파일은 나중에 학습시에 사용할 것이다. 참고로 sample은 www.quiseol.com/entry/Fine-tuning-llm-2에서 data investigation에서 weight을 준 후의 items가 담긴 리스트다.

random.seed(42)
random.shuffle(sample)
train = sample[:400_000]
test = sample[400_000:402_000]

with open('train_set.pkl', 'wb') as file:
    pickle.dump(train, file)
with open('test_set.pkl', 'wb') as file:
    pickle.dump(test, file)

끝으로

다음 포스팅에서는 이렇게 다운받은 pkl파일을 불러오는 것을 시작으로 진짜 파인튜닝을 진행하는 내용에 대해서 포스팅하고자한다. 이번까지는 데이터를 만든것임. 무려 4개의 포스팅동안 말이다. 근데 요약하면 1. 데이터를 다운 받음 2. 파인튜닝에 적합하게 프롬프트 정제를 함 3. 해당 파일을 train, test 데이터로 나누고 pkl파일로 저장 4. 파인튜닝 진행이다. 그리고 다음 포스팅은 4번을 한다는 의미임. 다음 포스팅에서 봅시다.

'IT, Digital' 카테고리의 다른 글

Fine-tuning LLM (5) fine-tuning  (1) 2025.06.11
Fine-tuning LLM (3) Item Parsing Class  (0) 2025.06.06
Fine-tuning LLM (2) Dataset investigation  (0) 2025.06.05
Fine-tuning LLM (1) Data Curation과 데이터 수집 및 로드  (1) 2025.06.03
[RAG] Chroma와 FAISS 차이, 장단점 간단 정리  (0) 2025.05.28
'IT, Digital' 카테고리의 다른 글
  • Fine-tuning LLM (5) fine-tuning
  • Fine-tuning LLM (3) Item Parsing Class
  • Fine-tuning LLM (2) Dataset investigation
  • Fine-tuning LLM (1) Data Curation과 데이터 수집 및 로드
QUISEOL
QUISEOL
제품 사용기, 프로그래밍 언어 공부 블로그 입니다.
  • QUISEOL
    QUISEOL
    QUISEOL
    • 분류 전체보기 (119)
      • IT, Digital (69)
      • 여행 (12)
      • 쇼핑 (21)
      • 그 외 경험기 (17)
  • 블로그 메뉴

    • 링크

      • insta
    • 공지사항

    • 인기 글

    • 최근 글

    • hELLO· Designed By정상우.v4.10.3
    QUISEOL
    Fine-tuning LLM (4) Load & Download data
    상단으로

    티스토리툴바