Skip to main content

tractorun-llama-inference-tiny-stories-finetune

import yt.wrapper as yt
import uuid
import os
yt.config["pickling"]["safe_stream_mode"] = False
username = yt.get_user_name()
if yt.exists(f"//sys/users/{username}/@user_info/home_path"):
home = yt.get(f"//sys/users/{username}/@user_info/home_path")
working_dir = f"{home}/{uuid.uuid4().hex}"
else:
working_dir = f"//tmp/examples/{uuid.uuid4().hex}"
yt.create("map_node", working_dir)
print(working_dir)
from tractorun.toolbox import Toolbox
from tractorun.run import run
from tractorun.mesh import Mesh
from tractorun.resources import Resources
from tractorun.backend.generic import GenericBackend
from tractorun.backend.tractorch import Tractorch
from tractorun.stderr_reader import StderrMode
from yt import type_info

TALES_PER_JOB = 500

hf_token = os.environ.get("YT_SECURE_VAULT_HF_TOKEN", "")
assert hf_token != "", "set HF token in kernel's secrets to use llama"

schema = yt.schema.TableSchema(strict=False)
schema.add_column("text", type_info.String)

datasets_path = f"{working_dir}/datasets"
yt.create("map_node", datasets_path)


MAX_TOKENS = 1000


def prepare_dataset(toolbox: Toolbox):
from vllm import LLM, SamplingParams
import os

os.environ["HF_TOKEN"] = hf_token
os.environ["CUDA_VISIBLE_DEVICES"] = str(toolbox.coordinator.get_process_index())

self_index = toolbox.coordinator.get_self_index()
table_path = f"{datasets_path}/dataset_{self_index}"

llm = LLM(model="meta-llama/Llama-3.2-3B-Instruct", seed=self_index,)

sampling_params = SamplingParams(
temperature=0.9,
top_p=0.85,
max_tokens=MAX_TOKENS,
)
conversations = [
[
{
"role": "system",
"content": "You are a professional storyteller. Write the story in one paragraph, without line breaks. A user will now ask you to tell a fairy tale, and you must create a story featuring Tracto.ai. Tracto.ai the a ai-startup that provides infrastructure for machine learning and big data processing."
},
{
"role": "user",
"content": f"Write the {index}th fairy tail about some animal please.",
},
] for index in range(TALES_PER_JOB)
]

results = llm.chat(
messages=conversations,
sampling_params=sampling_params,
)

tales = ({"text": result.outputs[0].text} for result in results)

toolbox.yt_client.create("table", table_path, attributes={"schema": schema.to_yson_type()}, force=True)
toolbox.yt_client.write_table(table_path, tales)


run(
prepare_dataset,
backend=GenericBackend(),
proxy_stderr_mode=StderrMode.primary,
yt_path=f"{working_dir}/tractorun_inference",
# WARNING: on playground you have only 1 host with 1 gpu
# please set
# mesh=Mesh(node_count=1, gpu_per_process=1, process_per_node=1, pool_trees=["gpu_h100"]),
mesh=Mesh(node_count=1, gpu_per_process=1, process_per_node=1, pool_trees=["gpu_h100"]),
resources=Resources(
cpu_limit=64,
memory_limit=322122547200,
),
)

dataset_parts = [f"{datasets_path}/dataset_{i}" for i in range(2 * 8)]
dataset_path = f"{datasets_path}/dataset"

yt.run_merge(
dataset_parts,
dataset_path,
)
from tractorun.backend.tractorch import YtDataset
from tractorun.backend.tractorch.serializer import TensorSerializer

from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig


class YTTransform:
def __init__(self, tokenizer: AutoTokenizer):
self._tokenizer = tokenizer

def __call__(self, columns: list[str], row: dict) -> tuple:
assert columns == ["text"]
input_ids = self._tokenizer(yt.yson.get_bytes(row["text"]).decode(), padding="max_length", max_length=MAX_TOKENS)["input_ids"]
return {
"input_ids": input_ids,
}


def get_dataset(
path: str,
tokenizer: AutoTokenizer,
yt_client: yt.YtClient,
) -> tuple[YtDataset, YtDataset]:
start = 0
end = yt_client.get(path + "/@row_count")

train_end = int(end * 0.8)
eval_start = train_end + 1

train_dataset = YtDataset(path=path, yt_client=yt_client, transform=YTTransform(tokenizer), start=start, end=train_end, columns=["text"])
eval_dataset = YtDataset(path=path, yt_client=yt_client, transform=YTTransform(tokenizer), start=eval_start, end=end, columns=["text"])
return train_dataset, eval_dataset
from transformers import (
DataCollatorForLanguageModeling,
Trainer,
TrainingArguments,
AutoModelForCausalLM,
GenerationConfig,
)
from transformers.trainer_pt_utils import AcceleratorConfig


def training(toolbox: Toolbox):
model = AutoModelForCausalLM.from_pretrained(
"roneneldan/TinyStories-3M",
trust_remote_code=True,
use_cache = False,
)
tokenizer = AutoTokenizer.from_pretrained("EleutherAI/gpt-neo-125M")
tokenizer.pad_token = tokenizer.eos_token
data_collator = DataCollatorForLanguageModeling(tokenizer, pad_to_multiple_of=2, mlm=False)
train_dataset, eval_dataset = get_dataset(
path=dataset_path,
tokenizer=tokenizer,
yt_client=toolbox.yt_client,
)
args = TrainingArguments(
output_dir="/tmp/results",
per_device_train_batch_size=2,
gradient_accumulation_steps=1,
eval_on_start=True,
eval_strategy="epoch",
num_train_epochs=8,
weight_decay=0.1,
lr_scheduler_type="constant",
learning_rate=5e-5,
save_steps=0.0, # don't save checkpoints
logging_dir=None,
logging_strategy="epoch",
fp16=True,
push_to_hub=False,
batch_eval_metrics=False,
accelerator_config=AcceleratorConfig(
split_batches=True,
dispatch_batches=True,
),
)
args = args.set_dataloader(train_batch_size=16, drop_last=True)
trainer = Trainer(
model=model,
processing_class=tokenizer,
args=args,
data_collator=data_collator,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
)
trainer.train()
if toolbox.coordinator.is_primary():
toolbox.save_model(TensorSerializer().serialize(trainer.model))


run(
training,
backend=Tractorch(),
yt_path=f"{working_dir}/tractorun_training",
mesh=Mesh(node_count=1, gpu_per_process=1, process_per_node=8, pool="fifo", pool_trees=["gpu_h200"]),
resources=Resources(
cpu_limit=64,
memory_limit=322122547200,
),
)
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig
import torch
import io

incarnation = sorted(yt.list(f"{working_dir}/tractorun_training/models"), key=lambda x: int(x), reverse=True)[0]

raw_model = io.BytesIO(yt.read_file(f"{working_dir}/tractorun_training/models/{incarnation}").read())

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = torch.load(raw_model).to(device)

tokenizer = AutoTokenizer.from_pretrained("EleutherAI/gpt-neo-125M")
tokenizer.pad_token = tokenizer.eos_token
prompt = f"Long time ago"
input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)
output = model.generate(input_ids, max_length = 200, num_beams=1, temperature=0.7, do_sample=True)
output_text = tokenizer.decode(output[0], skip_special_tokens=True)

print("\n\n", output_text)