CCP over Scapy

flashing interface for CCP over scapy
# #|export
# from candycan.data_link_socketcan import done, send_msg

get_argparser

 get_argparser ()

*Summary Get argument parser for command line arguments

Returns: argparse.ArgumentParser: description*

parser = get_argparser()
args = parser.parse_args(
    args=[
        '--protocol', 'CCP',
        '--can_type', 'NATIVE',  #'PYTHON',  #
        '--bus_type', 'SOCKET',  #'KVASER',  # 
        '--download',
        '--diff_mode',
        '--diff_threshold', '0.001',
        '--bit_rate', '500_000',
        '--time_out', '1.0',
        '--station_address', 0,
        '--download_can_id', '630',
        '--upload_can_id', '631',
        '--a2l', repo.working_dir+'/res/VBU_AI.json',
        '--node-path', r'/PROJECT/MODULE[]',
        '--leaves', r'TQD_trqTrqSetNormal_MAP_v, VBU_L045A_CWP_05_09T_AImode_CM_single, Lookup2D_FLOAT32_IEEE, Lookup2D_X_FLOAT32_IEEE, Scalar_FLOAT32_IEEE, TQD_vVehSpd, TQD_vSgndSpd_MAP_y, TQD_pctAccPedPosFlt, TQD_pctAccPdl_MAP_x',
        '--channel_serial_number', '3',
        '--input', repo.working_dir+'/res/download.json',
        '--output', repo.working_dir+'/res/output.json',
    ]
)
args.download_can_id, args.channel_serial_number, args.upload_can_id
args.diff_threshold, hex(args.station_address), args.time_out
args.__dict__

Types definition


check_can_type

 check_can_type (c:str)

*Summary Check if the CAN type is valid

Args: can_type (str): CAN type to be checked

Returns: str: CAN type if valid

Raises: ValueError: if CAN type is invalid*

# # CanType
# native_can_type = CanType('NATIVE')
# native_can_type.lower()
# isinstance(native_can_type, CanType)
# isinstance('NATIVE', CanType)

check_bus_type

 check_bus_type (b:str)

*Summary Check if the CAN bus type is valid

Args: b (str): Python CAN bus type to be checked

Returns: str: Python CAN bus type if valid

Raises: ValueError: if CAN bus type is invalid*


CANFilter

 CANFilter (can_id:typing.Annotated[int,Gt(gt=0)]=630,
            can_mask:typing.Annotated[int,Gt(gt=0)]=2047)

*Summary CAN filter for Python CAN bus

Attributes: can_id (int): CAN message ID can_mask (int): CAN message mask*


ScapyCANSpecs

 ScapyCANSpecs (can_type:typing.Annotated[str,AfterValidator(func=<functio
                ncheck_can_typeat0x7f8f4c8b22a0>)]='NATIVE', bus_type:typi
                ng.Annotated[str,AfterValidator(func=<functioncheck_bus_ty
                peat0x7f8f4c8b2520>)]='VIRTUAL', channel_serial_number:typ
                ing.Annotated[int,Ge(ge=0),Lt(lt=500)]=3,
                download_can_id:typing.Annotated[int,Gt(gt=0)]=630,
                upload_can_id:typing.Annotated[int,Gt(gt=0)]=630,
                can_filters:Optional[list[__main__.CANFilter]]=None, bit_r
                ate:typing.Annotated[int,Gt(gt=0),Lt(lt=1000000)]=500000, 
                time_out:typing.Annotated[float,Gt(gt=0.0),Lt(lt=10.0)]=1.
                0, station_address:typing.Annotated[int,Ge(ge=0),Lt(lt=255
                )]=0,
                cntr:typing.Annotated[int,Ge(ge=0),Lt(lt=1000000)]=0,
                receive_own_messages:bool=True, download_upload:bool=True,
                diff_mode:bool=False, diff_threshold:float=0.001,
                last_download_data:Optional[candycan.a2l.XCPData]=None)

*Usage docs: https://docs.pydantic.dev/2.7/concepts/models/

A base class for creating Pydantic models.

Attributes: class_vars: The names of classvars defined on the model. private_attributes: Metadata about the private attributes of the model. signature: The signature for instantiating the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a `RootModel`.
__pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.

__pydantic_extra__: An instance attribute with the values of extra fields from validation when
    `model_config['extra'] == 'allow'`.
__pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
__pydantic_private__: Instance attribute with the values of private attributes set on the model instance.*
try:
    m = ScapyCANSpecs(can_type='NATIVE', bus_type='VIRTUAL')
except ValidationError as exc:
    print(exc)

pprint(m.model_dump())
def emulate_call(m: ScapyCANSpecs):
    m.cntr += 1

for i in range(3):
    emulate_call(m)
    print(f"{i}: counter {m.cntr}")
xcp_calib_from_xcpjson = Get_XCPCalib_From_XCPJSon(args.input)
xcp_calib_from_xcpjson
xcp_data = Generate_Init_XCPData_From_A2L(
    a2l=args.a2l, keys=args.leaves, node_path=args.node_path
)

#  address from xcp data file should align with the address from xcp calib file
test_eq(xcp_data.address, xcp_calib_from_xcpjson.data[0].address)

# validate the model
try:
    XCPData.model_validate(xcp_data)
except ValidationError as exc:
    print(exc)
# type(args.channel), type(args.download_id), args.upload_id, args.download, args.diff_flashing
xcp_data.value = xcp_calib_from_xcpjson.data[0].value
pprint(xcp_data)
xcp_data.value_array_view[0,2], xcp_data.value_array_view[2,0]
xcp_calib = XCPCalib(
    config=XCPConfig(
        channel=args.channel_serial_number, download=args.download_can_id, upload=args.upload_can_id
    ),
    data=[xcp_data],
)
pprint(xcp_calib)
npa =  xcp_calib.data[0].value_array_view
npa.shape, npa.dtype, npa
len(xcp_calib.data[0].value_bytes), xcp_calib.data[0].value_bytes

len(npa.tobytes()), npa.tobytes()
test_eq(npa.tobytes(), xcp_calib.data[0].value_bytes)
xcp_calib.data[0].value
# buffer = [i.hex() for x in npa for i in x]
# # buffer[::-1]
# len(buffer)
# buffer
# buffer = npa.tobytes()

# pprint(buffer), len(buffer)
# xcp_calib.data[0].value, len(xcp_calib.data[0].value)
addr = bytes('7000aa2a', 'utf-8')
a = 0x7000aa2a
a
npb = npa[::-1]
# npb
buffer = [struct.pack("<f", x) for x in np.nditer(npa)]
# buffer
len(buffer)

investigate int type and type size

d = xcp_calib.data[0]
add = int(d.address, base=16)
d.address, add
hex(add), type(add), sys.getsizeof(add)
struct.calcsize('h'), struct.calcsize('i'), struct.calcsize('l'), struct.calcsize('L')
type(d.address),len(d.address)

npa_to_packed_buffer

 npa_to_packed_buffer (a:numpy.ndarray)

*convert a numpy array to a packed string buffer for flashing TODO: implementation as numpy ufunc

Args: a (np.ndarray): input numpy array for flashing

Returns: str: packed string buffer for flashing*

# buffer = [struct.pack("<f", x).hex() for x in np.nditer(npa)]
# buffer[::-1]
# len(buffer)
# buffer
# data = ''.join(buffer)
data = npa_to_packed_buffer(npa)
test_eq(data, xcp_calib.data[0].value)
# data

convert a numpy array to a continuous hex string

# npa.astype(np.float32).tobytes().hex()
buffer = npa.astype(np.float32).tobytes().hex()  ## == npa_to_packed_buffer(npa)
buffer, len(buffer)
test_eq(buffer, xcp_calib.data[0].value)

flash_xcp

 flash_xcp (xcp_calib:candycan.a2l.XCPCalib,
            data:pandas.core.frame.DataFrame, diff_flashing:bool=False,
            download:bool=True)

*Summary Flash XCP data to target

Args: xcp_calib (XCPCalib): XCP calibration as template, contains all the meta information except for data xcp_data (pd.DataFrame): input XCP data to be flashed, replace the value in xcp_calib diff_flashing (bool): Use differential flashing download (bool): Download or upload*

Caution

set the python3 of the virtualenv with the CAP_NET_RAW capability!

sudo setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /dpt/.pyenv/versions/miniconda3-3.11-24.1.2-0/envs/can/bin/python3.11

install sshpass, and create .sshpasswd.gpg file in home root

cd ~
sudo apt-get install sshpass
echo 'password_in_verbatim" > .sshpasswd
gpg -c .sshpasswd
ls | grep 'sshpasswd.gpg'
args.channel_serial_number
# install vcan interface with encrypted password to sudo 
os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe can")
args.channel_serial_number
# sshpass -v -p asdf sudo ip link add dev can0 type can
os.system(f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev can{args.channel_serial_number} type vcan")
os.system(f"ip link show can{args.channel_serial_number}")
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set can0 type can bitrate 500000  # can does not support set bitrate on command line!
# !sshpass -p asdf sudo ip link add dev can0 type can
# os.system(f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up can{args.channel_serial_number}")
os.system(f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set can{args.channel_serial_number} up type can bitrate 500000")
# !sshpass -v -p asdf sudo ip link set up can0
socket = CANSocket(bustype='socketcan',channel=f'can{args.channel_serial_number}',
                receive_own_messages=True)
socket, args.channel_serial_number
packet = CAN(identifier=0x123, data=b'12345678')
packet.show2()
# socket.send(packet)
# rx_packet = socket.recv()
# rx_packet.show2()
# rx_packet.canvas_dump()
# rx_packet = socket.recv()
wrpcap("./scapypcaptest.pcap", packet)
## another socket in the same process cannot receive the packet sent by the first socket
# socket2 = CANSocket(channel='vcan0')
## same socket cannot receive the packet sent by itself
# rx_packet = socket2.recv()

CCP via Scapy

channel = 'can' + str(xcp_calib.config.channel)
channel
args.channel_serial_number
# sock = CANSocket(busytpe='socketcan', channel=f'can{args.channel_serial_number}', receive_own_messages=True)
sock = CANSocket(busytpe='socketcan', channel=f'can{args.channel_serial_number}', receive_own_messages=True)
sock
# sock = CANSocket(busytpe='socketcan', channel=f'can{args.channel_serial_number}') # receive_own_messages=True)
# sock
sock = CANSocket(busytpe='socketcan', channel=f'can{args.channel_serial_number}', basecls=CCP) # receive_own_messages=True)
sock
xcp_calib.config.download_can_id
hex(xcp_calib.config.download_can_id)

CRO for connection

# sock.send(cro)
# ctr += 1
# rx_cro = sock.recv()
# rx_cro.canvas_dump()
sock = CANSocket(busytpe='socketcan', channel=f'can{args.channel_serial_number}', basecls=CCP, receive_own_messages=True)
# sock = CANSocket(busytpe='socketcan', channel=f'can{args.channel_serial_number}', basecls=CCP)
sock

get CCP version

ctr = 0
# cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/CONNECT(station_address=0x01)  # or 0x00?
cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/GET_CCP_VERSION()  # or 0x00?
cro.show2()
cro.canvas_dump()
dto = sock.sr1(cro) #, timeout=5)
ctr += 1
dto.show2()
dto.canvas_dump()
dto

send connect CRO

print(f'ctr: {ctr}')
# cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/CONNECT(station_address=0x01)  # or 0x00?
cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=0)/CONNECT()  # or 0x00?
cro.show2()
cro.canvas_dump()
print(f"ctr: {ctr}")
dto = sock.sr1(cro) #, timeout=5)
ctr += 1 # 1
dto.show2()
dto.canvas_dump()
dto

Download from target

xcp_calib.data
d = xcp_calib.data[0]
d.address, d.type_size, d.dim, d.value_array_view, d.value_bytes
len(d.value_bytes )
d.value

set mta

d.address,ctr
# ctr = 0
print(f"ctr: {ctr}")
cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=0)/SET_MTA(address=int(d.address, 16))
cro.show2()
cro.payload
sock
dto = sock.sr1(cro)
ctr += 1
dto.show2()
dto.canvas_dump()
dto.payload
len_in_bytes = d.type_size * d.dim[0] * d.dim[1]
print(f"len_in_bytes: {len_in_bytes} = type_size: {d.type_size} x dim: {d.dim}")

download_times = len_in_bytes // 6
last_download_size = len_in_bytes % 6
print(f"download_times: {download_times}, last_download_size: {last_download_size}")
d.value_bytes
len(d.value_bytes)
tile0 = d.value_bytes[0:6]
len(tile0), tile0, tile0.hex(), type(tile0)
tile1 = d.value_bytes[6:12]
len(tile1), tile1, tile1.hex()
tile0 = d.value_bytes[0:4]
len(tile0), tile0, tile0.hex(), type(tile0), struct.unpack("<f", tile0)
tile1 = d.value_bytes[4:8]
len(tile1), tile1, tile1.hex(), struct.unpack("<f", tile1)
tile2 = d.value_bytes[8:12]
len(tile1), tile2, tile2.hex(), struct.unpack("<f", tile2)
st = 2*17*4
tile3 = d.value_bytes[st:st+4]
len(tile1), tile3, tile3.hex(), struct.unpack("<f", tile3)
d.value_array_view[2,0]
test_eq(struct.unpack("<f", tile3), d.value_array_view[2,0])
tile3.hex(), tile3.hex().encode()
type(tile3)
list(tile3)
ba_uploaded = bytearray()
ba_uploaded += tile3
ba_uploaded
ba_uploaded += tile2
ba_uploaded += tile1
ba_uploaded, len(ba_uploaded)
ba_uploaded = bytearray()
len_in_bytes = d.type_size * d.dim[0] * d.dim[1]
download_times = len_in_bytes // 6
last_download_size = len_in_bytes % 6
for tile in range(download_times):
    ba_uploaded += d.value_bytes[tile*6:(tile+1)*6]
if last_download_size:
    ba_uploaded += d.value_bytes[download_times*6:download_times*6+last_download_size]

len(ba_uploaded), ba_uploaded.hex()
test_eq(ba_uploaded, d.value_bytes)
test_eq(ba_uploaded.hex(), d.value)
ba_uploaded.hex()

loop over XCPCalib data array

print(f'payload: {d.value_bytes[0:6]}')
print(f'ctr: {ctr}')
cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=0)/DNLOAD_6(data=d.value_bytes[0:6])
# if i%100==0:
    # print(f"i: {i}, ctr: {ctr} cro: {cro}")
cro.show2()
    # cro.payload
# sent_bytes = sock.send(cro)
# rx_cro = sock.recv()
dto = sock.sr1(cro)
ctr += 1
dto.show2()
cro.canvas_dump()
dto.canvas_dump()
print(f'payload: {d.value_bytes[6:12]}')
print(f'ctr: {ctr}')
cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=0)/DNLOAD_6(data=d.value_bytes[6:12])
# if i%100==0:
    # print(f"i: {i}, ctr: {ctr} cro: {cro}")
cro.show2()
    # cro.payload
# sent_bytes = sock.send(cro)
# rx_cro = sock.recv()
dto = sock.sr1(cro)
ctr += 1
dto.show2()
for i in range(download_times):
    print(f'payload: {d.value_bytes[i*6:(i+1)*6]}')
    print(f'ctr: {ctr}')
    cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/DNLOAD_6(data=d.value_bytes[i*6:(i+1)*6])
    # if i%100==0:
        # print(f"i: {i}, ctr: {ctr} cro: {cro}")
    cro.show2()
        # cro.payload
    # sent_bytes = sock.send(cro)
    # rx_cro = sock.recv()
    dto = sock.sr1(cro)
    ctr += 1
    dto.show2()
    assert dto.return_code == 0x00
    # rx_cro.show2()
    # dto = sock.sr1(cro)
    # dto.show2()
    # assert dto.return_code == 0x00
i
cro.canvas_dump()
dto.canvas_dump()
start_index = download_times * 6
cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/DNLOAD(data=d.value_bytes[start_index:start_index+last_download_size])
cro.show2()
cro.payload
cro.canvas_dump()
# sent_bytes = sock.send(cro)
dto = sock.sr1(cro)
ctr += 1 
dto.canvas_dump()

Disconnect target ecu

cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/DISCONNECT(station_address=0x00)
cro.show2()
cro.canvas_dump()
# bytes_sent = sock.send(cro)
dto = sock.sr1(cro)
ctr += 1
dto.show2()
dto.canvas_dump()

Three context managers for CCP


can_context

 can_context (can_specs:__main__.ScapyCANSpecs)

*Summary Context manager for scapy CAN socket

Args: can_specs (ScapyCANSpecs): CAN specs including can type, bus type, channel, etc.

Yields: CANSocket: CAN socket object*


SET_MTA_context

 SET_MTA_context (can_specs:__main__.ScapyCANSpecs,
                  sock:scapy.contrib.cansocket_native.NativeCANSocket,
                  data:candycan.a2l.XCPData)

*Summary Context manager for scapy set_mta

Args: channel (str): CAN channel to use, default is vcan0

Yields: CAN: packdet for CAN message*


XLOAD_context

 XLOAD_context (can_specs:__main__.ScapyCANSpecs,
                sock:scapy.contrib.cansocket_native.NativeCANSocket,
                data:candycan.a2l.XCPData, start_index:int, tile_size:int)

*Summary Context manager for scapy load (download or upload)

Args: channel (str): CAN channel to use, default is vcan0

Yields: CANSocket: CAN socket object*

Downloading and uploading with context managers


upload_calib_data2

 upload_calib_data2 (xcp_calib:candycan.a2l.XCPCalib,
                     can_specs:__main__.ScapyCANSpecs)

*Summary Upload XCP calibration data from target to host, the result will update the xcp_calib.data field

Args: xcp_calib (XCPCalib): XCP calibration to be uploaded from the target to host diff_flashing (bool): Use differential flashing*


downlod_calib_data2

 downlod_calib_data2 (xcp_calib:candycan.a2l.XCPCalib,
                      can_specs:__main__.ScapyCANSpecs)

*Summary Download XCP calibration data to target use scapy_can_context

Args: xcp_calib (XCPCalib): XCP calibration to be downloaded into the target*

can_filters = [{'can_id': xcp_calib.config.upload_can_id, 'can_mask': 0x7FF}]
can_specs = ScapyCANSpecs(can_type='NATIVE', 
                        bus_type='SOCKET', 
                        channel_serial_number=3,
                        download_can_id=xcp_calib.config.download_can_id,
                        upload_can_id=xcp_calib.config.upload_can_id,
                        can_filters=can_filters,
                        bit_rate=500_000,
                        time_out=1.0,
                        station_address=0x00,
                        cntr=0,
                        receive_own_messages=True,
                        download_upload=True
                        )
can_specs
can_specs.model_dump()
xcp_calib.config.channel = 0
xcp_calib
# TODO cannot run test on CCP without a real or emulated ECU 
# downlod_calib_data2(xcp_calib, can_type='NATIVE', bus_type='VIRTUAL', bit_rate=500_000, timeout=1.0, station_address=0x00, diff_flashing=False)

Test downloading and uploading

xcp_calib_from_xcpjson = Get_XCPCalib_From_XCPJSon(args.input)

args.download_can_id = xcp_calib_from_xcpjson.config.download_can_id
args.upload_can_id = xcp_calib_from_xcpjson.config.upload_can_id
args.channel_serial_number = xcp_calib_from_xcpjson.config.channel

xcp_data = Generate_Init_XCPData_From_A2L(
    a2l=args.a2l, keys=args.leaves, node_path=args.node_path
)  # initial xcp_data has value 0
try:
    XCPData.model_validate(xcp_data)
except ValidationError as exc:
    print(exc)

# emulate torque table input as numpy array
xcp_data_value_npa = xcp_calib_from_xcpjson.data[0].value_array_view
xcp_data.value = xcp_data_value_npa.astype(np.float32).tobytes().hex()
pprint(xcp_data)

xcp_calib = XCPCalib(
    config=XCPConfig(
        channel=args.channel_serial_number, download=args.download_can_id, upload=args.upload_can_id
    ),
    data=[xcp_data],
)
pprint(xcp_calib)

can_filters = [{'can_id': xcp_calib.config.upload_can_id, 'can_mask': 0x7FF}]
cntr = 0
can_specs = ScapyCANSpecs(can_type=args.can_type,
                        bus_type=args.bus_type,
                        channel_serial_number=args.channel_serial_number,
                        download_can_id=xcp_calib.config.download_can_id,
                        upload_can_id=xcp_calib.config.upload_can_id,
                        can_filters=can_filters,
                        bit_rate=args.bit_rate,
                        time_out=args.time_out,
                        station_address=args.station_address,
                        cntr=cntr,
                        receive_own_messages=True,
                        download_upload=args.download,  # CCP Upload mode
                        diff_mode = args.diff_mode,
                        diff_threshold= args.diff_threshold
                        )
can_specs
# test uploading
# can_specs.download_upload = False                        
# upload_calib_data2(xcp_calib=xcp_calib, can_specs=can_specs)

Release CAN device

# close and remove vcan0
# !sshpass -v -p  asdf sudo ip link delete vcan0 

os.system(f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down can{args.channel_serial_number}")
# delete vcan0
os.system(f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete can{args.channel_serial_number}")