from pprint import pprint
import subprocesssend_message
sending a CAN message to a bus
get_argparser
get_argparser ()
*summary get CAN bus, dbc config and the message to send
Returns: argparse.ArgumentParser: description*
signal_usr1
signal_usr1 (signum, frame)
Handle USR1 signal as an event to set the received flag.
send_msg
send_msg (db:cantools.database.can.database.Database, message:str, payload:bytes, channel:str, bitrate:int, bus_type:str, is_extended:bool)
db_can = cantools.database.load_file('../../res/motohawk_new.dbc')
db_can.messages
example_message: MessageTpl = db_can.get_message_by_name('ExampleMessage')
pprint(example_message.signals)
pprint(example_message.__dict__)
example_message.frame_id# install vcan interface with encrypted password to sudo
os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe vcan")
# sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan0 type vcan")
os.system("ip link show vcan0")
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000 # vcan does not support set bitrate on command line!
# !sshpass -p asdf sudo ip link add dev vcan0 type vcan
os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan0")
# !sshpass -v -p asdf sudo ip link set up vcan0# # install vcan interface with encrypted password to sudo
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe vcan
# # sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan0 type vcan
# !ip link show vcan0
# # !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000 # vcan does not support set bitrate on command line!
# # !sshpass -p asdf sudo ip link add dev vcan0 type vcan
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan0
# # !sshpass -v -p asdf sudo ip link set up vcan0data_dict ={'Temperature': 250.1, 'AverageRadius': 3.2, 'Enable': 1}
data_json_bytes = json.dumps(data_dict).encode('utf-8')
data_json_bytes
json.loads(data_json_bytes.decode())
can_data = example_message.encode({'Temperature': 250.1, 'AverageRadius': 3.2, 'Enable': 1})
example_message.decode(can_data)bus = can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000)
message_to_send = can.Message(arbitration_id=example_message.frame_id, data=can_data, is_extended_id=False)
# can_bus.send(message)manager = Manager()
message_proxy = manager.dict()def receive_message(message_proxy: DictProxy,bus: can.interface.Bus)->None:
print('waiting for message')
msg:can.Message = bus.recv()
print('message received')
message_proxy['timestamp'] = msg.timestamp
message_proxy['arbitration_id'] = msg.arbitration_id
message_proxy['data']=msg.dataproc = subprocess.Popen(
['python',
'../../candycan/send_message.py',
'-t', 'socketcan',
'-c' , 'vcan0',
'-b', '25000',
'-d', '../../res/motohawk_new.dbc',
'-m', 'ExampleMessage',
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE
)
print(f'PARENT: {proc.pid} before signaling child')data_json_bytes
data_json_bytes.decode()
json.loads(data_json_bytes.decode())try:
outs, errs = proc.communicate(data_json_bytes, timeout=1)
except subprocess.TimeoutExpired:
print(f'PARENT: {proc.pid}; TimeoutExpired')
# outs, errs = proc.communicate()
# print(f'PARENT: {proc.pid}; outs: {outs}; errs: {errs} TimeoutExpired')
# sys.stdout.flush()
# time.sleep(1)receive_message(message_proxy, bus)
# proc_receive = subprocess.Popen(
# ['python',
# '../candycan/receive_message.py',
# '-t', 'socketcan',
# '-c' , 'vcan0',
# '-b', '25000',
# '-d', '../res/motohawk_new.dbc',
# '-m', 'ExampleMessage',
# ],
# # stdout=subprocess.PIPE,
# # stderr=subprocess.PIPE,
# # stdin=subprocess.PIPE
# )
# print(f'PARENT: {proc.pid} before signaling child')
# stdout_raw, stderr_raw = proc_receive.communicate()
# stdout_value = stdout_raw.decode('utf-8')
# stderr_value = stderr_raw.decode('utf-8')
# # [0].decode('utf-8')
# stdout_value
# stderr_valueprint(f'PARENT: {proc.pid} signaling child')
# sys.stdout.flush()
os.kill(proc.pid, signal.SIGUSR1)stdout_raw, stderr_raw = proc.communicate()
stdout_value = stdout_raw.decode('utf-8')
stderr_value = stderr_raw.decode('utf-8')
print(f'stdout: {repr(stdout_value)}; stderr: {repr(stderr_value)}')
# [0].decode('utf-8')datetime.fromtimestamp(message_proxy['timestamp']),db_can.decode_message(message_proxy['arbitration_id'],message_proxy['data'])# close and remove vcan0
# !sshpass -v -p asdf sudo ip link delete vcan0
os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down vcan0")# delete vcan0
os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete vcan0")