Skip to main content

tractorun-llama-finetune

This notebook uses docker image cr.eu-north1.nebius.cloud/e00faee7vas5hpsh3s/solutions/torch:v3 that was created as

FROM ghcr.io/tractoai/notebook-kernel-default:2024-12-17-18-49-00-ceb64d083

USER root

RUN pip install torch transformers peft trl datasets
RUN pip install tractorun -U

USER 1000
import yt.wrapper as yt
import uuid
username = yt.get_user_name()
if yt.exists(f"//sys/users/{username}/@user_info/home_path"):
home = yt.get(f"//sys/users/{username}/@user_info/home_path")
working_dir = f"{home}/{uuid.uuid4().hex}"
else:
working_dir = f"//tmp/examples/{uuid.uuid4().hex}"
yt.create("map_node", working_dir)
print(working_dir)
from datasets import load_dataset, Dataset
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, AutoTokenizer
from peft import LoraConfig
from trl import SFTTrainer
from transformers import TrainingArguments
from tractorun.toolbox import Toolbox

import os


def train(toolbox: Toolbox):
ytc = toolbox.yt_client

# Load the dataset. Since the dataset is small we just read it to memory.
dataset = Dataset.from_list(list(ytc.read_table("//home/samples/shakespeare")))

# Load the model + tokenizer. Here we just download the weights of the model from HF.
# To avoid downloading weights every time they can be uploaded to Cypress and passed as cypress_binds.
model_name = "NousResearch/Llama-2-7b-chat-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
model_name,
trust_remote_code=True,
use_cache = False
)

# PEFT config.
lora_alpha = 16
lora_dropout = 0.1
lora_r = 64
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=["k_proj", "q_proj", "v_proj", "up_proj", "down_proj", "gate_proj"]
)

# Args.
max_seq_length = 512
output_dir = "./results"
per_device_train_batch_size = 8
gradient_accumulation_steps = 2
optim = "adamw_hf"
logging_steps = 1
learning_rate = 2e-4
max_grad_norm = 0.3
max_steps = len(dataset) // (per_device_train_batch_size * gradient_accumulation_steps * int(os.environ["WORLD_SIZE"]))
warmup_ratio = 0.1
lr_scheduler_type = "cosine"
training_arguments = TrainingArguments(
output_dir=output_dir,
per_device_train_batch_size=per_device_train_batch_size,
gradient_accumulation_steps=gradient_accumulation_steps,
optim=optim,
save_strategy="steps",
save_steps=0.0, # Do not save intermediate checkpoints, save only result.
logging_steps=logging_steps,
learning_rate=learning_rate,
fp16=True,
max_grad_norm=max_grad_norm,
max_steps=max_steps,
warmup_ratio=warmup_ratio,
group_by_length=True,
lr_scheduler_type=lr_scheduler_type,
gradient_checkpointing=True,
gradient_checkpointing_kwargs={'use_reentrant':False},
)

# Trainer
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
)

trainer.train()

# Upload result from one peer only.
if os.environ["RANK"] == "0":
# Now the result of the fine-tuning is stored on a local filesystem in a job's sandbox
# which will be removed after the job is completed.
# We will move it to the Cypress.

local_result_path = output_dir
tracto_result_path = f"{working_dir}/result"

print("Uploading result to {tracto_result_path}")

ytc = toolbox.yt_client

# Cypress does not like paths with trailing slashes,
# so we join paths carefully.
def join_paths(l, r):
if l and r:
return f"{l}/{r}"
else:
return l + r

def dfs(path):
local_path = join_paths(local_result_path, path)
tracto_path = join_paths(tracto_result_path, path)

if os.path.isdir(local_path):
print(f"Creating directory {tracto_path}")
ytc.create("map_node", tracto_path, ignore_existing=True, recursive=True)
for f in os.listdir(local_path):
dfs(join_paths(path, f))
else:
print(f"Uploading file {tracto_path}")
with open(local_path, "rb") as f:
ytc.write_file(tracto_path, f)
dfs("")

print("Results uploaded to {tracto_result_path}")

from tractorun.backend.tractorch import YtTensorDataset, Tractorch
from tractorun.toolbox import Toolbox
from tractorun.run import run
from tractorun.mesh import Mesh
from tractorun.resources import Resources
from tractorun.stderr_reader import StderrMode
from tractorun.env import EnvVariable

run(
train,
backend=Tractorch(),
yt_path=f"{working_dir}/working_dir",
# Let's run training with dp=16: two nodes with 8 GPUs each.
# Since PyTorch does not like multi-GPU processes we launch 8 processes at each node.

# WARNING: on playground you have only 1 host with 1 gpu, please set
# mesh=Mesh(node_count=1, process_per_node=1, gpu_per_process=1, pool_trees=["gpu_h100"]),
mesh=Mesh(node_count=16, process_per_node=8, gpu_per_process=1, pool_trees=["gpu_h100"]),
resources=Resources(
cpu_limit=20,
memory_limit=858993459200, # 800G
),
proxy_stderr_mode=StderrMode.primary,
env=[
EnvVariable(name="NCCL_SOCKET_IFNAME", value="eth0"),
EnvVariable(name="NCCL_IB_SL", value="1"),
EnvVariable(name="NCCL_DEBUG", value="INFO"),
EnvVariable(name="NCCL_DEBUG_SUBSYS", value="INIT"),
EnvVariable(name="NCCL_IB_HCA", value="mlx5"),
]
)