Tractorun
This notebook demonstrates the Tractorun library and CLI tool for running distributed machine learning tasks on the YTsaurus. Tractorun provides convenient tools to integrate with YTsaurus' distributed data processing system, enabling execution and management of machine learning training jobs.
Tractorun:
- Manages the configuration and coordination of distributed training.
- Provides tools for working with
YtDataset(allows you to use data on YTsaurus as a dataset), checkpoints, saving models, interacting withtensorproxy, and more. - Ensures integration with the YTsaurus ecosystem.
In this notebook, we cover the following steps:
- Uploading a PyTorch dataset to YTsaurus.
- Training a model using MNIST. We perform model training on the MNIST dataset directly from a Jupyter Notebook, leveraging YTsaurus as the computation platform.
- Running the same training with Tractorun CLI. We'll demonstrate how to run the same training job via the command line using the Tractorun CLI.
We use the official PyTorch MNIST training example as a reference and show how to modify it with minimal changes to run using Tractorun.
How to run this notebook:
- Use
kernelwith installedtorchandtorchvision. You can create a newkernelusingcr.eu-north1.nebius.cloud/e00faee7vas5hpsh3s/solutions/torch:v3or.
from yt import wrapper as yt
from yt import type_info
import uuid
import sys
import io
Create a base directory for examples
username = yt.get_user_name()
if yt.exists(f"//sys/users/{username}/@user_info/home_path"):
home = yt.get(f"//sys/users/{username}/@user_info/home_path")
working_dir = f"{home}/{uuid.uuid4().hex}"
else:
working_dir = f"//tmp/examples/{uuid.uuid4().hex}"
yt.create("map_node", working_dir)
print(working_dir)
Ensure torch and torchvision exist
Let's ensure that the system has installed torch and torchvision.
import torch
import torchvision
Upload dataset to YTsaurus
For this demonstration, we will use the MNIST dataset from the torchvision library and upload it to YTsaurus. Some rows in the dataset exceed the standard limits, so we will set table_writer={"max_row_weight": 50 * 1024 * 1024}.
from torchvision import datasets, transforms
# https://github.com/pytorch/examples/blob/26de41904319c7094afc53a3ee809de47112d387/mnist/main.py#L119
transform = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,)),
],
)
dataset_train_local = datasets.MNIST("./mnist", train=True, download=True)
dataset_test_local = datasets.MNIST("./mnist", train=False, download=True)
Let's upload on YTsaurus the MNIST dataset as tensors and as simple types. There are 4 columns:
image- raw png image. This column has the tag "image/png" which allows to draw images directly in the YTsaurus UI.number- human-readable label.dataandlabels- serialized tensor form of dataset's data and label.
It is more efficient to save ready-to-use tensors in YT right away to save time and resources during model training. In the following examples, we will work only with columns containing tensors.
schema = yt.schema.TableSchema()
schema.add_column("image", type_info.Tagged[type_info.String, "image/png"])
schema.add_column("number", type_info.Int8)
schema.add_column("data", type_info.String)
schema.add_column("labels", type_info.String)
from tractorun.backend.tractorch import TensorSerializer
dataset_train_path = f"{working_dir}/dataset_train"
dataset_test_path = f"{working_dir}/dataset_test"
print(dataset_train_path)
print(dataset_test_path)
yt.create("table", dataset_train_path, force=True, attributes={"schema": schema.to_yson_type()})
yt.create("table", dataset_test_path, force=True, attributes={"schema": schema.to_yson_type()})
def pil_to_png(image):
r = io.BytesIO()
image.save(r, format="PNG")
return r.getvalue()
ts = TensorSerializer()
yt_train_data = [
{
"image": pil_to_png(data),
"number": labels,
"labels": ts.serialize(labels),
"data": ts.serialize(transform(data)),
}
for data, labels in dataset_train_local
]
yt.write_table(dataset_train_path, yt_train_data, table_writer={"max_row_weight": 50 * 1024 * 1024})
yt_test_data = [
{
"image": pil_to_png(data),
"number": labels,
"labels": ts.serialize(labels),
"data": ts.serialize(transform(data)),
}
for data, labels in dataset_test_local
]
yt.write_table(dataset_test_path, yt_test_data, table_writer={"max_row_weight": 50 * 1024 * 1024})
Run training
Tractorun store some data to the training dir:
- Checkpoints.
- Metadata about each training run.
- Models.
- Some locks.
- etc
Let's create and cleanup the training dir.
training_dir = f"{working_dir}/tractorun"
yt.create("map_node", training_dir, force=True)
print(training_dir)
The model training process run in a Docker container. When launching from a Jupyter Notebook, it is important to ensure that the same container as in the Kernel is used.
We use the official PyTorch MNIST training example as a reference and show how to modify it with minimal changes to run using Tractorun:
- Add
toolbox: Toolboxto the main function. Toolbox object provides useful utils for training like checkpoint manager, coordination metadata, initialized ytsaurus client, and more. - Add
file=sys.stderrto each print. - Use
YtTensorDatasetinstead of defaulttorch.Dataset. - Call magic function
tractorun.run.run.
<details> <summary>Show the full diff</summary>
@@ -6,6 +6,13 @@
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
+from tractorun.backend.tractorch import YtTensorDataset, Tractorch
+from tractorun.toolbox import Toolbox
+from tractorun.run import run
+from tractorun.mesh import Mesh
+from tractorun.resources import Resources
+from tractorun.stderr_reader import StderrMode
+from tractorun.backend.tractorch.serializer import TensorSerializer
class Net(nn.Module):
def __init__(self):
@@ -45,7 +52,7 @@
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
- 100. * batch_idx / len(train_loader), loss.item()))
+ 100. * batch_idx / len(train_loader), loss.item()), file=sys.stderr)
if args.dry_run:
break
@@ -66,10 +73,10 @@
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset),
- 100. * correct / len(test_loader.dataset)))
+ 100. * correct / len(test_loader.dataset)), file=sys.stderr)
-def main():
+def main(toolbox: Toolbox):
# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
@@ -94,7 +101,7 @@
help='how many batches to wait before logging training status')
parser.add_argument('--save-model', action='store_true', default=False,
help='For Saving the current Model')
- args = parser.parse_args()
+ args = parser.parse_args([])
use_cuda = not args.no_cuda and torch.cuda.is_available()
use_mps = not args.no_mps and torch.backends.mps.is_available()
@@ -120,10 +127,9 @@
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
- dataset1 = datasets.MNIST('../data', train=True, download=True,
- transform=transform)
- dataset2 = datasets.MNIST('../data', train=False,
- transform=transform)
+ dataset1 = YtTensorDataset(path=dataset_train_path, columns=['data', 'labels'])
+ dataset2 = YtTensorDataset(path=dataset_test_path, columns=['data', 'labels'])
+
train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
@@ -137,9 +143,20 @@
scheduler.step()
if args.save_model:
- torch.save(model.state_dict(), "mnist_cnn.pt")
+ ts = TensorSerializer()
+ toolbox.save_model(ts.serialize(model.state_dict()), dataset_train_path, metadata={})
-if __name__ == '__main__':
- main()
+run(
+ main,
+ backend=Tractorch(),
+ yt_path=training_dir,
+ mesh=Mesh(node_count=1, process_per_node=1, gpu_per_process=0),
+ resources=Resources(
+ cpu_limit=8,
+ memory_limit=105899345920,
+ ),
+ proxy_stderr_mode=StderrMode.primary,
+)
</details>
<font color="red">IMPORTANT NOTE</font> In this example we are running tractorun directly from Jupyter notebook.
This is a convenient method for experiments and demonstrations, as tractorun uses pickle for easy serialization of the entire notebook state and transferring it to the cluster. This means that all variables will be available in the model training function, and tractorun will attempt to transfer all Python modules from the local environment to the cluster.
However, this method does not ensure reproducibility of the run of model's training. For production processes, use the execution via the tractorun CLI, which is described below.
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
from tractorun.backend.tractorch import YtTensorDataset, Tractorch
from tractorun.toolbox import Toolbox
from tractorun.run import run
from tractorun.mesh import Mesh
from tractorun.resources import Resources
from tractorun.stderr_reader import StderrMode
from tractorun.backend.tractorch.serializer import TensorSerializer
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()), file=sys.stderr)
if args.dry_run:
break
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset),
100. * correct / len(test_loader.dataset)), file=sys.stderr)
def main(toolbox: Toolbox):
# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=14, metavar='N',
help='number of epochs to train (default: 14)')
parser.add_argument('--lr', type=float, default=1.0, metavar='LR',
help='learning rate (default: 1.0)')
parser.add_argument('--gamma', type=float, default=0.7, metavar='M',
help='Learning rate step gamma (default: 0.7)')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--no-mps', action='store_true', default=False,
help='disables macOS GPU training')
parser.add_argument('--dry-run', action='store_true', default=False,
help='quickly check a single pass')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed (default: 1)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
help='how many batches to wait before logging training status')
parser.add_argument('--save-model', action='store_true', default=False,
help='For Saving the current Model')
args = parser.parse_args([])
use_cuda = not args.no_cuda and torch.cuda.is_available()
use_mps = not args.no_mps and torch.backends.mps.is_available()
torch.manual_seed(args.seed)
if use_cuda:
device = torch.device("cuda")
elif use_mps:
device = torch.device("mps")
else:
device = torch.device("cpu")
train_kwargs = {'batch_size': args.batch_size}
test_kwargs = {'batch_size': args.test_batch_size}
if use_cuda:
cuda_kwargs = {'num_workers': 1,
'pin_memory': True,
'shuffle': True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
dataset1 = YtTensorDataset(path=dataset_train_path, yt_client=toolbox.yt_client, columns=['data', 'labels'])
dataset2 = YtTensorDataset(path=dataset_test_path, yt_client=toolbox.yt_client, columns=['data', 'labels'])
train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
model = Net().to(device)
optimizer = optim.Adadelta(model.parameters(), lr=args.lr)
scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1):
train(args, model, device, train_loader, optimizer, epoch)
test(model, device, test_loader)
scheduler.step()
if args.save_model:
ts = TensorSerializer()
toolbox.save_model(ts.serialize(model.state_dict()), dataset_train_path, metadata={})
run(
main,
backend=Tractorch(),
yt_path=training_dir,
mesh=Mesh(node_count=1, process_per_node=1, gpu_per_process=0),
resources=Resources(
cpu_limit=8,
memory_limit=105899345920,
),
proxy_stderr_mode=StderrMode.primary,
)
Tractorun cli
Let's consider a production-like scenario for running model training through the Tractorun CLI.
The Tractorun CLI allows:
- Make model training reproducible.
- Separating the model training code from the training run parameters.
Tractorun CLIenables configuring the training process via a configuration file and CLI parameters. - Running the training module from any host where Python and
Tractorunare installed.
We will use the official PyTorch MNIST training example again. How to modify it with minimal changes to run using Tractorun:
- Add
toolbox = prepare_and_get_toolbox(backend=Tractorch())to the main function. Toolbox object provides useful utils for training like checkpoint manager, coordination metadata, initialized ytsaurus client, and more. - Add
file=sys.stderrto each print. - Use
YtTensorDatasetinstead of defaulttorch.Dataset.
<details> <summary>Show the full diff</summary>
@@ -6,6 +6,15 @@
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
+import sys
+from tractorun.backend.tractorch import YtTensorDataset, Tractorch
+from tractorun.toolbox import Toolbox
+from tractorun.run import run
+from tractorun.mesh import Mesh
+from tractorun.resources import Resources
+from tractorun.stderr_reader import StderrMode
+from tractorun.backend.tractorch.serializer import TensorSerializer
+from tractorun.run import prepare_and_get_toolbox
class Net(nn.Module):
def __init__(self):
@@ -45,7 +54,7 @@
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
- 100. * batch_idx / len(train_loader), loss.item()))
+ 100. * batch_idx / len(train_loader), loss.item()), file=sys.stderr)
if args.dry_run:
break
@@ -66,10 +75,13 @@
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset),
- 100. * correct / len(test_loader.dataset)))
+ 100. * correct / len(test_loader.dataset)), file=sys.stderr)
def main():
+ toolbox = prepare_and_get_toolbox(backend=Tractorch())
+ user_config = toolbox.get_user_config()
+
# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
@@ -94,7 +106,7 @@
help='how many batches to wait before logging training status')
parser.add_argument('--save-model', action='store_true', default=False,
help='For Saving the current Model')
- args = parser.parse_args()
+ args = parser.parse_args([])
use_cuda = not args.no_cuda and torch.cuda.is_available()
use_mps = not args.no_mps and torch.backends.mps.is_available()
@@ -120,10 +132,9 @@
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
- dataset1 = datasets.MNIST('../data', train=True, download=True,
- transform=transform)
- dataset2 = datasets.MNIST('../data', train=False,
- transform=transform)
+ dataset1 = YtTensorDataset(toolbox=toolbox, path=user_config["dataset_train_path"], columns=['data', 'labels'])
+ dataset2 = YtTensorDataset(toolbox=toolbox, path=user_config["dataset_test_path"], columns=['data', 'labels'])
+
train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
@@ -137,9 +148,9 @@
scheduler.step()
if args.save_model:
- torch.save(model.state_dict(), "mnist_cnn.pt")
+ ts = TensorSerializer()
+ toolbox.save_model(ts.serialize(model.state_dict()), dataset_train_path, metadata={})
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
</details>
Let's create two files:
main.pywith our model-training code.run_config.yamlwith training configuration.
code = r"""
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
import sys
from tractorun.backend.tractorch import YtTensorDataset, Tractorch
from tractorun.toolbox import Toolbox
from tractorun.run import run
from tractorun.mesh import Mesh
from tractorun.resources import Resources
from tractorun.stderr_reader import StderrMode
from tractorun.backend.tractorch.serializer import TensorSerializer
from tractorun.run import prepare_and_get_toolbox
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()), file=sys.stderr)
if args.dry_run:
break
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset),
100. * correct / len(test_loader.dataset)), file=sys.stderr)
def main():
toolbox = prepare_and_get_toolbox(backend=Tractorch())
user_config = toolbox.get_user_config()
# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=14, metavar='N',
help='number of epochs to train (default: 14)')
parser.add_argument('--lr', type=float, default=1.0, metavar='LR',
help='learning rate (default: 1.0)')
parser.add_argument('--gamma', type=float, default=0.7, metavar='M',
help='Learning rate step gamma (default: 0.7)')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--no-mps', action='store_true', default=False,
help='disables macOS GPU training')
parser.add_argument('--dry-run', action='store_true', default=False,
help='quickly check a single pass')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed (default: 1)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
help='how many batches to wait before logging training status')
parser.add_argument('--save-model', action='store_true', default=False,
help='For Saving the current Model')
args = parser.parse_args([])
use_cuda = not args.no_cuda and torch.cuda.is_available()
use_mps = not args.no_mps and torch.backends.mps.is_available()
torch.manual_seed(args.seed)
if use_cuda:
device = torch.device("cuda")
elif use_mps:
device = torch.device("mps")
else:
device = torch.device("cpu")
train_kwargs = {'batch_size': args.batch_size}
test_kwargs = {'batch_size': args.test_batch_size}
if use_cuda:
cuda_kwargs = {'num_workers': 1,
'pin_memory': True,
'shuffle': True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
dataset1 = YtTensorDataset(toolbox=toolbox, yt_client=toolbox.yt_client, path=user_config["dataset_train_path"], columns=['data', 'labels'])
dataset2 = YtTensorDataset(toolbox=toolbox, yt_client=toolbox.yt_client, path=user_config["dataset_test_path"], columns=['data', 'labels'])
train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
model = Net().to(device)
optimizer = optim.Adadelta(model.parameters(), lr=args.lr)
scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1):
train(args, model, device, train_loader, optimizer, epoch)
test(model, device, test_loader)
scheduler.step()
if args.save_model:
ts = TensorSerializer()
toolbox.save_model(ts.serialize(model.state_dict()), dataset_train_path, metadata={})
if __name__ == "__main__":
main()
"""
with open("main.py", "w") as f:
f.write(code)
import yaml
config = {
"yt_path": training_dir,
"mesh": {
"node_count": 1,
"process_per_node": 1,
"gpu_per_process": 0,
},
"user_config": {
"dataset_train_path": dataset_train_path,
"dataset_test_path": dataset_test_path,
},
"resources": {
"cpu_limit": 8,
"memory_limit": 105899345920,
},
"bind_local": ["./main.py:/tractorun/main.py"],
"command": ["python3", "/tractorun/main.py"],
"proxy_stderr_mode": "primary",
}
with open("run_config.yaml", "w") as f:
yaml.dump(config, f)
!tractorun --run-config-path run_config.yaml