llm-generate-and-execute-code
This notebook demonstrates an example of generating Python code using inference CodeLlama-7b-Instruct-hf model, followed by validating the generated code by executing it in sandbox on Tracto.ai.
This notebook uses docker image cr.eu-north1.nebius.cloud/e00faee7vas5hpsh3s/solutions/torch:v3 that was created as
FROM ghcr.io/tractoai/notebook-kernel-default:2024-12-17-18-49-00-ceb64d083
USER root
RUN pip install torch transformers peft trl datasets
RUN pip install tractorun -U
USER 1000
import yt.wrapper as yt
from yt import type_info
import uuid
Prepare working directory on YTSaurus.
username = yt.get_user_name()
if yt.exists(f"//sys/users/{username}/@user_info/home_path"):
home = yt.get(f"//sys/users/{username}/@user_info/home_path")
working_dir = f"{home}/{uuid.uuid4().hex}"
else:
working_dir = f"//tmp/examples/{uuid.uuid4().hex}"
yt.create("map_node", working_dir)
print(working_dir)
yt.config["pickling"]["dynamic_libraries"]["enable_auto_collection"] = False
yt.config["pickling"]["ignore_system_modules"] = True
yt.config["pickling"]["safe_stream_mode"] = False # important to run vllm
Upload dataset from huggingface to YTSaurus table: task description, inputs, and outputs.
from datasets import load_dataset
MAX_SAMPLES = 50
dataset = load_dataset("deepmind/code_contests")
dataset_path = f"{working_dir}/dataset"
table_data = (
{
"index": index,
"description": record["description"],
"input": list(record["private_tests"]["input"] + record["generated_tests"]["input"])[:MAX_SAMPLES],
"output": list(record["private_tests"]["output"] + record["generated_tests"]["output"])[:MAX_SAMPLES],
}
for index, record in enumerate(dataset["train"])
)
schema = yt.schema.TableSchema(strict=True)
schema.add_column("index", type_info.Int32)
schema.add_column("description", type_info.String)
schema.add_column("input", type_info.List[type_info.String])
schema.add_column("output", type_info.List[type_info.String])
yt.create("table", dataset_path, force=True, attributes={"schema": schema.to_yson_type()})
yt.write_table(dataset_path, table_data, table_writer={"max_row_weight": 128 * 1024 * 1024})
import os
hf_token = os.environ.get("YT_SECURE_VAULT_HF_TOKEN", "")
assert hf_token != "", "set HF token in kernel's secrets to use llama"
Generate solutions by CodeLlama-7b-Instruct-hf.
from typing import Iterable
import logging
import sys
import random
BATCH_SIZE = 150
@yt.aggregator
def bulk_inference(records: Iterable[dict[str, str]]) -> Iterable[dict[str, str]]:
from vllm import LLM, SamplingParams
os.environ["HF_TOKEN"] = hf_token
# yt job have to write all logs to stderr
vllm_logger = logging.getLogger("vllm")
vllm_logger.handlers.clear()
vllm_logger.addHandler(logging.StreamHandler(sys.stderr))
llm = LLM(model="meta-llama/CodeLlama-7b-Instruct-hf", tensor_parallel_size=1, trust_remote_code=True)
sampling_params = SamplingParams(
temperature=0.6,
top_p=0.9,
max_tokens=5000,
)
def generate(records_batch: list[dict]):
conversations = [
[
{
"role": "system",
"content": "You are an AI assistant that generates Python code. Always write Python code that reads input from stdin and writes output to stdout. Ensure that all indentation is correct. Your responses must contain only ready-to-run Python code, without any explanations, comments, or extra text.",
},
{
"role": "user",
"content": r["description"],
},
] for r in records_batch
]
outputs = llm.chat(
messages=conversations,
sampling_params=sampling_params,
)
return outputs
batch = []
for record in records:
batch.append(record)
if len(batch) < BATCH_SIZE:
continue
outputs = generate(batch)
for r, output in zip(batch, outputs):
yield {
"index": r["index"],
"description": r["description"],
"input": r["input"],
"output": r["output"],
"code": output.outputs[0].text.strip(" "),
}
batch = []
if batch:
outputs = generate(batch)
for r, output in zip(batch, outputs):
yield {
"index": r["index"],
"description": r["description"],
"input": r["input"],
"output": r["output"],
"code": output.outputs[0].text.strip(" "),
}
inference_path = f"{working_dir}/inference"
schema = yt.schema.TableSchema(strict=True)
schema.add_column("index", type_info.Int32)
schema.add_column("description", type_info.String)
schema.add_column("input", type_info.List[type_info.String])
schema.add_column("output", type_info.List[type_info.String])
schema.add_column("code", type_info.String)
yt.create("table", inference_path, force=True, attributes={"schema": schema.to_yson_type()})
yt.run_map(
bulk_inference,
dataset_path,
inference_path,
job_count=16,
spec={
"pool_trees": ["gpu_100"],
"mapper": {
"gpu_limit": 1,
"memory_limit": 32212254720,
"cpu_limit": 2,
},
"job_io": {
"table_writer": {
"max_row_weight": 128 * 1024 * 1024,
},
},
},
)
Let's run the code in parallel and validate the results. We'll limit memory consumption for the tested code using prlimit. In this example, we do not isolate the llm-generated code from the controlling system at the io-level, but we limit RAM consumption.
import subprocess
import sys
RUN_TIMEOUT = 10
CODE_IO_LIMIT = 2000
MEM_LIMIT = 32212254720
@yt.aggregator
def validate_code(records: Iterable[dict[str, str]]) -> Iterable[dict[str, str]]:
for record in records:
for inp, expected in zip(record["input"], record["output"]):
print(f"Run code for {record['index']}", file=sys.stderr)
try:
process = subprocess.run(
["prlimit", f"--as={int(MEM_LIMIT * 0.5)}", "python3", "-c", record["code"]],
input=inp,
text=True,
capture_output=True,
timeout=RUN_TIMEOUT,
)
except subprocess.TimeoutExpired:
print(f"Execution failed: timeout", file=sys.stderr)
yield {
"index": record["index"],
"description": record["description"],
"input": record["input"],
"output": record["output"],
"code": record["code"],
"result_stdout": "<TIMEOUT>",
"result_stderr": "<TIMEOUT>",
"exitcode": -1,
"match_expected": False,
}
continue
if process.returncode:
print(f"Execution failed: {process.stderr}", file=sys.stderr)
yield {
"index": record["index"],
"description": record["description"],
"input": record["input"],
"output": record["output"],
"code": record["code"],
"result_stdout": process.stdout[:CODE_IO_LIMIT],
"result_stderr": process.stderr[:CODE_IO_LIMIT],
"exitcode": process.returncode,
"match_expected": process.stdout == expected,
}
code_validation_path = f"{working_dir}/code_validation"
schema = yt.schema.TableSchema(strict=True)
schema.add_column("index", type_info.Int32)
schema.add_column("description", type_info.String)
schema.add_column("input", type_info.List[type_info.String])
schema.add_column("output", type_info.List[type_info.String])
schema.add_column("code", type_info.String)
schema.add_column("result_stdout", type_info.String)
schema.add_column("result_stderr", type_info.String)
schema.add_column("exitcode", type_info.Int8)
schema.add_column("match_expected", type_info.Bool)
yt.create("table", code_validation_path, force=True, attributes={"schema": schema.to_yson_type()})
yt.run_map(
validate_code,
inference_path,
code_validation_path,
job_count=256,
spec={
"mapper": {
"memory_limit": MEM_LIMIT,
"cpu_limit": 2,
},
"job_io": {
"table_writer": {
"max_row_weight": 128 * 1024 * 1024,
},
},
},
)