Examples

Example scripts demonstrating how to use the python-can-j1939 library.

Diagnostic Message

diagnostic_message.py
import logging
import time
import can
import j1939

logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)


def on_message(priority, pgn, sa, timestamp, data):
    """Receive incoming messages from the bus

    :param int priority:
        Priority of the message
    :param int pgn:
        Parameter Group Number of the message
    :param int sa:
        Source Address of the message
    :param int timestamp:
        Timestamp of the message
    :param bytearray data:
        Data of the PDU
    """
    pass


def dm1_receive(sa, lamp_status, dtc_dic_list, timestamp):
    """Receive incoming Dm1 messages from the bus

    :param int sa:
        Source Address of the message
    :param dic lamp_status:
        lamp status dictionary
        keys: 'pl', 'awl', 'rsl', 'mil'
        value: j1939.DtcLamp.OFF / .ON / .SLOW_FLASH / .ON_FAST_FLASH
    :param list of dic dtc_dic_list:
        DTC List
        keys: 'spn', 'fmi', 'oc'
    :param int timestamp:
        Timestamp of the message
    :param bytearray data:
        Data of the PDU
    """
    print('DM1 received from address:', sa, 'lamp status:', lamp_status, 'DTCs:', dtc_dic_list)


def dm1_before_send():
    """This function is called before a Dm1 message is sent to collect the data
 
    :return:
        lamp status 
    :rtype: dic: 'pl', 'awl', 'rsl', 'mil'
    
    :return:
        list of dictionaries of all DTCs included in DM1

    :rtype: list of dic: 'spn', 'fmi', 'oc', 'cm'
    """
    lamp_status = {}
    # get lamp status (optional, if status not enter, lamp is switched off)
    lamp_status['pl']  = j1939.DtcLamp.ON_FAST_FLASH
    lamp_status['awl'] = j1939.DtcLamp.ON_SLOW_FLASH
    lamp_status['rsl'] = j1939.DtcLamp.NA
    lamp_status['mil'] = j1939.DtcLamp.OFF

    # add all active DTCs
    # if no DTC is active return empty list
    # 'cm' is the SAE J1939-73 SPN conversion method (1, 2, 3, or 4).
    # Defaults to 4 (current standard) when omitted.
    dtc_list = []
    dtc_list.append({'spn': 123, 'fmi': 31})                     # CM defaults to 4
    dtc_list.append({'spn': 456, 'fmi': 1, 'oc': 132})           # with occurrence counter
    dtc_list.append({'spn': 789, 'fmi': 2, 'oc': 5, 'cm': 3})    # legacy CM 3 layout

    return lamp_status, dtc_list


def main():
    print("Initializing")

    # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
    ecu = j1939.ElectronicControlUnit()

    # Connect to the CAN bus
    # Arguments are passed to python-can's can.interface.Bus() constructor
    # (see https://python-can.readthedocs.io/en/stable/bus.html).
    # ecu.connect(interface='socketcan', channel='can0')
    # ecu.connect(interface='kvaser', channel=0, bitrate=250000)
    ecu.connect(interface='pcan', channel='PCAN_USBBUS1', bitrate=250000)
    # ecu.connect(interface='ixxat', channel=0, bitrate=250000)
    # ecu.connect(interface='vector', app_name='CANalyzer', channel=0, bitrate=250000)
    # ecu.connect(interface='nican', channel='CAN0', bitrate=250000)   

    # subscribe to all (global) messages on the bus
    ecu.subscribe(on_message)

    # name descriptor for the ca
    CA_NAME = j1939.Name(
        arbitrary_address_capable=0,
        industry_group=j1939.Name.IndustryGroup.Industrial,
        vehicle_system_instance=1,
        vehicle_system=1,
        function=1,
        function_instance=1,
        ecu_instance=1,
        manufacturer_code=666,
        identity_number=1234567
        )
    ca = j1939.ControllerApplication(CA_NAME, 0xF1)
    # add CA to the ECU
    ecu.add_ca(controller_application=ca)
    # by starting the CA it starts the address claiming procedure on the bus
    ca.start()


    # create the instance of the Dm1 to be able to receive active DTCs
    Dm1_rec = j1939.Dm1(ca)
    # subscribe to DM1-messages on the bus
    Dm1_rec.subscribe(dm1_receive)

    # create the instance of the Dm1 to be able to send active DTCs
    Dm1_snd = j1939.Dm1(ca)
    # start sending Dm1-message from source-id 10
    Dm1_snd.start_send(callback=dm1_before_send)


    time.sleep(120)

    print("Deinitializing")
    ecu.disconnect()

if __name__ == '__main__':
    main()

J1939 Receive

j1939_receive.py
import logging
import time
import can
import j1939
from hexdump import hexdump

logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)

MY_ADDR = 1
# compose the name descriptor for the new ca
name = j1939.Name(
    arbitrary_address_capable=0,
    industry_group=j1939.Name.IndustryGroup.Industrial,
    vehicle_system_instance=1,
    vehicle_system=1,
    function=1,
    function_instance=1,
    ecu_instance=1,
    manufacturer_code=666,
    identity_number=1234567
    )

# create the ControllerApplications
ca = j1939.ControllerApplication(name, MY_ADDR)

def on_message(priority, pgn, sa, timestamp, data):
    """Receive incoming messages from the bus

    :param int priority:
        Priority of the message
    :param int pgn:
        Parameter Group Number of the message
    :param int sa:
        Source Address of the message
    :param int timestamp:
        Timestamp of the message
    :param bytearray data:
        Data of the PDU
    """
    print(f"PGN {pgn} length {len(data)} source {hex(sa)} time {timestamp} my_addr {hex(MY_ADDR)}")
    # print(hexdump(data))

def main():
    print("Initializing")

    # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
    ecu = j1939.ElectronicControlUnit()

    # Connect to the CAN bus
    # Arguments are passed to python-can's can.interface.Bus() constructor
    # (see https://python-can.readthedocs.io/en/stable/bus.html).
    ecu.connect(interface='socketcan', channel='can0')
    # ecu.connect(interface='kvaser', channel=0, bitrate=250000)
    # ecu.connect(interface='pcan', channel='PCAN_USBBUS1', bitrate=250000)
    # ecu.connect(interface='ixxat', channel=0, bitrate=250000)
    # ecu.connect(interface='vector', app_name='CANalyzer', channel=0, bitrate=250000)
    # ecu.connect(interface='nican', channel='CAN0', bitrate=250000)

    # add CA to the ECU
    ecu.add_ca(controller_application=ca)
    ca.subscribe(on_message)
    ca.start()

    print("Initialized")

    time.sleep(300)

    print("Deinitializing")
    ca.stop()
    ecu.disconnect()

if __name__ == '__main__':
    main()

J1939 Send

j1939_send.py
import logging
import time
import can
import j1939
import os
from hexdump import hexdump

logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)

MY_ADDR = 0x03
MAX_PACKET_SIZE = 1785

# compose the name descriptor for the new ca
name = j1939.Name(
    arbitrary_address_capable=1,
    industry_group=j1939.Name.IndustryGroup.Industrial,
    vehicle_system_instance=1,
    vehicle_system=1,
    function=1,
    function_instance=1,
    ecu_instance=1,
    manufacturer_code=666,
    identity_number=1234567
    )

# create the ControllerApplications
ca = j1939.ControllerApplication(name, MY_ADDR)

def ca_receive(priority, pgn, source, timestamp, data):
    """Feed incoming message to this CA.
    (OVERLOADED function)
    :param int priority:
        Priority of the message
    :param int pgn:
        Parameter Group Number of the message
    :param intsa:
        Source Address of the message
    :param int timestamp:
        Timestamp of the message
    :param bytearray data:
        Data of the PDU
    """
    print(f"PGN {pgn} length {len(data)} source {source} time {timestamp} my_addr {hex(MY_ADDR)}")
    print(hexdump(data))

def ca_send_broadcast_pgn(size=100):
    # wait until we have our device_address
    while ca.state != j1939.ControllerApplication.State.NORMAL:
        time.sleep(1)
        continue

    print(f"sending {size} bytes")
    # create custom length data
    # data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * size
    data = [0x01] * size

    # sending normal broadcast message
    ca.send_pgn(0, 0xFD, 0xED, 6, data)
    print(f"sent {size} bytes to broadcast")

    return True

def ca_send_direct_pgn(dest, size=100):
    # wait until we have our device_address
    while ca.state != j1939.ControllerApplication.State.NORMAL:
        time.sleep(1)
        continue

    # create custom length data
    data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * size

    # sending normal peer-to-peer message
    ca.send_pgn(0, 0xE0, dest, 6, data)
    print(f"sent {size} bytes to {hex(dest)}")
    return True

def ca_timer_callback1(cookie):
    """Callback for sending messages

    This callback is registered at the ECU timer event mechanism to be
    executed every 500ms.

    :param cookie:
        A cookie registered at 'add_timer'. May be None.
    """
    # wait until we have our device_address
    if ca.state != j1939.ControllerApplication.State.NORMAL:
        # returning true keeps the timer event active
        return True

    ca_send_direct_pgn(0x1, 100)
    ca_send_direct_pgn(0x2, 100)

    # returning true keeps the timer event active
    return True

def main():
    print("Initializing")

    # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
    ecu = j1939.ElectronicControlUnit()

    # Connect to the CAN bus
    # Arguments are passed to python-can's can.interface.Bus() constructor
    # (see https://python-can.readthedocs.io/en/stable/bus.html).
    ecu.connect(interface='socketcan', channel='can0')
    # ecu.connect(interface='kvaser', channel=0, bitrate=250000)
    # ecu.connect(interface='pcan', channel='PCAN_USBBUS1', bitrate=250000)
    # ecu.connect(interface='ixxat', channel=0, bitrate=250000)
    # ecu.connect(interface='vector', app_name='CANalyzer', channel=0, bitrate=250000)
    # ecu.connect(interface='nican', channel='CAN0', bitrate=250000)

    # add CA to the ECU
    ecu.add_ca(controller_application=ca)
    ca.subscribe(ca_receive)

    # setup periodic message callbacks
    ca.add_timer(2, ca_timer_callback1)

    # by starting the CA it starts the address claiming procedure on the bus
    ca.start()
    print("waiting for addr ...")

    time.sleep(120)

    print("Deinitializing")
    ca.stop()
    ecu.disconnect()

if __name__ == '__main__':
    main()

J1939 22 Multi Pg

j1939_22_multi_pg.py
import logging
import time
import j1939
from j1939.message_id import FrameFormat


logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)


def on_message(priority, pgn, sa, timestamp, data):
    """Receive incoming messages from the bus

    :param int priority:
        Priority of the message
    :param int pgn:
        Parameter Group Number of the message
    :param int sa:
        Source Address of the message
    :param int timestamp:
        Timestamp of the message
    :param bytearray data:
        Data of the PDU
    """
    print("PGN {} length {}".format(pgn, len(data)), timestamp)


def ca_timer_callback1(ca : j1939.ControllerApplication):
    """Callback for sending messages

    This callback is registered at the ECU timer event mechanism to be
    executed every 500ms.

    :param cookie:
        A cookie registered at 'add_timer'. May be None.
    """
    # wait until we have our device_address
    if ca.state != j1939.ControllerApplication.State.NORMAL:
        # returning true keeps the timer event active
        return True

    # create data with 20 bytes
    data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 20

    # sending broadcast message
    # the following two PGNs are packed into one multi-pg, due to time-limit of 10ms and same destination address (global)
    ca.send_pgn(0, 0xFD, 0xED, 6, data, time_limit=0.01, frame_format=FrameFormat.FBFF)    # Frame-Format FBFF
    ca.send_pgn(0, 0xFE, 0x32, 6, data, time_limit=0.01, frame_format=FrameFormat.FBFF)    # Frame-Format FBFF

    ca.send_pgn(0, 0xFA, 0xED, 6, data, time_limit=0.01)    # Frame-Format FEFF
    ca.send_pgn(0, 0xFB, 0x32, 6, data, time_limit=0.01)    # Frame-Format FEFF

    # sending peer-to-peer message, destintion address is 0x04
    # the following PGNs are transferred separately, because time limit == 0
    ca.send_pgn(0, 0xE0, 0x04, 6, data)
    ca.send_pgn(0, 0xD0, 0x04, 6, data)

    # returning true keeps the timer event active
    return True


def main():
    print("Initializing")

    # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications) with j1939-22 data link layer
    ecu = j1939.ElectronicControlUnit(data_link_layer='j1939-22', max_cmdt_packets=200)

    # can fd Baud: 500k/2M
    ecu.connect(interface='pcan', channel='PCAN_USBBUS3', fd=True,
                        f_clock_mhz=80, nom_brp=10, nom_tseg1=12, nom_tseg2=3, nom_sjw=1, data_brp=4, data_tseg1=7, data_tseg2=2, data_sjw=1)

    # subscribe to all (global) messages on the bus
    ecu.subscribe(on_message)

    # compose the name descriptor for the new ca
    name = j1939.Name(
        arbitrary_address_capable=0,
        industry_group=j1939.Name.IndustryGroup.Industrial,
        vehicle_system_instance=1,
        vehicle_system=1,
        function=1,
        function_instance=1,
        ecu_instance=1,
        manufacturer_code=666,
        identity_number=1234567
        )

    # create the ControllerApplications
    ca = j1939.ControllerApplication(name, 0x1)
    ecu.add_ca(controller_application=ca)
    # callback every 0.5s
    ca.add_timer(0.500, ca_timer_callback1, ca)
    ca.start()

    time.sleep(120)

    print("Deinitializing")
    ca.stop()
    ecu.disconnect()

if __name__ == '__main__':
    main()

J1939 22 Transport Protocols

j1939_22_transport_protocols.py
import logging
import time
import j1939


logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)


def on_message(priority, pgn, sa, timestamp, data):
    """Receive incoming messages from the bus

    :param int priority:
        Priority of the message
    :param int pgn:
        Parameter Group Number of the message
    :param int sa:
        Source Address of the message
    :param int timestamp:
        Timestamp of the message
    :param bytearray data:
        Data of the PDU
    """
    print("PGN {} length {}".format(pgn, len(data)), timestamp)


def ca_timer_callback1(ca):
    """Callback for sending messages

    This callback is registered at the ECU timer event mechanism to be
    executed every 500ms.

    :param cookie:
        A cookie registered at 'add_timer'. May be None.
    """
    # wait until we have our device_address
    if ca.state != j1939.ControllerApplication.State.NORMAL:
        # returning true keeps the timer event active
        return True

    # create data with 500 bytes
    data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 500

    # sending transport protocol broadcast message (BAM)
    successful = ca.send_pgn(0, 0xFD, 0xED, 6, data)
    if not successful:
        print( 'error occurred while sending BAM' )

    # sending transport protocol peer-to-peer message (rts/cts), destintion address is 0x04
    successful = ca.send_pgn(0, 0xE0, 0x04, 6, data)
    if not successful:
        print( 'error occurred while sending rts/cts message' )

    # returning true keeps the timer event active
    return True


def main():
    print("Initializing")

    # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications) with j1939-22 data link layer
    ecu = j1939.ElectronicControlUnit(data_link_layer='j1939-22', max_cmdt_packets=200)

    # can fd Baud: 500k/2M
    ecu.connect(interface='pcan', channel='PCAN_USBBUS1', fd=True,
                        f_clock_mhz=80, nom_brp=10, nom_tseg1=12, nom_tseg2=3, nom_sjw=1, data_brp=4, data_tseg1=7, data_tseg2=2, data_sjw=1)

    # subscribe to all (global) messages on the bus
    ecu.subscribe(on_message)

    # compose the name descriptor for the new ca
    name = j1939.Name(
        arbitrary_address_capable=0,
        industry_group=j1939.Name.IndustryGroup.Industrial,
        vehicle_system_instance=1,
        vehicle_system=1,
        function=1,
        function_instance=1,
        ecu_instance=1,
        manufacturer_code=666,
        identity_number=1234567
        )

    # create the ControllerApplications
    ca = j1939.ControllerApplication(name, 0x1)
    ecu.add_ca(controller_application=ca)
    # callback every 0.5s
    ca.add_timer(0.500, ca_timer_callback1, ca)
    ca.start()

    time.sleep(120)

    print("Deinitializing")
    ca.stop()
    ecu.disconnect()

if __name__ == '__main__':
    main()

Own Ca Producer

own_ca_producer.py
import logging
import time
import can
import j1939

logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)

# compose the name descriptor for the new ca
name = j1939.Name(
    arbitrary_address_capable=0, 
    industry_group=j1939.Name.IndustryGroup.Industrial,
    vehicle_system_instance=1,
    vehicle_system=1,
    function=1,
    function_instance=1,
    ecu_instance=1,
    manufacturer_code=666,
    identity_number=1234567
    )

# create the ControllerApplications
ca = j1939.ControllerApplication(name, 128)


def ca_receive(priority, pgn, source, timestamp, data):
    """Feed incoming message to this CA.
    (OVERLOADED function)
    :param int priority:
        Priority of the message
    :param int pgn:
        Parameter Group Number of the message
    :param intsa:
        Source Address of the message
    :param int timestamp:
        Timestamp of the message
    :param bytearray data:
        Data of the PDU
    """
    print("PGN {} length {}".format(pgn, len(data)))

def ca_timer_callback1(cookie):
    """Callback for sending messages

    This callback is registered at the ECU timer event mechanism to be 
    executed every 500ms.

    :param cookie:
        A cookie registered at 'add_timer'. May be None.
    """
    # wait until we have our device_address
    if ca.state != j1939.ControllerApplication.State.NORMAL:
        # returning true keeps the timer event active
        return True

    # create data with 8 bytes
    data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 8

    # sending normal broadcast message
    ca.send_pgn(0, 0xFD, 0xED, 6, data)

    # sending normal peer-to-peer message, destintion address is 0x04
    ca.send_pgn(0, 0xE0, 0x04, 6, data)

    # returning true keeps the timer event active
    return True


def ca_timer_callback2(cookie):
    """Callback for sending messages

    This callback is registered at the ECU timer event mechanism to be 
    executed every 500ms.

    :param cookie:
        A cookie registered at 'add_timer'. May be None.
    """
    # wait until we have our device_address
    if ca.state != j1939.ControllerApplication.State.NORMAL:
        # returning true keeps the timer event active
        return True

    # create data with 100 bytes
    data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 100

    # sending multipacket message with TP-BAM
    ca.send_pgn(0, 0xFE, 0xF6, 6, data)

    # sending multipacket message with TP-CMDT, destination address is 0x05
    ca.send_pgn(0, 0xD0, 0x05, 6, data)

    # returning true keeps the timer event active
    return True

def main():
    print("Initializing")

    # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
    ecu = j1939.ElectronicControlUnit()

    # Connect to the CAN bus
    # Arguments are passed to python-can's can.interface.Bus() constructor
    # (see https://python-can.readthedocs.io/en/stable/bus.html).
    # ecu.connect(interface='socketcan', channel='can0')
    # ecu.connect(interface='kvaser', channel=0, bitrate=250000)
    ecu.connect(interface='pcan', channel='PCAN_USBBUS1', bitrate=250000)
    # ecu.connect(interface='ixxat', channel=0, bitrate=250000)
    # ecu.connect(interface='vector', app_name='CANalyzer', channel=0, bitrate=250000)
    # ecu.connect(interface='nican', channel='CAN0', bitrate=250000)    
    # ecu.connect('testchannel_1', interface='virtual')

    # add CA to the ECU
    ecu.add_ca(controller_application=ca)
    ca.subscribe(ca_receive)
    # callback every 0.5s
    ca.add_timer(0.500, ca_timer_callback1)
    # callback every 5s
    ca.add_timer(5, ca_timer_callback2)
    # by starting the CA it starts the address claiming procedure on the bus
    ca.start()
                        
    time.sleep(120)

    print("Deinitializing")
    ca.stop()
    ecu.disconnect()

if __name__ == '__main__':
    main()     

Simple Receive Global

simple_receive_global.py
import logging
import time
import can
import j1939

logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)

def on_message(priority, pgn, sa, timestamp, data):
    """Receive incoming messages from the bus

    :param int priority:
        Priority of the message
    :param int pgn:
        Parameter Group Number of the message
    :param int sa:
        Source Address of the message
    :param int timestamp:
        Timestamp of the message
    :param bytearray data:
        Data of the PDU
    """
    print("PGN {} length {}".format(pgn, len(data)))

def main():
    print("Initializing")

    # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
    ecu = j1939.ElectronicControlUnit()

    # Connect to the CAN bus
    # Arguments are passed to python-can's can.interface.Bus() constructor
    # (see https://python-can.readthedocs.io/en/stable/bus.html).
    # ecu.connect(interface='socketcan', channel='can0')
    # ecu.connect(interface='kvaser', channel=0, bitrate=250000)
    ecu.connect(interface='pcan', channel='PCAN_USBBUS1', bitrate=250000)
    # ecu.connect(interface='ixxat', channel=0, bitrate=250000)
    # ecu.connect(interface='vector', app_name='CANalyzer', channel=0, bitrate=250000)
    # ecu.connect(interface='nican', channel='CAN0', bitrate=250000)    

    # subscribe to all (global) messages on the bus
    ecu.subscribe(on_message)

    time.sleep(120)

    print("Deinitializing")
    ecu.disconnect()

if __name__ == '__main__':
    main()

Simple Receive Peer To Peer

simple_receive_peer_to_peer.py
import logging
import time
import can
import j1939

logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)

def on_message(priority, pgn, sa, timestamp, data):
    """Receive incoming messages from the bus

    :param int priority:
        Priority of the message
    :param int pgn:
        Parameter Group Number of the message
    :param int sa:
        Source Address of the message
    :param int timestamp:
        Timestamp of the message
    :param bytearray data:
        Data of the PDU
    """
    print("PGN {} length {}".format(hex(pgn), len(data)))

def main():
    print("Initializing")

    # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
    ecu = j1939.ElectronicControlUnit()

    # Connect to the CAN bus
    # Arguments are passed to python-can's can.interface.Bus() constructor
    # (see https://python-can.readthedocs.io/en/stable/bus.html).
    # ecu.connect(interface='socketcan', channel='can0')
    # ecu.connect(interface='kvaser', channel=0, bitrate=250000)
    ecu.connect(interface='pcan', channel='PCAN_USBBUS1', bitrate=500000)
    # ecu.connect(interface='ixxat', channel=0, bitrate=250000)
    # ecu.connect(interface='vector', app_name='CANalyzer', channel=0, bitrate=250000)
    # ecu.connect(interface='nican', channel='CAN0', bitrate=250000)

    # subscribe to all global and peer-to-peer messages with destination 0xFA
    ecu.subscribe(on_message, 0xFA)

    time.sleep(120)

    print("Deinitializing")
    ecu.disconnect()

if __name__ == '__main__':
    main()