from __future__ import annotations
from collections.abc import Callable
from enum import Enum
import logging
from typing import Optional
import threading
import j1939
logger = logging.getLogger(__name__)
[docs]
class DMState(Enum):
IDLE = 1
REQUEST_STARTED = 2
WAIT_RESPONSE = 3
WAIT_QUERY = 4
SERVER_CLEANUP = 5
[docs]
class MemoryAccess:
[docs]
def __init__(self, ca: j1939.ControllerApplication) -> None:
"""
Makes an overarching Memory access class
Spawns a background servicer thread tied to the lifetime of this
instance. Call :meth:`stop` (or use the instance as a context
manager) when done. The instance is also registered as a dependent
of the parent ECU, so ``ecu.stop()`` will cascade and tear this
instance down automatically.
:param ca: Controller Application
"""
self._ca = ca
self.query = j1939.Dm14Query(ca)
self.server = j1939.DM14Server(ca)
self._proceed_event = threading.Event()
self._ca.subscribe(self._listen_for_dm14)
self.state = DMState.IDLE
self.seed_security = False
self._notify_query_received = None
self._proceed_function = None
self._stopped = False
self._stop_lock = threading.Lock()
self._job_thread_end = threading.Event()
self._job_thread = threading.Thread(target=self._servicer, name='j1939.memory_access servicer_thread')
# A thread can be flagged as a "daemon thread". The significance of
# this flag is that the entire Python program exits when only daemon
# threads are left.
self._job_thread.daemon = True
self._job_thread.start()
# Register with the parent ECU so ecu.stop() cascades to this instance.
# Done after the thread has started so a failed registration during
# shutdown is still recoverable by the user calling stop() directly.
try:
self._ca.register_dependent(self)
except Exception:
# If registration fails (e.g. ECU already stopping) we still want
# the user to be able to stop us manually; just log and continue.
logger.exception("Failed to register MemoryAccess with ECU")
[docs]
def stop(self, timeout: float = 2.0) -> None:
"""Stop the background servicer thread and release resources.
Idempotent: subsequent calls are no-ops. Safe to call from any
thread, including from inside ``ecu.stop()``'s cascade.
:param float timeout:
Maximum time in seconds to wait for the servicer thread to exit.
"""
with self._stop_lock:
if self._stopped:
return
self._stopped = True
# Signal shutdown and wake the servicer immediately so it does not
# have to wait out its full poll interval.
self._job_thread_end.set()
self._proceed_event.set()
if self._job_thread.is_alive():
self._job_thread.join(timeout=timeout)
# Best-effort cleanup of the CA-level subscription. If the CA/ECU
# is already torn down this may raise; that is fine.
try:
self._ca.unsubscribe(self._listen_for_dm14)
except Exception:
pass
# Best-effort removal from the ECU's dependent registry. If we are
# being called from inside the cascade this is a no-op (the registry
# has already been cleared); if we are being called explicitly it
# prevents a stale reference.
try:
self._ca.unregister_dependent(self)
except Exception:
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
self.stop()
return False
def __del__(self):
# Defensive backstop only. The primary cleanup paths are explicit
# stop() / context-manager exit / ecu.stop() cascade. Guard against
# partial __init__ (where _job_thread may not exist) and swallow all
# exceptions per the __del__ contract.
try:
if getattr(self, '_job_thread', None) is None:
return
self.stop()
except Exception:
pass
def _servicer(self):
"""
Job thread to service memory access requests.
Blocks on a threading.Event instead of busy-polling
"""
while not self._job_thread_end.is_set():
triggered = self._proceed_event.wait(timeout=1.0)
if self._job_thread_end.is_set():
return
if triggered and self.state == DMState.WAIT_RESPONSE:
self._proceed_event.clear()
if self._notify_query_received is not None:
self._notify_query_received() # notify incoming request
def _handle_error(self, priority: int, pgn: int, sa: int, timestamp: int, data: bytearray, error_code: int) -> None:
"""
Handles errors by resetting the state and unsubscribing from DM14 messages
:param priority: Priority of the message
:param pgn: Parameter Group Number of the message
:param sa: Source Address of the message
:param timestamp: Timestamp of the message
:param data: Data of the PDU
:param error_code: Error code to be set
"""
self.server.error = error_code
self.server.set_busy(True)
self.server.parse_dm14(
priority, pgn, sa, timestamp, data
)
self.server.set_busy(False)
self.reset()
def _listen_for_dm14(
self, priority: int, pgn: int, sa: int, timestamp: int, data: bytearray
) -> None:
"""
Listens for dm14 messages and passes them to the appropriate function
:param priority: Priority of the message
:param pgn: Parameter Group Number of the message
:param sa: Source Address of the message
:param timestamp: Timestamp of the message
:param data: Data of the PDU
"""
if pgn == j1939.ParameterGroupNumber.PGN.DM14:
match self.state:
case DMState.IDLE:
if self.server.state.value == DMState.IDLE.value:
self.state = DMState.REQUEST_STARTED
self.server.parse_dm14(priority, pgn, sa, timestamp, data)
if not self.seed_security:
self.state = DMState.WAIT_RESPONSE
self._ca.unsubscribe(self._listen_for_dm14)
if self._proceed_function is not None:
if self.server.address is None:
raise RuntimeError("server address must be set before calling proceed function")
proceed = self._proceed_function(
self.server.command,
int.from_bytes(
bytes=self.server.address,
byteorder="little",
signed=False,
),
self.server.pointer_type,
self.server.length,
self.server.object_count,
0xFFFF, # placeholder for key
self.server.sa,
self.server.access_level,
0x0, # placeholder for seed
) # call proceed function and pass in basic parameters
if not proceed:
self._handle_error(priority, pgn, sa, timestamp, data, 0x100)
else:
self._proceed_event.set()
else:
self._proceed_event.set() # no security, so always proceed
case DMState.REQUEST_STARTED:
self.server.parse_dm14(priority, pgn, sa, timestamp, data)
if self.server.state == j1939.ResponseState.SEND_PROCEED:
self.state = DMState.WAIT_RESPONSE
if self.seed_security:
if self.server.seed is None:
raise RuntimeError("server seed must be set before verifying key")
if self.server.key is None:
raise RuntimeError("server key must be set before verifying key")
if self.server.verify_key(
self.server.seed, self.server.key
):
if self._proceed_function is not None:
if self.server.address is None:
raise RuntimeError("server address must be set before calling proceed function")
proceed = self._proceed_function(
self.server.command,
int.from_bytes(
bytes=self.server.address,
byteorder="little",
signed=False,
),
self.server.pointer_type,
self.server.length,
self.server.object_count,
self.server.key,
self.server.sa,
self.server.access_level,
self.server.seed,
) # call proceed function and pass in basic parameters
if not proceed:
self._handle_error(priority, pgn, sa, timestamp, data, 0x100)
else:
self._proceed_event.set()
else:
self._proceed_event.set() # no proceed function, so always proceed
else:
self._handle_error(priority, pgn, sa, timestamp, data, 0x1003)
case DMState.WAIT_QUERY:
self.server.set_busy(True)
self.server.parse_dm14(priority, pgn, sa, timestamp, data)
self.server.set_busy(False)
case DMState.SERVER_CLEANUP:
self.state = DMState.IDLE
case _:
pass
[docs]
def respond(
self,
proceed: bool,
data: Optional[list] = None,
error: int = 0xFFFFFF,
edcp: int = 0xFF,
max_timeout: int = 3,
) -> Optional[list]:
"""
Responds with requested data and error code, if applicable, to a read request
:param bool proceed: whether the operation is good to proceed
:param list data: data to be sent to device
:param int error: error code to be sent to device
:param int edcp: value for edcp extension
:param int max_timeout: max timeout for transaction
"""
if data is None:
data = []
if self.state is not DMState.WAIT_RESPONSE:
return data
self._proceed_event.clear()
self._ca.unsubscribe(self._listen_for_dm14)
return_data = self.server.respond(proceed, data, error, edcp, max_timeout)
self.state = DMState.SERVER_CLEANUP if self.server.state.value != DMState.IDLE.value else DMState.IDLE
self._ca.subscribe(self._listen_for_dm14)
return return_data
[docs]
def read(
self,
dest_address: int,
direct: int,
address: int,
object_count: int,
object_byte_size: int = 1,
signed: bool = False,
return_raw_bytes: bool = False,
max_timeout: int = 1,
) -> list:
"""
Make a dm14 read Query
:param int dest_address: destination address of the message
:param int direct: direct address of the message
:param int address: address of the message
:param int object_count: number of objects to be read
:param int object_byte_size: size of each object in bytes
:param bool signed: whether the data is signed
:param bool return_raw_bytes: whether to return raw bytes or values
:param int max_timeout: max timeout for transaction
"""
if self.state == DMState.IDLE:
self.state = DMState.WAIT_QUERY
self.address = dest_address
data = self.query.read(
dest_address,
direct,
address,
object_count,
object_byte_size,
signed,
return_raw_bytes,
max_timeout,
)
self.reset()
return data
else:
self.reset()
raise RuntimeWarning("Process already Running")
[docs]
def write(
self,
dest_address: int,
direct: int,
address: int,
values: list,
object_byte_size: int = 1,
max_timeout: int = 1,
) -> None:
"""
Send a write query to dest_address, requesting to write values at address
:param int dest_address: destination address of the message
:param int direct: direct address of the message
:param int address: address of the message
:param list values: values to be written
:param int object_byte_size: size of each object in bytes
:param int max_timeout: max timeout for transaction
"""
if self.state == DMState.IDLE:
self.state = DMState.WAIT_QUERY
self.address = dest_address
self.query.write(
dest_address, direct, address, values, object_byte_size, max_timeout
)
self.reset()
[docs]
def set_seed_generator(self, seed_generator: Callable[[], int]) -> None:
"""
Sets seed generator function to use
:param seed_generator: seed generator function
"""
self.server.set_seed_generator(seed_generator)
[docs]
def set_seed_key_algorithm(self, algorithm: Callable[[int], int]) -> None:
"""
Sets seed-key algorithm to be used for key generation
:param algorithm: seed-key algorithm
"""
self.seed_security = True
self.query.set_seed_key_algorithm(algorithm)
self.server.set_seed_key_algorithm(algorithm)
[docs]
def set_verify_key(self, verify_key: Callable[..., bool]) -> None:
"""
Sets verify key function to be used for verifying the key
:param verify_key: verify key function
"""
self.server.set_verify_key(verify_key)
[docs]
def set_notify(self, notify: Callable[[], None]) -> None:
"""
Sets notify function to be used for notifying the user of memory accesses
:param notify: notify function
"""
self._notify_query_received = notify
[docs]
def set_proceed(self, proceed: Callable[..., bool]) -> None:
"""
Sets proceed function to determine if a memory query is valid or not
:param proceed: proceed function
"""
self._proceed_function = proceed
[docs]
def reset(self) -> None:
"""
Resets both server and query to remove transaction specific data
"""
self.state = DMState.IDLE
self._ca.unsubscribe(self._listen_for_dm14)
self._ca.subscribe(self._listen_for_dm14)
self.server.reset_server()
self.query.reset_query()
self._proceed_event.clear()