0] # just showd the Dataframe, ignore row_start (its's 0) assemble_state_ser(state, tz)[
Pandas utilities
Dataframe for state, action, reward, next_state
assemble_state_ser
assemble_state_ser (state_columns:pandas.core.frame.DataFrame, tz:zoneinfo.ZoneInfo)
*assemble state df from state_columns dataframe order is vital for the model
inputs:
state_columns: pd.DataFrame
“timestep, velocity, thrust, brake” contiguous storage in each measurement due to sort_index, output: [col0: brake, col1: thrust, col2: timestep, col3: velocity]
return:
state: pd.Series
table_row_start: int*
Type | Details | |
---|---|---|
state_columns | DataFrame | state_columns: Dataframe with columns [‘timestep’, ‘velocity’, ‘thrust’, ‘brake’] |
tz | ZoneInfo | timezone for the timestamp |
Returns | Tuple |
assert assemble_state_ser(state, tz)[1] == 0 # row_start should be 0
assert isinstance(assemble_state_ser(state, tz)[0], pd.Series) == True
from fastcore.test import *
isinstance(assemble_state_ser(state, tz)[0], pd.Series), True) # use fastcore testing utils test_eq(
assemble_reward_ser
assemble_reward_ser (power_columns:pandas.core.frame.DataFrame, obs_sampling_rate:int, ts)
assemble reward df from motion_power df order is vital for the model: contiguous storage in each row, due to sort_index, output: power_columns: [‘current’, ‘voltage’] [timestep, work]
assemble_flash_table
assemble_flash_table (torque_map_line:numpy.ndarray, table_start:int, torque_table_row_num_flash:int, torque_table_col_num:int, speed_scale:tuple, pedal_scale:tuple)
generate flash table df from torque_map_line order is vital for the model: contiguous storage in each row, due to sort_index, output: “r0, r1, r2, r3, …, speed, throttle(map),timestep”
assemble_action_ser
assemble_action_ser (torque_map_line:numpy.ndarray, torque_table_row_names:list[str], table_start:int, f lash_start_ts:pandas._libs.tslibs.timestamps.Timesta mp, flash_end_ts:pandas._libs.tslibs.timestamps.Time stamp, torque_table_row_num_flash:int, torque_table_col_num:int, speed_scale:tuple, pedal_scale:tuple, tz:zoneinfo.ZoneInfo)
generate action df from torque_map_line order is vital for the model: contiguous storage in each row, due to sort_index, output: “r0, r1, r2, r3, …, speed, throttle(map),timestep”
"action"] df[
= df["action", "timestep", 0].values
c c
"action", "timestep"].iloc[0].values df[
= action['action'].iloc[0]
action_ser = "action"
action_ser.name action_ser
# state = df['state']["timestep"]
# state["timestep"].values
= df["action"].iloc[0]
actn "r0"].values actn[
## The construction of DF by raw values will lose timezone information
# So you have to alway not directly build Dataframe from numpy array values
= pd.DataFrame(
action1 "r0"].values,
[actn["r1"].values,
actn["r2"].values,
actn["speed"].values,
actn["throttle"].values,
actn["timestep"].values]
actn[
).T= ["r0", "r1", "r2", "speed", "throttle", "timestep"]
action1.columns action1
= ["r0", "r1", "r2"]
torque_table_row_names = 4
table_start = 3
torque_table_row_num_flash = 5
torque_table_col_num = (0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120)
speed_scale = (0, 0.25, 0.5, 0.75, 1.0)
pedal_scale = ZoneInfo("Asia/Shanghai")
tz
# state = df['state'].stack
= assemble_action_ser(
ser_action
torque_table_line,
torque_table_row_names,
table_start,
flash_start_ts,
flash_end_ts,
torque_table_row_num_flash,
torque_table_col_num,
speed_scale,
pedal_scale, tz)
assert assemble_state_ser(state, tz)[1] == 0 # row_start should be 0
assert isinstance(assemble_state_ser(state, tz)[0], pd.Series) == True
from fastcore.test import *
isinstance(assemble_state_ser(state, tz)[0], pd.Series), True) # use fastcore testing utils test_eq(
nest
nest (d:dict)
Convert a flat dictionary with tuple key to a nested dictionary through to the leaves arrays will be converted to dictionaries with the index as the key no conversion of pd.Timestamp only for use in mongo records
df_to_nested_dict
df_to_nested_dict (df_multi_indexed_col:pandas.core.frame.DataFrame)
Convert a dataframe with multi-indexed columns to a nested dictionary
eos_df_to_nested_dict
eos_df_to_nested_dict (episode:pandas.core.frame.DataFrame)
Convert an eos dataframe with multi-indexed columns to a nested dictionary Remove all the levels of the multi-indexed columns except for ‘timestamp’ Keep only the timestamp as the single key for the nested dictionary
ep_nest
ep_nest (d:Dict)
Convert a flat dictionary with tuple key to a nested dictionary with arrays at the leaves convert pd.Timestamp to millisecond long integer Timestamp with zoneinfo will be converted to UTC and then to millisecond long integer
df_to_ep_nested_dict
df_to_ep_nested_dict (df_multi_indexed_col:pandas.core.frame.DataFrame)
Convert a dataframe with multi-indexed columns to a nested dictionary
avro_ep_encoding
avro_ep_encoding (episode:pandas.core.frame.DataFrame)
*avro encoding, parsing requires a schema defined in “data_io/pool/episode_avro_schema.py”
Convert an eos dataframe with multi-indexed columns to a nested dictionary Remove all the levels of the multi-indexed columns except for ‘timestamp’ Keep only the timestamp as the single key for the nested dictionary ! Convert Timestamp to millisecond long integer!! for compliance to the avro storage format ! Timestamp with ZoneInfo will be converted to UTC and then to millisecond long integer as flat as possible PEP20: flat is better than nested!*
avro_ep_decoding
avro_ep_decoding (episodes:list[typing.Dict], tz_info:Optional[zoneinfo.ZoneInfo])
*avro decoding,
Convert a list of nested dictionaries to DataFrame with multi-indexed columns and index ! Convert microsecond long integer to Timestamp! (avro storage format stores timestamp as long integer in keys but seem to have DateTime with timezone in the values.)
Apache Avro store datetime/timestamp as timezone unaware (default as UTC) Therefore, we need tz info either in the metadata or elsewhere to designate the timezone
sort the column order*
decode_mongo_records
decode_mongo_records (df:pandas.core.frame.DataFrame, torque_table_row_names:list[str])
decoding the batch RECORD observations from mongodb nested dicts to pandas dataframe (EPISODE doesn’t need decoding, it is already a dataframe) TODO need to check whether sort_index is necessary
decode_mongo_episodes
decode_mongo_episodes (df:pandas.core.frame.DataFrame)
decoding the batch RECORD observations from mongodb nested dicts to pandas dataframe (EPISODE doesn’t need decoding, it is already a dataframe) TODO need to check whether sort_index is necessary
encode_dataframe_from_parquet
encode_dataframe_from_parquet (df:pandas.core.frame.DataFrame)
decode the dataframe from parquet with flat column indices to MultiIndexed DataFrame
decode_episode_batch_to_padded_arrays
decode_episode_batch_to_padded_arrays (episodes:pandas.core.frame.DataFr ame, torque_table_row_names:list[str], padding_value:float=-10000.0)
*decode the dataframes to 3D numpy arrays [B, T, F] for states, actions, rewards, next_states episodes with variable lengths will turn into ragged arrays with the same raggedness, thus the same maximum length after padding the arrays will have the same shape and padding pattern.
episodes are not sorted and its internal index keeps the index order of the original episodes, not interleaved! idx_len_list: list of lengths of each episode in the batch, use explicit segmentation to avoid the bug, when the batch has duplicated episodes*
encode_episode_dataframe_from_series
encode_episode_dataframe_from_series (observations:List[pandas.core.seri es.Series], torque_table_row_names:List[str], episode_start_dt:datetime.datetime, driver_str:str, truck_str:str)
encode the list of observations as a dataframe with multi-indexed columns
recover_episodestart_tzinfo_from_timestamp
recover_episodestart_tzinfo_from_timestamp (ts:pandas._libs.tslibs.times tamps.Timestamp, tzinfo:zoneinfo.ZoneInfo)
recover the timezone information from the parquet folder name string