みなさま、故障診断していますか?
本記事はクルマの故障診断通信に使われるISO-TPについて理解しよう!pythonを使ってCAN通信のシミュレーションをしよう!という試みの記事。第6回目です。
CAN素人の筆者が書いたCAN関連記事は以下です。気になる方は見てみて下さい。
前回はISO-TPを使ってリクエストに対してレスポンスする通信シミュレーションを流してみました。
今回はファンクショナルアドレスの送信をしてみようという話です。
ファンクショナルアドレスとは?
ISO-TPの通信にはPhysical AddressとFunctional Addressがあります。
Physical Address:1対1の通信です。
〇〇さーん!
はーい!
これがPhysical Addressです。
Functional Address:1対複数の通信です。
みなさーん!!
はーい!
はーい!
これがFunctional Addressです。
うっしゃシミュレーションすっぞ!!
基本は5回目のコードを流用します。
5回目:【python-can】ISO-TPのリクエストとレスポンスをシミュレーションするぞ!
リクエスト側の(can_send.py)とレスポンス側の(can_recv.py)を作ります。can_send.pyからファンクショナルアドレスで送信を行って、can_recv.pyにユニットAとユニットBから返答してもらうっていうイメージです。
実際のコード
リクエスト側(can_send.py)
import isotp
import logging
import time
import threading
import can
class ThreadedApp:
def __init__(self, id_rx, id_tx):
self.ext_requested = False
#バス接続
self.bus = can.interface.Bus(bustype='vector',
channel=0,
bitrate=500000,
app_name='python-can')
#ID,frame
self.addr = isotp.Address(isotp.AddressingMode.Normal_11bits,
rxid=id_rx, txid=id_tx)
#
self.stack = isotp.CanStack(self.bus, address=self.addr,
error_handler=self.my_error_handler)
def start(self):
self.exit_requested = False
self.thread = threading.Thread(target = self.thread_task)
self.thread.start()
def stop(self):
self.exit_requested = True
if self.thread.isAlive():
self.thread.join()
def my_error_handler(self, error):
logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))
def thread_task(self):
while self.exit_requested == False:
self.stack.process() # Non-blocking
time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
def shutdown(self):
self.stop()
self.bus.shutdown()
if __name__ == '__main__':
#ユニットAのphysycalアドレスを設定 RxID:100 TxID:465
UnitAPhys = ThreadedApp(0x100, 0x465) #appをThreadedApp class化
UnitAPhys.start() #app classでstart関数を実行
#ユニットAのfunctionalアドレスを設定 RxID:100 TxID:7FF
UnitAFunc = ThreadedApp(0x100, 0x7FF)
UnitAFunc.start()
print('Waiting for payload - maximum 5 sec')
#request message
msg = b'\x01\x02\x03\x04'#Signal Frame
#msg = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b'#Multi Frame
#UnitAPhys.stack.send(msg)#physical Addressで呼び出し。
time.sleep(1)
UnitAFunc.stack.send(msg, isotp.TargetAddressType.Functional)#Functional Addressで呼び出し。
time.sleep(0.2)
print("Exiting")
UnitAPhys.shutdown()
UnitAFunc.shutdown()
色を変えているポイントが前回からの変更点ですね。変更点は、、、
・classの引数に送信ID、受信IDを設定できるようにした。
・Functional Addressを7FFとした。
・isotp.TargetAddressType.FunctionalとすることでFunctional Addressで送信
くらいです。
レスポンス側(can_recv.py)
import isotp
import logging
import time
import threading
import can
class ThreadedApp:
def __init__(self, id_rx, id_tx):
self.ext_requested = False
#バス接続
self.bus = can.interface.Bus(bustype='vector',
channel=0,
bitrate=500000,
app_name='python-can')
#ID,frame
self.addr = isotp.Address(isotp.AddressingMode.Normal_11bits,
rxid=id_rx,txid=id_tx)
self.stack = isotp.CanStack(self.bus, address=self.addr,
error_handler=self.my_error_handler)
def start(self):
self.exit_requested = False
self.thread = threading.Thread(target = self.thread_task)
self.thread.start()
def stop(self):
self.exit_requested = True
if self.thread.isAlive():
self.thread.join()
def my_error_handler(self, error):
logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))
def thread_task(self):
while self.exit_requested == False:
self.stack.process() # Non-blocking
time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
def shutdown(self):
self.stop()
self.bus.shutdown()
if __name__ == '__main__':
#ユニットAのphysycalアドレスを設定 RxID:465 TxID:100
UnitAPhys = ThreadedApp(0x465, 0x100) #physをThreadedApp class化
UnitAPhys.start() #phys classでstart関数を実行
#ユニットAのfunctionalアドレスを設定 RxID:7FF TxID:100
UnitAFunc = ThreadedApp(0x7FF, 0x100)
UnitAFunc.start()
#ユニットBのfunctionalアドレスを設定 RxID:7FF TxID:200
UnitBFunc = ThreadedApp(0x7FF, 0x200)
UnitBFunc.start()
print('Waiting for payload - maximum 5 sec')
t1 = time.time()
#response message
msg1 = b'\x01\x02\x03\x04'
msg2 = b'\x09\x08\x07\x06'
while time.time() - t1 < 5:
if UnitAPhys.stack.recv() != None:
UnitAPhys.stack.send(msg1)
if UnitAFunc.stack.recv() != None:
UnitAFunc.stack.send(msg1)
if UnitBFunc.stack.recv() != None:
UnitBFunc.stack.send(msg2)
time.sleep(0.2)
print("Exiting")
UnitAPhys.shutdown()
UnitAFunc.shutdown()
UnitBFunc.shutdown()
ユニットAとユニットBを準備してそれぞれのFunctional Addressを7FF、送信IDは100と200としました。
これで7FFを一回送ると100と200を一気に返答してほしい感じです。
実行結果
今回もコマンドプロンプトを2個開いてcan_recv.pyとcan_send.pyを実行します。
実行結果はBUSMASTERで確認しました↓。
うん!いいですね!!7FFのファンクショナルアドレスに対して、100(ユニットA)と200(ユニットB)が一度に返答してきますね。
オッケーです。
今日はそんな感じです。終わります。