# #|export
# from candycan.data_link_socketcan import done, send_msg
CCP over SocketCAN
get_argparser
get_argparser ()
*Summary Get argument parser for command line arguments
Returns: argparse.ArgumentParser: description*
= get_argparser()
parser = parser.parse_args(
args =[
args'--protocol', 'CCP',
'--bus_type', 'SOCKET', # 'VIRTUAL', #
'--download',
'--diff_mode',
'--diff_threshold', '0.001',
'--bit_rate', '500_000',
'--time_out', '1.0',
'--station_address', 0,
'--download_can_id', '630',
'--upload_can_id', '631',
'--a2l', repo.working_dir+'/res/VBU_AI.json',
'--node-path', r'/PROJECT/MODULE[]',
'--leaves', r'TQD_trqTrqSetNormal_MAP_v, VBU_L045A_CWP_05_09T_AImode_CM_single, Lookup2D_FLOAT32_IEEE, Lookup2D_X_FLOAT32_IEEE, Scalar_FLOAT32_IEEE, TQD_vVehSpd, TQD_vSgndSpd_MAP_y, TQD_pctAccPedPosFlt, TQD_pctAccPdl_MAP_x',
'--channel_serial_number', '3',
'--input', repo.working_dir+'/res/download.json',
'--output', repo.working_dir+'/res/output.json',
] )
args.download_can_id, args.channel_serial_number, args.upload_can_idhex(args.station_address), args.time_out
args.diff_threshold, args.__dict__
Types definition
check_bus_type
check_bus_type (b:str)
*Summary Check if the CAN bus type is valid
Args: b (str): Python CAN bus type to be checked
Returns: str: Python CAN bus type if valid
Raises: ValueError: if CAN bus type is invalid*
CANFilter
CANFilter (can_id:typing.Annotated[int,Gt(gt=0)]=630, can_mask:typing.Annotated[int,Gt(gt=0)]=2047)
*Summary CAN filter for Python CAN bus
Attributes: can_id (int): CAN message ID can_mask (int): CAN message mask*
ScapyCANSpecs
ScapyCANSpecs (bus_type:typing.Annotated[str,AfterValidator(func=<functio ncheck_bus_typeat0x7f1ceb80ede0>)]='VIRTUAL', channel_seri al_number:typing.Annotated[int,Ge(ge=0),Lt(lt=500)]=3, download_can_id:typing.Annotated[int,Gt(gt=0)]=630, upload_can_id:typing.Annotated[int,Gt(gt=0)]=630, can_filters:Optional[list[__main__.CANFilter]]=None, bit_r ate:typing.Annotated[int,Gt(gt=0),Lt(lt=1000000)]=500000, time_out:typing.Annotated[float,Gt(gt=0.0),Lt(lt=10.0)]=1. 0, station_address:typing.Annotated[int,Ge(ge=0),Lt(lt=255 )]=0, cntr:typing.Annotated[int,Ge(ge=0),Lt(lt=1000000)]=0, receive_own_messages:bool=True, download_upload:bool=True, diff_mode:bool=False, diff_threshold:float=0.001, last_download_data:Optional[candycan.a2l.XCPData]=None)
*Usage docs: https://docs.pydantic.dev/2.7/concepts/models/
A base class for creating Pydantic models.
Attributes: class_vars: The names of classvars defined on the model. private_attributes: Metadata about the private attributes of the model. signature: The signature for instantiating the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a `RootModel`.
__pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_extra__: An instance attribute with the values of extra fields from validation when
`model_config['extra'] == 'allow'`.
__pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
__pydantic_private__: Instance attribute with the values of private attributes set on the model instance.*
try:
= ScapyCANSpecs(can_type='NATIVE', bus_type='VIRTUAL')
m except ValidationError as exc:
print(exc)
pprint(m.model_dump())
def emulate_call(m: ScapyCANSpecs):
+= 1
m.cntr
for i in range(3):
emulate_call(m)print(f"{i}: counter {m.cntr}")
CCP code
CCPCommand
CCPCommand (connect:int=1, set_mta:int=2, disconnect:int=7, download:int=3, download6:int=35, upload:int=4, short_upload:int=15, get_seed:int=18, get_ccp_version:int=27)
*Usage docs: https://docs.pydantic.dev/2.7/concepts/models/
A base class for creating Pydantic models.
Attributes: class_vars: The names of classvars defined on the model. private_attributes: Metadata about the private attributes of the model. signature: The signature for instantiating the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a `RootModel`.
__pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_extra__: An instance attribute with the values of extra fields from validation when
`model_config['extra'] == 'allow'`.
__pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
__pydantic_private__: Instance attribute with the values of private attributes set on the model instance.*
= Get_XCPCalib_From_XCPJSon(args.input)
xcp_calib_from_xcpjson xcp_calib_from_xcpjson
= Generate_Init_XCPData_From_A2L(
xcp_data =args.a2l, keys=args.leaves, node_path=args.node_path
a2l
)
# address from xcp data file should align with the address from xcp calib file
0].address)
test_eq(xcp_data.address, xcp_calib_from_xcpjson.data[
# validate the model
try:
XCPData.model_validate(xcp_data)except ValidationError as exc:
print(exc)
# type(args.channel), type(args.download_id), args.upload_id, args.download, args.diff_flashing
= xcp_calib_from_xcpjson.data[0].value
xcp_data.value
pprint(xcp_data)0,2], xcp_data.value_array_view[2,0] xcp_data.value_array_view[
= XCPCalib(
xcp_calib =XCPConfig(
config=args.channel_serial_number, download=args.download_can_id, upload=args.upload_can_id
channel
),=[xcp_data],
data )
pprint(xcp_calib)
= xcp_calib.data[0].value_array_view
npa
npa.shape, npa.dtype, npalen(xcp_calib.data[0].value_bytes), xcp_calib.data[0].value_bytes
len(npa.tobytes()), npa.tobytes()
0].value_bytes)
test_eq(npa.tobytes(), xcp_calib.data[0].value xcp_calib.data[
= bytes('7000aa2a', 'utf-8')
addr = 0x7000aa2a
a a
= npa[::-1]
npb # npb
buffer = [struct.pack("<f", x) for x in np.nditer(npa)]
# buffer
len(buffer)
investigate int type and type size
= xcp_calib.data[0]
xcp_data = int(xcp_data.address, base=16)
add
xcp_data.address, addhex(add), type(add), sys.getsizeof(add)
'h'), struct.calcsize('i'), struct.calcsize('l'), struct.calcsize('L')
struct.calcsize(type(xcp_data.address),len(xcp_data.address)
npa_to_packed_buffer
npa_to_packed_buffer (a:numpy.ndarray)
*convert a numpy array to a packed string buffer for flashing TODO: implementation as numpy ufunc
Args: a (np.ndarray): input numpy array for flashing
Returns: str: packed string buffer for flashing*
# buffer = [struct.pack("<f", x).hex() for x in np.nditer(npa)]
# buffer[::-1]
# len(buffer)
# buffer
# data = ''.join(buffer)
= npa_to_packed_buffer(npa)
data 0].value)
test_eq(data, xcp_calib.data[# data
convert a numpy array to a continuous hex string
# npa.astype(np.float32).tobytes().hex()
buffer = npa.astype(np.float32).tobytes().hex() ## == npa_to_packed_buffer(npa)
buffer, len(buffer)
buffer, xcp_calib.data[0].value) test_eq(
flash_xcp
flash_xcp (xcp_calib:candycan.a2l.XCPCalib, data:pandas.core.frame.DataFrame, diff_flashing:bool=False, download:bool=True)
*Summary Flash XCP data to target
Args: xcp_calib (XCPCalib): XCP calibration as template, contains all the meta information except for data xcp_data (pd.DataFrame): input XCP data to be flashed, replace the value in xcp_calib diff_flashing (bool): Use differential flashing download (bool): Download or upload*
Caution
set the python3 of the virtualenv with the CAP_NET_RAW capability!
sudo setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /dpt/.pyenv/versions/miniconda3-3.11-24.1.2-0/envs/can/bin/python3.11
install sshpass, and create .sshpasswd.gpg file in home root
cd ~
sudo apt-get install sshpass
echo 'password_in_verbatim" > .sshpasswd
gpg -c .sshpasswd
ls | grep 'sshpasswd.gpg'
args.channel_serial_number= 'can' bus
+str(args.channel_serial_number) bus
# install vcan interface with encrypted password to sudo
if args.bus_type == 'SOCKET':
='can'
buselse: # 'VIRTUAL"
='vcan'
bus= bus+str(args.channel_serial_number)
device
f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe {bus}") os.system(
f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev {device} type {bus}") os.system(
device, bus, args.bit_rate
f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set {device} up type {bus} bitrate {args.bit_rate}")
os.system(# os.system(f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up {device}")
# sudo ip link set can3 up type can bitrate 500000
f"ip link show {device}") os.system(
= can.interface.Bus(bustype='socketcan', channel=device, bitrate=args.bit_rate)
bus bus.__dict__
# cntr = 0
# can_data = 0x12345678
# message_to_send = can.Message(arbitration_id=args.download_can_id,
# data=can_data,
# is_extended_id=False)
# message_to_send
args.channel_serial_number
CCP via python-can
= 'can' + str(xcp_calib.config.channel)
channel channel
args
= [{'can_id': xcp_calib.config.upload_can_id, 'can_mask': 0x7FF}]
can_filters = ScapyCANSpecs(can_type='NATIVE',
can_specs =args.bus_type,
bus_type=args.channel_serial_number,
channel_serial_number=xcp_calib.config.download_can_id,
download_can_id=xcp_calib.config.upload_can_id,
upload_can_id=can_filters,
can_filters=args.bit_rate,
bit_rate=1.0,
time_out=args.station_address,
station_address=0,
cntr=True,
receive_own_messages=True
download_upload
)
can_specs can_specs.model_dump()
create sample data for testing
xcp_calib.config.download_can_idhex(xcp_calib.config.download_can_id)
xcp_calib.data
= xcp_calib.data[0]
xcp_data
xcp_data.address, xcp_data.type_size, xcp_data.dim, xcp_data.value_array_view, xcp_data.value_byteslen(xcp_data.value_bytes)
xcp_data.value
Test bytes hex encodeing and decoding
= xcp_data.type_size * xcp_data.dim[0] * xcp_data.dim[1]
len_in_bytes print(f"len_in_bytes: {len_in_bytes} = type_size: {xcp_data.type_size} x dim: {xcp_data.dim}")
= len_in_bytes // 6
download_times = len_in_bytes % 6
last_download_size print(f"download_times: {download_times}, last_download_size: {last_download_size}")
xcp_data.value_byteslen(xcp_data.value_bytes)
= xcp_data.value_bytes[0:6]
tile0 len(tile0), tile0, tile0.hex(), type(tile0)
= xcp_data.value_bytes[6:12]
tile1 len(tile1), tile1, tile1.hex()
= xcp_data.value_bytes[0:4]
tile0 len(tile0), tile0, tile0.hex(), type(tile0), struct.unpack("<f", tile0)
= xcp_data.value_bytes[4:8]
tile1 len(tile1), tile1, tile1.hex(), struct.unpack("<f", tile1)
= xcp_data.value_bytes[8:12]
tile2 len(tile2), tile2, tile2.hex(), struct.unpack("<f", tile2)
= 2*17*4
st = xcp_data.value_bytes[st:st+4]
tile3 len(tile1), tile3, tile3.hex(), struct.unpack("<f", tile3)
2,0]
xcp_data.value_array_view["<f", tile3), xcp_data.value_array_view[2,0]) test_eq(struct.unpack(
len(tile3)
tile3, hex(), len(tile3.hex())
tile3.hex().encode(), len(tile3.hex().encode()),
tile3.hex().encode().hex(), len(tile3.hex().encode().hex())
tile3.try:
hex().hex()
tile3.except Exception as exc:
print(exc)
try:
"utf-8"), len(tile3.decode())
tile3.decode(except Exception as exc:
print(exc)
'utf-16')
tile3.decode('utf-32','backslashreplace')
tile3.decode(= tile3.decode('utf-8','backslashreplace')
t3 len(t3), type(t3)
t3, = tile3.decode('utf-8','ignore')
t4 len(t4), type(t4) t4,
type(tile3)
list(tile3)
= bytearray()
ba_uploaded += tile3
ba_uploaded
ba_uploaded+= tile2
ba_uploaded += tile1
ba_uploaded len(ba_uploaded) ba_uploaded,
= bytearray()
ba_uploaded = xcp_data.type_size * xcp_data.dim[0] * xcp_data.dim[1]
len_in_bytes = len_in_bytes // 6
download_times = len_in_bytes % 6
last_download_size for tile in range(download_times):
+= xcp_data.value_bytes[tile*6:(tile+1)*6]
ba_uploaded if last_download_size:
+= xcp_data.value_bytes[download_times*6:download_times*6+last_download_size]
ba_uploaded
len(ba_uploaded), ba_uploaded.hex()
test_eq(ba_uploaded, xcp_data.value_bytes)hex(), xcp_data.value)
test_eq(ba_uploaded.hex() ba_uploaded.
CRO for connection
Create can data as bytes of little endianness
= ccp_command.connect.to_bytes(byteorder='little', length=1)
b1 connect, b1, b1.hex()
ccp_command.= 2
cntr = cntr.to_bytes(byteorder='little', length=1)
b2 hex()
b2, b2.= b1 + b2
can_data hex() can_data, can_data.
connect, can_specs.station_address
cntr, ccp_command.
= struct.pack("@BB", ccp_command.connect, cntr) + can_specs.station_address.to_bytes(byteorder='little', length=2)
cro_little = can_specs.station_address.to_bytes(byteorder='big', length=2) + struct.pack(">BB", cntr, ccp_command.connect)
cro_big
cro_little, cro_big= cro_big
cro hex()
cro, cro.
"@BBH", cro) struct.unpack(
# cro_little = struct.pack("@bb", ccp_command.connect, cntr) + can_specs.station_address.to_bytes(byteorder='little', length=2)
# cro_big = can_specs.station_address.to_bytes(byteorder='big', length=2) + struct.pack(">bb", cntr, ccp_command.connect)
= struct.pack("@bb", ccp_command.connect, cntr)
cro_little = struct.pack(">bb", cntr, ccp_command.connect)
cro_big = cro_big
cro hex()
cro, cro.# struct.unpack("@bbh", cro)
"@bb", cro) struct.unpack(
try:
= struct.pack("@cc", cntr, ccp_command.connect)
d hex()
d, d.except Exception as exc:
print(exc)
device, can_specs.bit_rate, can_specs.can_filters, can_specs.station_address cro
# send connect message
# with can.interface.Bus(bustype='socketcan', channel=device, bitrate=can_specs.bit_rate, filter=can_specs.can_filters) as bus:
# with can.interface.Bus(interface='socketcan', channel=device, bitrate=can_specs.bit_rate) as bus:
with can.interface.Bus(interface='socketcan', channel=device, bitrate=can_specs.bit_rate, receive_own_message=True) as bus:
= can.Message(
msg =can_specs.download_can_id,
arbitration_id=cro,
data=False)
is_extended_idtry:
bus.send(msg)+= 1
cntr print(f"Message sent: {msg} on {bus.channel_info} cntr: {cntr-1}")
except can.CanError:
print("Message NOT sent")
= bus.recv(timeout=can_specs.time_out)
dto dto
can_specs.time_out
# with can.interface.Bus(interface='socketcan', channel=device, bitrate=can_specs.bit_rate, filter=can_specs.can_filters):
with can.interface.Bus(interface='socketcan', channel=device, bitrate=can_specs.bit_rate) as bus:
= bus.recv(timeout=can_specs.time_out)
dto: can.Message print(dto)
# while(True):
with can.interface.Bus(interface='socketcan', channel=device, bitrate=can_specs.bit_rate) as bus:
for msg in bus:
print(msg)
# print('Message received: ', dto.arbitration_id, dto.data, dto.timestamp, dto.dlc)
# # send connect message
# with can.interface.Bus(bustype='socketcan', channel=device, bitrate=can_specs.bit_rate, filter=can_specs.can_filters) as bus:
# bus.send(msg)
# cntr += 1
# print(f"Message sent: {msg}, cntr: {cntr-1}")
# while(True):
# bus = can.interface.Bus(bustype='socketcan', channel=device, bitrate=can_specs.bit_rate, filter=can_specs.can_filters)
# dto: can.Message = bus.recv()
# print(dto)
# # print('Message received: ', dto.arbitration_id, dto.data, dto.timestamp, dto.dlc)
# if dto.dlc != 3 or dto.arbitration_id != can_specs.upload_can_id:
# continue
# pid, err_code , cntr_ret = struct.unpack('@BBB', dto.data)
# if pid != 0xff:
# continue
# if cntr_ret == cntr and err_code == 0:
# print(f"Connected: {pid}, {err_code}, {cntr}")
# break
# else:
# print(f"Error: {pid}, {err_code}, {cntr}")
# break
send SET_MTA message
xcp_data.address= int(xcp_data.address, 16)
addr hex(addr)
addr, = struct.pack("@I", addr)
a_bytes_little = struct.pack(">I", addr)
a_bytes_big a_bytes_little, a_bytes_big
# create CAN message for ccp SET_MTA CRO
hex(addr), struct.pack("@I", addr)
xcp_data.address, cntr, ccp_command.set_mta, = struct.pack(">IBBBB", int(xcp_data.address,16), 0x00, 0x00, cntr, ccp_command.set_mta)
can_data_big = struct.pack("@BBBBI", ccp_command.set_mta, cntr, 0x00, 0x00, int(xcp_data.address,16))
can_data_little
can_data_little, can_data_big= can_data_little
can_data = can.Message(arbitration_id=args.download_can_id,
cro =can_data,
data=False)
is_extended_id cro
# # set MTA
# with can.interface.Bus(bustype='socketcan', channel=device, bitrate=args.bit_rate, filter=can_specs.can_filters) as bus:
# bus.send(cro)
# cntr += 1
# print(f"Message sent: {cro}, cntr: {cntr-1}")
# while(True):
# dto: can.Message = bus.recv()
# print('Message received: ', dto.arbitration_id, dto.data, dto.timestamp, dto.dlc)
# if dto.dlc != 3 or dto.arbitration_id != can_specs.upload_can_id:
# continue
# pid, err_code , cntr_ret = struct.unpack('@BBB', dto.data)
# if pid != 0xff:
# continue
# if cntr_ret == cntr and err_code == 0:
# print(f"Connected: {pid}, {err_code}, {cntr}")
# break
# else:
# print(f"Error: {pid}, {err_code}, {cntr}")
# break
Download from target
# can_data_big = struct.pack(">BBBBBBBB", int(xcp_data.address,16), 0x00, 0x00, cntr, ccp_command.download6)
= struct.pack("@BB", ccp_command.download6, cntr)
can_data_little ccp_command.download6, cntr, can_data_little
0:6]
xcp_data.value_bytes[= xcp_data.value_bytes[0]
b + b.to_bytes()
b, b.to_bytes(), can_data_little + xcp_data.value_bytes[0:6] can_data_little
= 0
tile for i in range(6):
= xcp_data.value_bytes[tile+i]
b += b.to_bytes()
can_data_little
b, can_data_little# xcp_data.value_bytes[i*6:(i+1)*6]
can_data_little
loop over XCPCalib data array
ccp_command
# for i in range(download_times):
# # Create CAN message for ccp DOWNLOAD CRO
# # xcp_data.address, cntr, ccp_command.set_mta, hex(addr), struct.pack("@I", addr)
# # can_data_big = xcp_data.value_bytes[i*6:(i+1)*6:-1] + can_data_big
# can_data_little = struct.pack("@BB", ccp_command.download6, cntr)
# can_data_little += xcp_data.value_bytes[i*6:(i+1)*6]
# # can_data_little, can_data_big
# can_data = can_data_little
# cro = can.Message(arbitration_id=args.download_can_id, data=can_data, is_extended_id=False)
# # Download data
# with can.interface.Bus(bustype='socketcan', channel=device, bitrate=args.bit_rate, filter=can_specs.can_filters) as bus:
# bus.send(cro)
# cntr += 1
# print(f"Message sent: {cro}, cntr: {cntr-1}")
# while(True):
# dto: can.Message = bus.recv()
# print('Message received: ', dto.arbitration_id, dto.data, dto.timestamp, dto.dlc)
# # if dto.arbitration_id != can_specs.upload_can_id:
# # continue
# pid, err_code , cntr_ret, mta0_ext, mta0_addr = struct.unpack('@BBBBI', dto.data)
# if pid != 0xff:
# continue
# if cntr_ret == cntr and err_code == 0:
# print(f"Downloaded: mta0_ext({mta0_ext}), mta0_add({mta0_addr}), {cntr}")
# break
# else:
# print(f"Error: {pid}, {err_code}, {cntr}")
# break
# # cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/DNLOAD_6(data=d.value_bytes[i*6:(i+1)*6])
# if i%100==0:
# print(f"i: {i}, cntr: {cntr} cro: {cro}")
Download the last tile
# i = download_times * 6
# # can_data_big = xcp_data.value_bytes[i*6:(i+1)*6:-1] + can_data_big
# can_data_little = struct.pack("@BB", ccp_command.download, cntr)
# can_data_little += xcp_data.value_bytes[i:i+last_download_size]
# # can_data_little, can_data_big
# can_data = can_data_little
# cro = can.Message(arbitration_id=args.download_can_id, data=can_data, is_extended_id=False)
# # Download data
# with can.interface.Bus(bustype='socketcan', channel=device, bitrate=args.bit_rate, filter=can_specs.can_filters) as bus:
# bus.send(cro)
# cntr += 1
# print(f"Message sent: {cro}, cntr: {cntr-1}")
# while(True):
# dto: can.Message = bus.recv()
# print('Message received: ', dto.arbitration_id, dto.data, dto.timestamp, dto.dlc)
# # if dto.arbitration_id != can_specs.upload_can_id:
# # continue
# pid, err_code , cntr_ret, mta0_ext, mta0_addr = struct.unpack('@BBBBI', dto.data)
# if pid != 0xff:
# continue
# if cntr_ret == cntr and err_code == 0:
# print(f"Downloaded: mta0_ext({mta0_ext}), mta0_add({mta0_addr}), {cntr}")
# break
# else:
# print(f"Error: {pid}, {err_code}, {cntr}")
# break
Disconnect target ecu
# cro = struct.pack("@bbb", ccp_command.disconnect, cntr, 0x01) + b'\x00' + can_specs.station_address.to_bytes(byteorder='little', length=2)
# cro, cro.hex()
# struct.unpack("@bbbbh", cro)
# # send disconnect message
# with can.interface.Bus(bustype='socketcan', channel=device, bitrate=can_specs.bit_rate, filter=can_specs.can_filters) as bus:
# bus.send(cro)
# cntr += 1
# print(f"Message sent: {cro}, cntr: {cntr-1}")
# while(True):
# dto: can.Message = bus.recv()
# print('Message received: ', dto.arbitration_id, dto.data, dto.timestamp, dto.dlc)
# if dto.dlc != 3 or dto.arbitration_id != can_specs.upload_can_id:
# continue
# pid, err_code , cntr_ret = struct.unpack('@bbb', dto.data)
# if pid != 0xff:
# continue
# if cntr_ret == cntr and err_code == 0:
# print(f"Connected: {pid}, {err_code}, {cntr}")
# break
# else:
# print(f"Error: {pid}, {err_code}, {cntr}")
# break
Three context managers for CCP
can_context
can_context (can_specs:__main__.ScapyCANSpecs)
*Summary Context manager for scapy CAN socket
Args: can_specs (ScapyCANSpecs): CAN specs including can type, bus type, channel, etc.
Yields: Bus: Python-CAN Bus object*
SET_MTA_context
SET_MTA_context (can_specs:__main__.ScapyCANSpecs, bus:<function Bus>, data:candycan.a2l.XCPData)
*Summary Context manager for scapy set_mta
Args: channel (str): CAN channel to use, default is vcan0
Yields: CAN: packdet for CAN message*
XLOAD_context
XLOAD_context (can_specs:__main__.ScapyCANSpecs, bus:<function Bus>, data:candycan.a2l.XCPData, start_index:int, tile_size:int)
*Summary Context manager for scapy load (download or upload)
Args: channel (str): CAN channel to use, default is vcan0
Yields: CANSocket: CAN socket object*
Downloading and uploading with context managers
upload_calib_data2
upload_calib_data2 (xcp_calib:candycan.a2l.XCPCalib, can_specs:__main__.ScapyCANSpecs)
*Summary Upload XCP calibration data from target to host, the result will update the xcp_calib.data field
Args: xcp_calib (XCPCalib): XCP calibration to be uploaded from the target to host diff_flashing (bool): Use differential flashing*
downlod_calib_data2
downlod_calib_data2 (xcp_calib:candycan.a2l.XCPCalib, can_specs:__main__.ScapyCANSpecs)
*Summary Download XCP calibration data to target use scapy_can_context
Args: xcp_calib (XCPCalib): XCP calibration to be downloaded into the target*
= [{'can_id': xcp_calib.config.upload_can_id, 'can_mask': 0x7FF}]
can_filters = ScapyCANSpecs(can_type='NATIVE',
can_specs ='SOCKET',
bus_type=3,
channel_serial_number=xcp_calib.config.download_can_id,
download_can_id=xcp_calib.config.upload_can_id,
upload_can_id=can_filters,
can_filters=500_000,
bit_rate=1.0,
time_out=0x00,
station_address=0,
cntr=True,
receive_own_messages=True
download_upload
)
can_specs can_specs.model_dump()
= 0
xcp_calib.config.channel xcp_calib
# TODO cannot run test on CCP without a real or emulated ECU
# downlod_calib_data2(xcp_calib, can_type='NATIVE', bus_type='VIRTUAL', bit_rate=500_000, timeout=1.0, station_address=0x00, diff_flashing=False)
Test downloading and uploading
= Get_XCPCalib_From_XCPJSon(args.input)
xcp_calib_from_xcpjson
= xcp_calib_from_xcpjson.config.download_can_id
args.download_can_id = xcp_calib_from_xcpjson.config.upload_can_id
args.upload_can_id = xcp_calib_from_xcpjson.config.channel
args.channel_serial_number
= Generate_Init_XCPData_From_A2L(
xcp_data =args.a2l, keys=args.leaves, node_path=args.node_path
a2l# initial xcp_data has value 0
) try:
XCPData.model_validate(xcp_data)except ValidationError as exc:
print(exc)
# emulate torque table input as numpy array
= xcp_calib_from_xcpjson.data[0].value_array_view
xcp_data_value_npa = xcp_data_value_npa.astype(np.float32).tobytes().hex()
xcp_data.value
pprint(xcp_data)
= XCPCalib(
xcp_calib =XCPConfig(
config=args.channel_serial_number, download=args.download_can_id, upload=args.upload_can_id
channel
),=[xcp_data],
data
)
pprint(xcp_calib)
= [{'can_id': xcp_calib.config.upload_can_id, 'can_mask': 0x7FF}]
can_filters = 0
cntr = ScapyCANSpecs(bus_type=args.bus_type,
can_specs =args.channel_serial_number,
channel_serial_number=xcp_calib.config.download_can_id,
download_can_id=xcp_calib.config.upload_can_id,
upload_can_id=can_filters,
can_filters=args.bit_rate,
bit_rate=args.time_out,
time_out=args.station_address,
station_address=cntr,
cntr=True,
receive_own_messages=args.download, # CCP Upload mode
download_upload= args.diff_mode,
diff_mode = args.diff_threshold
diff_threshold )
can_specs
# test uploading
# can_specs.download_upload = False
# upload_calib_data2(xcp_calib=xcp_calib, can_specs=can_specs)
Release CAN device
# close and remove vcan0
# !sshpass -v -p asdf sudo ip link delete vcan0
f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down {device}") os.system(
# delete vcan0
f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete can{args.channel_serial_number}") os.system(