XCP

flashing interface for XCP
# #|export
# from candycan.data_link_socketcan import done, send_msg

get_argparser

 get_argparser ()

*Summary Get argument parser for command line arguments

Returns: argparse.ArgumentParser: description*

parser = get_argparser()
args = parser.parse_args(
    args=[
        '--protocol', 'xcp',
        '--download',
        '--a2l', repo.working_dir+'/res/VBU_AI.json',
        '--node-path', r'/PROJECT/MODULE[]',
        '--leaves', r'TQD_trqTrqSetNormal_MAP_v, VBU_L045A_CWP_05_09T_AImode_CM_single, Lookup2D_FLOAT32_IEEE, Lookup2D_X_FLOAT32_IEEE, Scalar_FLOAT32_IEEE, TQD_vVehSpd, TQD_vSgndSpd_MAP_y, TQD_pctAccPedPosFlt, TQD_pctAccPdl_MAP_x',
        '--channel', '3',
        '--download_id', '630',
        '--upload_id', '631',
        '--input', repo.working_dir+'/res/download.json',
        '--output', repo.working_dir+'/res/output.json',
    ]
)
xcp_calib_from_xcpjson = Get_XCPCalib_From_XCPJSon(args.input)

xcp_data = Generate_Init_XCPData_From_A2L(
    a2l=args.a2l, keys=args.leaves, node_path=args.node_path
)

#  address from xcp data file should align with the address from xcp calib file
test_eq(xcp_data.address, xcp_calib_from_xcpjson.data[0].address)

# validate the model
try:
    XCPData.model_validate(xcp_data)
except ValidationError as exc:
    print(exc)
# type(args.channel), type(args.download_id), args.upload_id, args.download, args.diff_flashing
xcp_data.value = xcp_calib_from_xcpjson.data[0].value
pprint(xcp_data)

xcp_calib = XCPCalib(
    config=XCPConfig(
        channel=args.channel, download=str(args.download_id), upload=str(args.upload_id)
    ),
    data=[xcp_data],
)
pprint(xcp_calib)
npa =  xcp_calib.data[0].value_array_view
npa
# buffer = [i.hex() for x in npa for i in x]
# # buffer[::-1]
# len(buffer)
# buffer
# buffer = npa.tobytes()

# pprint(buffer), len(buffer)
# xcp_calib.data[0].value, len(xcp_calib.data[0].value)
addr = bytes('7000aa2a', 'utf-8')
a = 0x7000aa2a
a
npb = npa[::-1]
# npb
buffer = [struct.pack("<f", x) for x in np.nditer(npa)]
# buffer
len(buffer)

npa_to_packed_buffer

 npa_to_packed_buffer (a:numpy.ndarray)

*convert a numpy array to a packed string buffer for flashing TODO: implementation as numpy ufunc

Args: a (np.ndarray): input numpy array for flashing

Returns: str: packed string buffer for flashing*

# buffer = [struct.pack("<f", x).hex() for x in np.nditer(npa)]
# buffer[::-1]
# len(buffer)
# buffer
# data = ''.join(buffer)
data = npa_to_packed_buffer(npa)
test_eq(data, xcp_calib.data[0].value)
# data
# npa.astype(np.float32).tobytes().hex()
buffer = npa.astype(np.float32).tobytes().hex()  ## == npa_to_packed_buffer(npa)
buffer, len(buffer)
test_eq(buffer, xcp_calib.data[0].value)

flash_xcp

 flash_xcp (xcp_calib:candycan.a2l.XCPCalib,
            data:pandas.core.frame.DataFrame, diff_flashing:bool=False,
            download:bool=True)

*Summary Flash XCP data to target

Args: xcp_calib (XCPCalib): XCP calibration as template, contains all the meta information except for data xcp_data (pd.DataFrame): input XCP data to be flashed, replace the value in xcp_calib diff_flashing (bool): Use differential flashing download (bool): Download or upload*

pkt = IP()
pkt.canvas_dump()
IP()
a = IP(dst="10.10.10.28")
a.dst
a.ttl
ls(IP)
a = IP(ttl=10)
a.src
a.dst="192.168.1.1"
a
Ether()/IP()/TCP()
raw(IP())
# IP(_)
# a = Ether()/IP(dst="www.slashdot.org")/TCP()/"GET /index.html HTTP \n\n"
a = Ether()/IP(dst="www.baidu.com")/TCP()/"GET /index.html HTTP \n\n"
hexdump(a)
b=raw(a)
b
c = Ether(b)
c
c.hide_defaults()
c
os.getcwd()
a = rdpcap('../res/pcaps/ipfix.pcap')
a
# a[0].pdfdump(layer_shift=1)
a[1].psdump("/tmp/ipfix.eps", layer_shift=1)
a=IP(dst="www.baidu.com/30")
a
[p for p in a]

Caution

set the python3 of the virtualenv with the CAP_NET_RAW capability!

sudo setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /dpt/.pyenv/versions/miniconda3-3.11-24.1.2-0/envs/can/bin/python3.11
# sniff(filter="icmp and host 10.10.10.28", count=2)
if blue_pill:  # not a virtual machine
    # install vcan interface with encrypted password to sudo 
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe vcan")
    # sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan0 type vcan")
    os.system("ip link show vcan0")
    # !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000  # vcan does not support set bitrate on command line!
    # !sshpass -p asdf sudo ip link add dev vcan0 type vcan
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan0")
    # !sshpass -v -p asdf sudo ip link set up vcan0
else:  # in a virtual machine (Github workflow)
    os.system("sudo modprobe vcan")
    # sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
    os.system("sudo ip link add dev vcan0 type vcan")
    os.system("ip link show vcan0")
    # !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000  # vcan does not support set bitrate on command line!
    # !sshpass -p asdf sudo ip link add dev vcan0 type vcan
    os.system("sudo ip link set up vcan0")
load_layer("can")
conf.contribs['CANSocket'] = {'use-python-can': False}
load_contrib("cansocket")

socket = CANSocket(channel='vcan0',
                receive_own_messages=True)
packet = CAN(identifier=0x123, data=b'12345678')
packet.show2()
socket.send(packet)
rx_packet = socket.recv()
rx_packet.show2()

CCP via Scapy

load_contrib("automotive.ccp")
pkt = CCP(identifier=0x700)/CRO(ctr=1)/CONNECT(station_address=0x02)
pkt.show2()
pkt = CCP(identifier=0x711)/CRO(ctr=2)/GET_SEED(resource=2)
pkt.show2()
pkt = CCP(identifier=0x711)/CRO(ctr=3)/UNLOCK(key=b"123456")
pkt.show2()
pkt = CCP(identifier=0x711)/CRO(ctr=1)/GET_DAQ_SIZE()
sock = CANSocket(bustype='socketcan', channel='vcan0', receive_own_messages=True)
## another socket in the same process cannot receive the packet sent by the first socket
# socket2 = CANSocket(channel='vcan0')
## same socket cannot receive the packet sent by itself
# rx_packet = socket2.recv()
# rx_packet.show2()
# socket.sr1(packet, timeout=1)
# rx_packet = socket.recv()
wrpcap("./scapypcaptest.pcap", packet)
if blue_pill:
    # close and remove vcan0
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down vcan0")
    # delete vcan0
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete vcan0")    
else:
    # close and remove vcan0
    os.system("sudo ip link set down vcan0")
    # delete vcan0
    os.system("sudo ip link delete vcan0")