data link layer with scapy wrapper

send or receive a CAN message to a bus, ISO11898-1

get_argparser

 get_argparser ()

*summary get CAN bus, dbc config and the message to send

Returns: argparse.ArgumentParser: description*


signal_usr1

 signal_usr1 (signum, frame)

Handle USR1 signal as an event to set the received flag.


send_msg

 send_msg (db:cantools.database.can.database.Database, message:str,
           payload:bytes, channel:str, bitrate:int, bus_type:str,
           can_filters:list[dict], is_extended:bool)

*send a message to the CAN bus

return the bytes sent to the bus (default 16 bytes for a struct of CAN frame, payload 8 bytes)*

db_can = cantools.database.load_file(repo.working_dir+'/res/motohawk_new.dbc')
db_can.messages
[message('ExampleMessage', 0x1f0, False, 8, {None: 'Example message used as template in MotoHawk models.'}),
 message('NewMessage', 0x254, False, 8, {None: 'self made message'})]
example_message: MessageTpl = db_can.get_message_by_name('ExampleMessage')
pprint(example_message.signals)
pprint(example_message.__dict__)
example_message.frame_id
[signal('Enable', 7, 1, 'big_endian', False, None, 1, 0, None, None, '-', False, None, {0: 'Disabled', 1: 'Enabled'}, None, None),
 signal('AverageRadius', 6, 6, 'big_endian', False, None, 0.1, 0, 0, 5, 'm', False, None, None, None, None),
 signal('Temperature', 0, 12, 'big_endian', True, None, 0.01, 250, 229.52, 270.47, 'degK', False, None, None, None, None)]
{'_autosar': None,
 '_bus_name': None,
 '_codecs': {'formats': Formats(big_endian=<bitstruct.c.CompiledFormatDict object>, little_endian=<bitstruct.c.CompiledFormatDict object>, padding_mask=35184372088831),
             'multiplexers': {},
             'signals': [signal('Enable', 7, 1, 'big_endian', False, None, 1, 0, None, None, '-', False, None, {0: 'Disabled', 1: 'Enabled'}, None, None),
                         signal('AverageRadius', 6, 6, 'big_endian', False, None, 0.1, 0, 0, 5, 'm', False, None, None, None, None),
                         signal('Temperature', 0, 12, 'big_endian', True, None, 0.01, 250, 229.52, 270.47, 'degK', False, None, None, None, None)]},
 '_comments': {None: 'Example message used as template in MotoHawk models.'},
 '_contained_messages': None,
 '_cycle_time': None,
 '_dbc': <cantools.database.can.formats.dbc_specifics.DbcSpecifics object>,
 '_frame_id': 496,
 '_header_byte_order': 'big_endian',
 '_header_id': None,
 '_is_extended_frame': False,
 '_is_fd': False,
 '_length': 8,
 '_name': 'ExampleMessage',
 '_protocol': None,
 '_send_type': None,
 '_senders': ['PCM1'],
 '_signal_dict': {'AverageRadius': signal('AverageRadius', 6, 6, 'big_endian', False, None, 0.1, 0, 0, 5, 'm', False, None, None, None, None),
                  'Enable': signal('Enable', 7, 1, 'big_endian', False, None, 1, 0, None, None, '-', False, None, {0: 'Disabled', 1: 'Enabled'}, None, None),
                  'Temperature': signal('Temperature', 0, 12, 'big_endian', True, None, 0.01, 250, 229.52, 270.47, 'degK', False, None, None, None, None)},
 '_signal_groups': [],
 '_signal_tree': ['Enable', 'AverageRadius', 'Temperature'],
 '_signals': [signal('Enable', 7, 1, 'big_endian', False, None, 1, 0, None, None, '-', False, None, {0: 'Disabled', 1: 'Enabled'}, None, None),
              signal('AverageRadius', 6, 6, 'big_endian', False, None, 0.1, 0, 0, 5, 'm', False, None, None, None, None),
              signal('Temperature', 0, 12, 'big_endian', True, None, 0.01, 250, 229.52, 270.47, 'degK', False, None, None, None, None)],
 '_strict': True,
 '_unused_bit_pattern': 255}
496
if blue_pill:
    # install vcan interface with encrypted password to sudo 
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe vcan")
    # sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan0 type vcan")
    # !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000  # vcan does not support set bitrate on command line!
    # !sshpass -p asdf sudo ip link add dev vcan0 type vcan
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan0")
    # !sshpass -v -p asdf sudo ip link set up vcan0
else:
    os.system("sudo modprobe vcan")
    os.system("sudo ip link add dev vcan0 type vcan")
    os.system("sudo ip link set up vcan0")

os.system("ip link show vcan0")
SSHPASS: searching for password prompt using match "assword"
SSHPASS: read: [sudo] password for n: 
SSHPASS: detected prompt. Sending password.
SSHPASS: read: 

SSHPASS: searching for password prompt using match "assword"
SSHPASS: read: [sudo] password for n: 
SSHPASS: detected prompt. Sending password.
SSHPASS: read: 

SSHPASS: searching for password prompt using match "assword"
SSHPASS: read: [sudo] password for n: 
SSHPASS: detected prompt. Sending password.
SSHPASS: read: 
0
0
0
72: vcan0: <NOARP,UP,LOWER_UP> mtu 72 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000
    link/can 
0
# # install vcan interface with encrypted password to sudo 
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe vcan
# # sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan0 type vcan
# !ip link show vcan0
# # !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000  # vcan does not support set bitrate on command line!
# # !sshpass -p asdf sudo ip link add dev vcan0 type vcan
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan0
# # !sshpass -v -p asdf sudo ip link set up vcan0
data_dict ={'Temperature': 250.1, 'AverageRadius': 3.2, 'Enable': 1}
data_json_bytes = json.dumps(data_dict).encode('utf-8')
data_json_bytes
json.loads(data_json_bytes.decode())

can_data = example_message.encode({'Temperature': 250.1, 'AverageRadius': 3.2, 'Enable': 1})
example_message.decode(can_data)
can_data
b'{"Temperature": 250.1, "AverageRadius": 3.2, "Enable": 1}'
{'Temperature': 250.1, 'AverageRadius': 3.2, 'Enable': 1}
{'Enable': 'Enabled', 'AverageRadius': 3.2, 'Temperature': 250.1}
b'\xc0\x01@\x00\x00\x00\x00\x00'
#create can frame
packet = CAN(identifier=example_message.frame_id, data=can_data)
packet.show2()
###[ CAN ]### 
  flags     = 
  identifier= 0x1f0
  length    = 8
  reserved  = 0
  data      = '\\xc0\x01@\x00\x00\x00\x00\x00'
# create socket
socket = CANSocket(bustype='socketcan', 
                channel='vcan0', 
                can_filters=None, 
                bitrate=25000, 
                is_extended=False)
socket
# bus = can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000)
# message_to_send = can.Message(arbitration_id=example_message.frame_id, data=can_data, is_extended_id=False)   
# # can_bus.send(message)
<<NativeCANSocket: read/write packets at a given CAN interface using PF_CAN sockets>>
proc = subprocess.Popen(
    ['python', 
        repo.working_dir+'/candycan/data_link/scapycan.py', 
        '-t', 'socketcan', 
        '-c' , 'vcan0', 
        '-b', '25000',
        '-d', repo.working_dir+'/res/motohawk_new.dbc',
        '-m', 'ExampleMessage',
        ], 
    stdout=subprocess.PIPE, 
    stderr=subprocess.PIPE, 
    stdin=subprocess.PIPE
    )
print(f'PARENT: {proc.pid} before signaling child')
PARENT: 1085978 before signaling child
data_json_bytes
data_json_bytes.decode()
json.loads(data_json_bytes.decode())
b'{"Temperature": 250.1, "AverageRadius": 3.2, "Enable": 1}'
'{"Temperature": 250.1, "AverageRadius": 3.2, "Enable": 1}'
{'Temperature': 250.1, 'AverageRadius': 3.2, 'Enable': 1}
try:
    outs, errs = proc.communicate(data_json_bytes, timeout=1)
except subprocess.TimeoutExpired:
    print(f'PARENT: {proc.pid}; TimeoutExpired')
    # outs, errs = proc.communicate()
    # print(f'PARENT: {proc.pid}; outs: {outs}; errs: {errs} TimeoutExpired')
# sys.stdout.flush()
# time.sleep(1)
PARENT: 1085978; TimeoutExpired
rx_packet = socket.recv()
rx_packet.show2()
# receive_message(message_proxy, bus)
# proc_receive = subprocess.Popen(
#   ['python', 
#         '../candycan/receive_message.py', 
#       '-t', 'socketcan', 
#       '-c' , 'vcan0', 
#       '-b', '25000',
#       '-d', '../res/motohawk_new.dbc',
#       '-m', 'ExampleMessage',
#       ], 
#   # stdout=subprocess.PIPE, 
#   # stderr=subprocess.PIPE, 
#   # stdin=subprocess.PIPE
#   )
# print(f'PARENT: {proc.pid} before signaling child')

# stdout_raw, stderr_raw = proc_receive.communicate()
# stdout_value = stdout_raw.decode('utf-8')
# stderr_value = stderr_raw.decode('utf-8')
# # [0].decode('utf-8')
# stdout_value
# stderr_value
###[ CAN ]### 
  flags     = 
  identifier= 0x1f0
  length    = 8
  reserved  = 0
  data      = '\\xc0\x01@\x00\x00\x00\x00\x00'
print(f'PARENT: {proc.pid} signaling child')
# sys.stdout.flush()
os.kill(proc.pid, signal.SIGUSR1)
PARENT: 1085978 signaling child
stdout_raw, stderr_raw = proc.communicate()
stdout_value = stdout_raw.decode('utf-8')
stderr_value = stderr_raw.decode('utf-8')

print(f'stdout: {repr(stdout_value)}; stderr: {repr(stderr_value)}')
# [0].decode('utf-8')
stdout: "'/home/n/devel/candycan'\nsent bytes: 16\nSignal received after 0.568 seconds\n"; stderr: ''
hexdump(rx_packet)
hexdump(packet)
0000  00 00 01 F0 08 00 00 00 C0 01 40 00 00 00 00 00  ..........@.....
0000  00 00 01 F0 08 00 00 00 C0 01 40 00 00 00 00 00  ..........@.....
# datetime.fromtimestamp(message_proxy['timestamp']),db_can.decode_message(message_proxy['arbitration_id'],message_proxy['data'])
datetime.fromtimestamp(rx_packet.time), db_can.decode_message(rx_packet.identifier, rx_packet.data)
(datetime.datetime(2024, 4, 11, 15, 58, 21, 374909),
 {'Enable': 'Enabled', 'AverageRadius': 3.2, 'Temperature': 250.1})

create native CANSocket with loopback

if blue_pill:
    # install vcan1 interface with encrypted password to sudo 
    # sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan1 type vcan")
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan1")
else:
    os.system("sudo ip link add dev vcan1 type vcan")
    os.system("sudo ip link set up vcan1")

os.system("ip link show vcan1")
SSHPASS: searching for password prompt using match "assword"
SSHPASS: read: [sudo] password for n: 
SSHPASS: detected prompt. Sending password.
SSHPASS: read: 

RTNETLINK answers: File exists
SSHPASS: searching for password prompt using match "assword"
SSHPASS: read: [sudo] password for n: 
SSHPASS: detected prompt. Sending password.
SSHPASS: read: 
512
0
10: vcan1: <NOARP,UP,LOWER_UP> mtu 72 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000
    link/can 
0
# create socket
socket1 = CANSocket(bustype='socketcan', 
                channel='vcan1', 
                receive_own_messages=True,
                can_filters=None, 
                bitrate=25000, 
                is_extended=False)
socket1
<<NativeCANSocket: read/write packets at a given CAN interface using PF_CAN sockets>>
packet.show2()
###[ CAN ]### 
  flags     = 
  identifier= 0x1f0
  length    = 8
  reserved  = 0
  data      = '\\xc0\x01@\x00\x00\x00\x00\x00'
sent_bytes = socket1.send(packet)
sent_bytes
16
rx_packet1 = socket1.recv()
rx_packet1.show2()
###[ CAN ]### 
  flags     = 
  identifier= 0x1f0
  length    = 8
  reserved  = 0
  data      = '\\xc0\x01@\x00\x00\x00\x00\x00'
if blue_pill:
    # close and remove vcan0
    # !sshpass -v -p  asdf sudo ip link delete vcan0 
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down vcan0")
    # delete vcan0
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete vcan0")
else:
    os.system("sudo ip link set down vcan0")
    os.system("sudo ip link delete vcan0")
SSHPASS: searching for password prompt using match "assword"
SSHPASS: read: [sudo] password for n: 
SSHPASS: detected prompt. Sending password.
SSHPASS: read: 

SSHPASS: searching for password prompt using match "assword"
SSHPASS: read: [sudo] password for n: 
SSHPASS: detected prompt. Sending password.
SSHPASS: read: 
0
0