from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"import time
import os
from multiprocessing import Process, Event
from multiprocessing import synchronize, Manager
from multiprocessing.managers import DictProxy
from typing import Optional
# import threadingimport cantools
import can
# from can import Message
from cantools.database import Message
from can.interface import Bus # virtual interface for testing
# from can.interfaces.udp_multicast.bus import GeneralPurposeUdpMulticastBus as GPUDPMCBus
# from can.interfaces.udp_multicast import UdpMulticastBusdb = cantools.database.load_file('../res/motohawk_new.dbc')
db.messages
example_message = db.get_message_by_name('ExampleMessage')
example_message.signals
# pprint(example_message.__dict__)
example_message.frame_id
example_message.signal_groups
example_message.signal_tree!gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe vcan!gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan0 type vcan
# sshpass -v -p asdf sudo ip link add dev vcan0 type vcan!ip link show vcan0# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000 # vcan Does not SUPPORT set bitrate on command line!
# !sshpass -p asdf sudo ip link add dev vcan0 type vcan# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan0
!gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 up type vcan
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 up type vcan bitrate 500000 # vcan Does not SUPPORT set bitrate on command line!
# !sshpass -v -p asdf sudo ip link set up vcan0bus = Bus(bustype='socketcan', channel='vcan0', bitrate=250000) # interface='virtual'
# bus = GPUDPMCBus(group=UdpMulticastBus.DEFAULT_GROUP_IPv6, port=43113, hop_limit=1)
# bus = UdpMulticastBus(group=UdpMulticastBus.DEFAULT_GROUP_IPv6)
data = example_message.encode({'Temperature': 250.1, 'AverageRadius': 3.2, 'Enable': 1})
message_to_send = can.Message(arbitration_id=example_message.frame_id, data=data, is_extended_id=False)
message_to_send
bus
bus.fileno()
# bus.protocol
message_to_send.data
message_to_send.arbitration_idmanager = Manager()
message_proxy = manager.dict()# def receive_message(message_proxy: DictProxy, bus_channel:str='vcan0', bitrate:int=25000):
def receive_message(message_proxy: DictProxy, bus:Bus):
print('entering receive subprocess')
# bus = Bus(bustype='socketcan', channel=bus_channel, bitrate=bitrate)
# os.setsid()
print(f'bus: {bus.fileno()} {bus}')
msg:Message = bus.recv()
print(f'message received: {msg}')
message_proxy['timestamp'] = msg.timestamp
message_proxy['arbitration_id'] = msg.arbitration_id
message_proxy['data']=msg.data# def send_message(message: can.Message, done: synchronize.Event, bus_channel:str='vcan0', bitrate:int=25000, timeout: Optional[float]=None)->None:
def send_message(message: can.Message, done: synchronize.Event, bus:Bus, timeout: Optional[float]=None)->None:
print("entering send subprocess")
# os.setsid() # create new process group, become session leader, otherwise a process in a same session could not receive signal from the main process
os.setpgrp()
print((os.getpid(), os.getsid(os.getpid())))
# bus = Bus(bustype='socketcan', channel=bus_channel, bitrate=bitrate)
print(f'bus: {bus.fileno()}, {bus}')
bus.send(message)
print('message sent and waiting')
start = time.time_ns()
done.wait()
elapsed = (time.time_ns()-start)//1e6
print(f'elapsed time: {elapsed:.3f}ms')os.getcwd()
bus.fileno()
bus
os.getpid(),os.getsid(os.getpid())# shm = shared_memory.SharedMemory(create=True, size=1024)# def send_message_wrapper(message: Message, bus: Bus)->None:
timeout = None
done = Event()
# message_to_send
print("before")
process_send = Process(target=send_message, name="Sending CAN messasge", args=(message_to_send, done, bus))
# process_send.start()
# process_send.pid, process_send
# message_to_send# receive_message(message_proxy, bus)
# process_receive = Process(target=receive_message, name="Receiving CAN message", args=(message_proxy,bus))
# process_receive.start()
# process_receivedone.set()# process_send# datetime.fromtimestamp(message_proxy['timestamp']),db.decode_message(message_proxy['arbitration_id'],message_proxy['data'])# Process(target=receive_message, args=(message_proxy, bus, done, timeout)).start()# try:
# recv_message = Message(arbitration_id=recv_message_proxy['arbitration_id'], data=recv_message_proxy['data'], is_extended_id=False)
# except KeyError:
# print('KeyError')# message = bus.recv(timeout=2)
# if message is not None:
# message = can.Message(arbitration_id=example_message.frame_id, data=data, is_extended_id=False)
# db.decode_message(message.arbitration_id, message.data)
# else:
# print('no message received')# done.set()# import time
# import can
# from can.interfaces.udp_multicast import UdpMulticastBus
# # The bus can be created using the can.Bus wrapper class or using UdpMulticastBus directly
# with can.Bus(channel=UdpMulticastBus.DEFAULT_GROUP_IPv4, interface='udp_multicast') as bus_1:
# # bus_2 = can.Bus(channel=UdpMulticastBus.DEFAULT_GROUP_IPv6, interface='udp_multicast')
# bus_2 = UdpMulticastBus(channel=UdpMulticastBus.DEFAULT_GROUP_IPv4)
# # register a callback on the second bus that prints messages to the standard out
# notifier = can.Notifier(bus_2, [can.Printer()])
# message = can.Message(arbitration_id=0x123, data=[1, 2, 3])
# bus_1.send(message)
# # time.sleep(2.0)
# # notifier
# # msg = bus_2.recv(timeout=1)
# # print(msg)
# # create and send a message with the first bus, which should arrive at the second one
# # give the notifier enough time to get triggered by the second bus
# time.sleep(2.0)
# # msg = bus_2.recv(timeout=2)
# # give the notifier enough time to get triggered by the second bus
# # print(msg)
# bus_2.shutdown()# close and remove vcan0
# !sshpass -v -p asdf sudo ip link delete vcan0
!gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down vcan0!gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete vcan0