Upload and download YT tables
This notebook contains examples of how to download and upload tables on YTsaurus.
- How to work with tables without schema.
- How to create a table with schema and upload data.
- How to work with structured tables and yt_dataclass objects.
- How to upload datetime objects to Datetime columns.
from yt import wrapper as yt
from yt import type_info
from datetime import datetime, timedelta
from dataclasses import asdict
import uuid
import time
Create a base directory for examples
username = yt.get_user_name()
if yt.exists(f"//sys/users/{username}/@user_info/home_path"):
home = yt.get(f"//sys/users/{username}/@user_info/home_path")
working_dir = f"{home}/{uuid.uuid4().hex}"
else:
working_dir = f"//tmp/examples/{uuid.uuid4().hex}"
yt.create("map_node", working_dir)
print(working_dir)
Upload and download unstructured data
The easest way to upload data is using write_table method of ytClient. write_table creates a table without the schema. read_table automatically converts data from a table to primitive python types.
This method is easy to implement, but does not protect against some typical problems:
- Typos in column names in different records.
- Unexpected type conversions.
- Large tables without schemas use resources inefficiently.
records_unstractued = [
{
"field_string": f"string_{i}",
"field_float": i / 10,
"field_date": (datetime.now() - timedelta(days=i)).isoformat(),
"field_uint32": i,
}
for i in range(10)
]
print(records_unstractued)
unstructured_table_path = f"{working_dir}/unstructured_table"
yt.write_table(unstructured_table_path, records_unstractued)
for record in yt.read_table(unstructured_table_path):
print(record)
Create schema and upload unstructured data
In order to use schematized tables we can create a table with the strong schema and upload unstructured data. Schematization allows efficient use of resources and defines the table structure and column types.
schema = yt.schema.TableSchema()
schema.add_column("field_string", type_info.String)
schema.add_column("field_float", type_info.Float)
schema.add_column("field_date", type_info.Datetime)
schema.add_column("field_uint32", type_info.Uint32)
YTsaurus client doesn't have standard convertion mechanisms for datetime.datetime objects. Datetime columns expect uint32 values, so we have to convert datetime.datetime objects to python's int.
def datetime_to_unixtime(datetime_obj: datetime) -> int:
# workaround for https://github.com/ytsaurus/ytsaurus/issues/309
return int(time.mktime(datetime_obj.timetuple()))
records_handmade_schema = [
{
"field_string": f"string_{i}",
"field_float": i / 10,
"field_date": datetime_to_unixtime(datetime.now() - timedelta(days=i)),
"field_uint32": i,
}
for i in range(10)
]
print(records_unstractued)
handmade_schema_table_path = f"{working_dir}/handmade_schema_table"
yt.create_table(handmade_schema_table_path, attributes={"schema": schema.to_yson_type()})
yt.write_table(handmade_schema_table_path, records_handmade_schema)
yt.yt_dataclass obects for schematized data
YTsaurus sdk provides python-native tables representation. We can use yt_dataclass to represent the table's schema and to check the correspondence of python types to the table's schema. Documentation
@yt.yt_dataclass
class Nested:
a: str
b: int
@yt.yt_dataclass
class TableRow:
field_string: str
field_float: float
field_datetime: yt.schema.Datetime
field_uint32: yt.schema.Uint32
field_nested: Nested
records_dataclasses = [
TableRow(
field_string=f"string_{i}",
field_float=i / 10,
field_datetime=datetime_to_unixtime(datetime.now() - timedelta(days=i)),
field_uint32=i,
field_nested=Nested(
a=f"string_{i}",
b=i,
)
)
for i in range(10)
]
for record in records_dataclasses:
print(records_dataclasses)
dataclass_table_path = f"{working_dir}/dataclass_based_table"
yt.write_table_structured(dataclass_table_path, TableRow, records_dataclasses)
for record in yt.read_table_structured(dataclass_table_path, TableRow):
print(record)