Source code for j1939.j1939_21

from .parameter_group_number import ParameterGroupNumber
from .message_id import MessageId
import logging
import threading
import time

logger = logging.getLogger(__name__)

[docs] class J1939_21:
[docs] class ConnectionMode: RTS = 16 CTS = 17 EOM_ACK = 19 BAM = 32 ABORT = 255
[docs] class ConnectionAbortReason: BUSY = 1 # Already in one or more connection managed sessions and cannot support another RESOURCES = 2 # System resources were needed for another task so this connection managed session was terminated TIMEOUT = 3 # A timeout occured # 4..250 Reserved by SAE CTS_WHILE_DT = 4 # according AUTOSAR: CTS messages received when data transfer is in progress
# 251..255 Per J1939/71 definitions - but there are none?
[docs] class Timeout: """Timeouts according SAE J1939/21""" Tr = 0.200 # Response Time Th = 0.500 # Holding Time T1 = 0.750 T2 = 1.250 T3 = 1.250 T4 = 1.050 # timeout for multi packet broadcast messages 50..200ms Tb = 0.050
[docs] class SendBufferState: WAITING_CTS = 0 # waiting for CTS SENDING_IN_CTS = 1 # sending packages (temporary state) SENDING_BM = 2 # sending broadcast packages TRANSMISSION_FINISHED = 3 # finished, remove buffer
[docs] def __init__(self, send_message, job_thread_wakeup, notify_subscribers, max_cmdt_packets, minimum_tp_rts_cts_dt_interval, minimum_tp_bam_dt_interval, ecu_is_message_acceptable): # Receive buffers self._rcv_buffer = {} # Send buffers self._snd_buffer = {} # List of ControllerApplication self._cas = [] # set minimum time between two tp-rts/cts messages self._minimum_tp_rts_cts_dt_interval = minimum_tp_rts_cts_dt_interval # set minimum time between two tp-bam messages if minimum_tp_bam_dt_interval == None: self._minimum_tp_bam_dt_interval = self.Timeout.Tb else: self._minimum_tp_bam_dt_interval = minimum_tp_bam_dt_interval # number of packets that can be sent/received with CMDT (Connection Mode Data Transfer) self._max_cmdt_packets = max_cmdt_packets # Lock protecting _rcv_buffer and _snd_buffer — accessed from both the # Notifier thread (notify/process_tp_*) and the protocol job thread (async_job_thread). self._buffer_lock = threading.Lock() self.__job_thread_wakeup = job_thread_wakeup self.__send_message = send_message self.__notify_subscribers = notify_subscribers self.__ecu_is_message_acceptable = ecu_is_message_acceptable
[docs] def add_ca(self, ca): self._cas.append(ca)
[docs] def remove_ca(self, device_address): for ca in self._cas: if device_address == ca._device_address_preferred: self._cas.remove(ca) return True return False
def _buffer_hash(self, src_address, dest_address): """Calcluates a hash value for the given address pair :param src_address: The Source-Address the connection should bound to. :param dest_address: The Destination-Address the connection should bound to. :return: The calculated hash value. :rtype: int """ return ((src_address & 0xFF) << 8) | (dest_address & 0xFF)
[docs] def send_pgn(self, data_page, pdu_format, pdu_specific, priority, src_address, data, time_limit, frame_format): pgn = ParameterGroupNumber(data_page, pdu_format, pdu_specific) if len(data) <= 8: # send normal message mid = MessageId(priority=priority, parameter_group_number=pgn.value, source_address=src_address) self.__send_message(mid.can_id, True, data) else: # if the PF is between 0 and 239, the message is destination dependent when pdu_specific != 255 # if the PF is between 240 and 255, the message can only be broadcast if (pdu_specific == ParameterGroupNumber.Address.GLOBAL) or ParameterGroupNumber(0, pdu_format, pdu_specific).is_pdu2_format: dest_address = ParameterGroupNumber.Address.GLOBAL else: dest_address = pdu_specific # init sequence # known limitation: only one BAM can be sent in parallel to a destination node buffer_hash = self._buffer_hash(src_address, dest_address) message_size = len(data) num_packets = int(message_size / 7) if (message_size % 7 == 0) else int(message_size / 7) + 1 # if the PF is between 240 and 255, the message can only be broadcast if dest_address == ParameterGroupNumber.Address.GLOBAL: # send BAM before acquiring the lock — CAN I/O must not be # held under _buffer_lock to avoid priority inversion with the # protocol thread. with self._buffer_lock: if buffer_hash in self._snd_buffer: # There is already a sequence active for this pair return False self.__send_tp_bam(src_address, priority, pgn.value, message_size, num_packets) # init new buffer for this connection with self._buffer_lock: self._snd_buffer[buffer_hash] = { "pgn": pgn.value, "priority": priority, "message_size": message_size, "num_packages": num_packets, "data": data, "state": self.SendBufferState.SENDING_BM, "deadline": time.monotonic() + self._minimum_tp_bam_dt_interval, 'src_address' : src_address, 'dest_address' : ParameterGroupNumber.Address.GLOBAL, 'next_packet_to_send' : 0, } else: # send RTS/CTS pgn.pdu_specific = 0 # this is 0 for peer-to-peer transfer with self._buffer_lock: if buffer_hash in self._snd_buffer: # There is already a sequence active for this pair return False self.__send_tp_rts(src_address, pdu_specific, priority, pgn.value, message_size, num_packets, min(self._max_cmdt_packets, num_packets)) # init new buffer for this connection with self._buffer_lock: self._snd_buffer[buffer_hash] = { "pgn": pgn.value, "priority": priority, "message_size": message_size, "num_packages": num_packets, "data": data, "state": self.SendBufferState.WAITING_CTS, "deadline": time.monotonic() + self.Timeout.T3, 'src_address' : src_address, 'dest_address' : pdu_specific, 'next_packet_to_send' : 0, 'next_wait_on_cts': 0, } self.__job_thread_wakeup() return True
[docs] def async_job_thread(self, now): next_wakeup = now + 5.0 # wakeup in 5 seconds with self._buffer_lock: # check receive buffers for timeout for bufid in list(self._rcv_buffer): buf = self._rcv_buffer[bufid] if buf['deadline'] != 0: if buf['deadline'] > now: if next_wakeup > buf['deadline']: next_wakeup = buf['deadline'] else: # deadline reached logger.info("Deadline reached for rcv_buffer src 0x%02X dst 0x%02X", buf['src_address'], buf['dest_address'] ) if buf['dest_address'] != ParameterGroupNumber.Address.GLOBAL: # TODO: should we handle retries? self.__send_tp_abort(buf['dest_address'], buf['src_address'], self.ConnectionAbortReason.TIMEOUT, buf['pgn']) # TODO: should we notify our CAs about the cancelled transfer? del self._rcv_buffer[bufid] # check send buffers for bufid in list(self._snd_buffer): buf = self._snd_buffer[bufid] if buf['deadline'] != 0: if buf['deadline'] > now: if next_wakeup > buf['deadline']: next_wakeup = buf['deadline'] else: # deadline reached if buf['state'] == self.SendBufferState.WAITING_CTS: logger.info("Deadline WAITING_CTS reached for snd_buffer src 0x%02X dst 0x%02X", buf['src_address'], buf['dest_address'] ) self.__send_tp_abort(buf['src_address'], buf['dest_address'], self.ConnectionAbortReason.TIMEOUT, buf['pgn']) # TODO: should we notify our CAs about the cancelled transfer? del self._snd_buffer[bufid] elif buf['state'] == self.SendBufferState.SENDING_IN_CTS: while buf['next_packet_to_send'] < buf['num_packages']: package = buf['next_packet_to_send'] offset = package * 7 data = buf['data'][offset:] if len(data)>7: data = data[:7] else: while len(data)<7: data.append(255) data.insert(0, package+1) # modify the snd_buffer state in anticipation # of the message we are about to transmit buf['next_packet_to_send'] += 1 should_break = False if package == buf['next_wait_on_cts']: # wait on next cts buf['state'] = self.SendBufferState.WAITING_CTS buf['deadline'] = time.monotonic() + self.Timeout.T3 should_break = True elif self._minimum_tp_rts_cts_dt_interval != None: buf['deadline'] = time.monotonic() + self._minimum_tp_rts_cts_dt_interval should_break = True # state is ready for recv - Now send the message self.__send_tp_dt(buf['src_address'], buf['dest_address'], data) if should_break: break # recalc next wakeup if next_wakeup > buf['deadline']: next_wakeup = buf['deadline'] elif buf['state'] == self.SendBufferState.SENDING_BM: # send next broadcast message... offset = buf['next_packet_to_send'] * 7 data = buf['data'][offset:] if len(data)>7: data = data[:7] else: while len(data)<7: data.append(255) data.insert(0, buf['next_packet_to_send']+1) # modify the snd_buffer state in anticipation # of the message we are about to transmit buf['next_packet_to_send'] += 1 if buf['next_packet_to_send'] < buf['num_packages']: buf['deadline'] = time.monotonic() + self._minimum_tp_bam_dt_interval # recalc next wakeup if next_wakeup > buf['deadline']: next_wakeup = buf['deadline'] else: # done del self._snd_buffer[bufid] # state is updated and ready for recv - now send data self.__send_tp_dt(buf['src_address'], buf['dest_address'], data) elif buf['state'] == self.SendBufferState.TRANSMISSION_FINISHED: del self._snd_buffer[bufid] else: logger.critical("unknown SendBufferState %d", buf['state']) del self._snd_buffer[bufid] return next_wakeup
def _process_tp_cm(self, mid, dest_address, data, timestamp): """Processes a Transport Protocol Connection Management (TP.CM) message :param j1939.MessageId mid: A MessageId object holding the information extracted from the can_id. :param int dest_address: The destination address of the message :param bytearray data: The data contained in the can-message. :param float timestamp: The timestamp the message was received (mostly) in fractions of Epoch-Seconds. """ control_byte = data[0] pgn = data[5] | (data[6] << 8) | (data[7] << 16) src_address = mid.source_address with self._buffer_lock: if control_byte == self.ConnectionMode.RTS: message_size = data[1] | (data[2] << 8) num_packages = data[3] max_num_packages = data[4] # Maximum number of segments that can be sent in response to one CTS. buffer_hash = self._buffer_hash(src_address, dest_address) if buffer_hash in self._rcv_buffer: # according SAE J1939-21 we have to send an ABORT if an active # transmission is already established self.__send_tp_abort(dest_address, src_address, self.ConnectionAbortReason.BUSY, pgn) return # limit max number segments max_num_packages = min(max_num_packages, num_packages) # open new buffer for this connection self._rcv_buffer[buffer_hash] = { 'pgn': pgn, 'message_size': message_size, 'num_packages': num_packages, 'next_packet': min(self._max_cmdt_packets, max_num_packages), 'max_cmdt_packages': self._max_cmdt_packets, 'num_packages_max_rec': min(self._max_cmdt_packets, max_num_packages), 'data': [], 'deadline': time.monotonic() + self.Timeout.T2, 'src_address' : src_address, 'dest_address' : dest_address, } self.__send_tp_cts(dest_address, src_address, self._rcv_buffer[buffer_hash]['num_packages_max_rec'], 1, pgn) self.__job_thread_wakeup() elif control_byte == self.ConnectionMode.CTS: num_packages = data[1] next_package_number = data[2] - 1 buffer_hash = self._buffer_hash(dest_address, src_address) if buffer_hash not in self._snd_buffer: self.__send_tp_abort(dest_address, src_address, self.ConnectionAbortReason.RESOURCES, pgn) return if num_packages == 0: # SAE J1939/21 # receiver requests a pause self._snd_buffer[buffer_hash]['deadline'] = time.monotonic() + self.Timeout.Th self.__job_thread_wakeup() return num_packages_all = self._snd_buffer[buffer_hash]["num_packages"] if num_packages > num_packages_all: logger.debug("CTS: Allowed more packets %d than complete transmission %d", num_packages, num_packages_all) num_packages = num_packages_all if next_package_number + num_packages > num_packages_all: logger.debug("CTS: Allowed more packets %d than needed to complete transmission %d", num_packages, num_packages_all - next_package_number) num_packages = num_packages_all - next_package_number self._snd_buffer[buffer_hash]['next_wait_on_cts'] = self._snd_buffer[buffer_hash]['next_packet_to_send'] + num_packages - 1 self._snd_buffer[buffer_hash]['state'] = self.SendBufferState.SENDING_IN_CTS self._snd_buffer[buffer_hash]['deadline'] = time.monotonic() self.__job_thread_wakeup() elif control_byte == self.ConnectionMode.EOM_ACK: buffer_hash = self._buffer_hash(dest_address, src_address) if buffer_hash not in self._snd_buffer: self.__send_tp_abort(dest_address, src_address, self.ConnectionAbortReason.RESOURCES, pgn) return # TODO: should we inform the application about the successful transmission? # Notify subscribers here to be used for the memory access server to know when to send operation complete self.__notify_subscribers(mid.priority,pgn,mid.source_address,dest_address,timestamp,data) self._snd_buffer[buffer_hash]['state'] = self.SendBufferState.TRANSMISSION_FINISHED self._snd_buffer[buffer_hash]['deadline'] = time.monotonic() self.__job_thread_wakeup() elif control_byte == self.ConnectionMode.BAM: message_size = data[1] | (data[2] << 8) num_packages = data[3] buffer_hash = self._buffer_hash(src_address, dest_address) if buffer_hash in self._rcv_buffer: # TODO: should we deliver the partly received message to our CAs? del self._rcv_buffer[buffer_hash] self.__job_thread_wakeup() # init new buffer for this connection self._rcv_buffer[buffer_hash] = { "pgn": pgn, "message_size": message_size, "num_packages": num_packages, "next_packet": 1, "max_cmdt_packages": self._max_cmdt_packets, "data": [], "deadline": time.monotonic() + self.Timeout.T1, 'src_address' : src_address, 'dest_address' : dest_address, } self.__job_thread_wakeup() elif control_byte == self.ConnectionMode.ABORT: # if abort received before transmission established -> cancel transmission buffer_hash = self._buffer_hash(dest_address, src_address) if buffer_hash in self._snd_buffer and self._snd_buffer[buffer_hash]['state'] == self.SendBufferState.WAITING_CTS: self._snd_buffer[buffer_hash]['state'] = self.SendBufferState.TRANSMISSION_FINISHED self._snd_buffer[buffer_hash]['deadline'] = time.monotonic() # TODO: any more abort responses? pass else: raise RuntimeError("Received TP.CM with unknown control_byte %d", control_byte) def _process_tp_dt(self, mid, dest_address, data, timestamp): sequence_number = data[0] src_address = mid.source_address buffer_hash = self._buffer_hash(src_address, dest_address) with self._buffer_lock: if buffer_hash not in self._rcv_buffer: # TODO: LOG/TRACE/EXCEPTION? return # get data self._rcv_buffer[buffer_hash]['data'].extend(data[1:]) # message is complete with sending an acknowledge if len(self._rcv_buffer[buffer_hash]['data']) >= self._rcv_buffer[buffer_hash]['message_size']: logger.info("finished RCV of PGN {} with size {}".format(self._rcv_buffer[buffer_hash]['pgn'], self._rcv_buffer[buffer_hash]['message_size'])) # shorten data to message_size self._rcv_buffer[buffer_hash]['data'] = self._rcv_buffer[buffer_hash]['data'][:self._rcv_buffer[buffer_hash]['message_size']] # finished reassembly if dest_address != ParameterGroupNumber.Address.GLOBAL: self.__send_tp_eom_ack(dest_address, src_address, self._rcv_buffer[buffer_hash]['message_size'], self._rcv_buffer[buffer_hash]['num_packages'], self._rcv_buffer[buffer_hash]['pgn']) self.__notify_subscribers(mid.priority, self._rcv_buffer[buffer_hash]['pgn'], src_address, dest_address, timestamp, self._rcv_buffer[buffer_hash]['data']) del self._rcv_buffer[buffer_hash] self.__job_thread_wakeup() return # clear to send if (dest_address != ParameterGroupNumber.Address.GLOBAL) and (sequence_number >= self._rcv_buffer[buffer_hash]['next_packet']): # send cts number_of_packets_that_can_be_sent = min( self._rcv_buffer[buffer_hash]['num_packages_max_rec'], self._rcv_buffer[buffer_hash]['num_packages'] - self._rcv_buffer[buffer_hash]['next_packet'] ) next_packet_to_be_sent = self._rcv_buffer[buffer_hash]['next_packet'] + 1 self.__send_tp_cts(dest_address, src_address, number_of_packets_that_can_be_sent, next_packet_to_be_sent, self._rcv_buffer[buffer_hash]['pgn']) # calculate next packet number at which a CTS is to be sent self._rcv_buffer[buffer_hash]['next_packet'] = min(self._rcv_buffer[buffer_hash]['next_packet'] + self._rcv_buffer[buffer_hash]['num_packages_max_rec'], self._rcv_buffer[buffer_hash]['num_packages']) self._rcv_buffer[buffer_hash]['deadline'] = time.monotonic() + self.Timeout.T2 self.__job_thread_wakeup() return self._rcv_buffer[buffer_hash]['deadline'] = time.monotonic() + self.Timeout.T1 self.__job_thread_wakeup() def __send_tp_dt(self, src_address, dest_address, data): pgn = ParameterGroupNumber(0, 235, dest_address) mid = MessageId(priority=7, parameter_group_number=pgn.value, source_address=src_address) self.__send_message(mid.can_id, True, data) def __send_tp_abort(self, src_address, dest_address, reason, pgn_value): pgn = ParameterGroupNumber(0, 236, dest_address) mid = MessageId(priority=7, parameter_group_number=pgn.value, source_address=src_address) data = [self.ConnectionMode.ABORT, reason, 0xFF, 0xFF, 0xFF, pgn_value & 0xFF, (pgn_value >> 8) & 0xFF, (pgn_value >> 16) & 0xFF] self.__send_message(mid.can_id, True, data) def __send_tp_cts(self, src_address, dest_address, num_packets, next_packet, pgn_value): pgn = ParameterGroupNumber(0, 236, dest_address) mid = MessageId(priority=7, parameter_group_number=pgn.value, source_address=src_address) data = [self.ConnectionMode.CTS, num_packets, next_packet, 0xFF, 0xFF, pgn_value & 0xFF, (pgn_value >> 8) & 0xFF, (pgn_value >> 16) & 0xFF] self.__send_message(mid.can_id, True, data) def __send_tp_eom_ack(self, src_address, dest_address, message_size, num_packets, pgn_value): pgn = ParameterGroupNumber(0, 236, dest_address) mid = MessageId(priority=7, parameter_group_number=pgn.value, source_address=src_address) data = [self.ConnectionMode.EOM_ACK, message_size & 0xFF, (message_size >> 8) & 0xFF, num_packets, 0xFF, pgn_value & 0xFF, (pgn_value >> 8) & 0xFF, (pgn_value >> 16) & 0xFF] self.__send_message(mid.can_id, True, data) def __send_tp_rts(self, src_address, dest_address, priority, pgn_value, message_size, num_packets, max_cmdt_packets): pgn = ParameterGroupNumber(0, 236, dest_address) mid = MessageId(priority=priority, parameter_group_number=pgn.value, source_address=src_address) data = [self.ConnectionMode.RTS, message_size & 0xFF, (message_size >> 8) & 0xFF, num_packets, max_cmdt_packets, pgn_value & 0xFF, (pgn_value >> 8) & 0xFF, (pgn_value >> 16) & 0xFF] self.__send_message(mid.can_id, True, data) def __send_acknowledgement(self, control_byte, group_function_value, address_acknowledged, pgn): data = [control_byte, group_function_value, 0xFF, 0xFF, address_acknowledged, (pgn & 0xFF), ((pgn >> 8) & 0xFF), ((pgn >> 16) & 0xFF)] mid = MessageId(priority=6, parameter_group_number=0x00E800, source_address=255) self.__send_message(mid.can_id, True, data) def __send_tp_bam(self, src_address, priority, pgn_value, message_size, num_packets): pgn = ParameterGroupNumber(0, 236, ParameterGroupNumber.Address.GLOBAL) mid = MessageId(priority=priority, parameter_group_number=pgn.value, source_address=src_address) data = [self.ConnectionMode.BAM, message_size & 0xFF, (message_size >> 8) & 0xFF, num_packets, 0xFF, pgn_value & 0xFF, (pgn_value >> 8) & 0xFF, (pgn_value >> 16) & 0xFF] self.__send_message(mid.can_id, True, data)
[docs] def notify(self, can_id, data, timestamp): """Feed incoming CAN message into this ecu. If a custom interface is used, this function must be called for each 29-bit standard message read from the CAN bus. :param int can_id: CAN-ID of the message (always 29-bit) :param bytearray data: Data part of the message (0 - 8 bytes) :param float timestamp: The timestamp field in a CAN message is a floating point number representing when the message was received since the epoch in seconds. Where possible this will be timestamped in hardware. """ mid = MessageId(can_id=can_id) pgn = ParameterGroupNumber() pgn.from_message_id(mid) if pgn.is_pdu2_format: # direct broadcast self.__notify_subscribers(mid.priority, pgn.value, mid.source_address, ParameterGroupNumber.Address.GLOBAL, timestamp, data) return # peer to peer # pdu_specific is destination Address pgn_value = pgn.value & 0x1FF00 dest_address = pgn.pdu_specific # may be Address.GLOBAL # iterate all CAs to check if we have to handle this destination address if dest_address != ParameterGroupNumber.Address.GLOBAL: if not self.__ecu_is_message_acceptable(dest_address): # simple peer-to-peer reception without adding a controller-application reject = True for ca in self._cas: if ca.message_acceptable(dest_address): reject = False break if reject == True: return if pgn_value == ParameterGroupNumber.PGN.ADDRESSCLAIM: for ca in self._cas: ca._process_addressclaim(mid, data, timestamp) elif pgn_value == ParameterGroupNumber.PGN.REQUEST: for ca in self._cas: if ca.message_acceptable(dest_address): ca._process_request(mid, dest_address, data, timestamp) elif pgn_value == ParameterGroupNumber.PGN.TP_CM: self._process_tp_cm(mid, dest_address, data, timestamp) elif pgn_value == ParameterGroupNumber.PGN.DATATRANSFER: self._process_tp_dt(mid, dest_address, data, timestamp) else: self.__notify_subscribers(mid.priority, pgn_value, mid.source_address, dest_address, timestamp, data) return