data-preparation-notebook
When is it more likely to see a running squirrel in Central Park, New York? Let's find out, along with an example of data preparation on YTsaurus.
This notebook demonstrates:
- How to use
map,reduce,sort, andmapreduceoperations on schematized and non-schematized tables - How to use YTsaurus to transform unstructured data into structured data
- How to process Date type on YT
At the end of this example, we will find out when it is more likely to encounter a running squirrel in Central Park, New York.
from yt import wrapper as yt
from yt import type_info
import uuid
import re
import datetime
import time
from typing import Iterable
from collections import defaultdict
Create a base directory for examples
username = yt.get_user_name()
if yt.exists(f"//sys/users/{username}/@user_info/home_path"):
home = yt.get(f"//sys/users/{username}/@user_info/home_path")
working_dir = f"{home}/{uuid.uuid4().hex}"
else:
working_dir = f"//tmp/examples/{uuid.uuid4().hex}"
yt.create("map_node", working_dir)
print(working_dir)
Dataset preparation
Let use //home/samples/squirrels-hectare-data. This dataset contains environmental data related to each of the 350 “countable” hectares of Central Park. Examples include weather, litter, animals sighted, and human density.
This dataset has several problems:
- Date has a non-standard format.
- Columns
other_animals_sightingsis unstructured. - Weather data is also unstructed. Let's extract the temperature and structure weather description.
Extract weather data
Request for original dataset size. We are going to use this data to estimate the proportion of parsed values.
dataset_size = yt.get("//home/samples/squirrels-hectare-data/@row_count")
print(dataset_size)
Looking at the dataset, we can notice some facts:
- Temperature data is at the beginning of the record
- Temperature can be indicated in either Fahrenheit or Celsius
- Typically the data is separated by a comma
This way we can iteratively apply our parsing function in the map operation and evaluate the records that could not be parsed. Since there are few records in the dataset, we can read them and watch them in this notebook.
F_TEMP_REGEXP = re.compile(r"^~?(\d+\.?\d*)(-\d+)?\s*[°º]?\s*[fF]")
C_TEMP_REGEXP = re.compile(r"^~?(\d+\.?\d*)\s*[°º]?\s*[cC]")
F2_TEMP_REGEXP = re.compile(r"(\d+\.?\d*)[s|ish]")
def f_to_c(temp: int) -> int:
return round((5 / 9) * (temp - 32))
def str_to_int(value: str) -> int:
return round(float(value))
def parse_weather_data(raw_weather_data: str | None) -> tuple[int, list[str]]:
if raw_weather_data is None:
return None, []
weather_data_parts = [part.strip(" ").lower() for part in raw_weather_data.split(",")]
if len(weather_data_parts) == 0:
None, []
maybe_temp = weather_data_parts[0]
f_match = F_TEMP_REGEXP.search(maybe_temp)
if f_match:
return f_to_c(str_to_int(f_match.group(1))), weather_data_parts[1:]
f2_match = F2_TEMP_REGEXP.search(maybe_temp)
if f2_match:
return f_to_c(str_to_int(f2_match.group(1))), weather_data_parts[1:]
c_match = C_TEMP_REGEXP.search(maybe_temp)
if c_match:
return str_to_int(c_match.group(1)), weather_data_parts[1:]
return None, weather_data_parts
Map operation for testing parsiong function
Let's run map operation
def filter_records_without_temperature(record: dict) -> Iterable[dict]:
temp, weather_data = parse_weather_data(record["sighter_observed_weather_data"])
if not temp:
yield {"sighter_observed_weather_data": record["sighter_observed_weather_data"]}
yt.run_map(
filter_records_without_temperature,
source_table="//home/samples/squirrels-hectare-data",
destination_table=f"{working_dir}/records_without_temperature",
)
records = [record for record in yt.read_table(f"{working_dir}/records_without_temperature")]
filtered_count = yt.get(f"{working_dir}/records_without_temperature/@row_count")
print(f"{filtered_count / dataset_size * 100}%")
for record in records:
print(record)
We can verify that there is no more unparsed temperature data. The proportion of undefined temperature is 10%, let's consider it acceptable for demonstration.
Prepare dataset
Dates on YTsaurus are presented as days from 01-01-1970 (like unittime, but days) -> Dates have Int type. For simplicity, we will not use data schematization at this stage, except for some important columns.
In the next step we plan to join this data with another dataset, so to avoid problems with implicit type casting, we create an explicit non-strict schema only for three columns:
- date
- hectare
- shift
YTsaurus operation also can be implemented as python classes.
REMOVE_BRACKETS_REGEXP = re.compile(r"\(.*?\)")
class HectareDataCanonizer:
def _canonize_date(self, date: str) -> int:
day, month, year = date[2:4], date[:2], date[4:8]
date_str = f"{year}-{month}-{day}"
date_obj = datetime.datetime.strptime(date_str, '%Y-%m-%d')
unix_days = int((date_obj.date() - datetime.date.fromtimestamp(0)).days)
return unix_days
def _canonize_other_animals(self, other_animals_sightings: str | None) -> list[str]:
if not other_animals_sightings:
return []
return [REMOVE_BRACKETS_REGEXP.sub("", r).strip(" ").lower() for r in other_animals_sightings.split(",")]
def __call__(self, record: dict) -> Iterable[dict]:
record["date"] = self._canonize_date(record["date"])
temperature, weather_data = parse_weather_data(record["sighter_observed_weather_data"])
record["temperature_celsius"] = temperature
record["weather_data"] = weather_data
record["other_animals_sightings"] = self._canonize_other_animals(record["other_animals_sightings"])
yield record
canonized_squirrels_hectare_data = f"{working_dir}/hectare_data"
schema = yt.schema.TableSchema(strict=False)
schema.add_column("date", type_info.Date)
schema.add_column("hectare", type_info.String)
schema.add_column("shift", type_info.String)
yt.create("table", canonized_squirrels_hectare_data, force=True, attributes={"schema": schema.to_yson_type()})
yt.run_map(
HectareDataCanonizer(),
source_table="//home/samples/squirrels-hectare-data",
destination_table=canonized_squirrels_hectare_data,
)
Verify dataset
We have a dataset from the same authors, that contains squirrel data for each of the 3,023 sightings, including location coordinates, age, primary and secondary fur color, elevation, activities, communications, and interactions between squirrels and humans.
We can use this data for:
- Verifying our current dataset
- Creating a new dataset that includes data from both of them
Since we will be using the reduce operation, we have to sort the table by the keys.
yt.run_sort(
source_table=canonized_squirrels_hectare_data,
destination_table=canonized_squirrels_hectare_data,
sort_by=["date", "hectare", "shift"],
)
Let's count how many squirrels were seen every day in the first dataset. Let's use reduce operation
def sum_squirrels_by_date_hectare(key: dict[str, int], records: Iterable[dict]):
squirrels = 0
for record in records:
squirrels += record["number_of_squirrels"]
yield {"date": int(key["date"]), "squirrels": squirrels}
squirrels_by_date_hectare = f"{working_dir}/squirrels_by_date_hectare"
yt.run_reduce(
sum_squirrels_by_date_hectare,
source_table=canonized_squirrels_hectare_data,
destination_table=squirrels_by_date_hectare,
reduce_by=["date"],
)
yt.run_sort(
source_table=squirrels_by_date_hectare,
destination_table=squirrels_by_date_hectare,
sort_by=["date"],
)
Let's count how many squirrels were seen every day in the second dataset.
sorted_squirrels_data = f"{working_dir}/sorted_squirrels_data"
yt.run_sort(
source_table="//home/samples/squirrels",
destination_table=sorted_squirrels_data,
sort_by=["date", "hectare", "shift"],
)
def sum_squirrels_by_date_squirrels(key: dict[str, int], records: Iterable[dict]):
squirrels = []
for record in records:
squirrels.append(record["squirrel_id"])
squirrels_count = len(squirrels)
yield {"date": int(key["date"]), "squirrels": squirrels_count}
squirrels_by_date_squirrels = f"{working_dir}/squirrels_by_date_squirrels"
yt.run_reduce(
sum_squirrels_by_date_squirrels,
source_table=sorted_squirrels_data,
destination_table=squirrels_by_date_squirrels,
reduce_by=["date"],
)
yt.run_sort(
source_table=squirrels_by_date_squirrels,
destination_table=squirrels_by_date_squirrels,
sort_by=["date", "hectare", "shift"],
)
Join tables
Now we can compare the data.
@yt.with_context
def reduce_compare_squirrels_count(key: dict[str, str], records: Iterable[dict], context: yt.schema.Context):
count_by_table = {}
for record in records:
table_index = context.table_index
assert table_index not in count_by_table
count_by_table[table_index] = record["squirrels"]
if count_by_table.get(0) != count_by_table.get(1):
yield {"date": key["date"], "squirrels_data": count_by_table.get(0), "hectare_data": count_by_table.get(1)}
squirrels_count_diff = f"{working_dir}/squirrels_count_diff"
yt.run_reduce(
reduce_compare_squirrels_count,
source_table=[squirrels_by_date_squirrels, squirrels_by_date_hectare],
destination_table=squirrels_count_diff,
reduce_by=["date"],
)
for record in yt.read_table(squirrels_count_diff):
print(record)
We can see that the data is not the same on only one day and differs by 1. For the demo example, we consider this result acceptable.
Make new dataset
Use destination table with schema
We can secribe table's schema as yt_dataclass and reuse this object in next steps.
from typing import Optional, Any
@yt.yt_dataclass
class JoinedDatasetRow:
date: yt.schema.Date
hectare: str
shift: str
other_animals_sightings: list[str]
temperature_celsius: Optional[int]
weather_data: list[str]
age: str
squirrel_id: str
running: bool
chasing: bool
climbing: bool
eating: bool
foraging: bool
kuks: bool
quaas: bool
moans: bool
tail_flags: bool
tail_twitches: bool
approaches: bool
indifferent: bool
runs_from: bool
joined_dataset = f"{working_dir}/joined_dataset"
yt.create("table", joined_dataset, force=True, attributes={"schema": yt.schema.TableSchema.from_row_type(JoinedDatasetRow)})
@yt.with_context
def reduce_join(key: dict[str, str], records: Iterable[dict[str, Any]], context: yt.schema.Context) -> Iterable:
squirrels: list[SquirrelsRow] = []
other_animals_sightings = set()
temperature_celsius = None
weather_data = set()
for record in records:
if context.table_index == 0:
squirrels.append(record)
elif context.table_index == 1:
other_animals_sightings.update(set(record["other_animals_sightings"]))
temperature_celsius = record["temperature_celsius"] if temperature_celsius is None else (temperature_celsius + record["temperature_celsius"]) / 2
weather_data.update(set(record["weather_data"]))
temperature_celsius = round(temperature_celsius) if temperature_celsius is not None else None
weather_data = list(weather_data)
other_animals_sightings = list(other_animals_sightings)
for squirrel in squirrels:
yield dict(
date=key["date"],
hectare=key["hectare"],
shift=key["shift"],
other_animals_sightings=other_animals_sightings,
temperature_celsius=temperature_celsius,
weather_data=weather_data,
age=squirrel["age"],
squirrel_id=squirrel["squirrel_id"],
running=squirrel["running"],
chasing=squirrel["chasing"],
climbing=squirrel["climbing"],
eating=squirrel["eating"],
foraging=squirrel["foraging"],
kuks=squirrel["kuks"],
quaas=squirrel["quaas"],
moans=squirrel["moans"],
tail_flags=squirrel["tail_flags"],
tail_twitches=squirrel["tail_twitches"],
approaches=squirrel["approaches"],
indifferent=squirrel["indifferent"],
runs_from=squirrel["runs_from"],
)
yt.run_reduce(
reduce_join,
source_table=[sorted_squirrels_data, canonized_squirrels_hectare_data],
destination_table=joined_dataset,
reduce_by=["date", "hectare", "shift"],
)
Running squirrels
Now we can find out when it was more likely to see a running squirrel in Central Park, New York, in October 2018 - on cold or warm days using our new dataset. Let's do this using mapreduce operation. We will consider days with temperatures >= 15 as warm days and temperatures < 15 as cold days.
@yt.yt_dataclass
class RunningIsColdRow:
temperature: str
is_running: bool
@yt.yt_dataclass
class RunningIsColdResultRow:
temperature: str
is_running: int
not_running: int
class RunningIsColdMapper(yt.TypedJob):
def __call__(self, record: JoinedDatasetRow) -> Iterable[RunningIsColdRow]:
if record.temperature_celsius is None:
return
yield RunningIsColdRow(
is_running=record.running,
temperature="total",
)
yield RunningIsColdRow(
is_running=record.running,
temperature="cold" if (record.temperature_celsius < 15) else "not_cold",
)
class RunningIsColdReducer(yt.TypedJob):
def __call__(self, records: yt.schema.RowIterator[RunningIsColdRow]) -> Iterable[RunningIsColdResultRow]:
is_running = 0
not_running = 0
for record in records:
if record.is_running:
is_running += 1
else:
not_running += 1
yield RunningIsColdResultRow(
is_running=is_running,
not_running=not_running,
temperature=record.temperature,
)
running_squirrels = f"{working_dir}/running_squirrels"
yt.run_map_reduce(
mapper=RunningIsColdMapper(),
reducer=RunningIsColdReducer(),
source_table=joined_dataset,
destination_table=running_squirrels,
reduce_by=["temperature"],
)
for line in yt.read_table(running_squirrels):
print(line)
print("Cold: ", 296 / (296 + 799))
print("Warm: ", 368 / (368 + 1264))
We can see that the proportion of contacts with running squirrels was higher on cold days. Let's use Chi-squared test to verify it.
!pip install scipy
from scipy.stats import chi2_contingency
observed = [
[296, 799],
[368, 1264],
]
chi2, p, dof, expected = chi2_contingency(observed)
p < 0.05
Therefore, in cold days squirrels run more.
Therefore, we see that in cold days of October 2018, it was more likely to see a running squirrel in Central Park, New York, than on warm days.