# #|export
# from candycan.data_link_socketcan import done, send_msg
XCP
flashing interface for XCP
get_argparser
get_argparser ()
*Summary Get argument parser for command line arguments
Returns: argparse.ArgumentParser: description*
= get_argparser()
parser = parser.parse_args(
args =[
args'--protocol', 'xcp',
'--download',
'--a2l', repo.working_dir+'/res/VBU_AI.json',
'--node-path', r'/PROJECT/MODULE[]',
'--leaves', r'TQD_trqTrqSetNormal_MAP_v, VBU_L045A_CWP_05_09T_AImode_CM_single, Lookup2D_FLOAT32_IEEE, Lookup2D_X_FLOAT32_IEEE, Scalar_FLOAT32_IEEE, TQD_vVehSpd, TQD_vSgndSpd_MAP_y, TQD_pctAccPedPosFlt, TQD_pctAccPdl_MAP_x',
'--channel', '3',
'--download_id', '630',
'--upload_id', '631',
'--input', repo.working_dir+'/res/download.json',
'--output', repo.working_dir+'/res/output.json',
] )
= Get_XCPCalib_From_XCPJSon(args.input)
xcp_calib_from_xcpjson
= Generate_Init_XCPData_From_A2L(
xcp_data =args.a2l, keys=args.leaves, node_path=args.node_path
a2l
)
# address from xcp data file should align with the address from xcp calib file
0].address)
test_eq(xcp_data.address, xcp_calib_from_xcpjson.data[
# validate the model
try:
XCPData.model_validate(xcp_data)except ValidationError as exc:
print(exc)
# type(args.channel), type(args.download_id), args.upload_id, args.download, args.diff_flashing
= xcp_calib_from_xcpjson.data[0].value
xcp_data.value
pprint(xcp_data)
= XCPCalib(
xcp_calib =XCPConfig(
config=args.channel, download=str(args.download_id), upload=str(args.upload_id)
channel
),=[xcp_data],
data
) pprint(xcp_calib)
= xcp_calib.data[0].value_array_view
npa npa
# buffer = [i.hex() for x in npa for i in x]
# # buffer[::-1]
# len(buffer)
# buffer
# buffer = npa.tobytes()
# pprint(buffer), len(buffer)
# xcp_calib.data[0].value, len(xcp_calib.data[0].value)
= bytes('7000aa2a', 'utf-8')
addr = 0x7000aa2a
a a
= npa[::-1]
npb # npb
buffer = [struct.pack("<f", x) for x in np.nditer(npa)]
# buffer
len(buffer)
npa_to_packed_buffer
npa_to_packed_buffer (a:numpy.ndarray)
*convert a numpy array to a packed string buffer for flashing TODO: implementation as numpy ufunc
Args: a (np.ndarray): input numpy array for flashing
Returns: str: packed string buffer for flashing*
# buffer = [struct.pack("<f", x).hex() for x in np.nditer(npa)]
# buffer[::-1]
# len(buffer)
# buffer
# data = ''.join(buffer)
= npa_to_packed_buffer(npa)
data 0].value)
test_eq(data, xcp_calib.data[# data
# npa.astype(np.float32).tobytes().hex()
buffer = npa.astype(np.float32).tobytes().hex() ## == npa_to_packed_buffer(npa)
buffer, len(buffer)
buffer, xcp_calib.data[0].value) test_eq(
flash_xcp
flash_xcp (xcp_calib:candycan.a2l.XCPCalib, data:pandas.core.frame.DataFrame, diff_flashing:bool=False, download:bool=True)
*Summary Flash XCP data to target
Args: xcp_calib (XCPCalib): XCP calibration as template, contains all the meta information except for data xcp_data (pd.DataFrame): input XCP data to be flashed, replace the value in xcp_calib diff_flashing (bool): Use differential flashing download (bool): Download or upload*
= IP()
pkt pkt.canvas_dump()
IP()= IP(dst="10.10.10.28")
a
a.dst
a.ttl ls(IP)
= IP(ttl=10)
a
a.src="192.168.1.1"
a.dst
a/IP()/TCP()
Ether()
raw(IP())# IP(_)
# a = Ether()/IP(dst="www.slashdot.org")/TCP()/"GET /index.html HTTP \n\n"
= Ether()/IP(dst="www.baidu.com")/TCP()/"GET /index.html HTTP \n\n"
a
hexdump(a)=raw(a)
b
b= Ether(b)
c
c
c.hide_defaults() c
os.getcwd()= rdpcap('../res/pcaps/ipfix.pcap')
a
a# a[0].pdfdump(layer_shift=1)
1].psdump("/tmp/ipfix.eps", layer_shift=1) a[
=IP(dst="www.baidu.com/30")
a
afor p in a] [p
Caution
set the python3 of the virtualenv with the CAP_NET_RAW capability!
sudo setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /dpt/.pyenv/versions/miniconda3-3.11-24.1.2-0/envs/can/bin/python3.11
# sniff(filter="icmp and host 10.10.10.28", count=2)
if blue_pill: # not a virtual machine
# install vcan interface with encrypted password to sudo
"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe vcan")
os.system(# sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan0 type vcan")
os.system("ip link show vcan0")
os.system(# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000 # vcan does not support set bitrate on command line!
# !sshpass -p asdf sudo ip link add dev vcan0 type vcan
"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan0")
os.system(# !sshpass -v -p asdf sudo ip link set up vcan0
else: # in a virtual machine (Github workflow)
"sudo modprobe vcan")
os.system(# sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
"sudo ip link add dev vcan0 type vcan")
os.system("ip link show vcan0")
os.system(# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000 # vcan does not support set bitrate on command line!
# !sshpass -p asdf sudo ip link add dev vcan0 type vcan
"sudo ip link set up vcan0") os.system(
"can")
load_layer('CANSocket'] = {'use-python-can': False}
conf.contribs["cansocket")
load_contrib(
= CANSocket(channel='vcan0',
socket =True) receive_own_messages
= CAN(identifier=0x123, data=b'12345678')
packet packet.show2()
socket.send(packet)= socket.recv()
rx_packet rx_packet.show2()
CCP via Scapy
"automotive.ccp") load_contrib(
= CCP(identifier=0x700)/CRO(ctr=1)/CONNECT(station_address=0x02)
pkt pkt.show2()
= CCP(identifier=0x711)/CRO(ctr=2)/GET_SEED(resource=2)
pkt pkt.show2()
= CCP(identifier=0x711)/CRO(ctr=3)/UNLOCK(key=b"123456")
pkt pkt.show2()
= CCP(identifier=0x711)/CRO(ctr=1)/GET_DAQ_SIZE()
pkt = CANSocket(bustype='socketcan', channel='vcan0', receive_own_messages=True) sock
## another socket in the same process cannot receive the packet sent by the first socket
# socket2 = CANSocket(channel='vcan0')
## same socket cannot receive the packet sent by itself
# rx_packet = socket2.recv()
# rx_packet.show2()
# socket.sr1(packet, timeout=1)
# rx_packet = socket.recv()
"./scapypcaptest.pcap", packet) wrpcap(
if blue_pill:
# close and remove vcan0
"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down vcan0")
os.system(# delete vcan0
"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete vcan0")
os.system(else:
# close and remove vcan0
"sudo ip link set down vcan0")
os.system(# delete vcan0
"sudo ip link delete vcan0") os.system(