Source code for j1939.j1939_22

from .parameter_group_number import ParameterGroupNumber
from .message_id import MessageId, FrameFormat
from enum import IntEnum
import logging
import threading
import time

logger = logging.getLogger(__name__)

[docs] class J1939_22:
[docs] class TpControlType(IntEnum): RTS = 0 # Destination Specific Request_To_Send CTS = 1 # Destination Specific Clear_To_Send EOM_STATUS = 2 # Destination Specific or Global Destination End_of_Message Status EOM_ACK = 3 # Destination Specific End_of_Message Acknowledge BAM = 4 # Global Destination Broadcast Announce Message ABORT = 15 # Destination Specific Connection Abort
[docs] class Adt(IntEnum): # assurance data type NO_ADT = 0 # no assurance Data MS_CS = 1 # Manufacturer specific cybersecurity assurance data MS_FS = 2 # Manufacturer specific functional safety assurance MS_COMBINED_CS_FS = 3 # Manufacturer specific combined cybersecurity followed by functional safety assurance
[docs] class ConnectionAbortReason: BUSY = 1 # Already in one or more connection managed sessions and cannot support another RESOURCES = 2 # System resources were needed for another task so this connection managed session was terminated TIMEOUT = 3 # A timeout occured # 4..250 Reserved by SAE CTS_WHILE_DT = 4 # according AUTOSAR: CTS messages received when data transfer is in progress
# 251..255 Per J1939/71 definitions - but there are none?
[docs] class DataLength: TP = 60 MULTI_PG = 60
[docs] class Timeout: """Timeouts according SAE J1939/22""" Tr = 0.200 # Maximum Response Time Th = 0.500 # Maximum time, for responder, between transmits of consecutive CTS messages during hold T1 = 0.750 # Transport Segment Interval T2 = 1.250 # Maximum time, for responder, to receive a DT segment after a CTS - Originator Failure T3 = 1.250 # Maximum time, for originator, to receive a CTS after last DT segment - Responder Failure T4 = 1.050 # Maximum time, for originator, to receive the next CTS messages since the previous “hold” CTS to hold a connection open T5 = 3.000 # Maximum time, for originator, to receive EOMA after sending EOMS
[docs] class SendBufferState: WAITING_CTS = 0 # waiting for CTS SENDING_RTS_CTS = 1 # sending rts/cts packages SENDING_BAM = 2 # sending broadcast packages SENDING_EOM_STATUS = 3 # sending end of message WAITING_EOM_ACK = 4 # waiting for end of message acknowledge EOM_ACK_RECEIVED = 5 # eom acknowledge received successfully TRANSMISSION_FINISHED = 6 # finished, remove buffer
[docs] class Acknowledgement: ACK = 0 NACK = 1 AccessDenied = 2 CannotRespond = 3
[docs] def __init__(self, send_message, job_thread_wakeup, notify_subscribers, max_cmdt_packets, minimum_tp_rts_cts_dt_interval, minimum_tp_bam_dt_interval, ecu_is_message_acceptable): # Receive buffers self._rcv_buffer = {} # Send buffers self._snd_buffer = {} # Multi-PG Send buffers self._multi_pg_snd_buffer = {} # List of ControllerApplication self._cas = [] self._LUT_FD_DLC = [] for i in range(9): self._LUT_FD_DLC.append(i) for _ in range(4): self._LUT_FD_DLC.append(12) for _ in range(4): self._LUT_FD_DLC.append(16) for _ in range(4): self._LUT_FD_DLC.append(20) for _ in range(4): self._LUT_FD_DLC.append(24) for _ in range(8): self._LUT_FD_DLC.append(32) for _ in range(16): self._LUT_FD_DLC.append(48) for _ in range(16): self._LUT_FD_DLC.append(64) # minimum time between two tp rts/cts dt frames, not necessary for standard conforming applications, # (they would use RTS/CTS flow control), but helps to talk to others without patching the library self._minimum_tp_rts_cts_dt_interval = minimum_tp_rts_cts_dt_interval # minimum time between two tp bam dt frames, inital value is 10ms # specified time range in j1939-22: 10-200ms if minimum_tp_bam_dt_interval == None: self._minimum_tp_bam_dt_interval = 0.010 else: self._minimum_tp_bam_dt_interval = minimum_tp_bam_dt_interval # Up to 4 concurrent BAM sessions per originator address are allowed self.__bam_session_list = [True] * 4 # Up to 8 concurrent RTS/CTS sessions per originator and responder address pair are allowed. self.__rts_cts_session_list = [True] * 8 # number of packets that can be sent/received with CMDT (Connection Mode Data Transfer) self._max_cmdt_packets = max_cmdt_packets # Lock protecting _rcv_buffer, _snd_buffer, and _multi_pg_snd_buffer — accessed from # both the Notifier thread (notify/process_tp_*) and the protocol job thread (async_job_thread). self._buffer_lock = threading.Lock() self.__job_thread_wakeup = job_thread_wakeup self.__send_message = send_message self.__notify_subscribers = notify_subscribers self.__ecu_is_message_acceptable = ecu_is_message_acceptable
[docs] def add_ca(self, ca): self._cas.append(ca)
[docs] def remove_ca(self, device_address): for ca in self._cas: if device_address == ca._device_address_preferred: self._cas.remove(ca) return True return False
def _buffer_hash(self, session_num, src_address, dest_address): """Calculates a hash value for the given address pair :param src_address: The Source-Address the connection should bound to. :param dest_address: The Destination-Address the connection should bound to. :return: The calculated hash value. :rtype: int """ return ((session_num & 0xF) << 16) | ((src_address & 0xFF) << 8) | (dest_address & 0xFF) def _buffer_hash_mpg(self, frame_format, msg_counter, src_address, dest_address): """Calculates a hash value for the given multi-pg arguments :param frame_format: The frame-format (FBFF, FEFF) the connection should bound to. :param msg_counter: The message counter the connection should bound to. :param src_address: The Source-Address the connection should bound to. :param dest_address: The Destination-Address the connection should bound to. :return: The calculated hash value. :rtype: int """ return ((frame_format & 0xFF) << 24) | ((msg_counter & 0xFF) << 16) | ((src_address & 0xFF) << 8) | (dest_address & 0xFF) def _buffer_unhash(self, hash): """Calculates session-number, source-address and destination-address for the given hash value :param hash: The hash to be unhased :return: The session-number, source-address and destination-address """ return ((hash >> 16) & 0xFF), ((hash >> 8) & 0xFF), (hash & 0xFF) def _buffer_unhash_mpg(self, hash): """Calculates frame_format, msg_counter, source-address and destination-address for the given hash value :param hash: The hash to be unhased :return: The session-number, source-address and destination-address """ return ((hash >> 24) & 0xFF), ((hash >> 16) & 0xFF), ((hash >> 8) & 0xFF), (hash & 0xFF) def __get_bam_session(self): for idx, i in enumerate(self.__bam_session_list): if i == True: self.__bam_session_list[idx] = False return idx return None def __put_bam_session(self, session): self.__bam_session_list[session] = True def __get_rts_cts_session(self): for idx, i in enumerate(self.__rts_cts_session_list): if i == True: self.__rts_cts_session_list[idx] = False return idx return None def __put_rts_cts_session(self, session): self.__rts_cts_session_list[session] = True
[docs] def send_pgn(self, data_page, pdu_format, pdu_specific, priority, src_address, data, time_limit, frame_format, tos=2, trailer_format=0): pgn = ParameterGroupNumber(data_page, pdu_format, pdu_specific) data_length = len(data) if data_length <= self.DataLength.TP: if (tos != 2) or (trailer_format != 0): print('currently "SAE J1939 with no assurance data" trailer format supported only') if pgn.is_pdu1_format: cpgn = pgn.value & 0xFFF00 dst_address = pdu_specific else: cpgn = pgn.value dst_address = ParameterGroupNumber.Address.GLOBAL if (frame_format==FrameFormat.FBFF): priority = 0 if (dst_address!=ParameterGroupNumber.Address.GLOBAL): logger.info('FBFF message must be a broadcast type') return False # create header dict cpg = {'priority': (priority & 0x7), 'tos': (tos & 0x7), 'tf': (trailer_format & 0x7), 'cpgn': (cpgn & 0x3FFFF), 'data_length': data_length, 'data': data.copy()} # send immediately if time_limit == 0: self.__send_multi_pg(frame_format, [cpg], src_address, dst_address) else: session = 0 deadline = time.monotonic() + time_limit with self._buffer_lock: while True: hash = self._buffer_hash_mpg(frame_format, session, src_address, dst_address) if hash not in self._multi_pg_snd_buffer: self._multi_pg_snd_buffer[hash] = {'deadline': deadline, 'cpg': [cpg], 'fill_level': 4 + data_length} break elif (self._multi_pg_snd_buffer[hash]['fill_level'] <= (self.DataLength.TP - data_length)): # update fill level self._multi_pg_snd_buffer[hash]['fill_level'] += 4 + data_length # update deadline if self._multi_pg_snd_buffer[hash]['deadline'] > deadline: self._multi_pg_snd_buffer[hash]['deadline'] = deadline # append c-pg self._multi_pg_snd_buffer[hash]['cpg'].append(cpg) break else: # trigger sending self._multi_pg_snd_buffer[hash]['deadline'] = time.monotonic() self.__job_thread_wakeup() # get next buffer session += 1 else: # if the PF is between 0 and 239, the message is destination dependent when pdu_specific != 255 # if the PF is between 240 and 255, the message can only be broadcast if (pdu_specific == ParameterGroupNumber.Address.GLOBAL) or ParameterGroupNumber(0, pdu_format, pdu_specific).is_pdu2_format: dest_address = ParameterGroupNumber.Address.GLOBAL session_num = self.__get_bam_session() if session_num == None: #print('bam session not available') return False else: dest_address = pdu_specific session_num = self.__get_rts_cts_session() if session_num == None: #print('rts/cts session not available') return False # init sequence buffer_hash = self._buffer_hash(session_num, src_address, dest_address) message_size = data_length num_segments = int(message_size / self.DataLength.TP ) + ((message_size % self.DataLength.TP ) != 0) # set default priority if priority == None: priority = 7 # get chunks from data chunk_size = self.DataLength.TP data_list = [list(data[i:i + chunk_size]) for i in range(0, data_length, chunk_size)] # if the PF is between 240 and 255, the message can only be broadcast if dest_address == ParameterGroupNumber.Address.GLOBAL: # send BAM self.__send_tp_bam(priority, src_address, session_num, pgn.value, message_size, num_segments) # init new buffer for this connection with self._buffer_lock: self._snd_buffer[buffer_hash] = { 'pgn': pgn.value, 'priority': priority, 'session': session_num, 'message_size': message_size, 'num_segments': num_segments, 'data': data_list, 'state': self.SendBufferState.SENDING_BAM, 'deadline': time.monotonic() + self._minimum_tp_bam_dt_interval, 'src_address' : src_address, 'dest_address' : ParameterGroupNumber.Address.GLOBAL, 'next_packet_to_send' : 0, } else: # send RTS/CTS pgn.pdu_specific = 0 # this is 0 for peer-to-peer transfer # init new buffer for this connection with self._buffer_lock: self._snd_buffer[buffer_hash] = { 'pgn': pgn.value, 'priority': priority, 'session': session_num, 'message_size': message_size, 'num_segments': num_segments, 'data': data_list, 'state': self.SendBufferState.WAITING_CTS, 'deadline': time.monotonic() + self.Timeout.T3, 'src_address' : src_address, 'dest_address' : pdu_specific, 'next_packet_to_send' : 0, 'next_wait_on_cts': 0, } self.__send_tp_rts(priority, src_address, pdu_specific, session_num, pgn.value, message_size, num_segments, min(self._max_cmdt_packets, num_segments)) self.__job_thread_wakeup() return True
def __send_multi_pg(self, frame_format, cpg_list, src_address, dst_address): # deadline reached priority = 7 data = [] for cpg in cpg_list: priority = min(cpg['priority'], priority) data.append( (cpg['tos'] << 5) | (cpg['tf'] << 2) | ((cpg['cpgn'] >> 16) & 0x3) ) data.append( ((cpg['cpgn'] >> 8) & 0xFF) ) data.append( (cpg['cpgn'] & 0xFF) ) data.append( cpg['data_length'] ) data.extend( cpg['data']) # padding next_valid_fd_length = self._LUT_FD_DLC[len(data)] if next_valid_fd_length < 0: next_valid_fd_length = 0 # padding with service header 0 padding_cnt = 0 while len(data)<next_valid_fd_length: if padding_cnt < 3: data.append(0) padding_cnt += 1 else: data.append(0xAA) if frame_format == FrameFormat.FBFF: self.__send_message(src_address, False, data, fd_format=True) else: mid = MessageId(priority=priority, parameter_group_number=ParameterGroupNumber.PGN.FEFF_MULTI_PG | (dst_address & 0xFF), source_address=src_address) self.__send_message(mid.can_id, True, data, fd_format=True)
[docs] def async_job_thread(self, now): next_wakeup = now + 5.0 # wakeup in 5 seconds with self._buffer_lock: # check receive buffers for timeout for bufid in list(self._rcv_buffer): buf = self._rcv_buffer[bufid] if buf['deadline'] != 0: if buf['deadline'] > now: if next_wakeup > buf['deadline']: next_wakeup = buf['deadline'] else: # deadline reached logger.info('Deadline reached for rcv_buffer src 0x%02X dst 0x%02X', buf['src_address'], buf['dest_address'] ) if buf['dest_address'] != ParameterGroupNumber.Address.GLOBAL: self.__send_tp_abort(buf['dest_address'], buf['src_address'], buf['session'], self.ConnectionAbortReason.TIMEOUT, buf['pgn']) del self._rcv_buffer[bufid] self.__put_rts_cts_session(buf['session']) else: del self._rcv_buffer[bufid] self.__put_bam_session(buf['session']) # TODO: should we notify our CAs about the cancelled transfer? # check multi-pg send buffers for timeout for bufid in list(self._multi_pg_snd_buffer): buf = self._multi_pg_snd_buffer[bufid] if buf['deadline'] > now: if next_wakeup > buf['deadline']: next_wakeup = buf['deadline'] else: # deadline reached frame_format, session_num, src_address, dst_address = self._buffer_unhash_mpg(bufid) self.__send_multi_pg(frame_format, buf['cpg'], src_address, dst_address) del self._multi_pg_snd_buffer[bufid] # check send buffers for bufid in list(self._snd_buffer): buf = self._snd_buffer[bufid] if buf['deadline'] != 0: if buf['deadline'] > now: if next_wakeup > buf['deadline']: next_wakeup = buf['deadline'] else: # deadline reached if buf['state'] == self.SendBufferState.WAITING_CTS: logger.info('Deadline WAITING_CTS reached for snd_buffer src 0x%02X dst 0x%02X', buf['src_address'], buf['dest_address'] ) self.__send_tp_abort(buf['src_address'], buf['dest_address'], buf['session'], self.ConnectionAbortReason.TIMEOUT, buf['pgn']) del self._snd_buffer[bufid] self.__put_rts_cts_session(buf['session']) # TODO: should we notify our CAs about the cancelled transfer? elif buf['state'] == self.SendBufferState.SENDING_RTS_CTS: while buf['next_packet_to_send'] < buf['num_segments']: package = buf['next_packet_to_send'] self.__send_tp_dt(buf['src_address'], buf['dest_address'], buf['session'], package+1, buf['data'][package]) buf['next_packet_to_send'] += 1 # send end of message status if (package+1) == buf['num_segments']: self.__send_tp_eom_status(buf['src_address'], buf['dest_address'], buf['session'], buf['message_size'], buf['num_segments'], buf['pgn']) buf['deadline'] = time.monotonic() + self.Timeout.T5 buf['state'] = self.SendBufferState.WAITING_EOM_ACK break elif package == buf['next_wait_on_cts']: # wait on next cts buf['state'] = self.SendBufferState.WAITING_CTS buf['deadline'] = time.monotonic() + self.Timeout.T3 break elif self._minimum_tp_rts_cts_dt_interval != None: buf['deadline'] = time.monotonic() + self._minimum_tp_rts_cts_dt_interval break # recalc next wakeup if next_wakeup > buf['deadline']: next_wakeup = buf['deadline'] elif buf['state'] == self.SendBufferState.WAITING_EOM_ACK: # TODO: should we inform the application about the eom ack timeout? del self._snd_buffer[bufid] self.__put_rts_cts_session(buf['session']) elif buf['state'] == self.SendBufferState.EOM_ACK_RECEIVED: # TODO: should we inform the application about the successful transmission? del self._snd_buffer[bufid] self.__put_rts_cts_session(buf['session']) elif buf['state'] == self.SendBufferState.SENDING_BAM: # send next broadcast message... package = buf['next_packet_to_send'] self.__send_tp_dt(buf['src_address'], buf['dest_address'], buf['session'], package+1, buf['data'][package]) buf['next_packet_to_send'] += 1 if buf['next_packet_to_send'] < buf['num_segments']: buf['deadline'] = time.monotonic() + self._minimum_tp_bam_dt_interval # recalc next wakeup if next_wakeup > buf['deadline']: next_wakeup = buf['deadline'] else: buf['state'] = self.SendBufferState.SENDING_EOM_STATUS # recalc next wakeup buf['deadline'] = time.monotonic() + self._minimum_tp_bam_dt_interval if next_wakeup > buf['deadline']: next_wakeup = buf['deadline'] elif buf['state'] == self.SendBufferState.SENDING_EOM_STATUS: # done self.__send_tp_eom_status(buf['src_address'], buf['dest_address'], buf['session'], buf['message_size'], buf['num_segments'], buf['pgn']) del self._snd_buffer[bufid] self.__put_bam_session(buf['session']) elif buf['state'] == self.SendBufferState.TRANSMISSION_FINISHED: del self._snd_buffer[bufid] else: logger.critical('unknown SendBufferState %d', buf['state']) del self._snd_buffer[bufid] return next_wakeup
def _process_tp_cm(self, mid, dest_address, data, timestamp): """Processes a Transport Protocol Connection Management (TP.CM) message :param j1939.MessageId mid: A MessageId object holding the information extracted from the can_id. :param int dest_address: The destination address of the message :param bytearray data: The data contained in the can-message. :param float timestamp: The timestamp the message was received (mostly) in fractions of Epoch-Seconds. """ # check minimum tp-cm length if len(data) < 12: logger.info('tp-cm with incorrect dlc received, id', mid ) return src_address = mid.source_address control_byte = data[0] & 0xF session_num = (data[0] >> 4) & 0xF message_size = (data[1] & 0xFF) | ((data[2] & 0xFF) << 8) | ((data[3] & 0xFF) << 16) segment_num = (data[4] & 0xFF) | ((data[5] & 0xFF) << 8) | ((data[6] & 0xFF) << 16) pgn = (data[9] & 0xFF) | ((data[10] & 0xFF) << 8) | ((data[11] & 0xFF) << 16) with self._buffer_lock: if control_byte == self.TpControlType.RTS: buffer_hash = self._buffer_hash(session_num, src_address, dest_address) num_segments = data[7] # Maximum number of segments that can be sent in response to one CTS. if buffer_hash in self._rcv_buffer: # according SAE J1939-22 we have to send an ABORT if an active # transmission is already established self.__send_tp_abort(dest_address, src_address, session_num, self.ConnectionAbortReason.BUSY, pgn) self.__put_rts_cts_session(session_num) return # limit max number segments num_segments = min(num_segments, segment_num) # open new buffer for this connection self._rcv_buffer[buffer_hash] = { 'pgn': pgn, 'session': session_num, 'message_size': message_size, # total message size, number of bytes 'num_segments': segment_num, # total number of segments 'next_packet': 1, 'next_cts_border': min(self._max_cmdt_packets, num_segments), 'num_segments_max_rec': min(self._max_cmdt_packets, num_segments), 'data': [], 'deadline': time.monotonic() + self.Timeout.T2, 'src_address' : src_address, 'dest_address' : dest_address, } self.__send_tp_cts(dest_address, src_address, session_num, self._rcv_buffer[buffer_hash]['num_segments_max_rec'], 1, pgn) self.__job_thread_wakeup() elif control_byte == self.TpControlType.CTS: buffer_hash = self._buffer_hash(session_num, dest_address, src_address) num_segments = data[7] # Maximum number of segments that can be sent if buffer_hash not in self._snd_buffer: self.__send_tp_abort(dest_address, src_address, session_num, self.ConnectionAbortReason.RESOURCES, pgn) self.__put_rts_cts_session(session_num) return if num_segments == 0: # SAE J1939/22 # receiver requests a pause self._snd_buffer[buffer_hash]['deadline'] = time.monotonic() + self.Timeout.Th self.__job_thread_wakeup() return num_segments_all = self._snd_buffer[buffer_hash]['num_segments'] self._snd_buffer[buffer_hash]['next_packet_to_send'] = segment_num - 1 segments_to_be_sent = num_segments_all - self._snd_buffer[buffer_hash]['next_packet_to_send'] if num_segments > num_segments_all: logger.debug("CTS: Allowed more packets %d than complete transmission %d", num_segments, num_segments_all) num_segments = num_segments_all if num_segments > self._max_cmdt_packets: logger.debug("CTS: Allowed more packets %d than transmitters max-cmdt-number %d", num_segments, self._max_cmdt_packets) num_segments = self._max_cmdt_packets if num_segments > segments_to_be_sent: logger.debug("CTS: Allowed more packets %d than needed to complete transmission %d", num_segments, segments_to_be_sent) num_segments = segments_to_be_sent self._snd_buffer[buffer_hash]['next_wait_on_cts'] = self._snd_buffer[buffer_hash]['next_packet_to_send'] + num_segments - 1 self._snd_buffer[buffer_hash]['state'] = self.SendBufferState.SENDING_RTS_CTS self._snd_buffer[buffer_hash]['deadline'] = time.monotonic() # wake up immediately self.__job_thread_wakeup() elif control_byte == self.TpControlType.EOM_STATUS: buffer_hash = self._buffer_hash(session_num, src_address, dest_address) if buffer_hash not in self._rcv_buffer: self.__put_rts_cts_session(session_num) return pgn = self._rcv_buffer[buffer_hash]['pgn'] if (self._rcv_buffer[buffer_hash]['message_size'] == message_size) and (self._rcv_buffer[buffer_hash]['num_segments'] == segment_num): self.__notify_subscribers(mid.priority, pgn, src_address, dest_address, timestamp, self._rcv_buffer[buffer_hash]['data']) if dest_address != ParameterGroupNumber.Address.GLOBAL: self.__send_tp_eom_ack(dest_address, src_address, session_num, message_size, segment_num, pgn) else: self.__send_tp_abort(dest_address, src_address, session_num, self.ConnectionAbortReason.RESOURCES, pgn) del self._rcv_buffer[buffer_hash] self.__put_rts_cts_session(session_num) elif control_byte == self.TpControlType.EOM_ACK: buffer_hash = self._buffer_hash(session_num, dest_address, src_address) if buffer_hash not in self._snd_buffer: self.__send_tp_abort(dest_address, src_address, session_num, self.ConnectionAbortReason.RESOURCES, pgn) self.__put_rts_cts_session(session_num) return # TODO: should we inform the application about the successful transmission? # Notify subscribers here to be used for the memory access server to know when to send operation complete self.__notify_subscribers(mid.priority, pgn, mid.source_address, dest_address, timestamp, data) self._snd_buffer[buffer_hash]['state'] = self.SendBufferState.EOM_ACK_RECEIVED self._snd_buffer[buffer_hash]['deadline'] = time.monotonic() # wake up immediately self.__job_thread_wakeup() # BAM FD.TP.CM received elif control_byte == self.TpControlType.BAM: buffer_hash = self._buffer_hash(session_num, src_address, dest_address) if buffer_hash in self._rcv_buffer: # buffer already in use logger.info('bam receive buffer already in use 0x%x', buffer_hash ) del self._rcv_buffer[buffer_hash] self.__put_bam_session(session_num) return # init new buffer for this connection self._rcv_buffer[buffer_hash] = { 'pgn': pgn, 'session': session_num, 'message_size': message_size, # Total message size, number of bytes 'num_segments': segment_num, # Total number of segments 'next_packet': 1, 'data': [], 'deadline': time.monotonic() + self.Timeout.T1, 'src_address' : src_address, 'dest_address' : dest_address, } self.__job_thread_wakeup() elif control_byte == self.TpControlType.ABORT: # if abort received before transmission established -> cancel transmission buffer_hash = self._buffer_hash(session_num, dest_address, src_address) if buffer_hash in self._snd_buffer and self._snd_buffer[buffer_hash]['state'] == self.SendBufferState.WAITING_CTS: # cancel transmission self._snd_buffer[buffer_hash]['state'] = self.SendBufferState.TRANSMISSION_FINISHED self._snd_buffer[buffer_hash]['deadline'] = time.monotonic() # TODO: any more abort responses? else: raise RuntimeError('Received TP.CM with unknown control_byte %d', control_byte) def _process_tp_dt(self, mid, dest_address, data, timestamp): # check minimum tp-dt length if len(data) <= 4: logger.info('tp-dt with incorrect dlc received, id', mid ) return src_address = mid.source_address dtfi = data[0] & 0xF # Data Transfer Format Indicator session_num = (data[0] >> 4) & 0xF segment_num = (data[1] & 0xFF) | ((data[2] & 0xFF) << 8) | ((data[3] & 0xFF) << 16) if segment_num == 0: logger.critical('segment number of 0 is not valid.') return buffer_hash = self._buffer_hash(session_num, src_address, dest_address) with self._buffer_lock: if buffer_hash not in self._rcv_buffer: logger.critical('buffer error process dt 0x%x', buffer_hash) return if self._rcv_buffer[buffer_hash]['next_packet'] != segment_num: logger.critical('packet error. required: '+ str(self._rcv_buffer[buffer_hash]['next_packet']) + ' received: ' + str(segment_num) ) return # get data self._rcv_buffer[buffer_hash]['data'].extend(data[4:]) self._rcv_buffer[buffer_hash]['next_packet'] = segment_num + 1 # message is complete with sending an acknowledge if len(self._rcv_buffer[buffer_hash]['data']) >= self._rcv_buffer[buffer_hash]['message_size']: logger.info('finished RCV of PGN {} with size {}'.format(self._rcv_buffer[buffer_hash]['pgn'], self._rcv_buffer[buffer_hash]['message_size'])) # shorten data to message_size self._rcv_buffer[buffer_hash]['data'] = self._rcv_buffer[buffer_hash]['data'][:self._rcv_buffer[buffer_hash]['message_size']] # finished reassembly if dest_address != ParameterGroupNumber.Address.GLOBAL: # set deadline for waiting on eom status self._rcv_buffer[buffer_hash]['deadline'] = time.monotonic() + self.Timeout.T1 self.__job_thread_wakeup() return # send clear to send if (dest_address != ParameterGroupNumber.Address.GLOBAL) and (segment_num >= self._rcv_buffer[buffer_hash]['next_cts_border']): # send cts number_of_packets_that_can_be_sent = min( self._rcv_buffer[buffer_hash]['num_segments_max_rec'], self._rcv_buffer[buffer_hash]['num_segments'] - self._rcv_buffer[buffer_hash]['next_cts_border'] ) next_packet_to_be_sent = self._rcv_buffer[buffer_hash]['next_cts_border'] + 1 self.__send_tp_cts(dest_address, src_address, session_num, number_of_packets_that_can_be_sent, next_packet_to_be_sent, self._rcv_buffer[buffer_hash]['pgn']) # calculate next packet number at which a CTS is to be sent self._rcv_buffer[buffer_hash]['next_cts_border'] = min(self._rcv_buffer[buffer_hash]['next_cts_border'] + self._rcv_buffer[buffer_hash]['num_segments_max_rec'], self._rcv_buffer[buffer_hash]['num_segments']) self._rcv_buffer[buffer_hash]['deadline'] = time.monotonic() + self.Timeout.T2 self.__job_thread_wakeup() return self._rcv_buffer[buffer_hash]['deadline'] = time.monotonic() + self.Timeout.T1 def _process_multi_pg(self, mid : MessageId, dest_address, data, timestamp): # currently "SAE J1939 with no assurance data" trailer format supported only src_address = mid.source_address while True: if len(data) <= 4: break tos = (data[0] >> 5) & 0x7 # padding service if tos == 0: break trailer_format = (data[0] >> 2) & 0x7 cpgn = ((data[0] & 0x3) << 16) | (data[1] << 8) | data[2] payload_length = (data[3] & 0xFF) if (tos == 2) and (trailer_format == 0): # SAE J1939 with no assurance data self.__notify_subscribers(mid.priority, cpgn, src_address, dest_address, timestamp, data[4:(4+payload_length)].copy()) else: # TODO print('other tos/tf formats currently not supported') # trim data data = data[(4+payload_length):] def __send_tp_abort(self, src_address, dest_address, session_num, reason, pgn_value): self.__send_tp_cm(src_address, dest_address, self.TpControlType.ABORT, session_num, 0xFFFFFF, 0xFFFFFF, 0xFFFFFF, reason, pgn_value) def __send_tp_rts(self, priority, src_address, dest_address, session_num, pgn_value, message_size, num_segments, max_cmdt_packets, adt=Adt.NO_ADT): self.__send_tp_cm(src_address, dest_address, self.TpControlType.RTS, session_num, message_size, num_segments, max_cmdt_packets, adt, pgn_value, priority) def __send_tp_cts(self, src_address, dest_address, session_num, num_segments_that_can_be_sent, next_packet, pgn_value): request_code = 0 self.__send_tp_cm(src_address, dest_address, self.TpControlType.CTS, session_num, 0xFFFFFF, next_packet, num_segments_that_can_be_sent, request_code, pgn_value) def __send_tp_eom_status(self, src_address, dest_address, session_num, message_size, num_segments, pgn_value, size_of_assurance_data=0, adt=Adt.NO_ADT): self.__send_tp_cm(src_address, dest_address, self.TpControlType.EOM_STATUS, session_num, message_size, num_segments, size_of_assurance_data, adt, pgn_value) def __send_tp_eom_ack(self, src_address, dest_address, session_num, message_size, num_segments, pgn_value): self.__send_tp_cm(src_address, dest_address, self.TpControlType.EOM_ACK, session_num, message_size, num_segments, 0xFF, 0xFF, pgn_value) def __send_tp_bam(self, priority, src_address, session_num, pgn_value, message_size, num_segments): self.__send_tp_cm(src_address, ParameterGroupNumber.Address.GLOBAL, self.TpControlType.BAM, session_num, message_size, num_segments, 0xFF , 0, pgn_value, priority) def __send_tp_cm(self, src_address, dest_address, tp_control_type: TpControlType, session_num, message_size, num_segments, # total number of segments or next segment number to be sent byte_7, # maximum number of segments or num of segments that can be sent or assurance data Size byte_8, # assurance data type or request code or teason code: pgn, priority=7): pgn_tp_cm = ParameterGroupNumber(0, (ParameterGroupNumber.PGN.FD_TP_CM>>8) & 0xFF, dest_address) mid = MessageId(priority=priority, parameter_group_number=pgn_tp_cm.value, source_address=src_address) data = [0] * 12 data[0] = ( (tp_control_type & 0xF) | ((session_num & 0xF) << 4)) data[1] = ( message_size & 0xFF ) data[2] = ( (message_size >> 8) & 0xFF ) data[3] = ( (message_size >> 16) & 0xFF ) data[4] = ( num_segments & 0xFF ) data[5] = ( (num_segments >> 8) & 0xFF ) data[6] = ( (num_segments >> 16) & 0xFF ) data[7] = ( byte_7 & 0xFF ) data[8] = ( byte_8 & 0xFF ) data[9] = ( pgn & 0xFF ) data[10] = ( (pgn >> 8) & 0xFF ) data[11] = ( (pgn >> 16) & 0xFF ) # 13 up to 64 Assurance Data of full message calculated using AD Type. Total length = Size in byte 8. self.__send_message(mid.can_id, True, data, fd_format=True) def __send_tp_dt(self, src_address, dest_address, session_num, segment_num, data, Dtfi=0): pgn = ParameterGroupNumber(0, (ParameterGroupNumber.PGN.FD_TP_DT>>8) & 0xFF, dest_address) mid = MessageId(priority=7, parameter_group_number=pgn.value, source_address=src_address) data.insert(0, (Dtfi & 0xF) | ((session_num & 0xF) << 4)) data.insert(1, segment_num & 0xFF) data.insert(2, (segment_num >> 8) & 0xFF) data.insert(3, (segment_num >> 16) & 0xFF) next_valid_fd_length = 0 if len(data)>=(self.DataLength.TP+4): data = data[:(self.DataLength.TP+4)] else: # padding next_valid_fd_length = self._LUT_FD_DLC[len(data)] if next_valid_fd_length < 0: next_valid_fd_length = 0 while len(data)<next_valid_fd_length: data.append(255) self.__send_message(mid.can_id, True, data, fd_format=True)
[docs] def notify(self, can_id, data, timestamp): """Feed incoming CAN message into this ecu. If a custom interface is used, this function must be called for each 29-bit standard message read from the CAN bus. :param int can_id: CAN-ID of the message (always 29-bit) :param bytearray data: Data part of the message (0 - 8 bytes) :param float timestamp: The timestamp field in a CAN message is a floating point number representing when the message was received since the epoch in seconds. Where possible this will be timestamped in hardware. """ mid = MessageId(can_id=can_id) pgn = ParameterGroupNumber() pgn.from_message_id(mid) # peer to peer # pdu_specific is destination Address pgn_value = pgn.value & 0x1FF00 dest_address = pgn.pdu_specific # may be Address.GLOBAL # iterate all CAs to check if we have to handle this destination address if dest_address != ParameterGroupNumber.Address.GLOBAL: if not self.__ecu_is_message_acceptable(dest_address): # simple peer-to-peer reception without adding a controller-application reject = True for ca in self._cas: if ca.message_acceptable(dest_address): reject = False break if reject == True: return if pgn_value == ParameterGroupNumber.PGN.FEFF_MULTI_PG: self._process_multi_pg(mid, dest_address, data, timestamp) elif pgn_value == ParameterGroupNumber.PGN.ADDRESSCLAIM: for ca in self._cas: ca._process_addressclaim(mid, data, timestamp) elif pgn_value == ParameterGroupNumber.PGN.REQUEST: for ca in self._cas: if ca.message_acceptable(dest_address): ca._process_request(mid, dest_address, data, timestamp) elif pgn_value == ParameterGroupNumber.PGN.FD_TP_CM: self._process_tp_cm(mid, dest_address, data, timestamp) elif pgn_value == ParameterGroupNumber.PGN.FD_TP_DT: self._process_tp_dt(mid, dest_address, data, timestamp) elif pgn_value == ParameterGroupNumber.PGN.TP_CM: logger.info('j1939-21 transport protocol cm not allowed in j1939-22 network') elif pgn_value == ParameterGroupNumber.PGN.DATATRANSFER: logger.info('j1939-21 transport protocol dt not allowed in j1939-22 network') elif pgn.is_pdu2_format: # direct broadcast self.__notify_subscribers(mid.priority, pgn.value, mid.source_address, ParameterGroupNumber.Address.GLOBAL, timestamp, data) else: self.__notify_subscribers(mid.priority, pgn_value, mid.source_address, dest_address, timestamp, data)