from pprint import pprint
import subprocess
send_message
sending a CAN message to a bus
get_argparser
get_argparser ()
*summary get CAN bus, dbc config and the message to send
Returns: argparse.ArgumentParser: description*
signal_usr1
signal_usr1 (signum, frame)
Handle USR1 signal as an event to set the received flag.
send_msg
send_msg (db:cantools.database.can.database.Database, message:str, payload:bytes, channel:str, bitrate:int, bus_type:str, is_extended:bool)
= cantools.database.load_file('../../res/motohawk_new.dbc')
db_can
db_can.messages= db_can.get_message_by_name('ExampleMessage')
example_message: MessageTpl
pprint(example_message.signals)
pprint(example_message.__dict__) example_message.frame_id
# install vcan interface with encrypted password to sudo
"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe vcan")
os.system(# sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan0 type vcan")
os.system("ip link show vcan0")
os.system(# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000 # vcan does not support set bitrate on command line!
# !sshpass -p asdf sudo ip link add dev vcan0 type vcan
"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan0")
os.system(# !sshpass -v -p asdf sudo ip link set up vcan0
# # install vcan interface with encrypted password to sudo
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe vcan
# # sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan0 type vcan
# !ip link show vcan0
# # !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000 # vcan does not support set bitrate on command line!
# # !sshpass -p asdf sudo ip link add dev vcan0 type vcan
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan0
# # !sshpass -v -p asdf sudo ip link set up vcan0
={'Temperature': 250.1, 'AverageRadius': 3.2, 'Enable': 1}
data_dict = json.dumps(data_dict).encode('utf-8')
data_json_bytes
data_json_bytes
json.loads(data_json_bytes.decode())
= example_message.encode({'Temperature': 250.1, 'AverageRadius': 3.2, 'Enable': 1})
can_data example_message.decode(can_data)
= can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000)
bus = can.Message(arbitration_id=example_message.frame_id, data=can_data, is_extended_id=False)
message_to_send # can_bus.send(message)
= Manager()
manager = manager.dict() message_proxy
def receive_message(message_proxy: DictProxy,bus: can.interface.Bus)->None:
print('waiting for message')
= bus.recv()
msg:can.Message print('message received')
'timestamp'] = msg.timestamp
message_proxy['arbitration_id'] = msg.arbitration_id
message_proxy['data']=msg.data message_proxy[
= subprocess.Popen(
proc 'python',
['../../candycan/send_message.py',
'-t', 'socketcan',
'-c' , 'vcan0',
'-b', '25000',
'-d', '../../res/motohawk_new.dbc',
'-m', 'ExampleMessage',
], =subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
stdin
)print(f'PARENT: {proc.pid} before signaling child')
data_json_bytes
data_json_bytes.decode() json.loads(data_json_bytes.decode())
try:
= proc.communicate(data_json_bytes, timeout=1)
outs, errs except subprocess.TimeoutExpired:
print(f'PARENT: {proc.pid}; TimeoutExpired')
# outs, errs = proc.communicate()
# print(f'PARENT: {proc.pid}; outs: {outs}; errs: {errs} TimeoutExpired')
# sys.stdout.flush()
# time.sleep(1)
receive_message(message_proxy, bus)# proc_receive = subprocess.Popen(
# ['python',
# '../candycan/receive_message.py',
# '-t', 'socketcan',
# '-c' , 'vcan0',
# '-b', '25000',
# '-d', '../res/motohawk_new.dbc',
# '-m', 'ExampleMessage',
# ],
# # stdout=subprocess.PIPE,
# # stderr=subprocess.PIPE,
# # stdin=subprocess.PIPE
# )
# print(f'PARENT: {proc.pid} before signaling child')
# stdout_raw, stderr_raw = proc_receive.communicate()
# stdout_value = stdout_raw.decode('utf-8')
# stderr_value = stderr_raw.decode('utf-8')
# # [0].decode('utf-8')
# stdout_value
# stderr_value
print(f'PARENT: {proc.pid} signaling child')
# sys.stdout.flush()
os.kill(proc.pid, signal.SIGUSR1)
= proc.communicate()
stdout_raw, stderr_raw = stdout_raw.decode('utf-8')
stdout_value = stderr_raw.decode('utf-8')
stderr_value
print(f'stdout: {repr(stdout_value)}; stderr: {repr(stderr_value)}')
# [0].decode('utf-8')
'timestamp']),db_can.decode_message(message_proxy['arbitration_id'],message_proxy['data']) datetime.fromtimestamp(message_proxy[
# close and remove vcan0
# !sshpass -v -p asdf sudo ip link delete vcan0
"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down vcan0") os.system(
# delete vcan0
"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete vcan0") os.system(