Source code for j1939.controller_application

from __future__ import annotations
import logging
from typing import Optional
import j1939
from .message_id import FrameFormat

logger = logging.getLogger(__name__)

[docs] class ControllerApplication: """ControllerApplication (CA) identified by a Name and an Address."""
[docs] class State: NONE = 0 WAIT_VETO = 1 NORMAL = 2 CANNOT_CLAIM = 3
[docs] class ClaimTimeout: VETO = 0.250 REQUEST_FOR_CLAIM = 1.250
[docs] class FieldValue: # The following values are in "Little Endian First" Byteorder # indicates, that the parameter is "not available" NOT_AVAILABLE_8 = 0xFF NOT_AVAILABLE_16 = 0xFF00 NOT_AVAILABLE_16_ARR = [0xFF, 0x00] # indicates, that the parameter is "not valid" or "in error" NOT_VALID_8 = 0xFE NOT_VALID_16 = 0xFE00 NOT_VALID_16_ARR = [0xFE, 0x00] # raw parameter values must not exceed the following values MAX_8 = 0xFA MAX_16 = 0xFAFF MAX_16_ARR = [0xFA, 0xFF]
[docs] def __init__(self, name, device_address_preferred=None, bypass_address_claim=False): """ :param name: A j1939 :class:`j1939.Name` instance :param device_address_preferred: The device_address this CA should claim on the bus. :param bypass_address_claim: Flag to bypass address claim procedure """ self._name = name self._device_address_preferred = device_address_preferred if bypass_address_claim and (device_address_preferred is not None): self._device_address_announced = device_address_preferred self._device_address = device_address_preferred self._device_address_state = ControllerApplication.State.NORMAL else: self._device_address_announced = j1939.ParameterGroupNumber.Address.NULL self._device_address = j1939.ParameterGroupNumber.Address.NULL self._device_address_state = ControllerApplication.State.NONE self._ecu: Optional[j1939.ElectronicControlUnit] = None self._subscribers_request = [] self._subscribers_acknowledge = [] self._started = False
@property def _ecu_ref(self) -> j1939.ElectronicControlUnit: if self._ecu is None: raise RuntimeError("CA is not associated with an ECU") return self._ecu
[docs] def associate_ecu(self, ecu): """Binds this CA to the ECU given :param ecu: The ECU this CA should be bound to. A j1939 :class:`j1939.ElectronicControlUnit` instance """ self._ecu = ecu
[docs] def remove_ecu(self): self._ecu = None
[docs] def subscribe(self, callback): """Add the given callback to the message notification stream. :param callback: Function to call when message is received. """ self._ecu_ref.subscribe(callback, self.message_acceptable)
[docs] def unsubscribe(self, callback): """Stop listening for message. :param callback: Function to call when message is received. """ self._ecu_ref.unsubscribe(callback)
[docs] def subscribe_request(self, callback): """Add the given callback to the request notification stream. :param callback: Function to call when a request is received. """ self._subscribers_request.append(callback)
[docs] def unsubscribe_request(self, callback): """Remove the given callback to the request notification stream. :param callback: Function to call when a request is received. """ self._subscribers_request.remove(callback)
[docs] def subscribe_acknowledge(self, callback): """Add the given callback from the acknowledge notification stream :param callback: Function to call when an acknowledge is received. """ self._subscribers_acknowledge.append(callback)
[docs] def unsubscribe_acknowledge(self, callback): """Remove the given callback from the request notification stream. :param callback: Function to call when an acknowledge is received. """
[docs] def add_timer(self, delta_time, callback, cookie=None): """Adds a callback to the list of timer events :param delta_time: The time in seconds after which the event is to be triggered. :param callback: The callback function to call """ self._ecu_ref.add_timer(delta_time, callback, cookie)
[docs] def remove_timer(self, callback): """Removes ALL entries from the timer event list for the given callback :param callback: The callback to be removed from the timer event list """ self._ecu_ref.remove_timer(callback)
[docs] def register_dependent(self, dependent): """Register a helper whose ``stop()`` should be called on ECU shutdown. Convenience forwarder to :meth:`ElectronicControlUnit.register_dependent` for helpers that only hold a reference to a CA. :param dependent: Any object exposing a no-arg ``stop()`` method. """ self._ecu_ref.register_dependent(dependent)
[docs] def unregister_dependent(self, dependent): """Remove a previously-registered dependent. Convenience forwarder to :meth:`ElectronicControlUnit.unregister_dependent`. :param dependent: The object previously passed to :meth:`register_dependent`. """ self._ecu_ref.unregister_dependent(dependent)
[docs] def start(self, claim_delay=0.5): """Starts the CA :param claim_delay: The time in seconds to wait before starting the address claim procedure. """ # TODO raise RuntimeError("Can't start CA. Seems to be already running.")? or just ignore? # check if we are not already started and there is an ecu connected if self._ecu and not self.started: self._started = True self._ecu_ref.add_timer(claim_delay, self._process_claim_async)
[docs] def stop(self): """Stops the CA """ # check if we are already started and there is an ecu connected if self._ecu and self.started: self._started = False self._ecu_ref.remove_timer(self._process_claim_async)
def _process_claim_async(self, cookie): time_to_sleep = 0.500 if self._device_address_state == ControllerApplication.State.NONE: if self._device_address_preferred != None: self._device_address_announced = self._device_address_preferred self._send_address_claimed(self._device_address_announced) if self._device_address_announced > 127 and self._device_address_announced < 248: self._device_address_state = ControllerApplication.State.WAIT_VETO time_to_sleep = ControllerApplication.ClaimTimeout.VETO else: # addresses from 0..127 and 248..253 should start immediately self._device_address = self._device_address_announced self._device_address_state = ControllerApplication.State.NORMAL elif self._device_address_state == ControllerApplication.State.WAIT_VETO: # if we reach this phase, there was no VETO to our address claimed message so far self._device_address = self._device_address_announced self._device_address_state = ControllerApplication.State.NORMAL elif self._device_address_state == ControllerApplication.State.NORMAL: # do nothing pass elif self._device_address_state == ControllerApplication.State.CANNOT_CLAIM: # do nothing pass # add new event with (possibly) new timeout value self._ecu_ref.add_timer(time_to_sleep, self._process_claim_async) # returning false deletes the event from the list return False def _process_addressclaim(self, mid, data, timestamp): """Processes an address claim message :param j1939.MessageId mid: A MessageId object holding the information extracted from the can_id. :param bytearray data: The data contained in the can-message. :param float timestamp: The timestamp the message was received (mostly) in fractions of Epoch-Seconds. """ src_address = mid.source_address logger.debug("Received ADDRESS CLAIMED message from source '%d'", src_address) # are we awaiting this address claimed message? if (0 or (self._device_address_state == ControllerApplication.State.NORMAL and src_address == self._device_address) or (self._device_address_state == ControllerApplication.State.WAIT_VETO and src_address == self._device_address_announced) ): logger.info("Received ADDRESS CLAIMED message with conflicting address '%d'", src_address) contenders_name = j1939.Name(bytes = data) if self._name.value == contenders_name.value: # both have the same name - this could mean that we are the device or there is a duplicate return if self._name.value > contenders_name.value: # we have to release our address and claim another one logger.info("We have to release our address '%d' because the contenders name is less than ours", src_address) # TODO: are there any state variables we have to care about? self._device_address = j1939.ParameterGroupNumber.Address.NULL # TODO: maybe we should call an overloadable function here if self._name.arbitrary_address_capable == False: # bad luck logger.error("After releasing our address we are configured to stop operation (CANNOT CLAIM)") self._device_address_state = ControllerApplication.State.CANNOT_CLAIM self._device_address = None self._send_address_claimed(j1939.ParameterGroupNumber.Address.NULL) # send CANNOT CLAIM else: # TODO: we should check the address range here self._device_address_announced += 1 logger.info("Try the next address '%d'", self._device_address_announced) self._send_address_claimed(self._device_address_announced) # TODO: it's not possible to set the VETO-Timeout from here self._device_address_state = ControllerApplication.State.WAIT_VETO else: # we have higher prio - repeat our claim message logger.info("Contender lost the competition - we can keep our address") if self._device_address_state == ControllerApplication.State.NORMAL: # we own our address already self._send_address_claimed(self._device_address) else: # we are in the middle of the claim-process self._send_address_claimed(self._device_address_announced) def _process_request(self, mid, dest_address, data, timestamp): """Processes a REQUEST message :param j1939.MessageId mid: A MessageId object holding the information extracted from the can_id. :param int dest_address: The destination address of the message :param bytearray data: The data contained in the can-message. :param float timestamp: The timestamp the message was received (mostly) in fractions of Epoch-Seconds. """ pgn = data[0] | (data[1] << 8) | (data[2] << 16) src_address = mid.source_address if (self.state != ControllerApplication.State.NORMAL) or ((self._device_address != dest_address) and (dest_address != j1939.ParameterGroupNumber.Address.GLOBAL)): # only answer if # - we have a valid address and # - the destination_addr is ours OR the destination_addr is the GLOBAL one return # special case j1939.ParameterGroupNumber.PGN.ADDRESSCLAIM if pgn==j1939.ParameterGroupNumber.PGN.ADDRESSCLAIM: # answer the request with our name... self._send_address_claimed(self._device_address) else: for subscriber in self._subscribers_request: subscriber(src_address, dest_address, pgn)
[docs] def send_message(self, priority, parameter_group_number, data): if self.state != ControllerApplication.State.NORMAL: raise RuntimeError("Could not send message unless address claiming has finished") mid = j1939.MessageId(priority=priority, parameter_group_number=parameter_group_number, source_address=self._device_address) self._ecu_ref.send_message(mid.can_id, True, data)
[docs] def send_pgn(self, data_page, pdu_format, pdu_specific, priority, data, time_limit=0, frame_format=FrameFormat.FEFF): """send a pgn :param int data_page: data page :param int pdu_format: pdu format :param int pdu_specific: pdu specific :param int priority: message priority :param list data: payload, each list index represents one payload byte :param time_limit: option j1939-22 multi-pg: specify a time limit in s (e.g. 0.1 == 100ms), after this time, the multi-pg will be sent. several pgs can thus be combined in one multi-pg. 0 or no time-limit means immediate sending. """ if self.state != ControllerApplication.State.NORMAL: raise RuntimeError("Could not send message unless address claiming has finished") return self._ecu_ref.send_pgn(data_page, pdu_format, pdu_specific, priority, self._device_address, data, time_limit, frame_format)
[docs] def send_request(self, data_page, pgn, destination): """send a request message :param int data_page: data page :param int pgn: pgn to be requested :param list data: destination address """ if self.state != ControllerApplication.State.NORMAL: if pgn != j1939.ParameterGroupNumber.PGN.ADDRESSCLAIM: raise RuntimeError("Could not send request message unless address claiming has finished") source_address = j1939.ParameterGroupNumber.Address.NULL else: source_address = self._device_address data = [(pgn & 0xFF), ((pgn >> 8) & 0xFF), ((pgn >> 16) & 0xFF)] self._ecu_ref.send_pgn(data_page, (j1939.ParameterGroupNumber.PGN.REQUEST >> 8) & 0xFF, destination & 0xFF, 6, source_address, data)
def _send_address_claimed(self, address): # TODO: Normally the (initial) address claimed message must not be an auto repeat message. # We have to use a single-shot message instead! # After a (send-)error occurs we have to wait 0..153 msec before repeating. pgn = j1939.ParameterGroupNumber(0, 238, j1939.ParameterGroupNumber.Address.GLOBAL) mid = j1939.MessageId(priority=6, parameter_group_number=pgn.value, source_address=address) data = self._name.bytes self._ecu_ref.send_message(mid.can_id, True, data)
[docs] def on_request(self, src_address, dest_address, pgn): """Callback for PGN requests :param int src_address: The address the request comes from :param int dest_address: The address the request was sent to; normally ours, but can also be GLOBAL :param int pgn: Parameter Group Number requested """ pass
[docs] def message_acceptable(self, dest_address): """Indicates if this CA would accept a message This function indicates the acceptance of this CA for the given dest_address. """ if self.state != j1939.ControllerApplication.State.NORMAL: return False if dest_address == j1939.ParameterGroupNumber.Address.GLOBAL: return True return (self.device_address == dest_address)
@property def state(self): return self._device_address_state @property def device_address(self): if self.state != j1939.ControllerApplication.State.NORMAL: return j1939.ParameterGroupNumber.Address.NULL return self._device_address @property def started(self) -> bool: """ Getter for the started property """ return self._started