# #|export
# from candycan.data_link_socketcan import done, send_msg
CCP over Scapy
get_argparser
get_argparser ()
*Summary Get argument parser for command line arguments
Returns: argparse.ArgumentParser: description*
= get_argparser()
parser = parser.parse_args(
args =[
args'--protocol', 'CCP',
'--can_type', 'NATIVE', #'PYTHON', #
'--bus_type', 'SOCKET', #'KVASER', #
'--download',
'--diff_mode',
'--diff_threshold', '0.001',
'--bit_rate', '500_000',
'--time_out', '1.0',
'--station_address', 0,
'--download_can_id', '630',
'--upload_can_id', '631',
'--a2l', repo.working_dir+'/res/VBU_AI.json',
'--node-path', r'/PROJECT/MODULE[]',
'--leaves', r'TQD_trqTrqSetNormal_MAP_v, VBU_L045A_CWP_05_09T_AImode_CM_single, Lookup2D_FLOAT32_IEEE, Lookup2D_X_FLOAT32_IEEE, Scalar_FLOAT32_IEEE, TQD_vVehSpd, TQD_vSgndSpd_MAP_y, TQD_pctAccPedPosFlt, TQD_pctAccPdl_MAP_x',
'--channel_serial_number', '3',
'--input', repo.working_dir+'/res/download.json',
'--output', repo.working_dir+'/res/output.json',
] )
args.download_can_id, args.channel_serial_number, args.upload_can_idhex(args.station_address), args.time_out
args.diff_threshold, args.__dict__
Types definition
check_can_type
check_can_type (c:str)
*Summary Check if the CAN type is valid
Args: can_type (str): CAN type to be checked
Returns: str: CAN type if valid
Raises: ValueError: if CAN type is invalid*
# # CanType
# native_can_type = CanType('NATIVE')
# native_can_type.lower()
# isinstance(native_can_type, CanType)
# isinstance('NATIVE', CanType)
check_bus_type
check_bus_type (b:str)
*Summary Check if the CAN bus type is valid
Args: b (str): Python CAN bus type to be checked
Returns: str: Python CAN bus type if valid
Raises: ValueError: if CAN bus type is invalid*
CANFilter
CANFilter (can_id:typing.Annotated[int,Gt(gt=0)]=630, can_mask:typing.Annotated[int,Gt(gt=0)]=2047)
*Summary CAN filter for Python CAN bus
Attributes: can_id (int): CAN message ID can_mask (int): CAN message mask*
ScapyCANSpecs
ScapyCANSpecs (can_type:typing.Annotated[str,AfterValidator(func=<functio ncheck_can_typeat0x7f8f4c8b22a0>)]='NATIVE', bus_type:typi ng.Annotated[str,AfterValidator(func=<functioncheck_bus_ty peat0x7f8f4c8b2520>)]='VIRTUAL', channel_serial_number:typ ing.Annotated[int,Ge(ge=0),Lt(lt=500)]=3, download_can_id:typing.Annotated[int,Gt(gt=0)]=630, upload_can_id:typing.Annotated[int,Gt(gt=0)]=630, can_filters:Optional[list[__main__.CANFilter]]=None, bit_r ate:typing.Annotated[int,Gt(gt=0),Lt(lt=1000000)]=500000, time_out:typing.Annotated[float,Gt(gt=0.0),Lt(lt=10.0)]=1. 0, station_address:typing.Annotated[int,Ge(ge=0),Lt(lt=255 )]=0, cntr:typing.Annotated[int,Ge(ge=0),Lt(lt=1000000)]=0, receive_own_messages:bool=True, download_upload:bool=True, diff_mode:bool=False, diff_threshold:float=0.001, last_download_data:Optional[candycan.a2l.XCPData]=None)
*Usage docs: https://docs.pydantic.dev/2.7/concepts/models/
A base class for creating Pydantic models.
Attributes: class_vars: The names of classvars defined on the model. private_attributes: Metadata about the private attributes of the model. signature: The signature for instantiating the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a `RootModel`.
__pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_extra__: An instance attribute with the values of extra fields from validation when
`model_config['extra'] == 'allow'`.
__pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
__pydantic_private__: Instance attribute with the values of private attributes set on the model instance.*
try:
= ScapyCANSpecs(can_type='NATIVE', bus_type='VIRTUAL')
m except ValidationError as exc:
print(exc)
pprint(m.model_dump())
def emulate_call(m: ScapyCANSpecs):
+= 1
m.cntr
for i in range(3):
emulate_call(m)print(f"{i}: counter {m.cntr}")
= Get_XCPCalib_From_XCPJSon(args.input)
xcp_calib_from_xcpjson xcp_calib_from_xcpjson
= Generate_Init_XCPData_From_A2L(
xcp_data =args.a2l, keys=args.leaves, node_path=args.node_path
a2l
)
# address from xcp data file should align with the address from xcp calib file
0].address)
test_eq(xcp_data.address, xcp_calib_from_xcpjson.data[
# validate the model
try:
XCPData.model_validate(xcp_data)except ValidationError as exc:
print(exc)
# type(args.channel), type(args.download_id), args.upload_id, args.download, args.diff_flashing
= xcp_calib_from_xcpjson.data[0].value
xcp_data.value
pprint(xcp_data)0,2], xcp_data.value_array_view[2,0] xcp_data.value_array_view[
= XCPCalib(
xcp_calib =XCPConfig(
config=args.channel_serial_number, download=args.download_can_id, upload=args.upload_can_id
channel
),=[xcp_data],
data )
pprint(xcp_calib)
= xcp_calib.data[0].value_array_view
npa
npa.shape, npa.dtype, npalen(xcp_calib.data[0].value_bytes), xcp_calib.data[0].value_bytes
len(npa.tobytes()), npa.tobytes()
0].value_bytes)
test_eq(npa.tobytes(), xcp_calib.data[0].value xcp_calib.data[
# buffer = [i.hex() for x in npa for i in x]
# # buffer[::-1]
# len(buffer)
# buffer
# buffer = npa.tobytes()
# pprint(buffer), len(buffer)
# xcp_calib.data[0].value, len(xcp_calib.data[0].value)
= bytes('7000aa2a', 'utf-8')
addr = 0x7000aa2a
a a
= npa[::-1]
npb # npb
buffer = [struct.pack("<f", x) for x in np.nditer(npa)]
# buffer
len(buffer)
investigate int type and type size
= xcp_calib.data[0]
d = int(d.address, base=16)
add
d.address, addhex(add), type(add), sys.getsizeof(add)
'h'), struct.calcsize('i'), struct.calcsize('l'), struct.calcsize('L')
struct.calcsize(type(d.address),len(d.address)
npa_to_packed_buffer
npa_to_packed_buffer (a:numpy.ndarray)
*convert a numpy array to a packed string buffer for flashing TODO: implementation as numpy ufunc
Args: a (np.ndarray): input numpy array for flashing
Returns: str: packed string buffer for flashing*
# buffer = [struct.pack("<f", x).hex() for x in np.nditer(npa)]
# buffer[::-1]
# len(buffer)
# buffer
# data = ''.join(buffer)
= npa_to_packed_buffer(npa)
data 0].value)
test_eq(data, xcp_calib.data[# data
convert a numpy array to a continuous hex string
# npa.astype(np.float32).tobytes().hex()
buffer = npa.astype(np.float32).tobytes().hex() ## == npa_to_packed_buffer(npa)
buffer, len(buffer)
buffer, xcp_calib.data[0].value) test_eq(
flash_xcp
flash_xcp (xcp_calib:candycan.a2l.XCPCalib, data:pandas.core.frame.DataFrame, diff_flashing:bool=False, download:bool=True)
*Summary Flash XCP data to target
Args: xcp_calib (XCPCalib): XCP calibration as template, contains all the meta information except for data xcp_data (pd.DataFrame): input XCP data to be flashed, replace the value in xcp_calib diff_flashing (bool): Use differential flashing download (bool): Download or upload*
Caution
set the python3 of the virtualenv with the CAP_NET_RAW capability!
sudo setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /dpt/.pyenv/versions/miniconda3-3.11-24.1.2-0/envs/can/bin/python3.11
install sshpass, and create .sshpasswd.gpg file in home root
cd ~
sudo apt-get install sshpass
echo 'password_in_verbatim" > .sshpasswd
gpg -c .sshpasswd
ls | grep 'sshpasswd.gpg'
args.channel_serial_number
# install vcan interface with encrypted password to sudo
"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe can") os.system(
args.channel_serial_number
# sshpass -v -p asdf sudo ip link add dev can0 type can
f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev can{args.channel_serial_number} type vcan") os.system(
f"ip link show can{args.channel_serial_number}") os.system(
# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set can0 type can bitrate 500000 # can does not support set bitrate on command line!
# !sshpass -p asdf sudo ip link add dev can0 type can
# os.system(f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up can{args.channel_serial_number}")
f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set can{args.channel_serial_number} up type can bitrate 500000")
os.system(# !sshpass -v -p asdf sudo ip link set up can0
= CANSocket(bustype='socketcan',channel=f'can{args.channel_serial_number}',
socket =True) receive_own_messages
socket, args.channel_serial_number
= CAN(identifier=0x123, data=b'12345678')
packet packet.show2()
# socket.send(packet)
# rx_packet = socket.recv()
# rx_packet.show2()
# rx_packet.canvas_dump()
# rx_packet = socket.recv()
"./scapypcaptest.pcap", packet) wrpcap(
## another socket in the same process cannot receive the packet sent by the first socket
# socket2 = CANSocket(channel='vcan0')
## same socket cannot receive the packet sent by itself
# rx_packet = socket2.recv()
CCP via Scapy
= 'can' + str(xcp_calib.config.channel)
channel channel
args.channel_serial_number
# sock = CANSocket(busytpe='socketcan', channel=f'can{args.channel_serial_number}', receive_own_messages=True)
= CANSocket(busytpe='socketcan', channel=f'can{args.channel_serial_number}', receive_own_messages=True)
sock sock
# sock = CANSocket(busytpe='socketcan', channel=f'can{args.channel_serial_number}') # receive_own_messages=True)
# sock
= CANSocket(busytpe='socketcan', channel=f'can{args.channel_serial_number}', basecls=CCP) # receive_own_messages=True)
sock sock
xcp_calib.config.download_can_idhex(xcp_calib.config.download_can_id)
CRO for connection
# sock.send(cro)
# ctr += 1
# rx_cro = sock.recv()
# rx_cro.canvas_dump()
= CANSocket(busytpe='socketcan', channel=f'can{args.channel_serial_number}', basecls=CCP, receive_own_messages=True)
sock # sock = CANSocket(busytpe='socketcan', channel=f'can{args.channel_serial_number}', basecls=CCP)
sock
get CCP version
= 0 ctr
# cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/CONNECT(station_address=0x01) # or 0x00?
= CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/GET_CCP_VERSION() # or 0x00?
cro
cro.show2() cro.canvas_dump()
= sock.sr1(cro) #, timeout=5)
dto += 1
ctr
dto.show2() dto.canvas_dump()
dto
send connect CRO
print(f'ctr: {ctr}')
# cro = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/CONNECT(station_address=0x01) # or 0x00?
= CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=0)/CONNECT() # or 0x00?
cro
cro.show2()
cro.canvas_dump()print(f"ctr: {ctr}")
= sock.sr1(cro) #, timeout=5)
dto += 1 # 1 ctr
dto.show2() dto.canvas_dump()
dto
Download from target
xcp_calib.data
= xcp_calib.data[0]
d
d.address, d.type_size, d.dim, d.value_array_view, d.value_byteslen(d.value_bytes )
d.value
set mta
d.address,ctr
# ctr = 0
print(f"ctr: {ctr}")
= CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=0)/SET_MTA(address=int(d.address, 16))
cro
cro.show2() cro.payload
sock
= sock.sr1(cro)
dto += 1 ctr
dto.show2()
dto.canvas_dump() dto.payload
= d.type_size * d.dim[0] * d.dim[1]
len_in_bytes print(f"len_in_bytes: {len_in_bytes} = type_size: {d.type_size} x dim: {d.dim}")
= len_in_bytes // 6
download_times = len_in_bytes % 6
last_download_size print(f"download_times: {download_times}, last_download_size: {last_download_size}")
d.value_byteslen(d.value_bytes)
= d.value_bytes[0:6]
tile0 len(tile0), tile0, tile0.hex(), type(tile0)
= d.value_bytes[6:12]
tile1 len(tile1), tile1, tile1.hex()
= d.value_bytes[0:4]
tile0 len(tile0), tile0, tile0.hex(), type(tile0), struct.unpack("<f", tile0)
= d.value_bytes[4:8]
tile1 len(tile1), tile1, tile1.hex(), struct.unpack("<f", tile1)
= d.value_bytes[8:12]
tile2 len(tile1), tile2, tile2.hex(), struct.unpack("<f", tile2)
= 2*17*4
st = d.value_bytes[st:st+4]
tile3 len(tile1), tile3, tile3.hex(), struct.unpack("<f", tile3)
2,0]
d.value_array_view["<f", tile3), d.value_array_view[2,0]) test_eq(struct.unpack(
hex(), tile3.hex().encode() tile3.
type(tile3)
list(tile3)
= bytearray()
ba_uploaded += tile3
ba_uploaded
ba_uploaded+= tile2
ba_uploaded += tile1
ba_uploaded len(ba_uploaded) ba_uploaded,
= bytearray()
ba_uploaded = d.type_size * d.dim[0] * d.dim[1]
len_in_bytes = len_in_bytes // 6
download_times = len_in_bytes % 6
last_download_size for tile in range(download_times):
+= d.value_bytes[tile*6:(tile+1)*6]
ba_uploaded if last_download_size:
+= d.value_bytes[download_times*6:download_times*6+last_download_size]
ba_uploaded
len(ba_uploaded), ba_uploaded.hex()
test_eq(ba_uploaded, d.value_bytes)hex(), d.value)
test_eq(ba_uploaded.hex() ba_uploaded.
loop over XCPCalib data array
print(f'payload: {d.value_bytes[0:6]}')
print(f'ctr: {ctr}')
= CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=0)/DNLOAD_6(data=d.value_bytes[0:6])
cro # if i%100==0:
# print(f"i: {i}, ctr: {ctr} cro: {cro}")
cro.show2()# cro.payload
# sent_bytes = sock.send(cro)
# rx_cro = sock.recv()
= sock.sr1(cro)
dto += 1
ctr dto.show2()
cro.canvas_dump() dto.canvas_dump()
print(f'payload: {d.value_bytes[6:12]}')
print(f'ctr: {ctr}')
= CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=0)/DNLOAD_6(data=d.value_bytes[6:12])
cro # if i%100==0:
# print(f"i: {i}, ctr: {ctr} cro: {cro}")
cro.show2()# cro.payload
# sent_bytes = sock.send(cro)
# rx_cro = sock.recv()
= sock.sr1(cro)
dto += 1
ctr dto.show2()
for i in range(download_times):
print(f'payload: {d.value_bytes[i*6:(i+1)*6]}')
print(f'ctr: {ctr}')
= CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/DNLOAD_6(data=d.value_bytes[i*6:(i+1)*6])
cro # if i%100==0:
# print(f"i: {i}, ctr: {ctr} cro: {cro}")
cro.show2()# cro.payload
# sent_bytes = sock.send(cro)
# rx_cro = sock.recv()
= sock.sr1(cro)
dto += 1
ctr
dto.show2()assert dto.return_code == 0x00
# rx_cro.show2()
# dto = sock.sr1(cro)
# dto.show2()
# assert dto.return_code == 0x00
i
cro.canvas_dump() dto.canvas_dump()
= download_times * 6
start_index = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/DNLOAD(data=d.value_bytes[start_index:start_index+last_download_size])
cro
cro.show2()
cro.payload
cro.canvas_dump()# sent_bytes = sock.send(cro)
= sock.sr1(cro)
dto += 1
ctr dto.canvas_dump()
Disconnect target ecu
= CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/DISCONNECT(station_address=0x00)
cro
cro.show2()
cro.canvas_dump()# bytes_sent = sock.send(cro)
= sock.sr1(cro)
dto += 1
ctr
dto.show2() dto.canvas_dump()
Three context managers for CCP
can_context
can_context (can_specs:__main__.ScapyCANSpecs)
*Summary Context manager for scapy CAN socket
Args: can_specs (ScapyCANSpecs): CAN specs including can type, bus type, channel, etc.
Yields: CANSocket: CAN socket object*
SET_MTA_context
SET_MTA_context (can_specs:__main__.ScapyCANSpecs, sock:scapy.contrib.cansocket_native.NativeCANSocket, data:candycan.a2l.XCPData)
*Summary Context manager for scapy set_mta
Args: channel (str): CAN channel to use, default is vcan0
Yields: CAN: packdet for CAN message*
XLOAD_context
XLOAD_context (can_specs:__main__.ScapyCANSpecs, sock:scapy.contrib.cansocket_native.NativeCANSocket, data:candycan.a2l.XCPData, start_index:int, tile_size:int)
*Summary Context manager for scapy load (download or upload)
Args: channel (str): CAN channel to use, default is vcan0
Yields: CANSocket: CAN socket object*
Downloading and uploading with context managers
upload_calib_data2
upload_calib_data2 (xcp_calib:candycan.a2l.XCPCalib, can_specs:__main__.ScapyCANSpecs)
*Summary Upload XCP calibration data from target to host, the result will update the xcp_calib.data field
Args: xcp_calib (XCPCalib): XCP calibration to be uploaded from the target to host diff_flashing (bool): Use differential flashing*
downlod_calib_data2
downlod_calib_data2 (xcp_calib:candycan.a2l.XCPCalib, can_specs:__main__.ScapyCANSpecs)
*Summary Download XCP calibration data to target use scapy_can_context
Args: xcp_calib (XCPCalib): XCP calibration to be downloaded into the target*
= [{'can_id': xcp_calib.config.upload_can_id, 'can_mask': 0x7FF}]
can_filters = ScapyCANSpecs(can_type='NATIVE',
can_specs ='SOCKET',
bus_type=3,
channel_serial_number=xcp_calib.config.download_can_id,
download_can_id=xcp_calib.config.upload_can_id,
upload_can_id=can_filters,
can_filters=500_000,
bit_rate=1.0,
time_out=0x00,
station_address=0,
cntr=True,
receive_own_messages=True
download_upload
)
can_specs can_specs.model_dump()
= 0
xcp_calib.config.channel xcp_calib
# TODO cannot run test on CCP without a real or emulated ECU
# downlod_calib_data2(xcp_calib, can_type='NATIVE', bus_type='VIRTUAL', bit_rate=500_000, timeout=1.0, station_address=0x00, diff_flashing=False)
Test downloading and uploading
= Get_XCPCalib_From_XCPJSon(args.input)
xcp_calib_from_xcpjson
= xcp_calib_from_xcpjson.config.download_can_id
args.download_can_id = xcp_calib_from_xcpjson.config.upload_can_id
args.upload_can_id = xcp_calib_from_xcpjson.config.channel
args.channel_serial_number
= Generate_Init_XCPData_From_A2L(
xcp_data =args.a2l, keys=args.leaves, node_path=args.node_path
a2l# initial xcp_data has value 0
) try:
XCPData.model_validate(xcp_data)except ValidationError as exc:
print(exc)
# emulate torque table input as numpy array
= xcp_calib_from_xcpjson.data[0].value_array_view
xcp_data_value_npa = xcp_data_value_npa.astype(np.float32).tobytes().hex()
xcp_data.value
pprint(xcp_data)
= XCPCalib(
xcp_calib =XCPConfig(
config=args.channel_serial_number, download=args.download_can_id, upload=args.upload_can_id
channel
),=[xcp_data],
data
)
pprint(xcp_calib)
= [{'can_id': xcp_calib.config.upload_can_id, 'can_mask': 0x7FF}]
can_filters = 0
cntr = ScapyCANSpecs(can_type=args.can_type,
can_specs =args.bus_type,
bus_type=args.channel_serial_number,
channel_serial_number=xcp_calib.config.download_can_id,
download_can_id=xcp_calib.config.upload_can_id,
upload_can_id=can_filters,
can_filters=args.bit_rate,
bit_rate=args.time_out,
time_out=args.station_address,
station_address=cntr,
cntr=True,
receive_own_messages=args.download, # CCP Upload mode
download_upload= args.diff_mode,
diff_mode = args.diff_threshold
diff_threshold )
can_specs
# test uploading
# can_specs.download_upload = False
# upload_calib_data2(xcp_calib=xcp_calib, can_specs=can_specs)
Release CAN device
# close and remove vcan0
# !sshpass -v -p asdf sudo ip link delete vcan0
f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down can{args.channel_serial_number}") os.system(
# delete vcan0
f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete can{args.channel_serial_number}") os.system(