# #|export
# from candycan.data_link_socketcan import done, send_msgXCP
flashing interface for XCP
get_argparser
get_argparser ()
*Summary Get argument parser for command line arguments
Returns: argparse.ArgumentParser: description*
parser = get_argparser()
args = parser.parse_args(
args=[
'--protocol', 'xcp',
'--download',
'--a2l', repo.working_dir+'/res/VBU_AI.json',
'--node-path', r'/PROJECT/MODULE[]',
'--leaves', r'TQD_trqTrqSetNormal_MAP_v, VBU_L045A_CWP_05_09T_AImode_CM_single, Lookup2D_FLOAT32_IEEE, Lookup2D_X_FLOAT32_IEEE, Scalar_FLOAT32_IEEE, TQD_vVehSpd, TQD_vSgndSpd_MAP_y, TQD_pctAccPedPosFlt, TQD_pctAccPdl_MAP_x',
'--channel', '3',
'--download_id', '630',
'--upload_id', '631',
'--input', repo.working_dir+'/res/download.json',
'--output', repo.working_dir+'/res/output.json',
]
)xcp_calib_from_xcpjson = Get_XCPCalib_From_XCPJSon(args.input)
xcp_data = Generate_Init_XCPData_From_A2L(
a2l=args.a2l, keys=args.leaves, node_path=args.node_path
)
# address from xcp data file should align with the address from xcp calib file
test_eq(xcp_data.address, xcp_calib_from_xcpjson.data[0].address)
# validate the model
try:
XCPData.model_validate(xcp_data)
except ValidationError as exc:
print(exc)# type(args.channel), type(args.download_id), args.upload_id, args.download, args.diff_flashingxcp_data.value = xcp_calib_from_xcpjson.data[0].value
pprint(xcp_data)
xcp_calib = XCPCalib(
config=XCPConfig(
channel=args.channel, download=str(args.download_id), upload=str(args.upload_id)
),
data=[xcp_data],
)
pprint(xcp_calib)npa = xcp_calib.data[0].value_array_view
npa# buffer = [i.hex() for x in npa for i in x]
# # buffer[::-1]
# len(buffer)
# buffer# buffer = npa.tobytes()
# pprint(buffer), len(buffer)
# xcp_calib.data[0].value, len(xcp_calib.data[0].value)addr = bytes('7000aa2a', 'utf-8')
a = 0x7000aa2a
anpb = npa[::-1]
# npb
buffer = [struct.pack("<f", x) for x in np.nditer(npa)]
# buffer
len(buffer)npa_to_packed_buffer
npa_to_packed_buffer (a:numpy.ndarray)
*convert a numpy array to a packed string buffer for flashing TODO: implementation as numpy ufunc
Args: a (np.ndarray): input numpy array for flashing
Returns: str: packed string buffer for flashing*
# buffer = [struct.pack("<f", x).hex() for x in np.nditer(npa)]
# buffer[::-1]
# len(buffer)
# buffer
# data = ''.join(buffer)
data = npa_to_packed_buffer(npa)
test_eq(data, xcp_calib.data[0].value)
# data# npa.astype(np.float32).tobytes().hex()
buffer = npa.astype(np.float32).tobytes().hex() ## == npa_to_packed_buffer(npa)
buffer, len(buffer)
test_eq(buffer, xcp_calib.data[0].value)flash_xcp
flash_xcp (xcp_calib:candycan.a2l.XCPCalib, data:pandas.core.frame.DataFrame, diff_flashing:bool=False, download:bool=True)
*Summary Flash XCP data to target
Args: xcp_calib (XCPCalib): XCP calibration as template, contains all the meta information except for data xcp_data (pd.DataFrame): input XCP data to be flashed, replace the value in xcp_calib diff_flashing (bool): Use differential flashing download (bool): Download or upload*
pkt = IP()
pkt.canvas_dump()IP()
a = IP(dst="10.10.10.28")
a.dst
a.ttl
ls(IP)a = IP(ttl=10)
a.src
a.dst="192.168.1.1"
a
Ether()/IP()/TCP()
raw(IP())
# IP(_)
# a = Ether()/IP(dst="www.slashdot.org")/TCP()/"GET /index.html HTTP \n\n"
a = Ether()/IP(dst="www.baidu.com")/TCP()/"GET /index.html HTTP \n\n"
hexdump(a)
b=raw(a)
b
c = Ether(b)
c
c.hide_defaults()
cos.getcwd()
a = rdpcap('../res/pcaps/ipfix.pcap')
a
# a[0].pdfdump(layer_shift=1)
a[1].psdump("/tmp/ipfix.eps", layer_shift=1)a=IP(dst="www.baidu.com/30")
a
[p for p in a]Caution
set the python3 of the virtualenv with the CAP_NET_RAW capability!
sudo setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /dpt/.pyenv/versions/miniconda3-3.11-24.1.2-0/envs/can/bin/python3.11# sniff(filter="icmp and host 10.10.10.28", count=2)if blue_pill: # not a virtual machine
# install vcan interface with encrypted password to sudo
os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe vcan")
# sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan0 type vcan")
os.system("ip link show vcan0")
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000 # vcan does not support set bitrate on command line!
# !sshpass -p asdf sudo ip link add dev vcan0 type vcan
os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan0")
# !sshpass -v -p asdf sudo ip link set up vcan0
else: # in a virtual machine (Github workflow)
os.system("sudo modprobe vcan")
# sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
os.system("sudo ip link add dev vcan0 type vcan")
os.system("ip link show vcan0")
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000 # vcan does not support set bitrate on command line!
# !sshpass -p asdf sudo ip link add dev vcan0 type vcan
os.system("sudo ip link set up vcan0")load_layer("can")
conf.contribs['CANSocket'] = {'use-python-can': False}
load_contrib("cansocket")
socket = CANSocket(channel='vcan0',
receive_own_messages=True)packet = CAN(identifier=0x123, data=b'12345678')
packet.show2()socket.send(packet)
rx_packet = socket.recv()
rx_packet.show2()CCP via Scapy
load_contrib("automotive.ccp")pkt = CCP(identifier=0x700)/CRO(ctr=1)/CONNECT(station_address=0x02)
pkt.show2()pkt = CCP(identifier=0x711)/CRO(ctr=2)/GET_SEED(resource=2)
pkt.show2()pkt = CCP(identifier=0x711)/CRO(ctr=3)/UNLOCK(key=b"123456")
pkt.show2()pkt = CCP(identifier=0x711)/CRO(ctr=1)/GET_DAQ_SIZE()
sock = CANSocket(bustype='socketcan', channel='vcan0', receive_own_messages=True)## another socket in the same process cannot receive the packet sent by the first socket
# socket2 = CANSocket(channel='vcan0')## same socket cannot receive the packet sent by itself
# rx_packet = socket2.recv()# rx_packet.show2()# socket.sr1(packet, timeout=1)# rx_packet = socket.recv()
wrpcap("./scapypcaptest.pcap", packet)if blue_pill:
# close and remove vcan0
os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down vcan0")
# delete vcan0
os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete vcan0")
else:
# close and remove vcan0
os.system("sudo ip link set down vcan0")
# delete vcan0
os.system("sudo ip link delete vcan0")