orc-features
Trigger types
Cron
Run workflow every 10 minutes:
trigger_type: cron
params:
cron_expression: "*/10 * * * *"
Yt node update
Run workflow when //some/cypress/path is modified:
trigger_type: node_update
params:
node_path: "//some/cypress/path"
## Step params
### `step_id` (required)
Defines a unique (within workflow) identificator of the step. Start with a letter, use alphanumeric values and `_` for it.
### `task_type` (required)
Determines what kind of processor will be used to execute the step. See the list of available task types below.
### `task_params`
A map of parameters to be passed to the chosen task processor. String parameters can be rendered with args:
```json
"depends_on": ["prev_step"],
"args": [
{
"name": "image",
"src_type": "step_output",
"src_ref": "prev_step.docker_image"
}
],
"task_params": {
"docker_image": "{{ args.image }}",
"command": "echo hello \>&2"
}
depends_on
List of steps (identified with step_id) that must be successfully completed before the current step will be executed.
secrets
List of objects of the following structure:
key: \<string\>
value_src_type: secret_store | cypress_docker_creds | predefined
value_ref: \<string\>
key- the key to use to obtain the secret in the stepvalue_src_type- one of:secret_store- take the value from a secret store.value_refmust be of the following structure:\<cypress_path\>:\<key_in_secret_store\>. For example,//some/workflow:my_tokensecret_store_docker_creds- use it to authenticate in a docker registry fordockertask type. The secret store invalue_refmust containusername+passwordorauthkeyspredefined- ifvalue_refis set toYT_TOKEN, a default user's token will be available in the step (with predefined keyYT_TOKEN)
outputs
Steps can have output params. Those must be defined in outputs section of the step. They can be used as input arguments for children steps. Outputs must be printed to stderr in json format in the last line of the program's output.
{
"step_id": "assert_foo_bar",
"task_type": "docker",
"task_params": {
"env": {
"FOO": "BAR"
},
"command": "python3 -c 'import json, os; assert os.environ[\"FOO\"] == \"BAR\"; print(json.dumps({\"key1\": \"value1\"}))' \>&2",
"docker_image": "docker.io/library/python:3.11"
},
"max_retries": 3,
"min_retry_interval_seconds": 10,
"outputs": [
{
"name": "key1"
}
]
},
{
"step_id": "get_prev_step_out",
"task_type": "docker",
"task_params": {
"command": "python3 -c 'import os; the_arg = os.environ[\"ORC_PARAM_the_arg\"]; print(f\"Hello, {the_arg}\")' \>&2",
"docker_image": "docker.io/library/python:3.11"
},
"args": [
{
"name": "the_arg",
"src_type": "step_output",
"src_ref": "assert_foo_bar.key1"
}
],
"secrets": [
{
"key": "YT_TOKEN",
"value_ref": "YT_TOKEN"
}
],
"depends_on": [
"assert_foo_bar"
]
}
max_retries
Defines how many times the step will be run in case of failure. Default is 0
min_retry_interval_seconds
How many seconds (at least) should pass before the next retry will happen.
cache
Enables step caching. If step key is found in cache, its output will be taken from there and the step will not be executed. Cache key consist of task_type, task_params, args and cache_version.
"cache": {
"enable": true, # means "write result from cache and take result from cache" - the same as `enable_write` and `enable_read` both turned on.
"enable_write": null, # only write to cache, do not take result from there
"enable_read": null, # take result from cache, but do not update cache
"cache_version": "v1" # change it if you need to invalidate cache record
}
for_each
Having a list input argument LST_ARG, you can run N instances (where N = length of LST_ARG) of the step, and each of these substeps will receive its own element of LST_ARG. Outputs of the substeps will be concated into a list.
- step_id: st1
task_type: docker
task_params:
docker_image: docker.io/library/python:3.11
command: "python3 -c 'import json, sys; print(json.dumps({\"key_list\": [\"litem1\", \"litem2\"]}), file=sys.stderr)'"
outputs:
- name: key_list
- step_id: st2:
for_each:
loop_arg_name: for_each_arg
task_type: docker
task_params:
docker_image: docker.io/library/python:3.11
command: "python3 -c 'import json, os, sys; for_each = os.environ[\"ORC_PARAM_for_each_arg\"]; print(json.dumps({\"some_output\": f\"{for_each}_42\"}), file=sys.stderr)'"
args:
- name: for_each_arg
src_type: step_output
src_ref: st1.key_list
outputs:
- name: some_output
depends_on:
- st1
In this example some_output will contain ["litem1_42", "litem2_42"]
yt_pool_settings
Defines what YT pool will be used to execute the workflow and its tasks.
"yt_pool_settings": {
"pool": "my_pool",
"secret_ref": "//path/to/secret/store:key_with_token",
"pool_tree": null
}
Token in secret_ref must have access to the specified pool. If pool_tree is not set, default will be used. If you specify pool_tree, pool also must be set.
Task types
docker
Runs a command in a container. Available task params:
docker_image- the image code will be run incommand- the command to runenv- map of additional environment variables
Nuances:
- secrets are available in environment with prefix
YT_SECURE_VAULT_. So if you specify a secret with keymy_secret, it will be available in env asYT_SECURE_VAULT_my_secret - use only stderr for your code output. The easiest way is to add
\>&2at the end of the command, for example:
command: python3 -c 'print("hello world")' \>&2
notebook
Runs a jupyter notebook stored in YT. Available task params:
yt_jupyter_kernel- name of the jupyter kernel to run the notebook onnotebook_path- path to the notebook
python_code
Similar to docker, but runs python code (defined in code parameter) with default python3 interpreter from specified container.
Workflow-level parameters
Workflow-level parameters can be used in different steps. Values can be set when creating a new run; if a value is not set, a default value will be used (null, if not specified in workflow config).
workflow_params:
- name: my_param_1
default_value: 123
- name: my_param_2 # =\> default_value: null
Press "Run with params" to set required values.
These params now can be used in step args:
args:
- name: my_arg
src_type: workflow_param
src_ref: my_param_1
Command line interface
Installation
pip install orchestracto-client
Usage
YT_PROXY and YT_TOKEN env vars must be set.
Creating or updating workflows:
$ orc workflow update --wf-path //path/to/workflow --from-file /local/path/to/workflow/config.yaml
$ orc workflow update --wf-path //path/to/workflow --from-file /local/path/to/workflow/config.json --input-format json
Creating runs:
$ orc run create --wf-path //path/to/workflow
run_id: dc8f2600-972441b2-82331579-5654fcb1
$ orc run create --wf-path //path/to/workflow --label mylbl
run_id: 2c8fe31b-62ad446d-a4716d54-861ca02c
$ orc run create --wf-path //path/to/workflow --label mylbl --label anotherlbl
run_id: f11b8da4-75974a2c-86a594e9-5a4e481a
Getting runs:
$ orc workflow get-runs --wf-path //path/to/workflow --label mylbl --label anotherlbl
- created_at: '2025-01-22T11:36:58Z'
finished_at: null
labels:
- mylbl
- anotherlbl
run_id: f11b8da4-75974a2c-86a594e9-5a4e481a
stage: to_do
trigger_type: one_time_run
workflow_path: //path/to/workflow
$ orc --format json workflow get-runs --wf-path //path/to/workflow --start-dt 2025-01-21T18:40:00Z --end-dt 2025-01-21T18:50:00Z | jq -r '.[] | "\(.stage) \(.created_at)"'
done 2025-01-21T18:46:05Z
$ orc --format json run get --wf-path //path/to/workflow --run-id 9ae01f21-16784aea-8d48bba3-33404adf | jq -r .yt_operation_id
3c17e72-fa9c3a2-270703e8-4007351d
Getting run logs:
$ orc run get-logs --wf-path //path/to/workflow --run-id ab3b0734-da174153-8bc6c6be-ced658db | tail -n 5
ts=2025-01-15 16:21:55.226 name=orc.wf_executor.executor level=INFO run_id=ab3b0734-da174153-8bc6c6be-ced658db msg=Handling running steps
ts=2025-01-15 16:21:55.238 name=orc.wf_executor.executor level=INFO run_id=ab3b0734-da174153-8bc6c6be-ced658db msg=Checking running step step_2_2
ts=2025-01-15 16:21:55.491 name=orc.wf_executor.executor level=INFO run_id=ab3b0734-da174153-8bc6c6be-ced658db msg=step step_2_2 finished: failed=False
ts=2025-01-15 16:21:58.518 name=orc.wf_executor.executor level=INFO run_id=ab3b0734-da174153-8bc6c6be-ced658db msg=All steps have been done
Restart failed steps of a finished run:
$ orc run restart --wf-path //path/to/workflow --run-id 95a81a56-186f420d-a037d6ad-81a54fd8
Restart all steps:
$ orc run restart --wf-path //path/to/workflow --run-id 95a81a56-186f420d-a037d6ad-81a54fd8 --restart-successful-steps
Restart failed and specified steps (and their descendants):
$ orc run restart --wf-path //path/to/workflow --run-id 95a81a56-186f420d-a037d6ad-81a54fd8 --restart-step st_3-1 --restart-step st_3-2
Workflow cancellation
Press "Cancel" on running workflow to stop its execution. All running steps will also be stopped.
SDK tasks (built with sdk \>= 0.0.15) and docker tasks (with command call via exec) can handle SIGTERM signal, they are given 30 seconds (and after that process will receive SIGKILL).
Python SDK
Some examples are available on Github
Run WF_BASE_PATH=//home/some_map_node YT_PROXY=... YT_TOKEN=... orc sdk process ./orchestracto/example_yt/example.py - it will create workflow config with required docker images and upload them to cypress.
Create required secret stores (in this case - //home/some_map_node/secrets) in advance