【python-can】ISO-TPのリクエストとレスポンスをシミュレーションするぞ!

みなさま、故障診断していますか?

本記事はクルマの故障診断通信に使われるISO-TPについて理解しよう!pythonを使ってCAN通信のシミュレーションをしよう!という試みの記事。第5回目です。

CAN素人の筆者が書いたCAN関連記事は以下です。気になる方は見てみて下さい。

前回はとりあえずISO-TPを使った通信シミュレーションを流してみました。

今回はリクエストとレスポンスをシミュレーションしてみようと思います。

あ?リクエスト?レスポンス?なにいってんだこいつ?

と、わからないことにイラつく方は同じようにわからない人間が書いた記事をまぁ読んでいってみて下さいな。

スポンサーリンク

リクエストとレスポンス

ISO-TPを使うときは基本リクエストとレスポンスがセットになってます。

例えば、クルマの故障診断です。

クルマの故障診断をするときはDLCコネクタにテスターやらをぶすっと差してクルマの故障がないかを調べます。そのときにテスターが車内の部品たちに

テスター

おーい!!おめぇら異常があるか?あるならどんな異常があるか教えろや!!

これがリクエストです。このリクエストに対して聞かれた部品たちは

部品A

異常がありません。

部品B

異常あります。壊れてます。

と答えます。これがレスポンスです。

このリクエストとかレスポンスをISO-TPで通信するので、そこそこ長いメッセージをやり取りできるって感じです。

なので、今日はISO-TPを使ってリクエストしたらレスポンスする。ってのをシミュレーションしてみたいと思います。

おっしゃ!シミュレーションすっぞ!!

とりあえずシミュレーションをやってみましょう。今回のコードはほぼこちらを参照にしています。

Examples — isotp 2.0 documentation

ここの”Threaded reception with python-can”を試してみたって感じです。

実際のコード

送信側(リクエストする側) can_send.py

import isotp
import logging
import time
import threading
import can

#ISO-TP通信をスレッドで実行するclassを作る。
class ThreadedApp:
    def __init__(self):#classが呼び出されたときにBUSに接続したりIDを設定したりする。
        self.ext_requested = False
        #バス接続
        self.bus = can.interface.Bus(bustype='vector',
                                     channel=0,
                                     bitrate=500000,
                                     app_name='python-can')
        #ID,frame 受信ID:123, 送信ID:456
        self.addr =  isotp.Address(isotp.AddressingMode.Normal_11bits,
                                   rxid=0x123,txid=0x456,)
        self.stack = isotp.CanStack(self.bus, address=self.addr,
                                    error_handler=self.my_error_handler)

    def start(self):#スレッド処理を実行する関数
        self.exit_requested = False
        self.thread = threading.Thread(target = self.thread_task)
        self.thread.start()
    
    def stop(self):
        self.exit_requested = True
        if self.thread.isAlive():
            self.thread.join()

    def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

    def thread_task(self):#わからん
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state

    def shutdown(self):
      self.stop()
      self.bus.shutdown()

if __name__ == '__main__':
    app = ThreadedApp()  #appをThreadedApp class化 BUS接続とID設定などを行う。
    app.start()          #app classでstart関数を実行。iso-tpのスレッド処理を実行。

    print('Waiting for payload - maximum 5 sec')

    #request messageの送信設定
    #msg = b'\x01\x02\x03\x04'#Signal Frame
    msg = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b'#Multi Frame
    app.stack.send(msg)

    #シミュレーション実行時間は5秒間。
    t1 = time.time()
    while time.time() - t1 < 5:
        if app.stack.available():
            payload = app.stack.recv()
            break
    time.sleep(0.2)

    #5秒経過したら終わり。
    print("Exiting")
    app.shutdown()

受信側(リクエストを受けてレスポンスする側)can_recv.py

import isotp
import logging
import time
import threading
import can

class ThreadedApp:
    def __init__(self):
        self.ext_requested = False#これ何?
        #バス接続
        self.bus = can.interface.Bus(bustype='vector',
                                     channel=0,
                                     bitrate=500000,
                                     app_name='python-can')
        #ID,frame 受信ID:456, 送信ID:123
        self.addr =  isotp.Address(isotp.AddressingMode.Normal_11bits,
                                   rxid=0x456,txid=0x123,)
        #
        self.stack = isotp.CanStack(self.bus, address=self.addr,
                                    error_handler=self.my_error_handler)

    def start(self):
        self.exit_requested = False
        self.thread = threading.Thread(target = self.thread_task)
        self.thread.start()
    
    def stop(self):
        self.exit_requested = True
        if self.thread.isAlive():
            self.thread.join()

    def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

    def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state

    def shutdown(self):
      self.stop()
      self.bus.shutdown()

if __name__ == '__main__':
    app = ThreadedApp()  #appをThreadedApp class化
    app.start()          #app classでstart関数を実行

    print('Waiting for payload - maximum 5 sec')

    t1 = time.time()
    while time.time() - t1 < 5:
        if app.stack.available():
            payload = app.stack.recv()
			#response message
            msg = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b'
            app.stack.send(msg)
            break
    time.sleep(0.2)

    print("Exiting")
    app.shutdown()

実行結果

実行時は以下のようにWindows PowerShellを2つ起動してcan_recv.py(レスポンス側)を実行して、can_send.py(リクエスト側)を実行しました。

実行結果をBUSMASTERで確認した結果が以下の通りです。

一行ずつ意図通りになっているか見てみましょう。

1行目:まずID:0x456が”10 0B 01 02 03 04 05 06″を送信してます。can_send.pyでの送信ID通りなのでOKです。次にデータを1byteずつ見てみます。

  1. “10”はFirst Frameだよっていう印です。これからマルチフレームのメッセージを送る宣言が来ています。狙い通りiso-tpの通信が始まってます。
  2. “0B”はマルチフレームで送信するデータ量がどれだけか?を知らせています。B=11byteです。これも狙い通りです。

そこから後ろの6byteはデータを意図したデータを送信していて問題ないですね。次行きましょう。

2行目:ID 123が”30 08 00″を返しています。受信側が応答してますね。

  1. “30”これがFlow Control Frameだよっていう宣言です。
  2. “08”はBlock Sizeです。8個のメッセージを一気に送ることを許すって送ってます。
  3. “00”はSTminです。一つ一つのメッセージの間隔を送っています。特に設定していないのでdefaultで0になっているんですね。

まぁこんな感じでISO-TP通信が始まってますね。次行きましょう。

3行目:ID 456が”21 07 08 09 0A 0B”を送信しています。マルチフレームの2個目ですね。

  1. “21”この2はConsective Frameだよって宣言ですね。1はCosective Frameの1回目という通し番号ですね。
  2. それ以降は続きのデータを送信してます。問題なしです。

これでID 456が送りたいリクエストデータが全て送信を終わりました。次がレスポンスです。

4行目:ID 123が”10 2C 01 02 03 04 05 06″を送信しています。

  1. “10”はFirst Frame宣言ですね。
  2. “2C”はデータ長です。2C=44byteのデータを送るよですね。
  3. それ以降は通常データです。

問題ないですね。大体うまく行っているなと確認が取れました。ここからはルーチン的な確認ですね。

5行目:ID 456が”30 08 00″とFlow Control Frameを返しています。

6~11行目:ID 123が残りのデータを送信しています。それぞれの先頭データが21,22,23,,,,と通し番号がインクリメントされていますね。OKOKです。

意図した通信が全て出来ていることが確認できました。

まとめ

今回はpythonのpython-canモジュールとisotpモジュールを使って、リクエストとレスポンスをするISO-TP通信をシミュレーションしました。

一回シミュレーションしてみたら、そこそこISO-TPの通信を理解できた気がしました。

誰かの参考になれば幸いです。最後までお読みいただきありがとうございました!!