from __future__ import annotations
from collections.abc import Callable
from enum import Enum
from typing import Optional
import queue
import secrets
import j1939
[docs]
class ResponseState(Enum):
IDLE = 1
WAIT_FOR_DM14 = 2
WAIT_FOR_KEY = 3
SEND_PROCEED = 4
SEND_OPERATION_COMPLETE = 5
WAIT_OPERATION_COMPLETE = 6
SEND_ERROR = 7
WAIT_FOR_DM16 = 8
[docs]
class DM14Server:
[docs]
def __init__(self, ca: j1939.ControllerApplication) -> None:
"""
performs memory access responses using DM14-DM18 messaging.
:param obj ca: j1939 controller application
"""
self._ca = ca
self._busy = False
self.sa: Optional[int] = None
self.state = ResponseState.IDLE
self._key_from_seed: Optional[Callable[[int], int]] = None
self.data_queue: queue.Queue = queue.Queue()
self._seed_generator: Callable[[], int] = self.generate_seed
self._verify_key: Optional[Callable[..., bool]] = None
self.address: Optional[bytearray] = None
self.length = 8
self.proceed = False
self.data: bytearray | list = []
self.error = 0x00
self.edcp = 0x07
self.status = j1939.Dm15Status.PROCEED.value
self.direct = 0
def _wait_for_data(self) -> None:
"""
Determines whether to send data or wait to receive data based on the command type.
If the command is a read command, then the data requested is sent.
"""
if self.sa is None:
raise RuntimeError("sa must be set before waiting for data")
self._ca.subscribe(self._parse_dm16)
self._send_dm15(
self.length,
self.direct,
self.status,
self.state,
self.object_count,
self.sa,
j1939.ParameterGroupNumber.PGN.DM15,
self.error,
self.edcp,
)
if (
self.command is j1939.Command.READ.value
and self.state == ResponseState.SEND_PROCEED
):
self._ca.unsubscribe(self._parse_dm16)
self._send_dm16()
if (len(self.data)) <= 8:
self.proceed = True
self.state = ResponseState.SEND_OPERATION_COMPLETE
self._ca.subscribe(self.parse_dm14)
self._send_dm15(
self.length,
self.direct,
self.status,
self.state,
self.object_count,
self.sa,
j1939.ParameterGroupNumber.PGN.DM15,
self.error,
self.edcp,
)
elif (
self.command is j1939.Command.WRITE.value
and self.state == ResponseState.SEND_PROCEED
):
self.state = ResponseState.WAIT_FOR_DM16
else:
self._ca.unsubscribe(self._parse_dm16)
self.state = ResponseState.IDLE
self.sa = None
[docs]
def parse_dm14(
self, priority: int, pgn: int, sa: int, timestamp: int, data: bytearray
) -> None:
"""
parse DM14 message received
:param int priority: priority of the message
:param int pgn: parameter group number of the message
:param int sa: source address of the message
:param int timestamp: timestamp of the message
:param bytearray data: data of the PDU
"""
if pgn != j1939.ParameterGroupNumber.PGN.DM14:
return
if (
(self.sa is not None and sa != self.sa)
or (
self.address is not None and self.address != data[2 : (self.length - 2)]
)
or self._busy
):
self._send_dm15(
self.length,
data[1] >> 4,
j1939.Dm15Status.OPERATION_FAILED.value,
j1939.ResponseState.SEND_ERROR,
data[0],
sa,
j1939.ParameterGroupNumber.PGN.DM15,
self.error if self.error != 0x00 else 0x2,
0x7,
)
self.set_busy(False)
return
self.length = len(data)
self.direct = data[1] >> 4
match self.state:
case ResponseState.IDLE:
self.pgn = pgn
self.sa = sa
self.status = j1939.Dm15Status.PROCEED.value
self.address = data[2 : (self.length - 2)]
self.direct = data[1] >> 4
self.command = ((data[1] - 1) & 0x0F) >> 1
self.pointer_type = (data[1] >> 4) & 0x1
self.object_count = data[0]
self.access_level = (data[self.length - 1] << 8) + data[self.length - 2]
self.data = data
if self._key_from_seed is not None:
self.state = ResponseState.WAIT_FOR_KEY
self._pgn = j1939.ParameterGroupNumber.PGN.DM15
self._send_dm15(
self.length,
self.direct,
self.status,
self.state,
self.object_count,
sa,
)
else:
self.state = ResponseState.SEND_PROCEED
case ResponseState.WAIT_FOR_KEY:
self.length = len(data)
self.address = data[2 : (self.length - 2)]
self.command = ((data[1] - 1) & 0x0F) >> 1
self.object_count = data[0]
self.key = (data[self.length - 1] << 8) + data[self.length - 2]
self.data = data
self.state = ResponseState.SEND_PROCEED
case ResponseState.WAIT_OPERATION_COMPLETE:
self.reset_server()
case _:
self.reset_server()
raise ValueError("Invalid state")
def _send_dm15(
self,
length: int,
direct: int,
status: int,
state: ResponseState,
object_count: int,
sa: int,
pgn: int = j1939.ParameterGroupNumber.PGN.DM15,
error: Optional[int] = None,
edcp: Optional[int] = None,
) -> None:
"""
Send DM15 message to device, used to send the proceed message,
the generated seed, or the operation complete message
:param int length: length of data
:param int direct: value of direct transaction or not
:param int status: status of operation
:param ResponseState state: state of operation
:param int object_count: amount of objects to read
:param int sa: source address
:param int pgn: pgn value
:param int error: error code if necessary
:param int edcp: value of edcp for transaction
"""
self._pgn = j1939.ParameterGroupNumber.PGN.DM15
data = [0xFF] * length
data[1] = (direct << 4) + (status << 1) + 1
match state:
case ResponseState.WAIT_FOR_KEY:
self.seed = self._seed_generator()
data[0] = 0x00
data[length - 2] = self.seed & 0xFF
data[length - 1] = self.seed >> 8
case ResponseState.SEND_PROCEED:
data[0] = object_count
case ResponseState.SEND_OPERATION_COMPLETE:
self.command = j1939.Command.OPERATION_COMPLETED.value
data[0] = 0x00
data[1] = (direct << 4) + (self.command << 1) + 1
self.state = ResponseState.WAIT_OPERATION_COMPLETE
case ResponseState.SEND_ERROR:
if error is None:
raise RuntimeError("error must be provided for SEND_ERROR state")
if edcp is None:
raise RuntimeError("edcp must be provided for SEND_ERROR state")
status = j1939.Dm15Status.OPERATION_FAILED.value
data[0] = 0x00
data[1] = (direct << 4) + (status << 1) + 1
data[length - 6] = error & 0xFF
data[length - 5] = (error >> 8) & 0xFF
data[length - 4] = error >> 16
data[length - 3] = edcp
case _:
self.reset_server()
raise ValueError("Invalid state")
self._ca.send_pgn(0, (pgn >> 8) & 0xFF, sa & 0xFF, 6, data)
def _send_dm16(self) -> None:
"""
Send DM16 message to device, used to send requested data
"""
if self.sa is None:
raise RuntimeError("sa must be set before sending DM16")
self._pgn = j1939.ParameterGroupNumber.PGN.DM16
data = []
byte_count = len(self.data)
data.append(0xFF if byte_count > 7 else byte_count)
for i in range((byte_count)):
data.append(self.data[i])
data.extend([0xFF] * (self.length - byte_count - 1))
if byte_count > 8:
self._ca.subscribe(self._parse_dm16)
self._ca.send_pgn(0, (self._pgn >> 8) & 0xFF, self.sa & 0xFF, 7, data)
def _parse_dm16(
self, priority: int, pgn: int, sa: int, timestamp: int, data: bytearray
) -> None:
"""
parse DM16 message received, used to parse data received write command
:param int priority: priority of the message
:param int pgn: parameter group number of the message
:param int sa: source address of the message
:param int timestamp: timestamp of the message
:param bytearray data: data of the PDU
"""
if pgn != j1939.ParameterGroupNumber.PGN.DM16 or sa != self.sa:
return
length = min(data[0], len(data) - 1)
self.data_queue.put(data[1 : length + 1])
self._ca.unsubscribe(self._parse_dm16)
self._ca.subscribe(self.parse_dm14)
self.state = ResponseState.SEND_OPERATION_COMPLETE
self._send_dm15(
self.length,
self.direct,
self.status,
self.state,
self.object_count,
sa,
)
[docs]
def bytes_to_int(self, data: bytearray) -> int:
"""
Convert bytearray to integer
:param bytearray data: bytearray to be converted to integer
"""
return int.from_bytes(data, byteorder="little", signed=False)
[docs]
def set_busy(self, busy: bool) -> None:
"""
Sets busy variable to indicate if busy or not
:param busy: busy value
"""
self._busy = busy
[docs]
def generate_seed(self) -> int:
"""
Generte a random seed value for key generation
"""
seed = secrets.randbits(16)
if (seed == 0xFFFF) or (seed == 0x0000):
seed = 0xBEEF
return seed
[docs]
def set_seed_key_algorithm(self, algorithm: Callable[[int], int]) -> None:
"""
Set seed key algorithm to be used for key generation
:param algorithm: seed-key algorithm
"""
self._key_from_seed = algorithm
[docs]
def set_seed_generator(self, algorithm: Callable[[], int]) -> None:
"""
Sets seed generation algorithm to be used for generating a seed value
:param algorithm: seed generation algorithm
"""
self._seed_generator = algorithm
[docs]
def set_verify_key(self, algorithm: Callable[..., bool]) -> None:
"""
Set key verification algorithm to be used for key verification
:param algorithm: key verification algorithm
"""
self._verify_key = algorithm
[docs]
def verify_key(self, seed: int, key: int) -> bool:
"""
Checks to see if key is valid
:param int seed: seed
:param int key: key
"""
if self._verify_key is not None:
# TODO: add ability to dynamically pass arguments to verification function if needed,
# if this is breaking can just add **kwargs to function defintion used to set the verification function
# this will allow for the reception of additional arguments if needed
if self.address is None:
raise RuntimeError("address must be set before verifying key")
return self._verify_key(
seed=seed, key=key, address=self.bytes_to_int(self.address), sa=self.sa
)
if self._key_from_seed is None:
raise RuntimeError("no key-from-seed algorithm set; call set_seed_key_algorithm first")
return self._key_from_seed(seed) == key
[docs]
def unsubscribe_all(self) -> None:
"""
Unsubscribes all message handlers
"""
self._ca.unsubscribe(self.parse_dm14)
self._ca.unsubscribe(self._parse_dm16)
[docs]
def reset_server(self) -> None:
"""
Resets server to remove transaction specific data
"""
self.state = ResponseState.IDLE
self.data_queue = queue.Queue()
self.sa = None
self.seed = None
self.key = None
self._busy = False
self.address = None
self.length = 8
self.proceed = False
self.data = []
self.error = 0x00
self.edcp = 0x07
self.status = j1939.Dm15Status.PROCEED.value
self.direct = 0
self.unsubscribe_all()
[docs]
def respond(
self,
proceed: bool,
data=None,
error: int = 0xFFFFFF,
edcp: int = 0xFF,
max_timeout: int = 3,
) -> Optional[list]:
"""
Respond to DM14 query with the requested data or confimation of operation is good to proceed
:param bool proceed: whether the operation is good to proceed
:param list data: data to be sent to device
:param int error: error code to be sent to device
:param int edcp: value for edcp extension
:param int max_timeout: max time for transaction
"""
if data is None:
data = []
self.proceed = proceed
self.data = data
self.error = error
self.edcp = edcp
self.status = (
j1939.Dm15Status.PROCEED.value
if proceed
else j1939.Dm15Status.OPERATION_FAILED.value
)
if self.status == j1939.Dm15Status.PROCEED.value:
self.state = ResponseState.SEND_PROCEED
else:
self.state = ResponseState.SEND_ERROR
self._wait_for_data()
mem_data: Optional[list] = None
if self.state == ResponseState.WAIT_FOR_DM16:
try:
mem_data = self.data_queue.get(block=True, timeout=max_timeout)
except queue.Empty:
self.reset_server()
raise RuntimeError("No data received from DM16 within timeout period")
return mem_data