from IPython.core.interactiveshell import InteractiveShell
= "all" InteractiveShell.ast_node_interactivity
import time
import os
from multiprocessing import Process, Event
from multiprocessing import synchronize, Manager
from multiprocessing.managers import DictProxy
from typing import Optional
# import threading
import cantools
import can
# from can import Message
from cantools.database import Message
from can.interface import Bus # virtual interface for testing
# from can.interfaces.udp_multicast.bus import GeneralPurposeUdpMulticastBus as GPUDPMCBus
# from can.interfaces.udp_multicast import UdpMulticastBus
= cantools.database.load_file('../res/motohawk_new.dbc')
db
db.messages= db.get_message_by_name('ExampleMessage')
example_message
example_message.signals# pprint(example_message.__dict__)
example_message.frame_id
example_message.signal_groups example_message.signal_tree
!gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe vcan
!gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan0 type vcan
# sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
!ip link show vcan0
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000 # vcan Does not SUPPORT set bitrate on command line!
# !sshpass -p asdf sudo ip link add dev vcan0 type vcan
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan0
!gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 up type vcan
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 up type vcan bitrate 500000 # vcan Does not SUPPORT set bitrate on command line!
# !sshpass -v -p asdf sudo ip link set up vcan0
= Bus(bustype='socketcan', channel='vcan0', bitrate=250000) # interface='virtual'
bus # bus = GPUDPMCBus(group=UdpMulticastBus.DEFAULT_GROUP_IPv6, port=43113, hop_limit=1)
# bus = UdpMulticastBus(group=UdpMulticastBus.DEFAULT_GROUP_IPv6)
= example_message.encode({'Temperature': 250.1, 'AverageRadius': 3.2, 'Enable': 1})
data = can.Message(arbitration_id=example_message.frame_id, data=data, is_extended_id=False)
message_to_send
message_to_send
bus
bus.fileno()# bus.protocol
message_to_send.data message_to_send.arbitration_id
= Manager()
manager = manager.dict() message_proxy
# def receive_message(message_proxy: DictProxy, bus_channel:str='vcan0', bitrate:int=25000):
def receive_message(message_proxy: DictProxy, bus:Bus):
print('entering receive subprocess')
# bus = Bus(bustype='socketcan', channel=bus_channel, bitrate=bitrate)
# os.setsid()
print(f'bus: {bus.fileno()} {bus}')
= bus.recv()
msg:Message print(f'message received: {msg}')
'timestamp'] = msg.timestamp
message_proxy['arbitration_id'] = msg.arbitration_id
message_proxy['data']=msg.data message_proxy[
# def send_message(message: can.Message, done: synchronize.Event, bus_channel:str='vcan0', bitrate:int=25000, timeout: Optional[float]=None)->None:
def send_message(message: can.Message, done: synchronize.Event, bus:Bus, timeout: Optional[float]=None)->None:
print("entering send subprocess")
# os.setsid() # create new process group, become session leader, otherwise a process in a same session could not receive signal from the main process
os.setpgrp()print((os.getpid(), os.getsid(os.getpid())))
# bus = Bus(bustype='socketcan', channel=bus_channel, bitrate=bitrate)
print(f'bus: {bus.fileno()}, {bus}')
bus.send(message)print('message sent and waiting')
= time.time_ns()
start
done.wait()= (time.time_ns()-start)//1e6
elapsed print(f'elapsed time: {elapsed:.3f}ms')
os.getcwd()
bus.fileno()
bus os.getpid(),os.getsid(os.getpid())
# shm = shared_memory.SharedMemory(create=True, size=1024)
# def send_message_wrapper(message: Message, bus: Bus)->None:
= None
timeout = Event()
done # message_to_send
print("before")
= Process(target=send_message, name="Sending CAN messasge", args=(message_to_send, done, bus))
process_send # process_send.start()
# process_send.pid, process_send
# message_to_send
# receive_message(message_proxy, bus)
# process_receive = Process(target=receive_message, name="Receiving CAN message", args=(message_proxy,bus))
# process_receive.start()
# process_receive
set() done.
# process_send
# datetime.fromtimestamp(message_proxy['timestamp']),db.decode_message(message_proxy['arbitration_id'],message_proxy['data'])
# Process(target=receive_message, args=(message_proxy, bus, done, timeout)).start()
# try:
# recv_message = Message(arbitration_id=recv_message_proxy['arbitration_id'], data=recv_message_proxy['data'], is_extended_id=False)
# except KeyError:
# print('KeyError')
# message = bus.recv(timeout=2)
# if message is not None:
# message = can.Message(arbitration_id=example_message.frame_id, data=data, is_extended_id=False)
# db.decode_message(message.arbitration_id, message.data)
# else:
# print('no message received')
# done.set()
# import time
# import can
# from can.interfaces.udp_multicast import UdpMulticastBus
# # The bus can be created using the can.Bus wrapper class or using UdpMulticastBus directly
# with can.Bus(channel=UdpMulticastBus.DEFAULT_GROUP_IPv4, interface='udp_multicast') as bus_1:
# # bus_2 = can.Bus(channel=UdpMulticastBus.DEFAULT_GROUP_IPv6, interface='udp_multicast')
# bus_2 = UdpMulticastBus(channel=UdpMulticastBus.DEFAULT_GROUP_IPv4)
# # register a callback on the second bus that prints messages to the standard out
# notifier = can.Notifier(bus_2, [can.Printer()])
# message = can.Message(arbitration_id=0x123, data=[1, 2, 3])
# bus_1.send(message)
# # time.sleep(2.0)
# # notifier
# # msg = bus_2.recv(timeout=1)
# # print(msg)
# # create and send a message with the first bus, which should arrive at the second one
# # give the notifier enough time to get triggered by the second bus
# time.sleep(2.0)
# # msg = bus_2.recv(timeout=2)
# # give the notifier enough time to get triggered by the second bus
# # print(msg)
# bus_2.shutdown()
# close and remove vcan0
# !sshpass -v -p asdf sudo ip link delete vcan0
!gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down vcan0
!gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete vcan0