Examples
Example scripts demonstrating how to use the python-can-j1939 library.
Diagnostic Message
diagnostic_message.py
import logging
import time
import can
import j1939
logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)
def on_message(priority, pgn, sa, timestamp, data):
"""Receive incoming messages from the bus
:param int priority:
Priority of the message
:param int pgn:
Parameter Group Number of the message
:param int sa:
Source Address of the message
:param int timestamp:
Timestamp of the message
:param bytearray data:
Data of the PDU
"""
pass
def dm1_receive(sa, lamp_status, dtc_dic_list, timestamp):
"""Receive incoming Dm1 messages from the bus
:param int sa:
Source Address of the message
:param dic lamp_status:
lamp status dictionary
keys: 'pl', 'awl', 'rsl', 'mil'
value: j1939.DtcLamp.OFF / .ON / .SLOW_FLASH / .ON_FAST_FLASH
:param list of dic dtc_dic_list:
DTC List
keys: 'spn', 'fmi', 'oc'
:param int timestamp:
Timestamp of the message
:param bytearray data:
Data of the PDU
"""
print('DM1 received from address:', sa, 'lamp status:', lamp_status, 'DTCs:', dtc_dic_list)
def dm1_before_send():
"""This function is called before a Dm1 message is sent to collect the data
:return:
lamp status
:rtype: dic: 'pl', 'awl', 'rsl', 'mil'
:return:
list of dictionaries of all DTCs included in DM1
:rtype: list of dic: 'spn', 'fmi', 'oc', 'cm'
"""
lamp_status = {}
# get lamp status (optional, if status not enter, lamp is switched off)
lamp_status['pl'] = j1939.DtcLamp.ON_FAST_FLASH
lamp_status['awl'] = j1939.DtcLamp.ON_SLOW_FLASH
lamp_status['rsl'] = j1939.DtcLamp.NA
lamp_status['mil'] = j1939.DtcLamp.OFF
# add all active DTCs
# if no DTC is active return empty list
# 'cm' is the SAE J1939-73 SPN conversion method (1, 2, 3, or 4).
# Defaults to 4 (current standard) when omitted.
dtc_list = []
dtc_list.append({'spn': 123, 'fmi': 31}) # CM defaults to 4
dtc_list.append({'spn': 456, 'fmi': 1, 'oc': 132}) # with occurrence counter
dtc_list.append({'spn': 789, 'fmi': 2, 'oc': 5, 'cm': 3}) # legacy CM 3 layout
return lamp_status, dtc_list
def main():
print("Initializing")
# create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
ecu = j1939.ElectronicControlUnit()
# Connect to the CAN bus
# Arguments are passed to python-can's can.interface.Bus() constructor
# (see https://python-can.readthedocs.io/en/stable/bus.html).
# ecu.connect(interface='socketcan', channel='can0')
# ecu.connect(interface='kvaser', channel=0, bitrate=250000)
ecu.connect(interface='pcan', channel='PCAN_USBBUS1', bitrate=250000)
# ecu.connect(interface='ixxat', channel=0, bitrate=250000)
# ecu.connect(interface='vector', app_name='CANalyzer', channel=0, bitrate=250000)
# ecu.connect(interface='nican', channel='CAN0', bitrate=250000)
# subscribe to all (global) messages on the bus
ecu.subscribe(on_message)
# name descriptor for the ca
CA_NAME = j1939.Name(
arbitrary_address_capable=0,
industry_group=j1939.Name.IndustryGroup.Industrial,
vehicle_system_instance=1,
vehicle_system=1,
function=1,
function_instance=1,
ecu_instance=1,
manufacturer_code=666,
identity_number=1234567
)
ca = j1939.ControllerApplication(CA_NAME, 0xF1)
# add CA to the ECU
ecu.add_ca(controller_application=ca)
# by starting the CA it starts the address claiming procedure on the bus
ca.start()
# create the instance of the Dm1 to be able to receive active DTCs
Dm1_rec = j1939.Dm1(ca)
# subscribe to DM1-messages on the bus
Dm1_rec.subscribe(dm1_receive)
# create the instance of the Dm1 to be able to send active DTCs
Dm1_snd = j1939.Dm1(ca)
# start sending Dm1-message from source-id 10
Dm1_snd.start_send(callback=dm1_before_send)
time.sleep(120)
print("Deinitializing")
ecu.disconnect()
if __name__ == '__main__':
main()
J1939 Receive
j1939_receive.py
import logging
import time
import can
import j1939
from hexdump import hexdump
logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)
MY_ADDR = 1
# compose the name descriptor for the new ca
name = j1939.Name(
arbitrary_address_capable=0,
industry_group=j1939.Name.IndustryGroup.Industrial,
vehicle_system_instance=1,
vehicle_system=1,
function=1,
function_instance=1,
ecu_instance=1,
manufacturer_code=666,
identity_number=1234567
)
# create the ControllerApplications
ca = j1939.ControllerApplication(name, MY_ADDR)
def on_message(priority, pgn, sa, timestamp, data):
"""Receive incoming messages from the bus
:param int priority:
Priority of the message
:param int pgn:
Parameter Group Number of the message
:param int sa:
Source Address of the message
:param int timestamp:
Timestamp of the message
:param bytearray data:
Data of the PDU
"""
print(f"PGN {pgn} length {len(data)} source {hex(sa)} time {timestamp} my_addr {hex(MY_ADDR)}")
# print(hexdump(data))
def main():
print("Initializing")
# create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
ecu = j1939.ElectronicControlUnit()
# Connect to the CAN bus
# Arguments are passed to python-can's can.interface.Bus() constructor
# (see https://python-can.readthedocs.io/en/stable/bus.html).
ecu.connect(interface='socketcan', channel='can0')
# ecu.connect(interface='kvaser', channel=0, bitrate=250000)
# ecu.connect(interface='pcan', channel='PCAN_USBBUS1', bitrate=250000)
# ecu.connect(interface='ixxat', channel=0, bitrate=250000)
# ecu.connect(interface='vector', app_name='CANalyzer', channel=0, bitrate=250000)
# ecu.connect(interface='nican', channel='CAN0', bitrate=250000)
# add CA to the ECU
ecu.add_ca(controller_application=ca)
ca.subscribe(on_message)
ca.start()
print("Initialized")
time.sleep(300)
print("Deinitializing")
ca.stop()
ecu.disconnect()
if __name__ == '__main__':
main()
J1939 Send
j1939_send.py
import logging
import time
import can
import j1939
import os
from hexdump import hexdump
logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)
MY_ADDR = 0x03
MAX_PACKET_SIZE = 1785
# compose the name descriptor for the new ca
name = j1939.Name(
arbitrary_address_capable=1,
industry_group=j1939.Name.IndustryGroup.Industrial,
vehicle_system_instance=1,
vehicle_system=1,
function=1,
function_instance=1,
ecu_instance=1,
manufacturer_code=666,
identity_number=1234567
)
# create the ControllerApplications
ca = j1939.ControllerApplication(name, MY_ADDR)
def ca_receive(priority, pgn, source, timestamp, data):
"""Feed incoming message to this CA.
(OVERLOADED function)
:param int priority:
Priority of the message
:param int pgn:
Parameter Group Number of the message
:param intsa:
Source Address of the message
:param int timestamp:
Timestamp of the message
:param bytearray data:
Data of the PDU
"""
print(f"PGN {pgn} length {len(data)} source {source} time {timestamp} my_addr {hex(MY_ADDR)}")
print(hexdump(data))
def ca_send_broadcast_pgn(size=100):
# wait until we have our device_address
while ca.state != j1939.ControllerApplication.State.NORMAL:
time.sleep(1)
continue
print(f"sending {size} bytes")
# create custom length data
# data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * size
data = [0x01] * size
# sending normal broadcast message
ca.send_pgn(0, 0xFD, 0xED, 6, data)
print(f"sent {size} bytes to broadcast")
return True
def ca_send_direct_pgn(dest, size=100):
# wait until we have our device_address
while ca.state != j1939.ControllerApplication.State.NORMAL:
time.sleep(1)
continue
# create custom length data
data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * size
# sending normal peer-to-peer message
ca.send_pgn(0, 0xE0, dest, 6, data)
print(f"sent {size} bytes to {hex(dest)}")
return True
def ca_timer_callback1(cookie):
"""Callback for sending messages
This callback is registered at the ECU timer event mechanism to be
executed every 500ms.
:param cookie:
A cookie registered at 'add_timer'. May be None.
"""
# wait until we have our device_address
if ca.state != j1939.ControllerApplication.State.NORMAL:
# returning true keeps the timer event active
return True
ca_send_direct_pgn(0x1, 100)
ca_send_direct_pgn(0x2, 100)
# returning true keeps the timer event active
return True
def main():
print("Initializing")
# create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
ecu = j1939.ElectronicControlUnit()
# Connect to the CAN bus
# Arguments are passed to python-can's can.interface.Bus() constructor
# (see https://python-can.readthedocs.io/en/stable/bus.html).
ecu.connect(interface='socketcan', channel='can0')
# ecu.connect(interface='kvaser', channel=0, bitrate=250000)
# ecu.connect(interface='pcan', channel='PCAN_USBBUS1', bitrate=250000)
# ecu.connect(interface='ixxat', channel=0, bitrate=250000)
# ecu.connect(interface='vector', app_name='CANalyzer', channel=0, bitrate=250000)
# ecu.connect(interface='nican', channel='CAN0', bitrate=250000)
# add CA to the ECU
ecu.add_ca(controller_application=ca)
ca.subscribe(ca_receive)
# setup periodic message callbacks
ca.add_timer(2, ca_timer_callback1)
# by starting the CA it starts the address claiming procedure on the bus
ca.start()
print("waiting for addr ...")
time.sleep(120)
print("Deinitializing")
ca.stop()
ecu.disconnect()
if __name__ == '__main__':
main()
J1939 22 Multi Pg
j1939_22_multi_pg.py
import logging
import time
import j1939
from j1939.message_id import FrameFormat
logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)
def on_message(priority, pgn, sa, timestamp, data):
"""Receive incoming messages from the bus
:param int priority:
Priority of the message
:param int pgn:
Parameter Group Number of the message
:param int sa:
Source Address of the message
:param int timestamp:
Timestamp of the message
:param bytearray data:
Data of the PDU
"""
print("PGN {} length {}".format(pgn, len(data)), timestamp)
def ca_timer_callback1(ca : j1939.ControllerApplication):
"""Callback for sending messages
This callback is registered at the ECU timer event mechanism to be
executed every 500ms.
:param cookie:
A cookie registered at 'add_timer'. May be None.
"""
# wait until we have our device_address
if ca.state != j1939.ControllerApplication.State.NORMAL:
# returning true keeps the timer event active
return True
# create data with 20 bytes
data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 20
# sending broadcast message
# the following two PGNs are packed into one multi-pg, due to time-limit of 10ms and same destination address (global)
ca.send_pgn(0, 0xFD, 0xED, 6, data, time_limit=0.01, frame_format=FrameFormat.FBFF) # Frame-Format FBFF
ca.send_pgn(0, 0xFE, 0x32, 6, data, time_limit=0.01, frame_format=FrameFormat.FBFF) # Frame-Format FBFF
ca.send_pgn(0, 0xFA, 0xED, 6, data, time_limit=0.01) # Frame-Format FEFF
ca.send_pgn(0, 0xFB, 0x32, 6, data, time_limit=0.01) # Frame-Format FEFF
# sending peer-to-peer message, destintion address is 0x04
# the following PGNs are transferred separately, because time limit == 0
ca.send_pgn(0, 0xE0, 0x04, 6, data)
ca.send_pgn(0, 0xD0, 0x04, 6, data)
# returning true keeps the timer event active
return True
def main():
print("Initializing")
# create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications) with j1939-22 data link layer
ecu = j1939.ElectronicControlUnit(data_link_layer='j1939-22', max_cmdt_packets=200)
# can fd Baud: 500k/2M
ecu.connect(interface='pcan', channel='PCAN_USBBUS3', fd=True,
f_clock_mhz=80, nom_brp=10, nom_tseg1=12, nom_tseg2=3, nom_sjw=1, data_brp=4, data_tseg1=7, data_tseg2=2, data_sjw=1)
# subscribe to all (global) messages on the bus
ecu.subscribe(on_message)
# compose the name descriptor for the new ca
name = j1939.Name(
arbitrary_address_capable=0,
industry_group=j1939.Name.IndustryGroup.Industrial,
vehicle_system_instance=1,
vehicle_system=1,
function=1,
function_instance=1,
ecu_instance=1,
manufacturer_code=666,
identity_number=1234567
)
# create the ControllerApplications
ca = j1939.ControllerApplication(name, 0x1)
ecu.add_ca(controller_application=ca)
# callback every 0.5s
ca.add_timer(0.500, ca_timer_callback1, ca)
ca.start()
time.sleep(120)
print("Deinitializing")
ca.stop()
ecu.disconnect()
if __name__ == '__main__':
main()
J1939 22 Transport Protocols
j1939_22_transport_protocols.py
import logging
import time
import j1939
logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)
def on_message(priority, pgn, sa, timestamp, data):
"""Receive incoming messages from the bus
:param int priority:
Priority of the message
:param int pgn:
Parameter Group Number of the message
:param int sa:
Source Address of the message
:param int timestamp:
Timestamp of the message
:param bytearray data:
Data of the PDU
"""
print("PGN {} length {}".format(pgn, len(data)), timestamp)
def ca_timer_callback1(ca):
"""Callback for sending messages
This callback is registered at the ECU timer event mechanism to be
executed every 500ms.
:param cookie:
A cookie registered at 'add_timer'. May be None.
"""
# wait until we have our device_address
if ca.state != j1939.ControllerApplication.State.NORMAL:
# returning true keeps the timer event active
return True
# create data with 500 bytes
data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 500
# sending transport protocol broadcast message (BAM)
successful = ca.send_pgn(0, 0xFD, 0xED, 6, data)
if not successful:
print( 'error occurred while sending BAM' )
# sending transport protocol peer-to-peer message (rts/cts), destintion address is 0x04
successful = ca.send_pgn(0, 0xE0, 0x04, 6, data)
if not successful:
print( 'error occurred while sending rts/cts message' )
# returning true keeps the timer event active
return True
def main():
print("Initializing")
# create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications) with j1939-22 data link layer
ecu = j1939.ElectronicControlUnit(data_link_layer='j1939-22', max_cmdt_packets=200)
# can fd Baud: 500k/2M
ecu.connect(interface='pcan', channel='PCAN_USBBUS1', fd=True,
f_clock_mhz=80, nom_brp=10, nom_tseg1=12, nom_tseg2=3, nom_sjw=1, data_brp=4, data_tseg1=7, data_tseg2=2, data_sjw=1)
# subscribe to all (global) messages on the bus
ecu.subscribe(on_message)
# compose the name descriptor for the new ca
name = j1939.Name(
arbitrary_address_capable=0,
industry_group=j1939.Name.IndustryGroup.Industrial,
vehicle_system_instance=1,
vehicle_system=1,
function=1,
function_instance=1,
ecu_instance=1,
manufacturer_code=666,
identity_number=1234567
)
# create the ControllerApplications
ca = j1939.ControllerApplication(name, 0x1)
ecu.add_ca(controller_application=ca)
# callback every 0.5s
ca.add_timer(0.500, ca_timer_callback1, ca)
ca.start()
time.sleep(120)
print("Deinitializing")
ca.stop()
ecu.disconnect()
if __name__ == '__main__':
main()
Own Ca Producer
own_ca_producer.py
import logging
import time
import can
import j1939
logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)
# compose the name descriptor for the new ca
name = j1939.Name(
arbitrary_address_capable=0,
industry_group=j1939.Name.IndustryGroup.Industrial,
vehicle_system_instance=1,
vehicle_system=1,
function=1,
function_instance=1,
ecu_instance=1,
manufacturer_code=666,
identity_number=1234567
)
# create the ControllerApplications
ca = j1939.ControllerApplication(name, 128)
def ca_receive(priority, pgn, source, timestamp, data):
"""Feed incoming message to this CA.
(OVERLOADED function)
:param int priority:
Priority of the message
:param int pgn:
Parameter Group Number of the message
:param intsa:
Source Address of the message
:param int timestamp:
Timestamp of the message
:param bytearray data:
Data of the PDU
"""
print("PGN {} length {}".format(pgn, len(data)))
def ca_timer_callback1(cookie):
"""Callback for sending messages
This callback is registered at the ECU timer event mechanism to be
executed every 500ms.
:param cookie:
A cookie registered at 'add_timer'. May be None.
"""
# wait until we have our device_address
if ca.state != j1939.ControllerApplication.State.NORMAL:
# returning true keeps the timer event active
return True
# create data with 8 bytes
data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 8
# sending normal broadcast message
ca.send_pgn(0, 0xFD, 0xED, 6, data)
# sending normal peer-to-peer message, destintion address is 0x04
ca.send_pgn(0, 0xE0, 0x04, 6, data)
# returning true keeps the timer event active
return True
def ca_timer_callback2(cookie):
"""Callback for sending messages
This callback is registered at the ECU timer event mechanism to be
executed every 500ms.
:param cookie:
A cookie registered at 'add_timer'. May be None.
"""
# wait until we have our device_address
if ca.state != j1939.ControllerApplication.State.NORMAL:
# returning true keeps the timer event active
return True
# create data with 100 bytes
data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 100
# sending multipacket message with TP-BAM
ca.send_pgn(0, 0xFE, 0xF6, 6, data)
# sending multipacket message with TP-CMDT, destination address is 0x05
ca.send_pgn(0, 0xD0, 0x05, 6, data)
# returning true keeps the timer event active
return True
def main():
print("Initializing")
# create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
ecu = j1939.ElectronicControlUnit()
# Connect to the CAN bus
# Arguments are passed to python-can's can.interface.Bus() constructor
# (see https://python-can.readthedocs.io/en/stable/bus.html).
# ecu.connect(interface='socketcan', channel='can0')
# ecu.connect(interface='kvaser', channel=0, bitrate=250000)
ecu.connect(interface='pcan', channel='PCAN_USBBUS1', bitrate=250000)
# ecu.connect(interface='ixxat', channel=0, bitrate=250000)
# ecu.connect(interface='vector', app_name='CANalyzer', channel=0, bitrate=250000)
# ecu.connect(interface='nican', channel='CAN0', bitrate=250000)
# ecu.connect('testchannel_1', interface='virtual')
# add CA to the ECU
ecu.add_ca(controller_application=ca)
ca.subscribe(ca_receive)
# callback every 0.5s
ca.add_timer(0.500, ca_timer_callback1)
# callback every 5s
ca.add_timer(5, ca_timer_callback2)
# by starting the CA it starts the address claiming procedure on the bus
ca.start()
time.sleep(120)
print("Deinitializing")
ca.stop()
ecu.disconnect()
if __name__ == '__main__':
main()
Simple Receive Global
simple_receive_global.py
import logging
import time
import can
import j1939
logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)
def on_message(priority, pgn, sa, timestamp, data):
"""Receive incoming messages from the bus
:param int priority:
Priority of the message
:param int pgn:
Parameter Group Number of the message
:param int sa:
Source Address of the message
:param int timestamp:
Timestamp of the message
:param bytearray data:
Data of the PDU
"""
print("PGN {} length {}".format(pgn, len(data)))
def main():
print("Initializing")
# create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
ecu = j1939.ElectronicControlUnit()
# Connect to the CAN bus
# Arguments are passed to python-can's can.interface.Bus() constructor
# (see https://python-can.readthedocs.io/en/stable/bus.html).
# ecu.connect(interface='socketcan', channel='can0')
# ecu.connect(interface='kvaser', channel=0, bitrate=250000)
ecu.connect(interface='pcan', channel='PCAN_USBBUS1', bitrate=250000)
# ecu.connect(interface='ixxat', channel=0, bitrate=250000)
# ecu.connect(interface='vector', app_name='CANalyzer', channel=0, bitrate=250000)
# ecu.connect(interface='nican', channel='CAN0', bitrate=250000)
# subscribe to all (global) messages on the bus
ecu.subscribe(on_message)
time.sleep(120)
print("Deinitializing")
ecu.disconnect()
if __name__ == '__main__':
main()
Simple Receive Peer To Peer
simple_receive_peer_to_peer.py
import logging
import time
import can
import j1939
logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)
def on_message(priority, pgn, sa, timestamp, data):
"""Receive incoming messages from the bus
:param int priority:
Priority of the message
:param int pgn:
Parameter Group Number of the message
:param int sa:
Source Address of the message
:param int timestamp:
Timestamp of the message
:param bytearray data:
Data of the PDU
"""
print("PGN {} length {}".format(hex(pgn), len(data)))
def main():
print("Initializing")
# create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
ecu = j1939.ElectronicControlUnit()
# Connect to the CAN bus
# Arguments are passed to python-can's can.interface.Bus() constructor
# (see https://python-can.readthedocs.io/en/stable/bus.html).
# ecu.connect(interface='socketcan', channel='can0')
# ecu.connect(interface='kvaser', channel=0, bitrate=250000)
ecu.connect(interface='pcan', channel='PCAN_USBBUS1', bitrate=500000)
# ecu.connect(interface='ixxat', channel=0, bitrate=250000)
# ecu.connect(interface='vector', app_name='CANalyzer', channel=0, bitrate=250000)
# ecu.connect(interface='nican', channel='CAN0', bitrate=250000)
# subscribe to all global and peer-to-peer messages with destination 0xFA
ecu.subscribe(on_message, 0xFA)
time.sleep(120)
print("Deinitializing")
ecu.disconnect()
if __name__ == '__main__':
main()