Source code for j1939.Dm14Query

from __future__ import annotations
from collections.abc import Callable
from enum import Enum
from typing import Optional
import queue
import j1939


[docs] class QueryState(Enum): IDLE = 1 WAIT_FOR_SEED = 2 WAIT_FOR_DM16 = 3 WAIT_FOR_OPER_COMPLETE = 4
[docs] class Command(Enum): ERASE = 0 READ = 1 WRITE = 2 STATUS_REQUEST = 3 OPERATION_COMPLETED = 4 OPERATION_FAILED = 5 BOOT_LOAD = 6 EDCP_GENERATION = 7
[docs] class Dm15Status(Enum): PROCEED = 0 BUSY = 1 OPERATION_COMPLETE = 4 OPERATION_FAILED = 5
[docs] class Dm14Query:
[docs] def __init__(self, ca: j1939.ControllerApplication, user_level=7) -> None: """ performs memory access queries using DM14-DM18 messaging. Presently only read and write queries are supported :param obj ca: j1939 controller application :param int user_level: the user level for the request """ self._ca = ca self.state = QueryState.IDLE self._seed_from_key: Optional[Callable[[int], int]] = None self.data_queue: queue.Queue = queue.Queue() self.mem_data = None self.exception_queue: queue.Queue = queue.Queue() self.user_level = user_level self._dest_address: Optional[int] = None self.address: Optional[int] = None self.command: Optional[Command] = None
[docs] def unsubscribe_all(self) -> None: """ Unsubscribes all message handlers """ self._ca.unsubscribe(self._parse_dm15) self._ca.unsubscribe(self._parse_dm16)
[docs] def reset_query(self) -> None: """ Resets query to remove transaction specific data """ self.state = QueryState.IDLE self._dest_address = None self.address = None self.object_count = 0 self.object_byte_size = 1 self.signed = False self.return_raw_bytes = False self.direct = 0 self.command = None self.bytes = bytearray() self.mem_data = None self.data_queue = queue.Queue() self.exception_queue = queue.Queue() self.unsubscribe_all()
def _wait_for_data(self) -> None: """ Determines whether to send data or wait to receive data based on the command type. If the command is a write command, then the data is sent. If the command is a read command, then the device waits to receive data. """ assert self.state is QueryState.WAIT_FOR_SEED if self.command is Command.WRITE: self._send_dm16() self.state = QueryState.WAIT_FOR_OPER_COMPLETE else: self.state = QueryState.WAIT_FOR_DM16 self._ca.unsubscribe(self._parse_dm15) self._ca.subscribe(self._parse_dm16) def _send_operation_complete(self) -> None: """ Send DM14 message to confirm the operation is complete """ self.object_count = 1 self.command = Command.OPERATION_COMPLETED self._send_dm14(0xFFFF) def _send_dm14(self, key_or_user_level: int) -> None: """ Send DM14 message to device, used to initialize a memory access operation, respond with a key when needed, and to confirm the operation is complete :param int key_or_user_level: key or user level """ if self.address is None: raise RuntimeError("address must be set before sending DM14") if self.command is None: raise RuntimeError("command must be set before sending DM14") if self._dest_address is None: raise RuntimeError("destination address must be set before sending DM14") self._pgn = j1939.ParameterGroupNumber.PGN.DM14 pointer = self.address.to_bytes(length=4, byteorder="little") data = [] data.append(self.object_count) data.append( (self.direct << 4) + (self.command.value << 1) + 1 ) # (SAE reserved = 1) for octet in pointer: data.append(octet) data.append(key_or_user_level & 0xFF) data.append(key_or_user_level >> 8) self._ca.send_pgn( 0, (self._pgn >> 8) & 0xFF, self._dest_address & 0xFF, 6, data ) def _send_dm16(self) -> None: """ Send DM16 message to device, used to send data to the device """ if self._dest_address is None: raise RuntimeError("destination address must be set before sending DM16") self._pgn = j1939.ParameterGroupNumber.PGN.DM16 data = [] byte_count = len(self.bytes) data.append(0xFF if byte_count > 7 else byte_count) for i in range(byte_count): data.append(self.bytes[i]) self._ca.send_pgn( 0, (self._pgn >> 8) & 0xFF, self._dest_address & 0xFF, 6, data ) def _parse_dm15( self, priority: int, pgn: int, sa: int, timestamp: int, data: bytearray ) -> None: """ Parse DM15 message from device, used to determine whether device is ready, or if operation has completed and to receive seed from device :param int priority: priority of the message :param int pgn: parameter group number of the message :param int sa: source address of the message :param int timestamp: timestamp of the message :param bytearray data: data of the PDU """ if pgn != j1939.ParameterGroupNumber.PGN.DM15 or sa != self._dest_address: return seed = (data[7] << 8) + data[6] status = (data[1] >> 1) & 7 if ( status is Dm15Status.BUSY.value or status is Dm15Status.OPERATION_FAILED.value ): error = int.from_bytes(data[2:5], byteorder="little", signed=False) edcp = data[5] self.data_queue.put(None) if edcp == 0x06 or edcp == 0x07: if error in j1939.ErrorInfo: self.exception_queue.put( RuntimeError( f"Device {hex(sa)} error: {hex(error)} {j1939.ErrorInfo[error]} edcp: {hex(edcp)}" ) ) else: self.exception_queue.put( RuntimeError( f"Device {hex(sa)} error: {hex(error)} edcp: {hex(edcp)}" ) ) else: length = data[0] if seed == 0xFFFF and length == self.object_count: self._wait_for_data() else: if self.state is QueryState.WAIT_FOR_OPER_COMPLETE: assert status is Command.OPERATION_COMPLETED.value self._send_operation_complete() self.state = QueryState.IDLE self.data_queue.put(self.mem_data) else: assert self.state is QueryState.WAIT_FOR_SEED if self._seed_from_key is not None: self._send_dm14(self._seed_from_key(seed)) else: self.data_queue.put(None) self.exception_queue.put( RuntimeError( "Key requested from host but no seed-key algorithm has been provided" ) ) def _parse_dm16( self, priority: int, pgn: int, sa: int, timestamp: int, data: bytearray ) -> None: """ parse DM16 message received from device, used to parse data received from device on a read command :param int priority: priority of the message :param int pgn: parameter group number of the message :param int sa: source address of the message :param int timestamp: timestamp of the message :param bytearray data: data of the PDU """ if pgn != j1939.ParameterGroupNumber.PGN.DM16 or sa != self._dest_address: return length = min(data[0], len(data) - 1) # assert object_count == self.object_count self.mem_data = data[1 : length + 1] self._ca.unsubscribe(self._parse_dm16) self._ca.subscribe(self._parse_dm15) self.state = QueryState.WAIT_FOR_OPER_COMPLETE def _values_to_bytes(self, values: list) -> list: """ convert values to a flat list of bytes for sending to device :param list values: values to be converted to bytes :return: flat list of ints representing the byte encoding """ result = [] for val in values: result.extend(val.to_bytes(self.object_byte_size, byteorder="little")) return result def _bytes_to_values(self, raw_bytes: bytearray) -> list: """ convert bytes received from device to values :param bytearray raw_bytes: bytes received from device """ values = [] for i in range(len(raw_bytes) // self.object_byte_size): values.append( int.from_bytes( raw_bytes[i : self.object_byte_size], byteorder="little", signed=self.signed, ) ) return values
[docs] def read( self, dest_address: int, direct: int, address: int, object_count: int, object_byte_size: int = 1, signed: bool = False, return_raw_bytes: bool = False, max_timeout: int = 1, ) -> list: """ Send a read query to dest_address, requesting data at address :param int dest_address: destination address of the message :param int direct: direct address of the message :param int address: address of the message :param int object_count: number of objects to be read :param int object_byte_size: size of each object in bytes :param bool signed: whether the data is signed :param bool return_raw_bytes: whether to return raw bytes or values :param int max_timeout: max timeout for transaction """ assert object_count > 0 self._dest_address = dest_address self.direct = direct self.address = address self.object_count = object_count self.object_byte_size = object_byte_size self.signed = signed self.return_raw_bytes = return_raw_bytes self.command = Command.READ self._ca.subscribe(self._parse_dm15) self._send_dm14(self.user_level) self.state = QueryState.WAIT_FOR_SEED # wait for operation completed DM15 message raw_bytes = None try: raw_bytes = self.data_queue.get(block=True, timeout=max_timeout) except queue.Empty: if self.state is QueryState.WAIT_FOR_SEED: raise RuntimeError("No response from server") pass for _ in range(self.exception_queue.qsize()): raise self.exception_queue.get(block=False, timeout=max_timeout) if raw_bytes: if self.return_raw_bytes: return raw_bytes else: return self._bytes_to_values(raw_bytes) else: return []
[docs] def write( self, dest_address: int, direct: int, address: int, values: list, object_byte_size: int = 1, max_timeout: int = 1, ) -> None: """ Send a write query to dest_address, requesting to write values at address :param int dest_address: destination address of the message :param int direct: direct address of the message :param int address: address of the message :param list values: values to be written :param int object_byte_size: size of each object in bytes :param int max_timeout: max timeout for transaction """ self._dest_address = dest_address self.direct = direct self.address = address self.object_byte_size = object_byte_size self.command = Command.WRITE self.bytes = self._values_to_bytes(values) self.object_count = len(values) self._ca.subscribe(self._parse_dm15) self._send_dm14(self.user_level) self.state = QueryState.WAIT_FOR_SEED # wait for operation completed DM15 message try: self.data_queue.get(block=True, timeout=max_timeout) for _ in range(self.exception_queue.qsize()): raise self.exception_queue.get(block=False, timeout=max_timeout) except queue.Empty: if self.state is QueryState.WAIT_FOR_SEED: raise RuntimeError("No response from server") pass # expect empty queue for write
[docs] def set_seed_key_algorithm(self, algorithm: Callable[[int], int]) -> None: """ set seed-key algorithm to be used for key generation :param algorithm: seed-key algorithm """ self._seed_from_key = algorithm