# -*- coding: UTF-8 -*-
import time
import struct
import string
from .imp_crc import MaximCRC
class ModuleError(Exception):
pass
[docs]class Module:
"""The Module object represents a IMPBus2 probe. It is used to provide a
easy to use interface for the probe specific commands. It is mostly just a
small wrapper around the much more general :func:`Bus.set` and
:func:`Bus.get` commands. To create a `Module` object you first to supply a
:class:`Bus` object and a serial number. As a quick example we will catch
up with the code from :class:`Bus` and extend that a bit, pretending we
have two probes with the serial numbers 10010 and 10011 connected to the
bus::
>>> from implib2 import Bus, Module
>>> bus = Bus('/dev/ttyUSB0')
>>> bus.sync()
>>> bus.scan()
(10010, 10011)
Now that we found our two probes lets create the :class:`Module` objects::
>>> module10 = Module(bus, 10010)
>>> module11 = Module(bus, 10011)
And now, lets use the :class:`Module` objects to gain some informations
from the probes::
>>> module10.get_hw_version()
1.14
>>> module10.get_fw_version()
1.140301
>>> module11.get_hw_version()
1.14
>>> module11.get_fw_version()
1.140301
:param bus: An instaciated :class:`Bus` object to use.
:type bus: :class:`Bus`
:param serno: The serial number of the probe to address.
:type serno: int
:rtype: :class:`Module`
"""
def __init__(self, bus, serno):
self.crc = MaximCRC()
self.bus = bus
self._serno = serno
self.protocols = {
'IMPBUS': 0,
'SDI12': 1}
self.event_modes = {
"NormalMeasure": 0x00,
"TRDScan": 0x01,
"AnalogOut": 0x02,
"ACIC_TC": 0x03,
"SelfTest": 0x04,
"MatTempSensor": 0x05}
self.measure_modes = {
"ModeA": 0x00,
"ModeB": 0x01,
"ModeC": 0x02}
self.average_modes = {
"CA": 0x00,
"CK": 0x01,
"CS": 0x02,
"CF": 0x03}
[docs] def unlock(self):
"""Command to unlock the write protected rows in the probes tables.
The unlock key is the `CRC + 0x8000` of serial number of the probe.
Be aware that the key changes as the serial number is changed.
:rtype: bool
"""
# Calculate the SupportPW: calc_crc(serno) + 0x8000
passwd = struct.pack('<I', self._serno)
passwd = struct.unpack('<B', self.crc.calc_crc(passwd))[0] + 0x8000
# Unlock the device with the password
table = 'ACTION_PARAMETER_TABLE'
param = 'SupportPW'
value = passwd
return self.bus.set(self._serno, table, param, [value])
[docs] def get_event_mode(self):
"""Command to retrieve the event mode parameter of the probe. For more
informations look at :func:`set_event_mode`.
:raises : **ModuleError** - If event mode is not known.
:rtype: string
"""
table = 'ACTION_PARAMETER_TABLE'
param = 'Event'
modes = {v: k for k, v in self.event_modes.items()}
mode = self.bus.get(self._serno, table, param)[0]
if mode not in range(127, 134):
raise ModuleError("Unknown event mode: %s" % mode)
return modes[mode % 0x80]
[docs] def set_event_mode(self, mode="NormalMeasure"):
"""Command to set the the EventMode of the probe. This parameter
can be set to six different values:
.. note::
0. *NormalMeasure:* Normal measurement Mode.
1. *TDRScan:* TDRScan Mode.
2. *AnalogOut:* Used for setting the analog out to a fixed value.
3. *ASIC_TC:* Mode to perform a ASIC temperature compensation.
4. *Self Test:* Mode to perform varios self tests.
5. *MatTempSensor:* Mode to do a material temperatur compensation.
:param mode: The EventMode to use.
:type mode: string
:rtype: bool
:raises: **ModuleError** - If mode is not known.
"""
table = 'ACTION_PARAMETER_TABLE'
param = 'Event'
if mode not in self.event_modes:
raise ModuleError("%s: Invalid event mode!" % mode)
value = self.event_modes[mode]
self.unlock()
self.bus.set(self._serno, table, param, [value])
# let's try 5 times.
for attempt in range(5):
if self.bus.get(self._serno, table, param)[0] == value + 0x80:
break
if attempt == 4:
raise ModuleError("Failed to set event mode!")
return True
[docs] def get_measure_mode(self):
"""Command to retrieve the measure mode parameter of the probe. For
more informations look at :func:`set_measure_mode`.
:raises: **ModuleError** - If measure mode is not known.
:rtype: string
"""
table = 'DEVICE_CONFIGURATION_PARAMETER_TABLE'
param = 'MeasMode'
modes = {v: k for k, v in self.measure_modes.items()}
try:
mode = modes[self.bus.get(self._serno, table, param)[0]]
except KeyError as err:
raise ModuleError("Unknown measure mode: %s!" % err.args[0])
return mode
[docs] def set_measure_mode(self, mode='ModeA'):
"""Command to set the measure mode of the probe. There a 3 different
measure modes.
.. note::
**ModeA**
On Request, the probe checks the parameter StartMeasure in
Measure Parameter Table. If the parameter is 0, the probe does
nothing. If the parameter is 1, the probe does the measurement
and then sets the parameter to 0 again. Setting the parameter
to 1 must be carried out through RS485 or IMPBus by an external
command.
**ModeB**
Single, the probe measures once after it is powered on. This
mode is normally used in the case that the probe is connected
to a data logger which samples the analog output after being
powered on.
**ModeC**
Cyclic, the probe measures cyclically. That means, the probe
measures once and sleeps the time SleepTimeInModeC, then it
wakes up automatically and repeats the process. This mode is
normally aused in casees when the probe is always powered and
measures periodically.
:param mode: Mode to use.
:type mode: string
:rtype: bool
:raises: **ModuleError** - If mode is unknown.
"""
table = 'DEVICE_CONFIGURATION_PARAMETER_TABLE'
param = 'MeasMode'
if mode not in self.measure_modes:
raise ModuleError("%s: Invalid measure mode!" % mode)
if not self.get_event_mode() == "NormalMeasure":
raise ModuleError("Wrong event mode, need 'NormalMeasure'!")
value = self.measure_modes[mode]
return self.bus.set(self._serno, table, param, [value])
def get_default_measure_mode(self):
table = 'DEVICE_CONFIGURATION_PARAMETER_TABLE'
param = 'DefaultMeasMode'
modes = {v: k for k, v in self.measure_modes.items()}
try:
mode = modes[self.bus.get(self._serno, table, param)[0]]
except KeyError as err:
raise ModuleError("Unknown default measure mode: %s!" % err.args[0])
return mode
def set_default_measure_mode(self, mode='ModeC'):
table = 'DEVICE_CONFIGURATION_PARAMETER_TABLE'
param = 'DefaultMeasMode'
if mode not in self.measure_modes:
raise ModuleError("%s: Invalid default measure mode!" % mode)
if not self.get_event_mode() == "NormalMeasure":
raise ModuleError("Wrong event mode, need 'NormalMeasure'!")
value = self.measure_modes[mode]
return self.bus.set(self._serno, table, param, [value])
def get_average_mode(self):
table = 'APPLICATION_PARAMETER_TABLE'
param = 'AverageMode'
modes = {v: k for k, v in self.average_modes.items()}
try:
mode = modes[self.bus.get(self._serno, table, param)[0]]
except KeyError as err:
raise ModuleError("Unknown average mode: %s!" % err.args[0])
return mode
def set_average_mode(self, mode='CA'):
table = 'APPLICATION_PARAMETER_TABLE'
param = 'AverageMode'
if mode not in self.average_modes:
raise ModuleError("%s: Invalid average mode!" % mode)
value = self.average_modes[mode]
return self.bus.set(self._serno, table, param, [value])
[docs] def get_table(self, table):
"""Spezial Command to get a whole table.
.. warning:: **Not yet implemented!**
Basicly you get a whole table, witch means the data-part of the
recieved package consists of the concatinated table values. If
the table don't fit into one package the status byte of the
header-part will be '0xff'. Than you have to wait a bit and recieve
packages as long as the status byte is '0x00' again. To extract the
concatenated table-values you have to split the data in order of the
Parameter-No., the length of each value can is equal to the
Parameter-Length.
You can use the parameters GetData, DataSize and TableSize to gain
information about the spezific table.
:param table: Table to retrieve from probe.
:type table: string
:rtype: json
"""
# pylint: disable=unused-argument, no-self-use
raise NotImplementedError()
[docs] def set_table(self, table, data):
"""Special command to set the values of a hole table.
.. warning:: **Not yet implemented!**
You can use the parameters GetData, DataSize and TableSize to gain
information about the spezific table.
:param table: Name of the table to write.
:type table: string
:param data: Table data to write.
:type data: json
:rtype: bool
"""
# pylint: disable=unused-argument, no-self-use
raise NotImplementedError()
[docs] def get_serno(self):
"""Command to retrieve the serial number of the probe.
:rtype: int
"""
table = 'SYSTEM_PARAMETER_TABLE'
param = 'SerialNum'
return self.bus.get(self._serno, table, param)[0]
[docs] def set_serno(self, serno):
"""Command to change the serial number of the probe.
:param serno: Serial number so use.
:type serno: int
:rtype: bool
"""
table = 'SYSTEM_PARAMETER_TABLE'
param = 'SerialNum'
self.unlock()
self.bus.set(self._serno, table, param, [serno])
self._serno = serno
return True
[docs] def read_eeprom(self):
"""Command to read the EEPROM image from the probe. The image get's
stored into a EEPROM object.
:rtype: :class:`EEPROM`
:raises: **ModuleError** - If length of the constructed :class:`EEPRom`
does not match the length value from the probe table.
"""
# pylint: disable=fixme
# TODO: add methode to create a EEPROM file.
raise NotImplementedError()
[docs] def write_eeprom(self, image):
"""Command to write a new EEPROM image to the probe. The EEPROM
image must be an instance of the :class:`EEPROM`.
:param image: The Image to write.
:type image: :class:`EEPROM`
:rtype: bool
"""
self.unlock()
for number, page in enumerate(image):
if not self.bus.set_eeprom_page(self._serno, number, page):
raise ModuleError("Writing EEPROM failed!")
time.sleep(0.05)
return True
[docs] def get_hw_version(self):
"""Command to retrieve the hardware version number of the probe.
:rtype: float
"""
table = 'SYSTEM_PARAMETER_TABLE'
param = 'HWVersion'
return '{0:.2f}'.format(self.bus.get(self._serno, table, param)[0])
[docs] def get_fw_version(self):
"""Command to retrieve the firmware version number of the probe.
:rtype: float
"""
table = 'SYSTEM_PARAMETER_TABLE'
param = 'FWVersion'
return '{0:.6f}'.format(self.bus.get(self._serno, table, param)[0])
[docs] def start_measure(self):
"""This command starts a measurement cycle and returns when the
measurement is finished. It's mostly used within a wrapper like
:func:`get_moisture`.
:rtype: bool
"""
table = 'ACTION_PARAMETER_TABLE'
param = 'StartMeasure'
value = 1
# Set Event mode to 'NormalMeasure'
if not self.get_event_mode() == "NormalMeasure":
raise ModuleError("Wrong event mode, need 'NormalMeasure'!")
# Refer to Protocol Handbook page 18.
if not self.get_measure_mode() == 'ModeA':
raise ModuleError("Wrong measure mode, need 'ModeA'!")
if self.measure_running():
raise ModuleError("Measurement cycle already in progress!")
return self.bus.set(self._serno, table, param, [value])
[docs] def measure_running(self):
"""This command checks if the measurement cycle is in progress.
:rtype: bool
"""
table = 'ACTION_PARAMETER_TABLE'
param = 'StartMeasure'
return self.bus.get(self._serno, table, param)[0] == 1
[docs] def get_measurement(self, quantity='Moist'):
"""This command gets the measured value of the requested quantity.
It's mostly used within a wrapper like :func:`get_moisture`.
:param quantity: The measure quantity to request.
:type quantity: str
:rtype: int or float
"""
table = 'MEASURE_PARAMETER_TABLE'
param = quantity
return self.bus.get(self._serno, table, param)[0]
[docs] def get_moisture(self):
"""This command is a simple wrapper arrond :func:`start_measure`,
:func:`measure_running` and :func:`get_measure`.
:rtype: float
"""
assert self.start_measure()
while self.measure_running():
time.sleep(0.500)
return self.get_measurement(quantity='Moist')
#########################
# END of the Public API #
#########################
def _get_analog_output_mode(self):
"""Command to retrieve the analog output mode.
This setting option, twogether with :func:`get_moist_min_value` and
:func:`get_analog_output_mode` can be used to determine the mean of
the analog moisture/temperatur output signal. For more information
look at :func:`set_analog_output_mode`.
.. note::
| AnalogOutputMode 0: => 0.0V - 1.0V
| AnalogOutputMode 1: => 0.2V - 1.0V
Analog output mode 1 is mainly intended to be used with a
U/I-Converter.
:rtype: int
"""
table = 'DEVICE_CONFIGURATION_PARAMETER_TABLE'
param = 'AnalogOutputMode'
return self.bus.get(self._serno, table, param)[0]
def _set_analog_output_mode(self, mode=0):
"""Command to set the analog output mode.
This setting option, twogether with :func:`get_moist_min_value` and
:func:`get_analog_output_mode` can be used to determine the mean of
the analog moisture/temperatur output signal. For more information
look at :func:`set_analog_output_mode`.
.. note::
| AnalogOutputMode 0: => 0.0V - 1.0V
| AnalogOutputMode 1: => 0.2V - 1.0V
Analog output mode 1 is mainly intended to be used with a
U/I-Converter.
:param:
:rtype: int
"""
table = 'DEVICE_CONFIGURATION_PARAMETER_TABLE'
param = 'AnalogOutputMode'
if mode not in (0, 1):
raise ModuleError("Wrong AnalogOutputMode!")
value = mode
return self.bus.set(self._serno, table, param, [value])
def _get_moist_max_value(self):
"""Command to retrieve the maximum moisture setting.
This setting option, twogether with :func:`get_moist_min_value` and
:func:`get_analog_output_mode` can be used to determine the mean of
the analog moisture output signal. For more information look at
:func:`set_analog_output_mode`.
:rtype: int
"""
table = 'DEVICE_CONFIGURATION_PARAMETER_TABLE'
param = 'MoistMaxValue'
return self.bus.get(self._serno, table, param)[0]
def _get_moist_min_value(self):
"""Command to retrieve the minimum moisture setting.
This setting option, twogether with :func:`get_moist_max_value` and
:func:`get_analog_output_mode` can be used to determine the mean of
the analog moisture output signal. For more information look at
:func:`set_analog_output_mode`.
:rtype: int
"""
table = 'DEVICE_CONFIGURATION_PARAMETER_TABLE'
param = 'MoistMinValue'
return self.bus.get(self._serno, table, param)[0]
def _get_temp_max_value(self):
"""Command to retrieve the maximum moisture setting.
This setting option, twogether with :func:`get_temp_min_value` and
:func:`get_analog_output_mode` can be used to determine the mean of
the analog temperature output signal. For more information look at
:func:`set_analog_output_mode`.
:rtype: int
"""
table = 'DEVICE_CONFIGURATION_PARAMETER_TABLE'
param = 'TempMaxValue'
return self.bus.get(self._serno, table, param)[0]
def _get_temp_min_value(self):
"""Command to retrieve the minimum moisture setting.
This setting option, twogether with :func:`get_temp_max_value` and
:func:`get_analog_output_mode` can be used to determine the mean of
the analog temperature output signal. For more information look at
:func:`set_analog_output_mode`.
:rtype: int
"""
table = 'DEVICE_CONFIGURATION_PARAMETER_TABLE'
param = 'TempMinValue'
return self.bus.get(self._serno, table, param)[0]
def _set_analog_moist(self, mvolt=500):
"""Command so set the analog output of the moisture channel to a fixed
value. This command can be used for calibration purposes.
:param mvolt: Output current in millivolts (0-1000).
:type mbolt: int
:rtype: bool
:raises ModuleError: If `mvolt` parameter is out of range.
:raises ModuleError: If EventMode can not be set to AnalogOut.
"""
table = 'MEASURE_PARAMETER_TABLE'
param = 'Moist'
if mvolt not in range(0, 1001):
raise ModuleError("Value out of range!")
if not self.get_event_mode() == "AnalogOut":
raise ModuleError("Wrong event mode, need 'AnalogOut'!")
if not self._get_analog_output_mode() == 0:
raise ModuleError("Wrong AnalogOutputMode, need mode 0 here!")
min_value = self._get_moist_min_value()
max_value = self._get_moist_max_value()
value = (max_value - min_value) / 1000.0 * mvolt + min_value
return self.bus.set(self._serno, table, param, [value])
def _get_analog_moist(self):
table = 'MEASURE_PARAMETER_TABLE'
param = 'Moist'
return self.bus.get(self._serno, table, param)[0]
def _set_analog_temp(self, mvolt=500):
"""Command so set the analog output of the temperatur channel to a
fixed value. This command can be used for calibration purposes.
:param mvolt: Output current in millivolts (0-1000).
:type mbolt: int
:rtype: bool
:raises ModuleError: If `mvolt` parameter is out of range.
:raises ModuleError: If EventMode can not be set to AnalogOut.
"""
if mvolt not in range(0, 1001):
raise ModuleError('Value out of range!')
if not self.get_event_mode() == "AnalogOut":
raise ModuleError("Wrong event mode, need 'AnalogOut'!")
if not self._get_analog_output_mode() == 0:
raise ModuleError("Wrong AnalogOutputMode, need mode 0 here")
table = 'MEASURE_PARAMETER_TABLE'
param = 'CompTemp'
min_value = self._get_temp_min_value()
max_value = self._get_temp_max_value()
value = (max_value - min_value) / 1000.0 * mvolt + min_value
return self.bus.set(self._serno, table, param, [value])
def _get_analog_temp(self):
table = 'MEASURE_PARAMETER_TABLE'
param = 'CompTemp'
return self.bus.get(self._serno, table, param)[0]
def _turn_asic_on(self):
"""Command to start the selftest of the probe.
SelfTest is used for primary for internal test by IMKO.
In this context, it will be used to 'ON' the ASIC.
"""
if not self.get_event_mode() == "SelfTest":
raise ModuleError("Wrong event mode, need 'SelfTest'!")
table = 'ACTION_PARAMETER_TABLE'
param = 'SelfTest'
value = [1, 1, 63, 0]
return self.bus.set(self._serno, table, param, value)
def _turn_asic_off(self):
"""Command to start the selftest of the probe.
SelfTest is used for primary for internal test by IMKO.
In this context, it will be used to 'OFF' the ASIC.
"""
if not self.get_event_mode() == "SelfTest":
raise ModuleError("Wrong event mode, need 'SelfTest'!")
table = 'ACTION_PARAMETER_TABLE'
param = 'SelfTest'
value = [1, 0, 255, 0]
return self.bus.set(self._serno, table, param, value)
def _get_transit_time_tdr(self):
# ** Internal usage - Trime IBT
if not self.get_event_mode() == "NormalMeasure":
raise ModuleError("Wrong event mode, need 'NormalMeasure'!")
table = 'DEVICE_CONFIGURATION_PARAMETER_TABLE'
param = 'MeasMode'
value = 0
self.bus.set(self._serno, table, param, [value])
table = 'ACTION_PARAMETER_TABLE'
param = 'StartMeasure'
value = 1
self.bus.set(self._serno, table, param, [value])
while self.bus.get(self._serno, table, param)[0]:
time.sleep(0.5)
table = 'MEASURE_PARAMETER_TABLE'
param = 'TransitTime'
transit_time = self.bus.get(self._serno, table, param)[0]
param = 'TDRValue'
tdr_value = self.bus.get(self._serno, table, param)[0]
table = 'DEVICE_CONFIGURATION_PARAMETER_TABLE'
param = 'MeasMode'
value = 2
self.bus.set(self._serno, table, param, [value])
return (transit_time, tdr_value)
def _set_sdi12_address(self, address=0):
"""Command to set the SDI-12 address
This command sets the SDI-12 address of the probe.
:param address: SDI-12 Address to set. (0-9, a-z, A-Z)
:type address: str
:rtype: bool
:raises ModuleError: If `address` parameter is out of range.
"""
table = 'SYSTEM_PARAMETER_TABLE'
param = 'ModuleInfo1'
sdi12_address_range = (list(range(0, 9)) + [c for c in string.ascii_letters])
if address not in sdi12_address_range:
raise ModuleError("SDI12 address out of range!")
value = address
return self.bus.set(self._serno, table, param, [value])
def _set_protocol(self, protocol='IMPBUS'):
"""Command to set the bus protocol.
:param protocol: Bus protocol to use. ('IMPBUS' or 'SDI12')
:type protocol: str
:rtype: bool
:raises ModuleError: If `protocol` parameter is unknown.
"""
table = 'DEVICE_CONFIGURATION_PARAMETER_TABLE'
param = 'Protocol'
try:
value = self.protocols[protocol]
except KeyError as err:
raise ModuleError("Wrong protocol: %s" % err.args[0])
return self.bus.set(self._serno, table, param, [value])