Skip to main content

llm-generate-and-execute-code

This notebook demonstrates an example of generating Python code using inference CodeLlama-7b-Instruct-hf model, followed by validating the generated code by executing it in sandbox on Tracto.ai.

This notebook uses docker image cr.eu-north1.nebius.cloud/e00faee7vas5hpsh3s/solutions/torch:v3 that was created as

FROM ghcr.io/tractoai/notebook-kernel-default:2024-12-17-18-49-00-ceb64d083

USER root

RUN pip install torch transformers peft trl datasets
RUN pip install tractorun -U

USER 1000
import yt.wrapper as yt
from yt import type_info
import uuid

Prepare working directory on YTSaurus.

username = yt.get_user_name()
if yt.exists(f"//sys/users/{username}/@user_info/home_path"):
home = yt.get(f"//sys/users/{username}/@user_info/home_path")
working_dir = f"{home}/{uuid.uuid4().hex}"
else:
working_dir = f"//tmp/examples/{uuid.uuid4().hex}"
yt.create("map_node", working_dir)
print(working_dir)
yt.config["pickling"]["dynamic_libraries"]["enable_auto_collection"] = False
yt.config["pickling"]["ignore_system_modules"] = True
yt.config["pickling"]["safe_stream_mode"] = False # important to run vllm

Upload dataset from huggingface to YTSaurus table: task description, inputs, and outputs.

from datasets import load_dataset

MAX_SAMPLES = 50

dataset = load_dataset("deepmind/code_contests")

dataset_path = f"{working_dir}/dataset"

table_data = (
{
"index": index,
"description": record["description"],
"input": list(record["private_tests"]["input"] + record["generated_tests"]["input"])[:MAX_SAMPLES],
"output": list(record["private_tests"]["output"] + record["generated_tests"]["output"])[:MAX_SAMPLES],
}
for index, record in enumerate(dataset["train"])
)

schema = yt.schema.TableSchema(strict=True)
schema.add_column("index", type_info.Int32)
schema.add_column("description", type_info.String)
schema.add_column("input", type_info.List[type_info.String])
schema.add_column("output", type_info.List[type_info.String])

yt.create("table", dataset_path, force=True, attributes={"schema": schema.to_yson_type()})
yt.write_table(dataset_path, table_data, table_writer={"max_row_weight": 128 * 1024 * 1024})
import os

hf_token = os.environ.get("YT_SECURE_VAULT_HF_TOKEN", "")
assert hf_token != "", "set HF token in kernel's secrets to use llama"

Generate solutions by CodeLlama-7b-Instruct-hf.

from typing import Iterable
import logging
import sys
import random


BATCH_SIZE = 150


@yt.aggregator
def bulk_inference(records: Iterable[dict[str, str]]) -> Iterable[dict[str, str]]:
from vllm import LLM, SamplingParams

os.environ["HF_TOKEN"] = hf_token

# yt job have to write all logs to stderr
vllm_logger = logging.getLogger("vllm")
vllm_logger.handlers.clear()
vllm_logger.addHandler(logging.StreamHandler(sys.stderr))

llm = LLM(model="meta-llama/CodeLlama-7b-Instruct-hf", tensor_parallel_size=1, trust_remote_code=True)
sampling_params = SamplingParams(
temperature=0.6,
top_p=0.9,
max_tokens=5000,
)

def generate(records_batch: list[dict]):
conversations = [
[
{
"role": "system",
"content": "You are an AI assistant that generates Python code. Always write Python code that reads input from stdin and writes output to stdout. Ensure that all indentation is correct. Your responses must contain only ready-to-run Python code, without any explanations, comments, or extra text.",
},
{
"role": "user",
"content": r["description"],
},
] for r in records_batch
]
outputs = llm.chat(
messages=conversations,
sampling_params=sampling_params,
)
return outputs

batch = []
for record in records:
batch.append(record)
if len(batch) < BATCH_SIZE:
continue
outputs = generate(batch)
for r, output in zip(batch, outputs):
yield {
"index": r["index"],
"description": r["description"],
"input": r["input"],
"output": r["output"],
"code": output.outputs[0].text.strip(" "),
}
batch = []
if batch:
outputs = generate(batch)
for r, output in zip(batch, outputs):
yield {
"index": r["index"],
"description": r["description"],
"input": r["input"],
"output": r["output"],
"code": output.outputs[0].text.strip(" "),
}
inference_path = f"{working_dir}/inference"

schema = yt.schema.TableSchema(strict=True)
schema.add_column("index", type_info.Int32)
schema.add_column("description", type_info.String)
schema.add_column("input", type_info.List[type_info.String])
schema.add_column("output", type_info.List[type_info.String])
schema.add_column("code", type_info.String)

yt.create("table", inference_path, force=True, attributes={"schema": schema.to_yson_type()})


yt.run_map(
bulk_inference,
dataset_path,
inference_path,
job_count=16,
spec={
"pool_trees": ["gpu_100"],
"mapper": {
"gpu_limit": 1,
"memory_limit": 32212254720,
"cpu_limit": 2,
},
"job_io": {
"table_writer": {
"max_row_weight": 128 * 1024 * 1024,
},
},
},
)

Let's run the code in parallel and validate the results. We'll limit memory consumption for the tested code using prlimit. In this example, we do not isolate the llm-generated code from the controlling system at the io-level, but we limit RAM consumption.

import subprocess
import sys


RUN_TIMEOUT = 10
CODE_IO_LIMIT = 2000
MEM_LIMIT = 32212254720


@yt.aggregator
def validate_code(records: Iterable[dict[str, str]]) -> Iterable[dict[str, str]]:
for record in records:
for inp, expected in zip(record["input"], record["output"]):
print(f"Run code for {record['index']}", file=sys.stderr)
try:
process = subprocess.run(
["prlimit", f"--as={int(MEM_LIMIT * 0.5)}", "python3", "-c", record["code"]],
input=inp,
text=True,
capture_output=True,
timeout=RUN_TIMEOUT,
)
except subprocess.TimeoutExpired:
print(f"Execution failed: timeout", file=sys.stderr)
yield {
"index": record["index"],
"description": record["description"],
"input": record["input"],
"output": record["output"],
"code": record["code"],
"result_stdout": "<TIMEOUT>",
"result_stderr": "<TIMEOUT>",
"exitcode": -1,
"match_expected": False,
}
continue
if process.returncode:
print(f"Execution failed: {process.stderr}", file=sys.stderr)

yield {
"index": record["index"],
"description": record["description"],
"input": record["input"],
"output": record["output"],
"code": record["code"],
"result_stdout": process.stdout[:CODE_IO_LIMIT],
"result_stderr": process.stderr[:CODE_IO_LIMIT],
"exitcode": process.returncode,
"match_expected": process.stdout == expected,
}
code_validation_path = f"{working_dir}/code_validation"

schema = yt.schema.TableSchema(strict=True)
schema.add_column("index", type_info.Int32)
schema.add_column("description", type_info.String)
schema.add_column("input", type_info.List[type_info.String])
schema.add_column("output", type_info.List[type_info.String])
schema.add_column("code", type_info.String)
schema.add_column("result_stdout", type_info.String)
schema.add_column("result_stderr", type_info.String)
schema.add_column("exitcode", type_info.Int8)
schema.add_column("match_expected", type_info.Bool)

yt.create("table", code_validation_path, force=True, attributes={"schema": schema.to_yson_type()})

yt.run_map(
validate_code,
inference_path,
code_validation_path,
job_count=256,
spec={
"mapper": {
"memory_limit": MEM_LIMIT,
"cpu_limit": 2,
},
"job_io": {
"table_writer": {
"max_row_weight": 128 * 1024 * 1024,
},
},
},
)