CCP over SocketCAN

flashing interface for CCP over SocketCAN
# #|export
# from candycan.data_link_socketcan import done, send_msg

get_argparser

 get_argparser ()

*Summary Get argument parser for command line arguments

Returns: argparse.ArgumentParser: description*

parser = get_argparser()
args = parser.parse_args(
    args=[
        '--protocol', 'CCP',
        '--bus_type', 'SOCKET', # 'VIRTUAL',  #
        '--download',
        '--diff_mode',
        '--diff_threshold', '0.001',
        '--bit_rate', '500_000',
        '--time_out', '1.0',
        '--station_address', 0,
        '--download_can_id', '630',
        '--upload_can_id', '631',
        '--a2l', repo.working_dir+'/res/VBU_AI.json',
        '--node-path', r'/PROJECT/MODULE[]',
        '--leaves', r'TQD_trqTrqSetNormal_MAP_v, VBU_L045A_CWP_05_09T_AImode_CM_single, Lookup2D_FLOAT32_IEEE, Lookup2D_X_FLOAT32_IEEE, Scalar_FLOAT32_IEEE, TQD_vVehSpd, TQD_vSgndSpd_MAP_y, TQD_pctAccPedPosFlt, TQD_pctAccPdl_MAP_x',
        '--channel_serial_number', '3',
        '--input', repo.working_dir+'/res/download.json',
        '--output', repo.working_dir+'/res/output.json',
    ]
)
args.download_can_id, args.channel_serial_number, args.upload_can_id
args.diff_threshold, hex(args.station_address), args.time_out
args.__dict__

Types definition


check_bus_type

 check_bus_type (b:str)

*Summary Check if the CAN bus type is valid

Args: b (str): Python CAN bus type to be checked

Returns: str: Python CAN bus type if valid

Raises: ValueError: if CAN bus type is invalid*


CANFilter

 CANFilter (can_id:typing.Annotated[int,Gt(gt=0)]=630,
            can_mask:typing.Annotated[int,Gt(gt=0)]=2047)

*Summary CAN filter for Python CAN bus

Attributes: can_id (int): CAN message ID can_mask (int): CAN message mask*


ScapyCANSpecs

 ScapyCANSpecs (bus_type:typing.Annotated[str,AfterValidator(func=<functio
                ncheck_bus_typeat0x7f1ceb80ede0>)]='VIRTUAL', channel_seri
                al_number:typing.Annotated[int,Ge(ge=0),Lt(lt=500)]=3,
                download_can_id:typing.Annotated[int,Gt(gt=0)]=630,
                upload_can_id:typing.Annotated[int,Gt(gt=0)]=630,
                can_filters:Optional[list[__main__.CANFilter]]=None, bit_r
                ate:typing.Annotated[int,Gt(gt=0),Lt(lt=1000000)]=500000, 
                time_out:typing.Annotated[float,Gt(gt=0.0),Lt(lt=10.0)]=1.
                0, station_address:typing.Annotated[int,Ge(ge=0),Lt(lt=255
                )]=0,
                cntr:typing.Annotated[int,Ge(ge=0),Lt(lt=1000000)]=0,
                receive_own_messages:bool=True, download_upload:bool=True,
                diff_mode:bool=False, diff_threshold:float=0.001,
                last_download_data:Optional[candycan.a2l.XCPData]=None)

*Usage docs: https://docs.pydantic.dev/2.7/concepts/models/

A base class for creating Pydantic models.

Attributes: class_vars: The names of classvars defined on the model. private_attributes: Metadata about the private attributes of the model. signature: The signature for instantiating the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a `RootModel`.
__pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.

__pydantic_extra__: An instance attribute with the values of extra fields from validation when
    `model_config['extra'] == 'allow'`.
__pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
__pydantic_private__: Instance attribute with the values of private attributes set on the model instance.*
try:
    m = ScapyCANSpecs(can_type='NATIVE', bus_type='VIRTUAL')
except ValidationError as exc:
    print(exc)

pprint(m.model_dump())
def emulate_call(m: ScapyCANSpecs):
    m.cntr += 1

for i in range(3):
    emulate_call(m)
    print(f"{i}: counter {m.cntr}")

CCP code


CCPCommand

 CCPCommand (connect:int=1, set_mta:int=2, disconnect:int=7,
             download:int=3, download6:int=35, upload:int=4,
             short_upload:int=15, get_seed:int=18, get_ccp_version:int=27)

*Usage docs: https://docs.pydantic.dev/2.7/concepts/models/

A base class for creating Pydantic models.

Attributes: class_vars: The names of classvars defined on the model. private_attributes: Metadata about the private attributes of the model. signature: The signature for instantiating the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a `RootModel`.
__pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.

__pydantic_extra__: An instance attribute with the values of extra fields from validation when
    `model_config['extra'] == 'allow'`.
__pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
__pydantic_private__: Instance attribute with the values of private attributes set on the model instance.*
xcp_calib_from_xcpjson = Get_XCPCalib_From_XCPJSon(args.input)
xcp_calib_from_xcpjson
xcp_data = Generate_Init_XCPData_From_A2L(
    a2l=args.a2l, keys=args.leaves, node_path=args.node_path
)

#  address from xcp data file should align with the address from xcp calib file
test_eq(xcp_data.address, xcp_calib_from_xcpjson.data[0].address)

# validate the model
try:
    XCPData.model_validate(xcp_data)
except ValidationError as exc:
    print(exc)
# type(args.channel), type(args.download_id), args.upload_id, args.download, args.diff_flashing
xcp_data.value = xcp_calib_from_xcpjson.data[0].value
pprint(xcp_data)
xcp_data.value_array_view[0,2], xcp_data.value_array_view[2,0]
xcp_calib = XCPCalib(
    config=XCPConfig(
        channel=args.channel_serial_number, download=args.download_can_id, upload=args.upload_can_id
    ),
    data=[xcp_data],
)
pprint(xcp_calib)
npa =  xcp_calib.data[0].value_array_view
npa.shape, npa.dtype, npa
len(xcp_calib.data[0].value_bytes), xcp_calib.data[0].value_bytes

len(npa.tobytes()), npa.tobytes()
test_eq(npa.tobytes(), xcp_calib.data[0].value_bytes)
xcp_calib.data[0].value
addr = bytes('7000aa2a', 'utf-8')
a = 0x7000aa2a
a
npb = npa[::-1]
# npb
buffer = [struct.pack("<f", x) for x in np.nditer(npa)]
# buffer
len(buffer)

investigate int type and type size

xcp_data = xcp_calib.data[0]
add = int(xcp_data.address, base=16)
xcp_data.address, add
hex(add), type(add), sys.getsizeof(add)
struct.calcsize('h'), struct.calcsize('i'), struct.calcsize('l'), struct.calcsize('L')
type(xcp_data.address),len(xcp_data.address)

npa_to_packed_buffer

 npa_to_packed_buffer (a:numpy.ndarray)

*convert a numpy array to a packed string buffer for flashing TODO: implementation as numpy ufunc

Args: a (np.ndarray): input numpy array for flashing

Returns: str: packed string buffer for flashing*

# buffer = [struct.pack("<f", x).hex() for x in np.nditer(npa)]
# buffer[::-1]
# len(buffer)
# buffer
# data = ''.join(buffer)
data = npa_to_packed_buffer(npa)
test_eq(data, xcp_calib.data[0].value)
# data

convert a numpy array to a continuous hex string

# npa.astype(np.float32).tobytes().hex()
buffer = npa.astype(np.float32).tobytes().hex()  ## == npa_to_packed_buffer(npa)
buffer, len(buffer)
test_eq(buffer, xcp_calib.data[0].value)

flash_xcp

 flash_xcp (xcp_calib:candycan.a2l.XCPCalib,
            data:pandas.core.frame.DataFrame, diff_flashing:bool=False,
            download:bool=True)

*Summary Flash XCP data to target

Args: xcp_calib (XCPCalib): XCP calibration as template, contains all the meta information except for data xcp_data (pd.DataFrame): input XCP data to be flashed, replace the value in xcp_calib diff_flashing (bool): Use differential flashing download (bool): Download or upload*

Caution

set the python3 of the virtualenv with the CAP_NET_RAW capability!

sudo setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /dpt/.pyenv/versions/miniconda3-3.11-24.1.2-0/envs/can/bin/python3.11

install sshpass, and create .sshpasswd.gpg file in home root

cd ~
sudo apt-get install sshpass
echo 'password_in_verbatim" > .sshpasswd
gpg -c .sshpasswd
ls | grep 'sshpasswd.gpg'
args.channel_serial_number
bus = 'can'
bus+str(args.channel_serial_number)
# install vcan interface with encrypted password to sudo 
if args.bus_type == 'SOCKET':
    bus='can'
else:  # 'VIRTUAL"
    bus='vcan'
device = bus+str(args.channel_serial_number)

os.system(f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe {bus}")
os.system(f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev {device} type {bus}")
device, bus, args.bit_rate
os.system(f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set {device} up type {bus} bitrate {args.bit_rate}")
# os.system(f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up {device}")
# sudo ip link set can3 up type can bitrate 500000
os.system(f"ip link show {device}")
bus = can.interface.Bus(bustype='socketcan', channel=device, bitrate=args.bit_rate)
bus.__dict__
# cntr = 0
# can_data = 0x12345678
# message_to_send = can.Message(arbitration_id=args.download_can_id, 
#                             data=can_data, 
#                             is_extended_id=False)     
# message_to_send
args.channel_serial_number

CCP via python-can

channel = 'can' + str(xcp_calib.config.channel)
channel
args
can_filters = [{'can_id': xcp_calib.config.upload_can_id, 'can_mask': 0x7FF}]
can_specs = ScapyCANSpecs(can_type='NATIVE', 
                        bus_type=args.bus_type, 
                        channel_serial_number=args.channel_serial_number,
                        download_can_id=xcp_calib.config.download_can_id,
                        upload_can_id=xcp_calib.config.upload_can_id,
                        can_filters=can_filters,
                        bit_rate=args.bit_rate,
                        time_out=1.0,
                        station_address=args.station_address,
                        cntr=0,
                        receive_own_messages=True,
                        download_upload=True
                        )
can_specs
can_specs.model_dump()

create sample data for testing

xcp_calib.config.download_can_id
hex(xcp_calib.config.download_can_id)
xcp_calib.data
xcp_data = xcp_calib.data[0]
xcp_data.address, xcp_data.type_size, xcp_data.dim, xcp_data.value_array_view, xcp_data.value_bytes
len(xcp_data.value_bytes)
xcp_data.value

Test bytes hex encodeing and decoding

len_in_bytes = xcp_data.type_size * xcp_data.dim[0] * xcp_data.dim[1]
print(f"len_in_bytes: {len_in_bytes} = type_size: {xcp_data.type_size} x dim: {xcp_data.dim}")

download_times = len_in_bytes // 6
last_download_size = len_in_bytes % 6
print(f"download_times: {download_times}, last_download_size: {last_download_size}")
xcp_data.value_bytes
len(xcp_data.value_bytes)
tile0 = xcp_data.value_bytes[0:6]
len(tile0), tile0, tile0.hex(), type(tile0)
tile1 = xcp_data.value_bytes[6:12]
len(tile1), tile1, tile1.hex()
tile0 = xcp_data.value_bytes[0:4]
len(tile0), tile0, tile0.hex(), type(tile0), struct.unpack("<f", tile0)
tile1 = xcp_data.value_bytes[4:8]
len(tile1), tile1, tile1.hex(), struct.unpack("<f", tile1)
tile2 = xcp_data.value_bytes[8:12]
len(tile2), tile2, tile2.hex(), struct.unpack("<f", tile2)
st = 2*17*4
tile3 = xcp_data.value_bytes[st:st+4]
len(tile1), tile3, tile3.hex(), struct.unpack("<f", tile3)
xcp_data.value_array_view[2,0]
test_eq(struct.unpack("<f", tile3), xcp_data.value_array_view[2,0])
tile3, len(tile3)
tile3.hex(), len(tile3.hex())
tile3.hex().encode(), len(tile3.hex().encode()), 
tile3.hex().encode().hex(), len(tile3.hex().encode().hex())
try:
    tile3.hex().hex()
except Exception as exc:
    print(exc)
try:
    tile3.decode("utf-8"), len(tile3.decode())
except Exception as exc:
    print(exc)
tile3.decode('utf-16')
tile3.decode('utf-32','backslashreplace')
t3  = tile3.decode('utf-8','backslashreplace')
t3, len(t3), type(t3)
t4  = tile3.decode('utf-8','ignore')
t4, len(t4), type(t4)
type(tile3)
list(tile3)
ba_uploaded = bytearray()
ba_uploaded += tile3
ba_uploaded
ba_uploaded += tile2
ba_uploaded += tile1
ba_uploaded, len(ba_uploaded)
ba_uploaded = bytearray()
len_in_bytes = xcp_data.type_size * xcp_data.dim[0] * xcp_data.dim[1]
download_times = len_in_bytes // 6
last_download_size = len_in_bytes % 6
for tile in range(download_times):
    ba_uploaded += xcp_data.value_bytes[tile*6:(tile+1)*6]
if last_download_size:
    ba_uploaded += xcp_data.value_bytes[download_times*6:download_times*6+last_download_size]

len(ba_uploaded), ba_uploaded.hex()
test_eq(ba_uploaded, xcp_data.value_bytes)
test_eq(ba_uploaded.hex(), xcp_data.value)
ba_uploaded.hex()

CRO for connection

Create can data as bytes of little endianness

b1 = ccp_command.connect.to_bytes(byteorder='little', length=1)
ccp_command.connect, b1, b1.hex()
cntr = 2
b2 = cntr.to_bytes(byteorder='little', length=1)
b2, b2.hex()
can_data =  b1 + b2
can_data, can_data.hex()
cntr, ccp_command.connect, can_specs.station_address

cro_little = struct.pack("@BB", ccp_command.connect, cntr) + can_specs.station_address.to_bytes(byteorder='little', length=2)
cro_big = can_specs.station_address.to_bytes(byteorder='big', length=2) + struct.pack(">BB", cntr, ccp_command.connect) 
cro_little, cro_big
cro = cro_big
cro, cro.hex()

struct.unpack("@BBH", cro)
# cro_little = struct.pack("@bb", ccp_command.connect, cntr) + can_specs.station_address.to_bytes(byteorder='little', length=2)
# cro_big = can_specs.station_address.to_bytes(byteorder='big', length=2) + struct.pack(">bb", cntr, ccp_command.connect) 
cro_little = struct.pack("@bb", ccp_command.connect, cntr) 
cro_big = struct.pack(">bb", cntr, ccp_command.connect) 
cro = cro_big
cro, cro.hex()
# struct.unpack("@bbh", cro)
struct.unpack("@bb", cro)
try:
    d = struct.pack("@cc", cntr, ccp_command.connect)
    d, d.hex()
except Exception as exc:
    print(exc)
device, can_specs.bit_rate, can_specs.can_filters, can_specs.station_address
cro
# send connect message
# with can.interface.Bus(bustype='socketcan', channel=device, bitrate=can_specs.bit_rate, filter=can_specs.can_filters) as bus:
# with can.interface.Bus(interface='socketcan', channel=device, bitrate=can_specs.bit_rate) as bus:
with can.interface.Bus(interface='socketcan', channel=device, bitrate=can_specs.bit_rate, receive_own_message=True) as bus:
    msg = can.Message(
        arbitration_id=can_specs.download_can_id, 
        data=cro, 
        is_extended_id=False)
    try:
        bus.send(msg)
        cntr += 1
        print(f"Message sent: {msg} on {bus.channel_info} cntr: {cntr-1}")
    except can.CanError:
        print("Message NOT sent")
    
    dto = bus.recv(timeout=can_specs.time_out)
    dto
can_specs.time_out
# with can.interface.Bus(interface='socketcan', channel=device, bitrate=can_specs.bit_rate, filter=can_specs.can_filters):
with can.interface.Bus(interface='socketcan', channel=device, bitrate=can_specs.bit_rate) as bus:
    dto: can.Message = bus.recv(timeout=can_specs.time_out)
    print(dto)
# while(True):
with can.interface.Bus(interface='socketcan', channel=device, bitrate=can_specs.bit_rate) as bus:
    for msg in bus:
        print(msg)
# print('Message received: ', dto.arbitration_id, dto.data, dto.timestamp, dto.dlc)
# # send connect message
# with can.interface.Bus(bustype='socketcan', channel=device, bitrate=can_specs.bit_rate, filter=can_specs.can_filters) as bus:
#     bus.send(msg)
#     cntr += 1
#     print(f"Message sent: {msg}, cntr: {cntr-1}")
#     while(True):
#         bus = can.interface.Bus(bustype='socketcan', channel=device, bitrate=can_specs.bit_rate, filter=can_specs.can_filters)
#         dto: can.Message = bus.recv()
#         print(dto)
#         # print('Message received: ', dto.arbitration_id, dto.data, dto.timestamp, dto.dlc)
#         if dto.dlc != 3 or dto.arbitration_id != can_specs.upload_can_id:
#             continue
#         pid, err_code , cntr_ret = struct.unpack('@BBB', dto.data)
#         if pid != 0xff:
#             continue
#         if cntr_ret == cntr and err_code == 0:
#             print(f"Connected: {pid}, {err_code}, {cntr}")
#             break
#         else:
#             print(f"Error: {pid}, {err_code}, {cntr}")
#             break

send SET_MTA message

xcp_data.address
addr = int(xcp_data.address, 16)
addr, hex(addr)
a_bytes_little = struct.pack("@I", addr)
a_bytes_big = struct.pack(">I", addr)
a_bytes_little, a_bytes_big
# create CAN message for ccp SET_MTA CRO
xcp_data.address, cntr, ccp_command.set_mta, hex(addr), struct.pack("@I", addr)
can_data_big = struct.pack(">IBBBB", int(xcp_data.address,16), 0x00, 0x00, cntr, ccp_command.set_mta)
can_data_little = struct.pack("@BBBBI", ccp_command.set_mta, cntr, 0x00, 0x00, int(xcp_data.address,16))
can_data_little, can_data_big
can_data = can_data_little
cro = can.Message(arbitration_id=args.download_can_id, 
                            data=can_data, 
                            is_extended_id=False)   
cro
# # set MTA
# with can.interface.Bus(bustype='socketcan', channel=device, bitrate=args.bit_rate, filter=can_specs.can_filters) as bus:
#     bus.send(cro)
#     cntr += 1
#     print(f"Message sent: {cro}, cntr: {cntr-1}")
    
#     while(True):
#         dto: can.Message = bus.recv()
#         print('Message received: ', dto.arbitration_id, dto.data, dto.timestamp, dto.dlc)
        
#         if dto.dlc != 3 or dto.arbitration_id != can_specs.upload_can_id:
#             continue
#         pid, err_code , cntr_ret = struct.unpack('@BBB', dto.data)
#         if pid != 0xff:
#             continue
#         if cntr_ret == cntr and err_code == 0:
#             print(f"Connected: {pid}, {err_code}, {cntr}")
#             break
#         else:
#             print(f"Error: {pid}, {err_code}, {cntr}")
#             break

Download from target

# can_data_big = struct.pack(">BBBBBBBB", int(xcp_data.address,16), 0x00, 0x00, cntr, ccp_command.download6)
can_data_little = struct.pack("@BB", ccp_command.download6, cntr)
ccp_command.download6, cntr, can_data_little
xcp_data.value_bytes[0:6]
b = xcp_data.value_bytes[0]
b, b.to_bytes(), can_data_little + b.to_bytes()
can_data_little + xcp_data.value_bytes[0:6]
tile = 0
for i in range(6):
    b = xcp_data.value_bytes[tile+i]
    can_data_little += b.to_bytes()
    b, can_data_little
# xcp_data.value_bytes[i*6:(i+1)*6]
can_data_little

loop over XCPCalib data array

ccp_command
# for i in range(download_times):
#     # Create CAN message for ccp DOWNLOAD CRO
#     # xcp_data.address, cntr, ccp_command.set_mta, hex(addr), struct.pack("@I", addr)
#     # can_data_big = xcp_data.value_bytes[i*6:(i+1)*6:-1] + can_data_big
#     can_data_little = struct.pack("@BB", ccp_command.download6, cntr)
#     can_data_little += xcp_data.value_bytes[i*6:(i+1)*6]
#     # can_data_little, can_data_big
#     can_data = can_data_little
#     cro = can.Message(arbitration_id=args.download_can_id, data=can_data, is_extended_id=False)   
    
#     # Download data 
#     with can.interface.Bus(bustype='socketcan', channel=device, bitrate=args.bit_rate, filter=can_specs.can_filters) as bus:
#         bus.send(cro)
#         cntr += 1
#         print(f"Message sent: {cro}, cntr: {cntr-1}")

#         while(True):
#             dto: can.Message = bus.recv()
#             print('Message received: ', dto.arbitration_id, dto.data, dto.timestamp, dto.dlc)

#             # if dto.arbitration_id != can_specs.upload_can_id:
#             #     continue
#             pid, err_code , cntr_ret, mta0_ext, mta0_addr = struct.unpack('@BBBBI', dto.data)
#             if pid != 0xff:
#                 continue
#             if cntr_ret == cntr and err_code == 0:
#                 print(f"Downloaded: mta0_ext({mta0_ext}), mta0_add({mta0_addr}), {cntr}")
#                 break
#             else:
#                 print(f"Error: {pid}, {err_code}, {cntr}")
#                 break
#     # cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/DNLOAD_6(data=d.value_bytes[i*6:(i+1)*6])
#     if i%100==0:
#         print(f"i: {i}, cntr: {cntr} cro: {cro}")

Download the last tile

# i = download_times * 6
# # can_data_big = xcp_data.value_bytes[i*6:(i+1)*6:-1] + can_data_big
# can_data_little = struct.pack("@BB", ccp_command.download, cntr)
# can_data_little += xcp_data.value_bytes[i:i+last_download_size]
# # can_data_little, can_data_big
# can_data = can_data_little
# cro = can.Message(arbitration_id=args.download_can_id, data=can_data, is_extended_id=False)   
    
# # Download data 
# with can.interface.Bus(bustype='socketcan', channel=device, bitrate=args.bit_rate, filter=can_specs.can_filters) as bus:
#     bus.send(cro)
#     cntr += 1
#     print(f"Message sent: {cro}, cntr: {cntr-1}")

#     while(True):
#         dto: can.Message = bus.recv()
#         print('Message received: ', dto.arbitration_id, dto.data, dto.timestamp, dto.dlc)

#         # if dto.arbitration_id != can_specs.upload_can_id:
#         #     continue
#         pid, err_code , cntr_ret, mta0_ext, mta0_addr = struct.unpack('@BBBBI', dto.data)
#         if pid != 0xff:
#             continue
#         if cntr_ret == cntr and err_code == 0:
#             print(f"Downloaded: mta0_ext({mta0_ext}), mta0_add({mta0_addr}), {cntr}")
#             break
#         else:
#             print(f"Error: {pid}, {err_code}, {cntr}")
#             break

Disconnect target ecu

# cro = struct.pack("@bbb", ccp_command.disconnect, cntr, 0x01) + b'\x00' + can_specs.station_address.to_bytes(byteorder='little', length=2) 
# cro, cro.hex()
# struct.unpack("@bbbbh", cro)
# # send disconnect message
# with can.interface.Bus(bustype='socketcan', channel=device, bitrate=can_specs.bit_rate, filter=can_specs.can_filters) as bus:
#     bus.send(cro)
#     cntr += 1
#     print(f"Message sent: {cro}, cntr: {cntr-1}")
    
#     while(True):
#         dto: can.Message = bus.recv()
#         print('Message received: ', dto.arbitration_id, dto.data, dto.timestamp, dto.dlc)
        
#         if dto.dlc != 3 or dto.arbitration_id != can_specs.upload_can_id:
#             continue
#         pid, err_code , cntr_ret = struct.unpack('@bbb', dto.data)
#         if pid != 0xff:
#             continue
#         if cntr_ret == cntr and err_code == 0:
#             print(f"Connected: {pid}, {err_code}, {cntr}")
#             break
#         else:
#             print(f"Error: {pid}, {err_code}, {cntr}")
#             break

Three context managers for CCP


can_context

 can_context (can_specs:__main__.ScapyCANSpecs)

*Summary Context manager for scapy CAN socket

Args: can_specs (ScapyCANSpecs): CAN specs including can type, bus type, channel, etc.

Yields: Bus: Python-CAN Bus object*


SET_MTA_context

 SET_MTA_context (can_specs:__main__.ScapyCANSpecs, bus:<function Bus>,
                  data:candycan.a2l.XCPData)

*Summary Context manager for scapy set_mta

Args: channel (str): CAN channel to use, default is vcan0

Yields: CAN: packdet for CAN message*


XLOAD_context

 XLOAD_context (can_specs:__main__.ScapyCANSpecs, bus:<function Bus>,
                data:candycan.a2l.XCPData, start_index:int, tile_size:int)

*Summary Context manager for scapy load (download or upload)

Args: channel (str): CAN channel to use, default is vcan0

Yields: CANSocket: CAN socket object*

Downloading and uploading with context managers


upload_calib_data2

 upload_calib_data2 (xcp_calib:candycan.a2l.XCPCalib,
                     can_specs:__main__.ScapyCANSpecs)

*Summary Upload XCP calibration data from target to host, the result will update the xcp_calib.data field

Args: xcp_calib (XCPCalib): XCP calibration to be uploaded from the target to host diff_flashing (bool): Use differential flashing*


downlod_calib_data2

 downlod_calib_data2 (xcp_calib:candycan.a2l.XCPCalib,
                      can_specs:__main__.ScapyCANSpecs)

*Summary Download XCP calibration data to target use scapy_can_context

Args: xcp_calib (XCPCalib): XCP calibration to be downloaded into the target*

can_filters = [{'can_id': xcp_calib.config.upload_can_id, 'can_mask': 0x7FF}]
can_specs = ScapyCANSpecs(can_type='NATIVE', 
                        bus_type='SOCKET', 
                        channel_serial_number=3,
                        download_can_id=xcp_calib.config.download_can_id,
                        upload_can_id=xcp_calib.config.upload_can_id,
                        can_filters=can_filters,
                        bit_rate=500_000,
                        time_out=1.0,
                        station_address=0x00,
                        cntr=0,
                        receive_own_messages=True,
                        download_upload=True
                        )
can_specs
can_specs.model_dump()
xcp_calib.config.channel = 0
xcp_calib
# TODO cannot run test on CCP without a real or emulated ECU 
# downlod_calib_data2(xcp_calib, can_type='NATIVE', bus_type='VIRTUAL', bit_rate=500_000, timeout=1.0, station_address=0x00, diff_flashing=False)

Test downloading and uploading

xcp_calib_from_xcpjson = Get_XCPCalib_From_XCPJSon(args.input)

args.download_can_id = xcp_calib_from_xcpjson.config.download_can_id
args.upload_can_id = xcp_calib_from_xcpjson.config.upload_can_id
args.channel_serial_number = xcp_calib_from_xcpjson.config.channel

xcp_data = Generate_Init_XCPData_From_A2L(
    a2l=args.a2l, keys=args.leaves, node_path=args.node_path
)  # initial xcp_data has value 0
try:
    XCPData.model_validate(xcp_data)
except ValidationError as exc:
    print(exc)

# emulate torque table input as numpy array
xcp_data_value_npa = xcp_calib_from_xcpjson.data[0].value_array_view
xcp_data.value = xcp_data_value_npa.astype(np.float32).tobytes().hex()
pprint(xcp_data)

xcp_calib = XCPCalib(
    config=XCPConfig(
        channel=args.channel_serial_number, download=args.download_can_id, upload=args.upload_can_id
    ),
    data=[xcp_data],
)
pprint(xcp_calib)

can_filters = [{'can_id': xcp_calib.config.upload_can_id, 'can_mask': 0x7FF}]
cntr = 0
can_specs = ScapyCANSpecs(bus_type=args.bus_type,
                        channel_serial_number=args.channel_serial_number,
                        download_can_id=xcp_calib.config.download_can_id,
                        upload_can_id=xcp_calib.config.upload_can_id,
                        can_filters=can_filters,
                        bit_rate=args.bit_rate,
                        time_out=args.time_out,
                        station_address=args.station_address,
                        cntr=cntr,
                        receive_own_messages=True,
                        download_upload=args.download,  # CCP Upload mode
                        diff_mode = args.diff_mode,
                        diff_threshold= args.diff_threshold
                        )
can_specs
# test uploading
# can_specs.download_upload = False                        
# upload_calib_data2(xcp_calib=xcp_calib, can_specs=can_specs)

Release CAN device

# close and remove vcan0
# !sshpass -v -p  asdf sudo ip link delete vcan0 

os.system(f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down {device}")
# delete vcan0
os.system(f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete can{args.channel_serial_number}")