Skip to main content

orc-features

Trigger types

Cron

Run workflow every 10 minutes:

trigger_type: cron
params:
cron_expression: "*/10 * * * *"

Yt node update

Run workflow when //some/cypress/path is modified:

trigger_type: node_update
params:
node_path: "//some/cypress/path"

## Step params

### `step_id` (required)
Defines a unique (within workflow) identificator of the step. Start with a letter, use alphanumeric values and `_` for it.

### `task_type` (required)
Determines what kind of processor will be used to execute the step. See the list of available task types below.

### `task_params`
A map of parameters to be passed to the chosen task processor. String parameters can be rendered with args:
```json
"depends_on": ["prev_step"],
"args": [
{
"name": "image",
"src_type": "step_output",
"src_ref": "prev_step.docker_image"
}
],
"task_params": {
"docker_image": "{{ args.image }}",
"command": "echo hello \>&2"
}

depends_on

List of steps (identified with step_id) that must be successfully completed before the current step will be executed.

secrets

List of objects of the following structure:

key: \<string\>
value_src_type: secret_store | cypress_docker_creds | predefined
value_ref: \<string\>
  • key - the key to use to obtain the secret in the step
  • value_src_type - one of:
    • secret_store - take the value from a secret store. value_ref must be of the following structure: \<cypress_path\>:\<key_in_secret_store\>. For example, //some/workflow:my_token
    • secret_store_docker_creds - use it to authenticate in a docker registry for docker task type. The secret store in value_ref must contain username+password or auth keys
    • predefined - if value_ref is set to YT_TOKEN, a default user's token will be available in the step (with predefined key YT_TOKEN)

outputs

Steps can have output params. Those must be defined in outputs section of the step. They can be used as input arguments for children steps. Outputs must be printed to stderr in json format in the last line of the program's output.

{
"step_id": "assert_foo_bar",
"task_type": "docker",
"task_params": {
"env": {
"FOO": "BAR"
},
"command": "python3 -c 'import json, os; assert os.environ[\"FOO\"] == \"BAR\"; print(json.dumps({\"key1\": \"value1\"}))' \>&2",
"docker_image": "docker.io/library/python:3.11"
},
"max_retries": 3,
"min_retry_interval_seconds": 10,
"outputs": [
{
"name": "key1"
}
]
},
{
"step_id": "get_prev_step_out",
"task_type": "docker",
"task_params": {
"command": "python3 -c 'import os; the_arg = os.environ[\"ORC_PARAM_the_arg\"]; print(f\"Hello, {the_arg}\")' \>&2",
"docker_image": "docker.io/library/python:3.11"
},
"args": [
{
"name": "the_arg",
"src_type": "step_output",
"src_ref": "assert_foo_bar.key1"
}
],
"secrets": [
{
"key": "YT_TOKEN",
"value_ref": "YT_TOKEN"
}
],
"depends_on": [
"assert_foo_bar"
]
}

max_retries

Defines how many times the step will be run in case of failure. Default is 0

min_retry_interval_seconds

How many seconds (at least) should pass before the next retry will happen.

cache

Enables step caching. If step key is found in cache, its output will be taken from there and the step will not be executed. Cache key consist of task_type, task_params, args and cache_version.

"cache": {
"enable": true, # means "write result from cache and take result from cache" - the same as `enable_write` and `enable_read` both turned on.
"enable_write": null, # only write to cache, do not take result from there
"enable_read": null, # take result from cache, but do not update cache
"cache_version": "v1" # change it if you need to invalidate cache record
}

for_each

Having a list input argument LST_ARG, you can run N instances (where N = length of LST_ARG) of the step, and each of these substeps will receive its own element of LST_ARG. Outputs of the substeps will be concated into a list.

- step_id: st1
task_type: docker
task_params:
docker_image: docker.io/library/python:3.11
command: "python3 -c 'import json, sys; print(json.dumps({\"key_list\": [\"litem1\", \"litem2\"]}), file=sys.stderr)'"
outputs:
- name: key_list
- step_id: st2:
for_each:
loop_arg_name: for_each_arg
task_type: docker
task_params:
docker_image: docker.io/library/python:3.11
command: "python3 -c 'import json, os, sys; for_each = os.environ[\"ORC_PARAM_for_each_arg\"]; print(json.dumps({\"some_output\": f\"{for_each}_42\"}), file=sys.stderr)'"
args:
- name: for_each_arg
src_type: step_output
src_ref: st1.key_list
outputs:
- name: some_output
depends_on:
- st1

In this example some_output will contain ["litem1_42", "litem2_42"]

yt_pool_settings

Defines what YT pool will be used to execute the workflow and its tasks.

"yt_pool_settings": {
"pool": "my_pool",
"secret_ref": "//path/to/secret/store:key_with_token",
"pool_tree": null
}

Token in secret_ref must have access to the specified pool. If pool_tree is not set, default will be used. If you specify pool_tree, pool also must be set.

Task types

docker

Runs a command in a container. Available task params:

  • docker_image - the image code will be run in
  • command - the command to run
  • env - map of additional environment variables

Nuances:

  • secrets are available in environment with prefix YT_SECURE_VAULT_. So if you specify a secret with key my_secret, it will be available in env as YT_SECURE_VAULT_my_secret
  • use only stderr for your code output. The easiest way is to add \>&2 at the end of the command, for example:
command: python3 -c 'print("hello world")' \>&2

notebook

Runs a jupyter notebook stored in YT. Available task params:

  • yt_jupyter_kernel - name of the jupyter kernel to run the notebook on
  • notebook_path - path to the notebook

python_code

Similar to docker, but runs python code (defined in code parameter) with default python3 interpreter from specified container.

Workflow-level parameters

Workflow-level parameters can be used in different steps. Values can be set when creating a new run; if a value is not set, a default value will be used (null, if not specified in workflow config).

workflow_params:
- name: my_param_1
default_value: 123
- name: my_param_2 # =\> default_value: null

Press "Run with params" to set required values.

These params now can be used in step args:

args:
- name: my_arg
src_type: workflow_param
src_ref: my_param_1

Command line interface

Installation

pip install orchestracto-client

Usage

YT_PROXY and YT_TOKEN env vars must be set.

Creating or updating workflows:

$ orc workflow update --wf-path //path/to/workflow --from-file /local/path/to/workflow/config.yaml

$ orc workflow update --wf-path //path/to/workflow --from-file /local/path/to/workflow/config.json --input-format json

Creating runs:

$ orc run create --wf-path //path/to/workflow
run_id: dc8f2600-972441b2-82331579-5654fcb1

$ orc run create --wf-path //path/to/workflow --label mylbl
run_id: 2c8fe31b-62ad446d-a4716d54-861ca02c

$ orc run create --wf-path //path/to/workflow --label mylbl --label anotherlbl
run_id: f11b8da4-75974a2c-86a594e9-5a4e481a

Getting runs:

$ orc workflow get-runs --wf-path //path/to/workflow --label mylbl --label anotherlbl
- created_at: '2025-01-22T11:36:58Z'
finished_at: null
labels:
- mylbl
- anotherlbl
run_id: f11b8da4-75974a2c-86a594e9-5a4e481a
stage: to_do
trigger_type: one_time_run
workflow_path: //path/to/workflow

$ orc --format json workflow get-runs --wf-path //path/to/workflow --start-dt 2025-01-21T18:40:00Z --end-dt 2025-01-21T18:50:00Z | jq -r '.[] | "\(.stage) \(.created_at)"'
done 2025-01-21T18:46:05Z

$ orc --format json run get --wf-path //path/to/workflow --run-id 9ae01f21-16784aea-8d48bba3-33404adf | jq -r .yt_operation_id
3c17e72-fa9c3a2-270703e8-4007351d

Getting run logs:

$ orc run get-logs --wf-path //path/to/workflow --run-id ab3b0734-da174153-8bc6c6be-ced658db | tail -n 5
ts=2025-01-15 16:21:55.226 name=orc.wf_executor.executor level=INFO run_id=ab3b0734-da174153-8bc6c6be-ced658db msg=Handling running steps
ts=2025-01-15 16:21:55.238 name=orc.wf_executor.executor level=INFO run_id=ab3b0734-da174153-8bc6c6be-ced658db msg=Checking running step step_2_2
ts=2025-01-15 16:21:55.491 name=orc.wf_executor.executor level=INFO run_id=ab3b0734-da174153-8bc6c6be-ced658db msg=step step_2_2 finished: failed=False
ts=2025-01-15 16:21:58.518 name=orc.wf_executor.executor level=INFO run_id=ab3b0734-da174153-8bc6c6be-ced658db msg=All steps have been done

Restart failed steps of a finished run:

$ orc run restart --wf-path //path/to/workflow --run-id 95a81a56-186f420d-a037d6ad-81a54fd8

Restart all steps:

$ orc run restart --wf-path //path/to/workflow --run-id 95a81a56-186f420d-a037d6ad-81a54fd8 --restart-successful-steps

Restart failed and specified steps (and their descendants):

$ orc run restart --wf-path //path/to/workflow --run-id 95a81a56-186f420d-a037d6ad-81a54fd8 --restart-step st_3-1 --restart-step st_3-2

Workflow cancellation

Press "Cancel" on running workflow to stop its execution. All running steps will also be stopped. SDK tasks (built with sdk \>= 0.0.15) and docker tasks (with command call via exec) can handle SIGTERM signal, they are given 30 seconds (and after that process will receive SIGKILL).

Python SDK

Available on pypi

Some examples are available on Github

Run WF_BASE_PATH=//home/some_map_node YT_PROXY=... YT_TOKEN=... orc sdk process ./orchestracto/example_yt/example.py - it will create workflow config with required docker images and upload them to cypress. Create required secret stores (in this case - //home/some_map_node/secrets) in advance