Pandas utilities

utilites for auxiliary pandas DataFrame processing

Dataframe for state, action, reward, next_state


source

assemble_state_ser

 assemble_state_ser (state_columns:pandas.core.frame.DataFrame,
                     tz:zoneinfo.ZoneInfo)

*assemble state df from state_columns dataframe order is vital for the model

inputs:

state_columns: pd.DataFrame

“timestep, velocity, thrust, brake” contiguous storage in each measurement due to sort_index, output: [col0: brake, col1: thrust, col2: timestep, col3: velocity]

return:

state: pd.Series
table_row_start: int*
Type Details
state_columns DataFrame state_columns: Dataframe with columns [‘timestep’, ‘velocity’, ‘thrust’, ‘brake’]
tz ZoneInfo timezone for the timestamp
Returns Tuple
assemble_state_ser(state, tz)[0]  # just showd the Dataframe, ignore row_start (its's 0)
assert assemble_state_ser(state, tz)[1] == 0  # row_start should be 0
assert isinstance(assemble_state_ser(state, tz)[0], pd.Series) == True
from fastcore.test import *
test_eq(isinstance(assemble_state_ser(state, tz)[0], pd.Series), True)  # use fastcore testing utils

source

assemble_reward_ser

 assemble_reward_ser (power_columns:pandas.core.frame.DataFrame,
                      obs_sampling_rate:int, ts)

assemble reward df from motion_power df order is vital for the model: contiguous storage in each row, due to sort_index, output: power_columns: [‘current’, ‘voltage’] [timestep, work]


source

assemble_flash_table

 assemble_flash_table (torque_map_line:numpy.ndarray, table_start:int,
                       torque_table_row_num_flash:int,
                       torque_table_col_num:int, speed_scale:tuple,
                       pedal_scale:tuple)

generate flash table df from torque_map_line order is vital for the model: contiguous storage in each row, due to sort_index, output: “r0, r1, r2, r3, …, speed, throttle(map),timestep”


source

assemble_action_ser

 assemble_action_ser (torque_map_line:numpy.ndarray,
                      torque_table_row_names:list[str], table_start:int, f
                      lash_start_ts:pandas._libs.tslibs.timestamps.Timesta
                      mp, flash_end_ts:pandas._libs.tslibs.timestamps.Time
                      stamp, torque_table_row_num_flash:int,
                      torque_table_col_num:int, speed_scale:tuple,
                      pedal_scale:tuple, tz:zoneinfo.ZoneInfo)

generate action df from torque_map_line order is vital for the model: contiguous storage in each row, due to sort_index, output: “r0, r1, r2, r3, …, speed, throttle(map),timestep”

df["action"]
c = df["action", "timestep", 0].values
c
df["action", "timestep"].iloc[0].values
action_ser = action['action'].iloc[0]
action_ser.name = "action"
action_ser
# state = df['state']["timestep"]
# state["timestep"].values
actn = df["action"].iloc[0]
actn["r0"].values
## The construction of DF by raw values will lose timezone information
# So you have to alway not directly build Dataframe from numpy array values
action1 = pd.DataFrame(
    [actn["r0"].values, 
     actn["r1"].values, 
     actn["r2"].values, 
     actn["speed"].values,
     actn["throttle"].values, 
     actn["timestep"].values]
).T
action1.columns = ["r0", "r1", "r2", "speed", "throttle", "timestep"]
action1
torque_table_row_names = ["r0", "r1", "r2"]
table_start = 4
torque_table_row_num_flash = 3
torque_table_col_num = 5
speed_scale = (0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120)
pedal_scale = (0, 0.25, 0.5, 0.75, 1.0)
tz = ZoneInfo("Asia/Shanghai")

# state = df['state'].stack
ser_action = assemble_action_ser(
    torque_table_line,
    torque_table_row_names,
    table_start,
    flash_start_ts,
    flash_end_ts,
    torque_table_row_num_flash,
    torque_table_col_num,
    speed_scale,
    pedal_scale,
    tz)
assert assemble_state_ser(state, tz)[1] == 0  # row_start should be 0
assert isinstance(assemble_state_ser(state, tz)[0], pd.Series) == True
from fastcore.test import *
test_eq(isinstance(assemble_state_ser(state, tz)[0], pd.Series), True)  # use fastcore testing utils

source

nest

 nest (d:dict)

Convert a flat dictionary with tuple key to a nested dictionary through to the leaves arrays will be converted to dictionaries with the index as the key no conversion of pd.Timestamp only for use in mongo records


source

df_to_nested_dict

 df_to_nested_dict (df_multi_indexed_col:pandas.core.frame.DataFrame)

Convert a dataframe with multi-indexed columns to a nested dictionary


source

eos_df_to_nested_dict

 eos_df_to_nested_dict (episode:pandas.core.frame.DataFrame)

Convert an eos dataframe with multi-indexed columns to a nested dictionary Remove all the levels of the multi-indexed columns except for ‘timestamp’ Keep only the timestamp as the single key for the nested dictionary


source

ep_nest

 ep_nest (d:Dict)

Convert a flat dictionary with tuple key to a nested dictionary with arrays at the leaves convert pd.Timestamp to millisecond long integer Timestamp with zoneinfo will be converted to UTC and then to millisecond long integer


source

df_to_ep_nested_dict

 df_to_ep_nested_dict (df_multi_indexed_col:pandas.core.frame.DataFrame)

Convert a dataframe with multi-indexed columns to a nested dictionary


source

avro_ep_encoding

 avro_ep_encoding (episode:pandas.core.frame.DataFrame)

*avro encoding, parsing requires a schema defined in “data_io/pool/episode_avro_schema.py”

Convert an eos dataframe with multi-indexed columns to a nested dictionary Remove all the levels of the multi-indexed columns except for ‘timestamp’ Keep only the timestamp as the single key for the nested dictionary ! Convert Timestamp to millisecond long integer!! for compliance to the avro storage format ! Timestamp with ZoneInfo will be converted to UTC and then to millisecond long integer as flat as possible PEP20: flat is better than nested!*


source

avro_ep_decoding

 avro_ep_decoding (episodes:list[typing.Dict],
                   tz_info:Optional[zoneinfo.ZoneInfo])

*avro decoding,

Convert a list of nested dictionaries to DataFrame with multi-indexed columns and index ! Convert microsecond long integer to Timestamp! (avro storage format stores timestamp as long integer in keys but seem to have DateTime with timezone in the values.)

Apache Avro store datetime/timestamp as timezone unaware (default as UTC) Therefore, we need tz info either in the metadata or elsewhere to designate the timezone

sort the column order*


source

decode_mongo_records

 decode_mongo_records (df:pandas.core.frame.DataFrame,
                       torque_table_row_names:list[str])

decoding the batch RECORD observations from mongodb nested dicts to pandas dataframe (EPISODE doesn’t need decoding, it is already a dataframe) TODO need to check whether sort_index is necessary


source

decode_mongo_episodes

 decode_mongo_episodes (df:pandas.core.frame.DataFrame)

decoding the batch RECORD observations from mongodb nested dicts to pandas dataframe (EPISODE doesn’t need decoding, it is already a dataframe) TODO need to check whether sort_index is necessary


source

encode_dataframe_from_parquet

 encode_dataframe_from_parquet (df:pandas.core.frame.DataFrame)

decode the dataframe from parquet with flat column indices to MultiIndexed DataFrame


source

decode_episode_batch_to_padded_arrays

 decode_episode_batch_to_padded_arrays
                                        (episodes:pandas.core.frame.DataFr
                                        ame,
                                        torque_table_row_names:list[str],
                                        padding_value:float=-10000.0)

*decode the dataframes to 3D numpy arrays [B, T, F] for states, actions, rewards, next_states episodes with variable lengths will turn into ragged arrays with the same raggedness, thus the same maximum length after padding the arrays will have the same shape and padding pattern.

episodes are not sorted and its internal index keeps the index order of the original episodes, not interleaved! idx_len_list: list of lengths of each episode in the batch, use explicit segmentation to avoid the bug, when the batch has duplicated episodes*


source

encode_episode_dataframe_from_series

 encode_episode_dataframe_from_series
                                       (observations:List[pandas.core.seri
                                       es.Series],
                                       torque_table_row_names:List[str],
                                       episode_start_dt:datetime.datetime,
                                       driver_str:str, truck_str:str)

encode the list of observations as a dataframe with multi-indexed columns


source

recover_episodestart_tzinfo_from_timestamp

 recover_episodestart_tzinfo_from_timestamp
                                             (ts:pandas._libs.tslibs.times
                                             tamps.Timestamp,
                                             tzinfo:zoneinfo.ZoneInfo)

recover the timezone information from the parquet folder name string