from etrobo_python.device import (ColorSensor, Device, GyroSensor, Hub, Motor,
SonarSensor, TouchSensor)
try:
from pathlib import Path
from typing import List, Optional, Tuple, Type, Union
from types import TracebackType
except BaseException:
pass
def _get_type_name(device: Device) -> str:
if isinstance(device, Hub):
return 'hub'
elif isinstance(device, Motor):
return 'motor'
elif isinstance(device, ColorSensor):
return 'color_sensor'
elif isinstance(device, TouchSensor):
return 'touch_sensor'
elif isinstance(device, SonarSensor):
return 'sonar_sensor'
elif isinstance(device, GyroSensor):
return 'gyro_sensor'
else:
raise ValueError('Invalid device class: {}'.format(device.__class__.__name__))
def _get_binary_length(device_type: str) -> int:
if device_type == 'hub':
return 5
elif device_type == 'motor':
return 4
elif device_type == 'color_sensor':
return 5
elif device_type == 'touch_sensor':
return 1
elif device_type == 'sonar_sensor':
return 2
elif device_type == 'gyro_sensor':
return 4
else:
raise ValueError('Invalid device type: {}'.format(device_type))
[docs]
class LogReader(object):
'''ログデータをファイルから読み込むためのクラス。
LogWriterで作成されたログファイルを読み込み、デバイスごとに分割したデータを取得する。
ログファイルのフォーマットについては、LogWriterの説明を参照。
Args:
path: ログファイルのパス
'''
def __init__(self, path: Union[str, Path]) -> None: # type: ignore
self.path = str(path) # type: ignore
self.reader = open(self.path, 'rb')
size = int.from_bytes(self.reader.read(2), 'big')
tokens = self.reader.read(size).decode('utf-8').split(',')
name_types = [token.split(':') for token in tokens]
self.devices = [(name, device_type) for name, device_type in name_types]
lengths = [_get_binary_length(device_type) for _, device_type in self.devices]
self.offsets = [sum(lengths[:i]) for i in range(len(lengths) + 1)]
def __enter__(self) -> 'LogReader':
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.reader.close()
[docs]
def get_devices(self) -> List[Tuple[str, str]]:
'''ログファイルに記録されているデバイスのリストを取得する。
Returns:
デバイスのリスト。
デバイスは(変数名, デバイスタイプ)のタプルで表現される。
'''
return self.devices
[docs]
def read(self) -> Optional[List[bytes]]:
'''ログファイルからデバイスごとに分割したデータを取得する。
Returns:
デバイスごとに分割したデータのリスト。
デバイスのリストの順番はget_devices()で取得したものと同じ。
ログデータが壊れている(と思われる)場合はNoneを返す。
'''
buffer = self.reader.read(self.offsets[-1])
if len(buffer) < self.offsets[-1]:
return None
return [buffer[b:e] for b, e in zip(self.offsets[:-1], self.offsets[1:])]
[docs]
def close(self) -> None:
'''ログファイルを閉じる。'''
self.reader.close()
def __iter__(self) -> 'LogReader':
return self
def __next__(self) -> List[bytes]:
data = self.read()
if data is None:
raise StopIteration()
return data
[docs]
class LogWriter(object):
'''ログデータをファイルに書き込むためのクラス。
モータやセンサから取得したデータをログファイルに書き込む。
**ログファイルのフォーマット**
.. code-block:: none
デバイスのリストの文字列のバイト数(2バイト)
デバイスのリストのUTF-8文字列(変数名1:デバイスタイプ1,変数名2:デバイスタイプ2,...)
以下、それぞれのデバイスから取得されたデータを時刻順に並べたもの
- Hub: 時刻(4バイト), ボタンの状態(1バイト)
ボタンの状態: 左ボタン=0x01, 右ボタン=0x02, 上ボタン=0x04, 下ボタン=0x08
- Motor: モーターの回転角度(4バイト)
- ColorSensor: brightness(1バイト), ambient(1バイト), raw_color(1バイト * 3)
- TouchSensor: タッチセンサの状態(1バイト)
- SonarSensor: 距離(2バイト)
- GyroSensor: 角度(2バイト), 角速度(2バイト)
**ログデータの例**
ログデータの取得対象となるデバイスがleft_motor:motor・right_motor:motor・color_sensor:color_sensorの場合、
以下のログデータが作成される。
.. code-block:: none
0x003c (デバイスリストの文字列のバイト数)
"left_motor:motor,right_motor:motor,color_sensor:color_sensor" (UTF-8文字列)
(left_motorの回転角度, right_motorの回転角度, color_sensorのbrightness, ambient, raw_color)
(left_motorの回転角度, right_motorの回転角度, color_sensorのbrightness, ambient, raw_color)
(left_motorの回転角度, right_motorの回転角度, color_sensorのbrightness, ambient, raw_color)
...
Args:
path: ログファイルのパス
devices: ログファイルに記録するデバイスのリスト。
デバイスは(変数名, デバイスオブジェクト)のタプルで指定する。
'''
def __init__(
self,
path: Union[str, Path], # type: ignore
devices: List[Tuple[str, Device]],
) -> None:
self.path = str(path) # type: ignore
self.writer = open(self.path, 'wb')
device_types = [_get_type_name(device) for _, device in devices]
lengths = [_get_binary_length(device_type) for device_type in device_types]
self.offsets = [sum(lengths[:i]) for i in range(len(lengths) + 1)]
self.buffer = bytearray(sum(lengths))
name_types = ['{}:{}'.format(name, _get_type_name(device)) for name, device in devices]
binary = ','.join(name_types).encode('utf-8')
self.writer.write(int.to_bytes(len(binary), 2, 'big'))
self.writer.write(binary)
def __enter__(self) -> 'LogWriter':
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.writer.close()
[docs]
def write(self, devices: List[Device]) -> None:
'''デバイスから取得したデータをログファイルに書き込む。
Args:
devices: ログファイルに記録するデバイスのリスト。
'''
for offset, device in zip(self.offsets, devices):
binary = device.get_log()
self.buffer[offset:offset + len(binary)] = binary
self.writer.write(self.buffer)
[docs]
def flush(self) -> None:
'''ログファイルに書き込みバッファの内容を書き込む。'''
self.writer.flush()
[docs]
def close(self) -> None:
'''ログファイルを閉じる。'''
self.writer.close()