(aktiver) USB-Hub! Der USB-Port des Raspberry taugt für nix.
Ankündigung
Einklappen
Keine Ankündigung bisher.
Neues Plugin: Applied Sensor Air Quality Stick (Voltcraft CO-20)
Einklappen
X
-
Hallo,
Code:Dec 27 17:22:43 homeserver kernel: [189056.785500] usb 3-8.3: ep 0x81 - rounding interval to 64 microframes, ep desc says 80 microframes Dec 27 17:22:43 homeserver kernel: [189056.785504] usb 3-8.3: ep 0x2 - rounding interval to 64 microframes, ep desc says 80 microframes Dec 27 17:22:50 homeserver kernel: [189063.401639] usb 3-8.3: usbfs: process 12130 (python3) did not claim interface 0 before use Dec 27 17:23:00 homeserver kernel: [189073.411281] usb 3-8.3: usbfs: process 12129 (python3) did not claim interface 0 before use
Gruß,
Hendrik
Kommentar
-
Hi Robert,
Zitat von Robert Beitrag anzeigenJa, ist aber gar kein Problem. Mit "usb.core.find" kann man auch alle Geräte mit einer Vendor- und Device-ID finden. Dann beide wie in "run" gemacht einzeln initialisieren. Der String "*IDN?" über den 0x02 OUT-Endpoint bekommt dann über den 0x81 Endpoint u.A. die Seriennummer. Ab da kann man alles eindeutig zuordnen. Jedem Item dann noch ein optionales "iaqstick_serial" und schon läuft das. Wenn konkret Bedarf ist kann ich da unterstützen.
Code:dmesg | grep iAQ [ 1.986486] usb 3-8.3: Product: iAQ Stick [ 1.988021] hid-generic 0003:03EB:2013.0003: hiddev0,hid [186538.532734] usb 3-7.2: Product: iAQ Stick [186538.534198] hid-generic 0003:03EB:2013.0005: hiddev0,hi [186542.371531] usb 3-7.3: Product: iAQ Stick [186542.373173] hid-generic 0003:03EB:2013.0006: hiddev0,hi [186552.096350] usb 3-7.4: Product: iAQ Stick [186552.098028] hid-generic 0003:03EB:2013.0007: hiddev0,hi >>> import usb.core >>> usb.core.find(idVendor=0x03eb, idProduct=0x2013) <usb.core.Device object at 0x7f1c5de27690> >>> a=usb.core.find(idVendor=0x03eb, idProduct=0x2013) >>> usb.core.find(idVendor=0x03eb, idProduct=0x2013) KeyboardInterrupt >>> manufacturer = usb.util.get_string(a, 0x101, 0x01, 0x409) >>> manufacturer 'AppliedSensor' >>> product = usb.util.get_string(a, 0x101, 0x02, 0x409) >>> product 'iAQ Stick' >>> a.xfer_type1('*IDN?') Traceback (most recent call last): File "<stdin>", line 1, in <module> AttributeError: 'Device' object has no attribute 'xfer_type1'
Ich bin jetzt etwas ratlos, wie ich weiter machen soll...
Wenn ich dich recht verstanden habe, sucht man mit usb.core.find nach den Devices und bekommt statt einen mehrere zurück, also wahrscheinlich eine Liste?
Die muss man dann in einer Schleife durchlaufen und die Seriennummer mit der aus dem item vergleichen?
Gruß,
Hendrik
Kommentar
-
Hi!
xfertype ist von usb.core.Device - also vom USB-Stack.
Code:import usb for dev in usb.core.find(find_all=True): print "Device:", dev.filename print " idVendor: %d (%s)" % (dev.idVendor, hex(dev.idVendor)) print " idProduct: %d (%s)" % (dev.idProduct, hex(dev.idProduct))
Scheinbar könnte man sich sogar die beiden dummen "GetString" durch Zugriffe auf dev.idVendor und .idProduct sparen (bzw. halt schöner verpacken).
Für xfer musst du evtl. erstmal die Configuration setzen - meine das fehlt bei dem von dir geposteten Ausschnitt.
Grüße
Robert
edit: Quark, xfertype ist sogar eine von mir gecodete Funktion - wenn man sich schon nicht mehr erinnert... Aber du hast ja den Code auch etwas "verkürzt und abgewandelt" ;-)
Also einfach den "run"-Part für alle gefundenen dev loopen.
Kommentar
-
Auf die schnelle nur fürs Konzept - musst du mal selber debuggen:
PHP-Code:#!/usr/bin/env python3
# vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab
#########################################################################
# Copyright 2013 Robert Budde robert@projekt131.de
#########################################################################
# iAQ-Stick plugin for SmartHome.py. http://mknx.github.io/smarthome/
#
# This plugin is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This plugin is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this plugin. If not, see <http://www.gnu.org/licenses/>.
#########################################################################
#/etc/udev/rules.d/99-iaqstick.rules
#SUBSYSTEM=="usb", ATTR{idVendor}=="03eb", ATTR{idProduct}=="2013", MODE="666"
#udevadm trigger
import logging
import usb.core
import usb.util
from time import sleep
logger = logging.getLogger('iAQ_Stick')
class iAQ_Stick():
def __init__(self, smarthome, update_cycle = "10"):
self._sh = smarthome
self._update_cycle = int(update_cycle)
self._items = {}
self._devs = {}
def xfer_type1(self, dev, msg):
out_data = bytes('@{:04X}{}\n@@@@@@@@@@'.format(self._devs[dev]['type1_seq'], msg), 'utf-8')
self._devs[dev]['type1_seq'] = (self._devs[dev]['type1_seq'] + 1) & 0xFFFF
ret = dev.write(0x02, out_data[:16], self._intf, 1000)
in_data = bytes()
while True:
ret = bytes(dev.read(0x81, 0x10, self._intf, 1000))
if len(ret) == 0:
break
in_data += ret
return in_data.decode('iso-8859-1')
def xfer_type2(self, dev, msg):
out_data = bytes('@', 'utf-8') + self._devs[dev]['type2_seq'].to_bytes(1, byteorder='big') + bytes('{}\n@@@@@@@@@@@@@'.format(msg), 'utf-8')
self._devs[dev]['type2_seq'] = (self._devs[dev]['type2_seq'] + 1) if (self._devs[dev]['type2_seq'] < 0xFF) else 0x67
ret = dev.write(0x02, out_data[:16], self._intf, 1000)
in_data = bytes()
while True:
ret = bytes(dev.read(0x81, 0x10, self._intf, 1000))
if len(ret) == 0:
break
in_data += ret
return in_data
def run(self):
devs = usb.core.find(idVendor=0x03eb, idProduct=0x2013, find_all=True)
if devs is None:
logger.error('iaqstick: iAQ Stick not found')
return
self._intf = 0
for dev in devs:
try:
if dev.is_kernel_driver_active(self._intf):
dev.detach_kernel_driver(self._intf)
_dev.set_configuration(0x01)
usb.util.claim_interface(dev, self._intf)
dev.set_interface_altsetting(self._intf, 0x00)
logger.info('iaqstick: Vendor: {} - Product: {}'.format(dev.idVendor, dev.idProduct))
self._devs.append({dev: {'type1_seq':0x0001, 'type2_seq':0x67})
id = self.xfer_type1(dev, '*IDN?')
#print(id)
self._devs[dev].append({'id':id})
ret = self.xfer_type1(dev, 'KNOBPRE?')
#print(ret)
ret = self.xfer_type1(dev, 'WFMPRE?')
#print(ret)
ret = self.xfer_type1(dev, 'FLAGS?')
#print(ret)
except Exception as e:
logger.error("iaqstick: init interface failed - {}".format(e))
self.alive = True
self._sh.scheduler.add('iAQ_Stick', self._update_values, prio = 5, cycle = self._update_cycle)
logger.info("iaqstick: init successful")
def stop(self):
self.alive = False
try:
for dev in self._devs:
usb.util.release_interface(dev, self._intf)
except Exception as e:
logger.error("iaqstick: releasing interface failed - {}".format(e))
try:
self._sh.scheduler.remove('iAQ_Stick')
except Exception as e:
logger.error("iaqstick: removing iAQ_Stick from scheduler failed - {}".format(e))
def _update_values(self):
#logger.debug("iaqstick: update")
try:
for dev in self._devs:
self.xfer_type1(dev, 'FLAGGET?')
meas = self.xfer_type2(dev, '*TR')
ppm = int.from_bytes(meas[2:4], byteorder='little')
logger.debug('iaqstick: ppm: {}'.format(ppm))
#logger.debug('iaqstick: debug?: {}'.format(int.from_bytes(meas[4:6], byteorder='little')))
#logger.debug('iaqstick: PWM: {}'.format(int.from_bytes(meas[6:7], byteorder='little')))
#logger.debug('iaqstick: Rh: {}'.format(int.from_bytes(meas[7:8], byteorder='little')*0.01))
#logger.debug('iaqstick: Rs: {}'.format(int.from_bytes(meas[8:12], byteorder='little')))
id = self._devs[dev]['id']
if id in self._items:
if 'ppm' in self._items[id]:
for item in self._items[id]['ppm']['items']:
item(ppm, 'iAQ_Stick', 'USB')
if 'none' in self._items:
if 'ppm' in self._items['none']:
for item in self._items['none']['ppm']['items']:
item(ppm, 'iAQ_Stick', 'USB')
except Exception as e:
logger.error("iaqstick: update failed - {}".format(e))
def parse_item(self, item):
if 'iaqstick_info' in item.conf:
logger.debug("parse item: {0}".format(item))
if 'iaqstick_id' in item.conf:
id = item.conf['iaqstick_id'].lower()
else:
id = 'none'
info_tag = item.conf['iaqstick_info'].lower()
if not id in self._items:
self._items[id] = {'ppm': {'items': [item], 'logics': []}}
else:
self._items[id]['ppm']['items'].append(item)
return None
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
myplugin = Plugin('iAQ_Stick')
myplugin.run()
Kommentar
-
Hallo,
Erstmal danke an Robert for das Programm. Ich habs mal debugged.
Hier die lauffähige Version.
PHP-Code:#!/usr/bin/env python3
# vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab
#########################################################################
# Copyright 2013 Robert Budde robert@projekt131.de
#########################################################################
# iAQ-Stick plugin for SmartHome.py. http://mknx.github.io/smarthome/
#
# This plugin is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This plugin is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this plugin. If not, see <http://www.gnu.org/licenses/>.
#########################################################################
#/etc/udev/rules.d/99-iaqstick.rules
#SUBSYSTEM=="usb", ATTR{idVendor}=="03eb", ATTR{idProduct}=="2013", MODE="666"
#udevadm trigger
import logging
import usb.core
import usb.util
from time import sleep
logger = logging.getLogger('iAQ_Stick')
class iAQ_Stick():
def __init__(self, smarthome, update_cycle = "10"):
self._sh = smarthome
self._update_cycle = int(update_cycle)
self._items = {}
self._devs = {}
def xfer_type1(self, dev, msg):
out_data = bytes('@{:04X}{}\n@@@@@@@@@@'.format(self._devs[dev]['type1_seq'], msg), 'utf-8')
self._devs[dev]['type1_seq'] = (self._devs[dev]['type1_seq'] + 1) & 0xFFFF
ret = dev.write(0x02, out_data[:16], self._intf, 1000)
in_data = bytes()
while True:
ret = bytes(dev.read(0x81, 0x10, self._intf, 1000))
if len(ret) == 0:
break
in_data += ret
return in_data.decode('iso-8859-1')
def xfer_type2(self, dev, msg):
out_data = bytes('@', 'utf-8') + self._devs[dev]['type2_seq'].to_bytes(1, byteorder='big') + bytes('{}\n@@@@@@@@@@@@@'.format(msg), 'utf-8')
self._devs[dev]['type2_seq'] = (self._devs[dev]['type2_seq'] + 1) if (self._devs[dev]['type2_seq'] < 0xFF) else 0x67
ret = dev.write(0x02, out_data[:16], self._intf, 1000)
in_data = bytes()
while True:
ret = bytes(dev.read(0x81, 0x10, self._intf, 1000))
if len(ret) == 0:
break
in_data += ret
return in_data
def run(self):
devs = usb.core.find(idVendor=0x03eb, idProduct=0x2013, find_all=True)
#self._devs = devs
if devs is None:
logger.error('iaqstick: iAQ Stick not found')
return
self._intf = 0
for dev in devs:
try:
if dev.is_kernel_driver_active(self._intf):
dev.detach_kernel_driver(self._intf)
dev.set_configuration(0x01)
usb.util.claim_interface(dev, self._intf)
dev.set_interface_altsetting(self._intf, 0x00)
logger.info('iaqstick: Vendor: {} - Product: {}'.format(dev.idVendor, dev.idProduct))
self._devs.update({dev: {'type1_seq':0x0001, 'type2_seq':0x67}})
id = self.xfer_type1(dev, '*IDN?')
pos1 = id.find('S/N:') + 4
pos2 = id.find(';',pos1)
id = id[pos1:pos2]
logger.debug('iaqstick: id: {}'.format(id))
self._devs[dev].update({'id':id})
#print(id)
#ret = self.xfer_type1(dev, 'KNOBPRE?')
#print(ret)
#ret = self.xfer_type1(dev, 'WFMPRE?')
#print(ret)
#ret = self.xfer_type1(dev, 'FLAGS?')
#print(ret)
except Exception as e:
logger.error("iaqstick: init interface failed - {}".format(e))
self.alive = True
self._sh.scheduler.add('iAQ_Stick', self._update_values, prio = 5, cycle = self._update_cycle)
logger.info("iaqstick: init successful")
def stop(self):
self.alive = False
try:
for dev in self._devs:
usb.util.release_interface(dev, self._intf)
except Exception as e:
logger.error("iaqstick: releasing interface failed - {}".format(e))
try:
self._sh.scheduler.remove('iAQ_Stick')
except Exception as e:
logger.error("iaqstick: removing iAQ_Stick from scheduler failed - {}".format(e))
def _update_values(self):
logger.debug("iaqstick: update")
try:
for dev in self._devs:
self.xfer_type1(dev, 'FLAGGET?')
meas = self.xfer_type2(dev, '*TR')
ppm = int.from_bytes(meas[2:4], byteorder='little')
logger.debug('iaqstick: ppm: {}'.format(ppm))
#logger.debug('iaqstick: debug?: {}'.format(int.from_bytes(meas[4:6], byteorder='little')))
#logger.debug('iaqstick: PWM: {}'.format(int.from_bytes(meas[6:7], byteorder='little')))
#logger.debug('iaqstick: Rh: {}'.format(int.from_bytes(meas[7:8], byteorder='little')*0.01))
#logger.debug('iaqstick: Rs: {}'.format(int.from_bytes(meas[8:12], byteorder='little')))
id = self._devs[dev]['id']
if id in self._items:
if 'ppm' in self._items[id]:
for item in self._items[id]['ppm']['items']:
item(ppm, 'iAQ_Stick', 'USB')
if 'none' in self._items:
if 'ppm' in self._items['none']:
for item in self._items['none']['ppm']['items']:
item(ppm, 'iAQ_Stick', 'USB')
except Exception as e:
logger.error("iaqstick: update failed - {}".format(e))
def parse_item(self, item):
if 'iaqstick_info' in item.conf:
logger.debug("parse item: {0}".format(item))
if 'iaqstick_id' in item.conf:
id = item.conf['iaqstick_id']
else:
id = 'none'
info_tag = item.conf['iaqstick_info'].lower()
if not id in self._items:
self._items[id] = {'ppm': {'items': [item], 'logics': []}}
else:
self._items[id]['ppm']['items'].append(item)
return None
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
myplugin = Plugin('iAQ_Stick')
myplugin.run()
Code:[iAQ_Stick] [[PPM1]] type = num sqlite = true iaqstick_info = ppm iaqstick_id=47303735323615042017 knx_dpt = 9 knx_send = 1/5/1 knx_reply = 1/5/1 [[PPM2]] type = num sqlite = true iaqstick_info = ppm iaqstick_id=473037353236150F141D knx_dpt = 9 knx_send = 1/5/2 knx_reply = 1/5/2
Und wer keine Sticks mehr in der Apotheke bekommen hat, ich habe sie bei Velux online recht preiswert bekommen (19,50€).
Gruß und immer frische Luft ;-)
Peter.
Kommentar
-
Hallo Peter, vielen Dank für deine Mühe!
Ich habe mir erlaubt die Änderungen leicht abgewandelt ins Repo zu schieben. Die Stick-ID habe ich allerdings noch mal von Hex-Ascii nach Ascii konvertiert damit sie kürzer und besser lesbar ist. Wenn keine passende stick_id gefunden wird, wird im Log ein entsprechender Hinweis gezeigt.
Grüße
Robert
Kommentar
-
Hallo Robert,
hatte mal noch ein paar Änderungen einfließen lassen, da die Sticks nicht stabil waren. Ich hatte mir dann in anderen Implementierungen ein paar Tricks abgeschaut und nun liefen die Dinger seit Monaten ohne Probleme.
Ist jetzt auf Pull-Request im Github.
Wäre ja schade, wenn ichs für mich behalten würde ;-)
Gruß, Peter.
Kommentar
-
An die Stick User hier im Thread:
Habe auch einen. Über die AirQualitySoftware habe ich dem Stick schon beigebracht dass er nicht jedes mal neu lernt wenn man ihn einsteckt. Funktioniert soweit gut.
Aber nach längerer Zeit, auch mal erst nach 1-2 Tagen Dauerbetrieb geht der Stick auf "rot", obwohl die Luft super ist. Stick ausstecken, wieder einstecken, warten bis er mit blinken fertig ist und schon ist der Wert wieder im Keller un steigt seeeehr langsam von 450 auf einen realistischeren Wert von 700..900.
Damit wirkt der Stick nicht besonders zuverlässig.
Habt ihr das auch? Gibt's da einen Trick?
Und wer bastelinteressiert ist, schaut auch hier mal rein: https://knx-user-forum.de/forum/%C3%...ino-ausstatten
Gruß
Alex
Kommentar
-
Hallo,
@Alex: Hast du das Problem, dass der Stick regelmäßig neu gestartet werden muss lösen können?
Ich habe gerade das Problem, den Stick mit Sh-ng zum Laufen zu bekommen. Im Log finde ich:
Code:2018-10-02 21:36:07 DEBUG Main Plugins, section: iaqstick 2018-10-02 21:36:07 DEBUG Main Plugins __init__: pluginname = 'iaqstick', classpath 'plugins.iaqstick' 2018-10-02 21:36:07 INFO Main Loading '/usr/local/smarthome/plugins/iaqstick/plugin.yaml' to 'OrderedDict' 2018-10-02 21:36:07 INFO Main plugin 'iaqstick': has no parameter definitions in metadata 2018-10-02 21:36:07 INFO Main plugin 'iaqstick': has no item definitions in metadata 2018-10-02 21:36:07 DEBUG Main PluginWrapper __init__: Section iaqstick, classname iAQ_Stick, classpath plugins.iaqstick 2018-10-02 21:36:07 INFO Main Loading '/usr/local/smarthome/plugins/iaqstick/locale.yaml' to 'dict' 2018-10-02 21:36:07 INFO Main plugin 'iaqstick': No parameter definitions found in metadata 2018-10-02 21:36:07 DEBUG Main Plugins: Loaded classic-plugin 'iaqstick' (class 'iAQ_Stick') 2018-10-02 21:36:07 INFO Main Initialized plugin 'iaqstick' from from section 'iaqstick' 2018-10-02 21:36:11 DEBUG Main Starting classic-plugin from section 'iaqstick'
Ich habe auch mal das Logging auf Debug erhöht:
Code:plugins.iaqstick: level: DEBUG handlers: [file_additional_iaq] file_additional_iaq: class: logging.handlers.TimedRotatingFileHandler formatter: simple level: DEBUG when: midnight backupCount: 7 filename: ./var/log/smarthome-iaq.log encoding: utf8
Die Datei "./var/log/smarthome-iaq.log" bleibt aber leer.
Ich würde erwarten, dass dieser Code mir auf jeden Fall einen Output gibt:
Code:def run(self): devs = usb.core.find(idVendor=0x03eb, idProduct=0x2013, find_all=True) if devs is None: logger.error('iaqstick: iAQ Stick not found') return logger.debug('iaqstick: {} iAQ Stick connected'.format(len(devs))) self._intf = 0
Hendrik
Kommentar
-
Hallo,
läuft das Plugin bei jemandem aktuell?
Ich habe jetzt 2h debugging hinter mir, aber ich stecke fest.
Code:def read(self, dev): in_data = bytes() try: while True: logger.info('iaqstick: read intf {0}'.format(self._intf)) ret = bytes(dev.read(0x81, 0x10, self._intf, 1000)) if len(ret) == 0: break in_data += ret except Exception as e: logger.error("iaqstick: read - {}".format(e)) pass return in_data
Code:2018-10-06 23:36:49 INFO iaqstick iaqstick: read intf 0 2018-10-06 23:36:49 ERROR iaqstick iaqstick: read - read() takes from 3 to 4 positional arguments but 5 were given
Wo ist der Fehler?
Gruß,
Hendrik
Kommentar
-
fyi - hatte auch Problem mit dem Plugin... Hatte 4 Wochen eine Lösung mit pyhdiapi am Laufen (abgeleitet aus https://github.com/johsam/airmonitor).
Habe mir heute Abend noch einmal mit deinen Hinweisen das ursprüngliche Plugin angesehen. Mit den Änderungen, die im Code Block enthalten sind, läuft es bei mir:
Code:#!/usr/bin/env python3 # vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab ######################################################################### # Copyright 2013 Robert Budde robert@projekt131.de ######################################################################### # iAQ-Stick plugin for SmartHomeNG. https://github.com/smarthomeNG// # # This plugin is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This plugin is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this plugin. If not, see <http://www.gnu.org/licenses/>. ######################################################################### #/etc/udev/rules.d/99-iaqstick.rules #SUBSYSTEM=="usb", ATTR{idVendor}=="03eb", ATTR{idProduct}=="2013", MODE="666" #udevadm trigger import logging import usb.core import usb.util from time import sleep logger = logging.getLogger('iAQ_Stick') class iAQ_Stick(): def __init__(self, smarthome, update_cycle = "10"): self._sh = smarthome self._update_cycle = int(update_cycle) self._items = {} self._devs = {} def read(self, dev): in_data = bytes() try: while True: ret = bytes(dev.read(0x81, 0x10, 1000)) if len(ret) == 0: break in_data += ret except Exception as e: logger.error("iaqstick: read - {}".format(e)) pass return in_data def xfer_type1(self, dev, msg): out_data = bytes('@{:04X}{}\n@@@@@@@@@@'.format(self._devs[dev]['type1_seq'], msg), 'utf-8') self._devs[dev]['type1_seq'] = (self._devs[dev]['type1_seq'] + 1) & 0xFFFF ret = dev.write(0x02, out_data[:16], 1000) return self.read(dev).decode('iso-8859-1') def xfer_type2(self, dev, msg): out_data = bytes('@', 'utf-8') + self._devs[dev]['type2_seq'].to_bytes(1, byteorder='big') + bytes('{}\n@@@@@@@@@@@@@'.format(msg), 'utf-8') self._devs[dev]['type2_seq'] = (self._devs[dev]['type2_seq'] + 1) if (self._devs[dev]['type2_seq'] < 0xFF) else 0x67 ret = dev.write(0x02, out_data[:16], 1000) in_data = bytes() return self.read(dev) def _init_dev(self, dev): try: if dev.is_kernel_driver_active(self._intf): dev.detach_kernel_driver(self._intf) dev.set_configuration(0x01) usb.util.claim_interface(dev, self._intf) dev.set_interface_altsetting(self._intf, 0x00) # vendor = usb.util.get_string(dev, 0x101, 0x01, 0x409) # product = usb.util.get_string(dev, 0x101, 0x02, 0x409) vendor = usb.util.get_string(dev, dev.iManufacturer) product = usb.util.get_string(dev,dev.iProduct ) self._devs[dev] = {'type1_seq':0x0001, 'type2_seq':0x67} ret = self.xfer_type1(dev, '*IDN?') pos1 = ret.find('S/N:') + 4 id = '{:s}-{:d}'.format(bytes.fromhex(ret[pos1:pos1+12]).decode('ascii'), int(ret[pos1+14:pos1+20], 16)) logger.info('iaqstick: Vendor: {} / Product: {} / Stick-ID: {}'.format(vendor, product, id)) if (id not in self._items): logger.warning('iaqstick: no specific item for Stick-ID {} - use \'iaqstick_id\' to distinguish multiple sticks!'.format(id)) #ret = self.xfer_type1(dev, 'KNOBPRE?') #ret = self.xfer_type1(dev, 'WFMPRE?') #ret = self.xfer_type1(dev, 'FLAGS?') return id except Exception as e: logger.error("iaqstick: init interface failed - {}".format(e)) return None def run(self): devs = list(usb.core.find(idVendor=0x03eb, idProduct=0x2013, find_all=True)) if devs is None: logger.error('iaqstick: iAQ Stick not found') return logger.debug('iaqstick: {} iAQ Stick connected'.format(len(devs))) self._intf = 0 for dev in devs: id = self._init_dev(dev) if id is not None: self._devs[dev]['id'] = id self.alive = True self._sh.scheduler.add('iAQ_Stick', self._update_values, prio = 5, cycle = self._update_cycle) logger.info("iaqstick: init successful") def stop(self): self.alive = False for dev in self._devs: try: usb.util.release_interface(dev, self._intf) if dev.is_kernel_driver_active(self._intf): dev.detach_kernel_driver(self._intf) except Exception as e: logger.error("iaqstick: releasing interface failed - {}".format(e)) try: self._sh.scheduler.remove('iAQ_Stick') except Exception as e: logger.error("iaqstick: removing iAQ_Stick from scheduler failed - {}".format(e)) def _update_values(self): logger.debug("iaqstick: updating {} sticks".format(len(self._devs))) for dev in self._devs: logger.debug("iaqstick: updating {}".format(self._devs[dev]['id'])) try: self.xfer_type1(dev, 'FLAGGET?') meas = self.xfer_type2(dev, '*TR') ppm = int.from_bytes(meas[2:4], byteorder='little') logger.debug('iaqstick: ppm: {}'.format(ppm)) #logger.debug('iaqstick: debug?: {}'.format(int.from_bytes(meas[4:6], byteorder='little'))) #logger.debug('iaqstick: PWM: {}'.format(int.from_bytes(meas[6:7], byteorder='little'))) #logger.debug('iaqstick: Rh: {}'.format(int.from_bytes(meas[7:8], byteorder='little')*0.01)) #logger.debug('iaqstick: Rs: {}'.format(int.from_bytes(meas[8:12], byteorder='little'))) id = self._devs[dev]['id'] if id in self._items: if 'ppm' in self._items[id]: for item in self._items[id]['ppm']['items']: item(ppm, 'iAQ_Stick', 'USB') if '*' in self._items: if 'ppm' in self._items['*']: for item in self._items['*']['ppm']['items']: item(ppm, 'iAQ_Stick', 'USB') except Exception as e: logger.error("iaqstick: update failed - {}".format(e)) logger.error("iaqstick: Trying to recover ...") broken_id = self._devs[dev]['id'] del self._devs[dev] __devs = list(usb.core.find(idVendor=0x03eb, idProduct=0x2013, find_all=True)) for __dev in __devs: if (__dev not in self._devs): id = self._init_dev(__dev) if id == broken_id: logger.error("iaqstick: {} was ressurrected".format(id)) self._devs[__dev]['id'] = id else: logger.error("iaqstick: found other yet unknown stick: {}".format(id)) def parse_item(self, item): if 'iaqstick_info' in item.conf: logger.debug("parse item: {0}".format(item)) if 'iaqstick_id' in item.conf: id = item.conf['iaqstick_id'] else: id = '*' info_tag = item.conf['iaqstick_info'].lower() if not id in self._items: self._items[id] = {'ppm': {'items': [item], 'logics': []}} else: self._items[id]['ppm']['items'].append(item) return None if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) myplugin = Plugin('iaqstick') myplugin.run() #Application Version: 2.19.0 (Id: Form1.frm 1053 2010-06-30 11:00:09Z patrik.arven@appliedsensor.com ) # #Device 0: #Name: iAQ Stick #Firmware: 1.12p5 $Revision: 346 $ #Protocol: 5 #Hardware: C #Processor: ATmega32U4 #Serial number: S/N:48303230303415041020 #Web address: #Plot title: Air Quality Trend # #Channels: 5 #... Channel 0:CO2/VOC level #... Channel 1:Debug #... Channel 2:PWM #... Channel 3:Rh #... Channel 4:Rs #Knobs: 8 #... Knob CO2/VOC level_warn1:1000 #... Knob CO2/VOC level_warn2:1500 #... Knob Reg_Set:151 #... Knob Reg_P:3 #... Knob Reg_I:10 #... Knob Reg_D:0 #... Knob LogInterval:0 #... Knob ui16StartupBits:1 #Flags: 5 #... WARMUP=&h0000& #... BURN-IN=&h0000& #... RESET BASELINE=&h0000& #... CALIBRATE HEATER=&h0000& #... LOGGING=&h0000& # #@013E;;DEBUG: #Log: #buffer_size=&h1400; #address_base=&h4800; #readindex=&h0040; #Write index=&h0000; #nValues=&h0000; #Records=&h0000; #nValues (last)=&h0000; #uint16_t g_u16_loop_cnt_100ms=&h08D4; #;\x0A
- die von dir erwähnten 'self.intf' parameter bei 2x dev.write und 1x dev.read raus
- vendor und product Variablen mittels usb.util.get_string im _inut_dev angepasst
- im run: devs = list(usb.core.find(idVendor=0x03eb, idProduct=0x2013, find_all=True))
In meinem lokalen test environment funktioniert es mit zwei Sensoren dran.
HolgerZuletzt geändert von 2pi; 11.10.2018, 23:38.
Kommentar
Kommentar