# -*- coding: UTF-8 -*-
import time
from .imp_device import Device, DeviceError
from .imp_datatypes import DataTypes
from .imp_packages import Package
from .imp_commands import Command
from .imp_responces import Responce
from .imp_tables import Tables
from .imp_helper import _imprange
class BusError(Exception):
pass
[docs]class Bus:
"""The Bus object represents the IMPBus2 master device. It is used to
initialize the Bus, set the baudrate and find the connected probes. A
simple example of how to initialize and search the bus would be::
>>> from implib2 import Bus, Module
>>> bus = Bus('/dev/ttyUSB0')
>>> bus.sync()
>>> bus.scan()
:param port: The serial port to use, defaults to `/dev/ttyUSB0`
:type port: string
:param rs485: Set this to `True` in order to use the way more
relaxed rs485 timings. Defaults to `False`.
:type rs485: bool
"""
def __init__(self, port='/dev/ttyUSB0', rs485=False):
tbl = Tables()
pkg = Package()
dts = DataTypes()
self.cmd = Command(tbl, pkg, dts)
self.res = Responce(tbl, pkg, dts)
self.dev = Device(port)
self.bus_synced = False
# timing magic, adds some extra love for rs485
self.trans_wait = 0.002 if not rs485 else 0.070
self.cycle_wait = 0.001 if not rs485 else 0.070
self.range_wait = 0.020 if not rs485 else 0.070
def _wait(self, package_len, process_time=0.1):
transit_time = package_len * self.trans_wait
time.sleep(transit_time + process_time + transit_time)
def _search(self, range_address, range_marker, found):
probes = len(found)
bcast_address = range_address + range_marker
if not self.probe_range(bcast_address):
return False
if range_marker == 1:
if self.probe_module_short(bcast_address):
found.append(bcast_address)
if self.probe_module_short(bcast_address - 1):
found.append(bcast_address - 1)
return not probes == len(found)
# divide-and-conquer by splitting the range into two pices.
self._search(bcast_address, range_marker >> 1, found)
self._search(range_address, range_marker >> 1, found)
return True
[docs] def wakeup(self):
"""This function sends a broadcast packet which sets the 'EnterSleep'
parameter of the 'ACTION_PARAMETER_TABLE' to '0', which actually means
to disable the sleep mode of all connected modules. But the real aim of
this command is to wake up sleeping modules by sending 'something'.
:rtype: :const:`True`
"""
address = 16777215 # 0xFFFFFF
table = 'ACTION_PARAMETER_TABLE'
param = 'EnterSleep'
value = 0
ad_param = 0
package = self.cmd.set_parameter(address, table, param,
[value], ad_param)
self.dev.open_device()
self.dev.write_pkg(package)
time.sleep(0.300)
return True
[docs] def sync(self, baudrate=9600):
"""This command synchronises the connected modules to the given
baudrate.
The communication between master and slaves can only be successful
if they use the same baudrate. In order to synchronise the modules
on a given baudrate, the bus master has to transmit the broadcast
command "SetSysPara" with the parameter Baudrate on all possible
baudrates. There must be a delay of at least 500ms after each command!
:param baudrate: Baudrate to use (1200-2400-4800-9600).
:type baudrate: int
:raises BusError: If baudrate is unknown.
:rtype: :const:`True`
"""
address = 16777215
table = 'SYSTEM_PARAMETER_TABLE'
param = 'Baudrate'
value = baudrate//100
ad_param = 0
if value not in (12, 24, 48, 96):
raise BusError("Unknown baudrate!")
package = self.cmd.set_parameter(address, table, param,
[value], ad_param)
# first close the device
self.dev.close_device()
# trying to set baudrate at 1200
self.dev.open_device(baudrate=1200)
self.dev.write_pkg(package)
time.sleep(0.500)
self.dev.close_device()
# trying to set baudrate at 2400
self.dev.open_device(baudrate=2400)
self.dev.write_pkg(package)
time.sleep(0.420)
self.dev.close_device()
# trying to set baudrate at 4800
self.dev.open_device(baudrate=4800)
self.dev.write_pkg(package)
time.sleep(0.340)
self.dev.close_device()
# trying to set baudrate at 9600
self.dev.open_device(baudrate=9600)
self.dev.write_pkg(package)
time.sleep(0.260)
self.dev.close_device()
# at last open the device with the setted baudrate
self.dev.open_device(baudrate=baudrate)
self.bus_synced = True
time.sleep(1.000)
return True
[docs] def scan(self, minserial=0, maxserial=16777215):
""" Command to scan the IMPBUS for connected probes.
This command can be uses to search the IMPBus2 for connected probes. It
uses the :func:`probe_range` command, to address a whole serial number
range and in the case of some 'answers' from the range it recursively
devides the range into equal parts and repeads this binary search
schema until the all probes are found.
.. note:: Searching the IMPBus2
The 3 bytes IMPBus2 serial numbers spans a 24bit [0 - 16777215]
address range, which is a whole lot of serial numbers to try. So in
order to quickly identify the connected probes the command
:func:`probe_range` can be used. In order to address more than one
probe a a time the header package of the :func:`probe_range`
command contains a range pattern where you would normaly find the
target probes serial number. Example: ::
range serno: 10010001 10000000 00000000 (0x918000)
range address: 10010001 00000000 00000000
range mark: 00000000 10000000 00000000
As you can see in the example above, the "range serno" from the
header package consists of a range address and a range marker. The
Range marker is alwayse the most right '1' of the "range serno"::
range min: 10010001 00000000 00000000 (0x910000)
range max: 10010001 11111111 11111111 (0x91ffff)
So all probes with serial numbers between min and max would answer
to a :func:`probe_range` command with the "range serno" 0x918000.
The probes would send the CRC of there serial number as reply to
this command. But because the probes share the same bus it is higly
possible the replyes are damaged when red from the serial line. So
the onoly thing we will know from a :func:`probe_range` command is
whether or not there is someone in the addressed range. So the next
thing to do, if we get something back from the addressed range, is
to devide the range into two pices by shifting the range mark
right::
range mark: 00000000 01000000 00000000 (new range mark)
lower half: 10010001 00000000 00000000 (old mark gets 0)
higher half: 10010001 10000000 00000000 (old mark gets 1)
So the new "range sernos" ("range address" + "range mark") are::
lower half: 10010001 01000000 00000000 (0x914000)
higher half: 10010001 11000000 00000000 (0x91c000)
This way we recursively divide the range until we hit the last
ranges, spanning only two serial numbers. Than we can query them
directly, using the :func:`probe_module_short` command.
:param minserial: Start of the range to search (usually: 0).
:type minserial: int
:param maxserial: End of the range to search (usually: 16777215).
:type maxserial: int
:rtype: tuple
"""
sernos = list()
rng, mark = _imprange(minserial, maxserial)
self._search(rng, mark, sernos)
sernos = [x for x in sernos if x >= minserial and x <= maxserial]
sernos.sort()
return tuple(sernos)
[docs] def find_single_module(self):
""" Find a single module on the Bus.
This command is used to identify a single module on the bus
which serial number is unknown. It is a broadcast command and
serves to get the serial number of the module.
:rtype: :const:`False` or :const:`tuple` containing the serial number.
"""
package = self.cmd.get_negative_ack()
try:
self.dev.write_pkg(package)
self._wait(len(package))
bytes_recv = self.dev.read_pkg()
except DeviceError:
return False
finally:
time.sleep(self.cycle_wait)
return self.res.get_negative_ack(bytes_recv)
[docs] def probe_module_long(self, serno):
""" This command with will call up the slave which is addressed
by its serial number. In return, the slave replies with a
complete address block. It can be used to test the presence
of a module in conjunction with the quality of the bus connection.
:param serno: Serial number of the probe do connect.
:type serno: int
:rtype: :const:`bool`
"""
package = self.cmd.get_long_ack(serno)
try:
self.dev.write_pkg(package)
self._wait(len(package))
bytes_recv = self.dev.read_pkg()
except DeviceError:
return False
finally:
time.sleep(self.cycle_wait)
return self.res.get_long_ack(bytes_recv, serno)
[docs] def probe_module_short(self, serno):
"""This command will call up the slave which is addressed by its serial
number. In return, the slave replies by just one byte: The CRC of its
serial number. It is the shortest possible command without the transfer
of any data block and the only one without a complete address block. It
can be used to test the presence of a module.
:param serno: Serial number of the probe do connect.
:type serno: int
:rtype: :const:`bool`
"""
package = self.cmd.get_short_ack(serno)
try:
self.dev.write_pkg(package)
self._wait(len(package))
bytes_recv = self.dev.read_bytes(1)
except DeviceError:
return False
finally:
time.sleep(self.cycle_wait)
return self.res.get_short_ack(bytes_recv, serno)
[docs] def probe_range(self, broadcast):
""" This command is very similar to probe_module_short(). However,
it addresses not just one single serial number, but a serial
number range. This is done by setting the values of byte 4 to byte 6
of the package header to a broadcast pattern. For more details refer to
the explanation at :func:`scan`.
:param serno: Broadcast address.
:type serno: int
:rtype: :const:`bool`
"""
package = self.cmd.get_range_ack(broadcast)
self.dev.write_pkg(package)
self._wait(len(package))
bytes_recv = self.dev.read()
time.sleep(self.cycle_wait)
return self.res.get_range_ack(bytes_recv)
[docs] def get(self, serno, table, param):
"""This is the base command for getting some information from the
probes. Instead of using this command directly, it's highly recommendet
to use the higher level `API` commands in :class:`Module`. Nevertheless
here is a small example of how to use this command to request the
serial number of a probe::
>>> table = 'SYSTEM_PARAMETER_TABLE'
>>> param = 'SerialNum'
>>> serno = 33912
>>> bus.get(serno, table, param)
:param serno: Serial number of the probe to request.
:type serno: int
:param table: System table containing the requested infomation.
:type table: string
:param param: Parameter od row containing the requested infomation.
:type param: string
:rtype: :const:`bool`
"""
package = self.cmd.get_parameter(serno, table, param)
self.dev.write_pkg(package)
self._wait(len(package))
bytes_recv = self.dev.read_pkg()
time.sleep(self.cycle_wait)
return self.res.get_parameter(bytes_recv, table, param)
[docs] def set(self, serno, table, param, value, ad_param=0):
"""This is the base command for sending and storing some information in
the tables of the probes. It's the counterpart of the :func:`get`
command. Instead of using this command directly, it's highly
recommendet to use the higher level `API` commands in :class:`Module`.
Nevertheless here is a small example of how to use this command to
set the serial number of a probe::
>>> table = 'SYSTEM_PARAMETER_TABLE'
>>> param = 'SerialNum'
>>> serno = 33912
>>> new_serno = 33913
>>> bus.set(serno, table, param, [new_serno])
:param serno: Serial number of the probe to address.
:type serno: int
:param table: System table to store the infomation.
:type table: string
:param param: Parameter od row containing the requested infomation.
:type param: string
:param value: Values to store.
:type value: iterable
:rtype: :const:`bool`
"""
# pylint: disable=too-many-arguments
package = self.cmd.set_parameter(serno, table, param,
value, ad_param)
self.dev.write_pkg(package)
self._wait(len(package))
bytes_recv = self.dev.read_pkg()
time.sleep(self.cycle_wait)
return self.res.set_parameter(bytes_recv, table, serno)
[docs] def get_eeprom_page(self, serno, page_nr):
"""This is the base command for reading a single page of EEPRom data
from a particular probe. It is later used within some higher `API`
commands within the :class:`Module`-Class. For more Information please
refer to the description in the :class:`EEPRom`-Class.
:param serno: Serial number of the probe to address.
:type serno: int
:param page_nr: EEPRom Page to get.
:type page_nr: int
"""
package = self.cmd.get_epr_page(serno, page_nr)
self.dev.write_pkg(package)
self._wait(len(package))
bytes_recv = self.dev.read_pkg()
time.sleep(self.cycle_wait)
return self.res.get_epr_page(bytes_recv)
[docs] def set_eeprom_page(self, serno, page_nr, page):
"""This is the base command for writing a single page of EEPRom data
into a particular probe. It is later used within some higher `API`
commands within the :class:`Module`-Class. For more Information please
refer to the description in the :class:`EEPRom`-Class.
:param serno: Serial number of the probe to address.
:type serno: int
:param page_nr: EEPRom Page to write.
:type page_nr: int
:param page: EEPRom page data.
:type page: bytes
"""
package = self.cmd.set_epr_page(serno, page_nr, page)
self.dev.write_pkg(package)
self._wait(len(package))
bytes_recv = self.dev.read_pkg()
time.sleep(self.cycle_wait)
return self.res.set_epr_page(bytes_recv)