Kvaser
Kvaser class
Kvaser
Kvaser (truck:tspace.config.vehicles.TruckInField, can_server:tspace.conf ig.messengers.CANMessenger=CANMessenger(server_name='can_udp_svc' , host='127.0.0.1', port='8002', protocol='udp'), driver:tspace.config.drivers.Driver, resume:bool=False, data_dir:Optional[pathlib.Path]=None, flash_count:int=0, episode_count:int=0, vcu_calib_table_row_start:int=0, torque_table_default:Optional[pandas.core.frame.DataFrame]=None, torque_table_live:Optional[pandas.core.frame.DataFrame]=None, epi_countdown_time:float=3.0, lock_watchdog:<built- infunctionallocate_lock>=<unlocked _thread.lock object at 0x7f9a997f6f80>, capture_failure_count:int=0, flash_failure_count:int=0, logger:Optional[logging.Logger]=None, dict_logger:Optional[dict]=None)
*Kvaser is local vehicle interface with Producer(get vehicle status) and Consumer(flasher)
Attributes:
truck: TruckInField
truck object
can_server: CANMessenger
can server object*
Kvaser.flash_vehicle
Kvaser.flash_vehicle (torque_table:pandas.core.frame.DataFrame)
flash the torque table to the vehicle via kvaser
Type | Details | |
---|---|---|
torque_table | DataFrame | the torque table to be flashed |
Returns | None |
Kvaser.init_internal_pipelines
Kvaser.init_internal_pipelines ()
initialize the internal pipelines for kvaser
Kvaser.produce
Kvaser.produce (raw_pipeline:tspace.dataflow.pipeline.deque.PipelineDQ[ty ping.Union[dict[str,str],dict[str,dict[str,list[typing.Un ion[str,list[list[str]]]]]]]], hmi_pipeline:Optional[tspa ce.dataflow.pipeline.queue.Pipeline[str]]=None, exit_event:Optional[threading.Event]=None)
produce data from kvaser and put into the pipeline
Type | Default | Details | |
---|---|---|---|
raw_pipeline | PipelineDQ | PipelineDQ[dict[str, str]], | |
hmi_pipeline | Optional | None | HMI pipeline |
exit_event | Optional | None | input event exit |
Kvaser.filter
Kvaser.filter (in_pipeline:tspace.dataflow.pipeline.deque.PipelineDQ[typi ng.Union[dict[str,str],dict[str,dict[str,list[typing.Union [str,list[list[str]]]]]]]], out_pipeline:tspace.dataflow.p ipeline.queue.Pipeline[pandas.core.frame.DataFrame], start_event:Optional[threading.Event], stop_event:Optional[threading.Event], interrupt_event:Optional[threading.Event], flash_event:Optional[threading.Event], exit_event:Optional[threading.Event])
filter data from kvaser input pipeline and put into the output pipeline
Type | Details | |
---|---|---|
in_pipeline | PipelineDQ | input PipelineDQ[dict[str, str]], |
out_pipeline | Pipeline | output Pipeline[pd.DataFrame], |
start_event | Optional | input event start |
stop_event | Optional | input event stop |
interrupt_event | Optional | input event interrupt |
flash_event | Optional | |
exit_event | Optional | input event exit |
Returns | None |