【python-can】ISO-TP通信でFunctional Addressで送信する

みなさま、故障診断していますか?

本記事はクルマの故障診断通信に使われるISO-TPについて理解しよう!pythonを使ってCAN通信のシミュレーションをしよう!という試みの記事。第6回目です。

CAN素人の筆者が書いたCAN関連記事は以下です。気になる方は見てみて下さい。

前回はISO-TPを使ってリクエストに対してレスポンスする通信シミュレーションを流してみました。

今回はファンクショナルアドレスの送信をしてみようという話です。

スポンサーリンク

ファンクショナルアドレスとは?

ISO-TPの通信にはPhysical AddressとFunctional Addressがあります。

Physical Address:1対1の通信です。

リクエスト側
リクエスト側

〇〇さーん!

レスポンス側
レスポンス側

はーい!

これがPhysical Addressです。

Functional Address:1対複数の通信です。

リクエスト側
リクエスト側

みなさーん!!

レスポンス側ユニット①
レスポンス側ユニット①

はーい!

レスポンス側ユニット②
レスポンス側ユニット②

はーい!

これがFunctional Addressです。

うっしゃシミュレーションすっぞ!!

基本は5回目のコードを流用します。

5回目:【python-can】ISO-TPのリクエストとレスポンスをシミュレーションするぞ!

リクエスト側の(can_send.py)とレスポンス側の(can_recv.py)を作ります。can_send.pyからファンクショナルアドレスで送信を行って、can_recv.pyにユニットAとユニットBから返答してもらうっていうイメージです。

実際のコード

リクエスト側(can_send.py)

import isotp
import logging
import time
import threading
import can

class ThreadedApp:
    def __init__(self, id_rx, id_tx):
        self.ext_requested = False
        #バス接続
        self.bus = can.interface.Bus(bustype='vector',
                                     channel=0,
                                     bitrate=500000,
                                     app_name='python-can')
        #ID,frame
        self.addr =  isotp.Address(isotp.AddressingMode.Normal_11bits,
                                   rxid=id_rx, txid=id_tx)
        #
        self.stack = isotp.CanStack(self.bus, address=self.addr,
                                    error_handler=self.my_error_handler)

    def start(self):
        self.exit_requested = False
        self.thread = threading.Thread(target = self.thread_task)
        self.thread.start()
    
    def stop(self):
        self.exit_requested = True
        if self.thread.isAlive():
            self.thread.join()

    def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

    def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state

    def shutdown(self):
      self.stop()
      self.bus.shutdown()

if __name__ == '__main__':
    #ユニットAのphysycalアドレスを設定 RxID:100 TxID:465
    UnitAPhys = ThreadedApp(0x100, 0x465)  #appをThreadedApp class化
    UnitAPhys.start()          #app classでstart関数を実行

    #ユニットAのfunctionalアドレスを設定 RxID:100 TxID:7FF
    UnitAFunc = ThreadedApp(0x100, 0x7FF)
    UnitAFunc.start()

    print('Waiting for payload - maximum 5 sec')

    #request message
    msg = b'\x01\x02\x03\x04'#Signal Frame
    #msg = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b'#Multi Frame

    #UnitAPhys.stack.send(msg)#physical Addressで呼び出し。
    time.sleep(1)

    UnitAFunc.stack.send(msg, isotp.TargetAddressType.Functional)#Functional Addressで呼び出し。
    time.sleep(0.2)

    print("Exiting")
    UnitAPhys.shutdown()
    UnitAFunc.shutdown()

色を変えているポイントが前回からの変更点ですね。変更点は、、、

・classの引数に送信ID、受信IDを設定できるようにした。

・Functional Addressを7FFとした。

・isotp.TargetAddressType.FunctionalとすることでFunctional Addressで送信

くらいです。

レスポンス側(can_recv.py)

import isotp
import logging
import time
import threading
import can

class ThreadedApp:
    def __init__(self, id_rx, id_tx):
        self.ext_requested = False
        #バス接続
        self.bus = can.interface.Bus(bustype='vector',
                                     channel=0,
                                     bitrate=500000,
                                     app_name='python-can')
        #ID,frame
        self.addr =  isotp.Address(isotp.AddressingMode.Normal_11bits,
                                   rxid=id_rx,txid=id_tx)

        self.stack = isotp.CanStack(self.bus, address=self.addr,
                                    error_handler=self.my_error_handler)

    def start(self):
        self.exit_requested = False
        self.thread = threading.Thread(target = self.thread_task)
        self.thread.start()
    
    def stop(self):
        self.exit_requested = True
        if self.thread.isAlive():
            self.thread.join()

    def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

    def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state

    def shutdown(self):
      self.stop()
      self.bus.shutdown()

if __name__ == '__main__':
    #ユニットAのphysycalアドレスを設定 RxID:465 TxID:100
    UnitAPhys = ThreadedApp(0x465, 0x100)     #physをThreadedApp class化
    UnitAPhys.start()                         #phys classでstart関数を実行

    #ユニットAのfunctionalアドレスを設定 RxID:7FF TxID:100
    UnitAFunc = ThreadedApp(0x7FF, 0x100)
    UnitAFunc.start()

    #ユニットBのfunctionalアドレスを設定 RxID:7FF TxID:200
    UnitBFunc = ThreadedApp(0x7FF, 0x200)
    UnitBFunc.start()

    print('Waiting for payload - maximum 5 sec')

    t1 = time.time()
    #response message
    msg1 = b'\x01\x02\x03\x04'
    msg2 = b'\x09\x08\x07\x06'

    while time.time() - t1 < 5:

        if UnitAPhys.stack.recv() != None:
            UnitAPhys.stack.send(msg1)

        if UnitAFunc.stack.recv() != None:
            UnitAFunc.stack.send(msg1)

        if UnitBFunc.stack.recv() != None:
            UnitBFunc.stack.send(msg2)

    time.sleep(0.2)

    print("Exiting")
    UnitAPhys.shutdown()
    UnitAFunc.shutdown()
    UnitBFunc.shutdown()

ユニットAとユニットBを準備してそれぞれのFunctional Addressを7FF、送信IDは100と200としました。

これで7FFを一回送ると100と200を一気に返答してほしい感じです。

実行結果

今回もコマンドプロンプトを2個開いてcan_recv.pyとcan_send.pyを実行します。

実行結果はBUSMASTERで確認しました↓。

うん!いいですね!!7FFのファンクショナルアドレスに対して、100(ユニットA)と200(ユニットB)が一度に返答してきますね。

オッケーです。

今日はそんな感じです。終わります。