# #|export
# from candycan.data_link_socketcan import done, send_msg
CCP
get_argparser
get_argparser ()
*Summary Get argument parser for command line arguments
Returns: argparse.ArgumentParser: description*
= get_argparser()
parser = parser.parse_args(
args =[
args'--protocol', 'CCP',
'--can_type', 'NATIVE',
'--bus_type', 'VIRTUAL',
'--download',
'--diff_mode',
'--diff_threshold', '0.001',
'--bit_rate', '500_000',
'--time_out', '1.0',
'--station_address', 0,
'--download_can_id', '630',
'--upload_can_id', '631',
'--a2l', repo.working_dir+'/res/VBU_AI.json',
'--node-path', r'/PROJECT/MODULE[]',
'--leaves', r'TQD_trqTrqSetNormal_MAP_v, VBU_L045A_CWP_05_09T_AImode_CM_single, Lookup2D_FLOAT32_IEEE, Lookup2D_X_FLOAT32_IEEE, Scalar_FLOAT32_IEEE, TQD_vVehSpd, TQD_vSgndSpd_MAP_y, TQD_pctAccPedPosFlt, TQD_pctAccPdl_MAP_x',
'--channel_serial_number', '3',
'--input', repo.working_dir+'/res/download.json',
'--output', repo.working_dir+'/res/output.json',
] )
args.download_can_id, args.channel_serial_number, args.upload_can_idhex(args.station_address), args.time_out
args.diff_threshold, args.__dict__
Types definition
check_can_type
check_can_type (c:str)
*Summary Check if the CAN type is valid
Args: can_type (str): CAN type to be checked
Returns: str: CAN type if valid
Raises: ValueError: if CAN type is invalid*
# # CanType
# native_can_type = CanType('NATIVE')
# native_can_type.lower()
# isinstance(native_can_type, CanType)
# isinstance('NATIVE', CanType)
check_bus_type
check_bus_type (b:str)
*Summary Check if the CAN bus type is valid
Args: b (str): Python CAN bus type to be checked
Returns: str: Python CAN bus type if valid
Raises: ValueError: if CAN bus type is invalid*
CANFilter
CANFilter (can_id:typing.Annotated[int,Gt(gt=0)]=630, can_mask:typing.Annotated[int,Gt(gt=0)]=2047)
*Summary CAN filter for Python CAN bus
Attributes: can_id (int): CAN message ID can_mask (int): CAN message mask*
ScapyCANSpecs
ScapyCANSpecs (can_type:typing.Annotated[str,AfterValidator(func=<functio ncheck_can_typeat0x7fc73e3f8ae0>)]='NATIVE', bus_type:typi ng.Annotated[str,AfterValidator(func=<functioncheck_bus_ty peat0x7fc73e3f8900>)]='VIRTUAL', channel_serial_number:typ ing.Annotated[int,Ge(ge=0),Lt(lt=500)]=3, download_can_id:typing.Annotated[int,Gt(gt=0)]=630, upload_can_id:typing.Annotated[int,Gt(gt=0)]=630, can_filters:Optional[list[__main__.CANFilter]]=None, bit_r ate:typing.Annotated[int,Gt(gt=0),Lt(lt=1000000)]=500000, time_out:typing.Annotated[float,Gt(gt=0.0),Lt(lt=10.0)]=1. 0, station_address:typing.Annotated[int,Ge(ge=0),Lt(lt=255 )]=0, cntr:typing.Annotated[int,Ge(ge=0),Lt(lt=1000000)]=0, receive_own_messages:bool=True, download_upload:bool=True, diff_mode:bool=False, diff_threshold:float=0.001, last_download_data:Optional[candycan.a2l.XCPData]=None)
*Usage docs: https://docs.pydantic.dev/2.7/concepts/models/
A base class for creating Pydantic models.
Attributes: class_vars: The names of classvars defined on the model. private_attributes: Metadata about the private attributes of the model. signature: The signature for instantiating the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a `RootModel`.
__pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_extra__: An instance attribute with the values of extra fields from validation when
`model_config['extra'] == 'allow'`.
__pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
__pydantic_private__: Instance attribute with the values of private attributes set on the model instance.*
try:
= ScapyCANSpecs(can_type='NATIVE', bus_type='VIRTUAL')
m except ValidationError as exc:
print(exc)
pprint(m.model_dump())
def emulate_call(m: ScapyCANSpecs):
+= 1
m.cntr
for i in range(3):
emulate_call(m)print(f"{i}: counter {m.cntr}")
= Get_XCPCalib_From_XCPJSon(args.input)
xcp_calib_from_xcpjson xcp_calib_from_xcpjson
= Generate_Init_XCPData_From_A2L(
xcp_data =args.a2l, keys=args.leaves, node_path=args.node_path
a2l
)
# address from xcp data file should align with the address from xcp calib file
0].address)
test_eq(xcp_data.address, xcp_calib_from_xcpjson.data[
# validate the model
try:
XCPData.model_validate(xcp_data)except ValidationError as exc:
print(exc)
# type(args.channel), type(args.download_id), args.upload_id, args.download, args.diff_flashing
= xcp_calib_from_xcpjson.data[0].value
xcp_data.value
pprint(xcp_data)0,2], xcp_data.value_array_view[2,0] xcp_data.value_array_view[
= XCPCalib(
xcp_calib =XCPConfig(
config=args.channel_serial_number, download=args.download_can_id, upload=args.upload_can_id
channel
),=[xcp_data],
data )
pprint(xcp_calib)
= xcp_calib.data[0].value_array_view
npa
npa.shape, npa.dtype, npalen(xcp_calib.data[0].value_bytes), xcp_calib.data[0].value_bytes
len(npa.tobytes()), npa.tobytes()
0].value_bytes)
test_eq(npa.tobytes(), xcp_calib.data[0].value xcp_calib.data[
# buffer = [i.hex() for x in npa for i in x]
# # buffer[::-1]
# len(buffer)
# buffer
# buffer = npa.tobytes()
# pprint(buffer), len(buffer)
# xcp_calib.data[0].value, len(xcp_calib.data[0].value)
= bytes('7000aa2a', 'utf-8')
addr = 0x7000aa2a
a a
= npa[::-1]
npb # npb
buffer = [struct.pack("<f", x) for x in np.nditer(npa)]
# buffer
len(buffer)
investigate int type and type size
= xcp_calib.data[0]
d = int(d.address, base=16)
add
d.address, addhex(add), type(add), sys.getsizeof(add)
'h'), struct.calcsize('i'), struct.calcsize('l'), struct.calcsize('L')
struct.calcsize(type(d.address),len(d.address)
npa_to_packed_buffer
npa_to_packed_buffer (a:numpy.ndarray)
*convert a numpy array to a packed string buffer for flashing TODO: implementation as numpy ufunc
Args: a (np.ndarray): input numpy array for flashing
Returns: str: packed string buffer for flashing*
# buffer = [struct.pack("<f", x).hex() for x in np.nditer(npa)]
# buffer[::-1]
# len(buffer)
# buffer
# data = ''.join(buffer)
= npa_to_packed_buffer(npa)
data 0].value)
test_eq(data, xcp_calib.data[# data
convert a numpy array to a continuous hex string
# npa.astype(np.float32).tobytes().hex()
buffer = npa.astype(np.float32).tobytes().hex() ## == npa_to_packed_buffer(npa)
buffer, len(buffer)
buffer, xcp_calib.data[0].value) test_eq(
flash_xcp
flash_xcp (xcp_calib:candycan.a2l.XCPCalib, data:pandas.core.frame.DataFrame, diff_flashing:bool=False, download:bool=True)
*Summary Flash XCP data to target
Args: xcp_calib (XCPCalib): XCP calibration as template, contains all the meta information except for data xcp_data (pd.DataFrame): input XCP data to be flashed, replace the value in xcp_calib diff_flashing (bool): Use differential flashing download (bool): Download or upload*
= IP()
pkt pkt.canvas_dump()
IP()= IP(dst="10.10.10.28")
a
a.dst
a.ttl ls(IP)
os.getcwd()= rdpcap('../res/pcaps/ipfix.pcap')
a
a# a[0].pdfdump(layer_shift=1)
1].psdump("/tmp/ipfix.eps", layer_shift=1) a[
Caution
set the python3 of the virtualenv with the CAP_NET_RAW capability!
sudo setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /dpt/.pyenv/versions/miniconda3-3.11-24.1.2-0/envs/can/bin/python3.11
install sshpass, and create .sshpasswd.gpg file in home root
cd ~
sudo apt-get install sshpass
echo 'password_in_verbatim" > .sshpasswd
gpg -c .sshpasswd
ls | grep 'sshpasswd.gpg'
# sniff(filter="icmp and host 10.10.10.28", count=2)
args.channel_serial_number
if blue_pill:
# install vcan interface with encrypted password to sudo
"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe vcan")
os.system(# sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan{args.channel_serial_number} type vcan")
os.system(# !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000 # vcan does not support set bitrate on command line!
# !sshpass -p asdf sudo ip link add dev vcan0 type vcan
f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan{args.channel_serial_number}")
os.system(# !sshpass -v -p asdf sudo ip link set up vcan0
else:
f"sudo modprobe vcan")
os.system(f"sudo ip link add dev vcan{args.channel_serial_number} type vcan")
os.system(f"sudo ip link set up vcan{args.channel_serial_number}")
os.system(
f"ip link show vcan{args.channel_serial_number}") os.system(
= CANSocket(channel=f'vcan{args.channel_serial_number}',
socket =True) receive_own_messages
= CAN(identifier=0x123, data=b'12345678')
packet packet.show2()
socket.send(packet)= socket.recv()
rx_packet rx_packet.show2()
# rx_packet = socket.recv()
"./scapypcaptest.pcap", packet) wrpcap(
## another socket in the same process cannot receive the packet sent by the first socket
# socket2 = CANSocket(channel='vcan0')
## same socket cannot receive the packet sent by itself
# rx_packet = socket2.recv()
CCP via Scapy
CRO for connection
xcp_calib.config.download_can_idhex(xcp_calib.config.download_can_id)
= 'can' + str(xcp_calib.config.channel)
channel channel
= 0
ctr print(f"ctr: {ctr}")
= CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/CONNECT() # CONNECT(station_address=0x02)?
cro
cro.show2() cro.canvas_dump()
socket.send(cro)+= 1
ctr = socket.recv()
rx_cro
rx_cro.show2()
rx_cro.canvas_dump()# dto = sock.sr1(cro)
# dto.show2()
# assert dto.return_code == 0x00
Download from target
xcp_calib.data
= xcp_calib.data[0]
d
d.address, d.type_size, d.dim, d.value_array_view, d.value_byteslen(d.value_bytes )
d.value
set mta
print(f"ctr: {ctr}")
= CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/SET_MTA(address=int(d.address, 16))
cro
cro.show2()
cro.payload
socket.send(cro)+= 1
ctr = socket.recv()
rx_cro
rx_cro.show2()# dto = sock.sr1(cro)
# dto.show2()
# assert dto.return_code == 0x00
rx_cro.canvas_dump()
= d.type_size * d.dim[0] * d.dim[1]
len_in_bytes print(f"len_in_bytes: {len_in_bytes} = type_size: {d.type_size} x dim: {d.dim}")
= len_in_bytes // 6
download_times = len_in_bytes % 6
last_download_size print(f"download_times: {download_times}, last_download_size: {last_download_size}")
d.value_byteslen(d.value_bytes)
= d.value_bytes[0:6]
tile0 len(tile0), tile0, tile0.hex(), type(tile0)
= d.value_bytes[6:12]
tile1 len(tile1), tile1, tile1.hex()
= d.value_bytes[0:4]
tile0 len(tile0), tile0, tile0.hex(), type(tile0), struct.unpack("<f", tile0)
= d.value_bytes[4:8]
tile1 len(tile1), tile1, tile1.hex(), struct.unpack("<f", tile1)
= d.value_bytes[8:12]
tile2 len(tile1), tile2, tile2.hex(), struct.unpack("<f", tile2)
= 2*17*4
st = d.value_bytes[st:st+4]
tile3 len(tile1), tile3, tile3.hex(), struct.unpack("<f", tile3)
2,0]
d.value_array_view["<f", tile3), d.value_array_view[2,0]) test_eq(struct.unpack(
hex(), tile3.hex().encode() tile3.
type(tile3)
list(tile3)
= bytearray()
ba_uploaded += tile3
ba_uploaded
ba_uploaded+= tile2
ba_uploaded += tile1
ba_uploaded len(ba_uploaded) ba_uploaded,
= bytearray()
ba_uploaded = d.type_size * d.dim[0] * d.dim[1]
len_in_bytes = len_in_bytes // 6
download_times = len_in_bytes % 6
last_download_size for tile in range(download_times):
+= d.value_bytes[tile*6:(tile+1)*6]
ba_uploaded if last_download_size:
+= d.value_bytes[download_times*6:download_times*6+last_download_size]
ba_uploaded
len(ba_uploaded), ba_uploaded.hex()
test_eq(ba_uploaded, d.value_bytes)hex(), d.value)
test_eq(ba_uploaded.hex() ba_uploaded.
loop over XCPCalib data array
for i in range(download_times):
= CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/DNLOAD_6(data=d.value_bytes[i*6:(i+1)*6])
cro if i%100==0:
print(f"i: {i}, ctr: {ctr}, cro: {cro}")
cro.show2()
cro.payload= socket.send(cro)
sent_bytes += 1
ctr = socket.recv()
rx_cro # rx_cro.show2()
# dto = sock.sr1(cro)
# dto.show2()
# assert dto.return_code == 0x00
i
cro.canvas_dump() rx_cro.canvas_dump()
= download_times * 6
start_index = CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/DNLOAD(data=d.value_bytes[start_index:start_index+last_download_size])
cro print(f"ctr: {ctr}")
cro.show2()
cro.payload
cro.canvas_dump()= socket.send(cro)
sent_bytes += 1
ctr = socket.recv()
rx_cro rx_cro.canvas_dump()
Disconnect target ecu
= CCP(identifier=xcp_calib.config.download_can_id)/CRO(ctr=ctr)/DISCONNECT(station_address=0x00)
cro print(f"ctr: {ctr}")
cro.show2()
cro.canvas_dump()= socket.send(cro)
bytes_sent += 1
ctr = socket.recv()
cx_cro
cx_cro.show2() cx_cro.canvas_dump()
download_calib_data
downlod_calib_data
downlod_calib_data (xcp_calib:candycan.a2l.XCPCalib, can_type:typing.Anno tated[str,AfterValidator(func=<functioncheck_can_type at0x7fc73e3f8ae0>)], channel:int, bus_type:typing.Ann otated[str,AfterValidator(func=<functioncheck_bus_typ eat0x7fc73e3f8900>)], can_filter=list[dict], bit_rate:int=500000, timeout:float=1.0, diff_flashing:bool=False)
*Summary Download XCP calibration data to target
Args: xcp_calib (XCPCalib): XCP calibration to be downloaded into the target diff_flashing (bool): Use differential flashing*
upload_calib_data
upload_calib_data
upload_calib_data (xcp_calib:candycan.a2l.XCPCalib, can_type:typing.Annot ated[str,AfterValidator(func=<functioncheck_can_typeat 0x7fc73e3f8ae0>)], channel:int, bus_type:typing.Annota ted[str,AfterValidator(func=<functioncheck_bus_typeat0 x7fc73e3f8900>)], can_filter=list[dict], bit_rate:int=500000, timeout:float=1.0, diff_flashing:bool=False)
*Summary Upload XCP calibration data from target to host, the result will update the xcp_calib.data field
Args: xcp_calib (XCPCalib): XCP calibration to be uploaded from the target to host diff_flashing (bool): Use differential flashing*
Three context managers for CCP
can_context
can_context (can_specs:__main__.ScapyCANSpecs)
*Summary Context manager for scapy CAN socket
Args: can_specs (ScapyCANSpecs): CAN specs including can type, bus type, channel, etc.
Yields: CANSocket: CAN socket object*
SET_MTA_context
SET_MTA_context (can_specs:__main__.ScapyCANSpecs, sock:scapy.contrib.cansocket_native.NativeCANSocket, data:candycan.a2l.XCPData)
*Summary Context manager for scapy set_mta
Args: channel (str): CAN channel to use, default is vcan0
Yields: CAN: packdet for CAN message*
XLOAD_context
XLOAD_context (can_specs:__main__.ScapyCANSpecs, sock:scapy.contrib.cansocket_native.NativeCANSocket, data:candycan.a2l.XCPData, start_index:int, tile_size:int)
*Summary Context manager for scapy load (download or upload)
Args: channel (str): CAN channel to use, default is vcan0
Yields: CANSocket: CAN socket object*
= [{'can_id': xcp_calib.config.upload_can_id, 'can_mask': 0x7FF}]
can_filters = ScapyCANSpecs(can_type='NATIVE',
can_specs ='VIRTUAL',
bus_type=0,
channel_serial_number=xcp_calib.config.download_can_id,
download_can_id=xcp_calib.config.upload_can_id,
upload_can_id=can_filters,
can_filters=500_000,
bit_rate=1.0,
time_out=0x00,
station_address=0,
cntr=True,
receive_own_messages=True
download_upload
)
can_specs can_specs.model_dump()
Downloading and uploading with context managers
upload_calib_data2
upload_calib_data2 (xcp_calib:candycan.a2l.XCPCalib, can_specs:__main__.ScapyCANSpecs)
*Summary Upload XCP calibration data from target to host, the result will update the xcp_calib.data field
Args: xcp_calib (XCPCalib): XCP calibration to be uploaded from the target to host diff_flashing (bool): Use differential flashing*
downlod_calib_data2
downlod_calib_data2 (xcp_calib:candycan.a2l.XCPCalib, can_specs:__main__.ScapyCANSpecs)
*Summary Download XCP calibration data to target use scapy_can_context
Args: xcp_calib (XCPCalib): XCP calibration to be downloaded into the target*
= 0
xcp_calib.config.channel xcp_calib
# TODO cannot run test on CCP without a real or emulated ECU
# downlod_calib_data2(xcp_calib, can_type='NATIVE', bus_type='VIRTUAL', bit_rate=500_000, timeout=1.0, station_address=0x00, diff_flashing=False)
Release CAN device
if blue_pill:
# close and remove vcan0
# !sshpass -v -p asdf sudo ip link delete vcan0
f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down vcan{args.channel_serial_number}")
os.system(# delete vcan0
f"gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete vcan{args.channel_serial_number}")
os.system(else:
f"sudo ip link set down vcan{args.channel_serial_number}")
os.system(f"sudo ip link delete vcan{args.channel_serial_number}")
os.system(
f"ip link show vcan{args.channel_serial_number}") os.system(