Hallo,
ich arbeite hier an meinem ersten Plugin zum Auslesen meines Pelletofen.
Funktion:
Serielle Verbindung aufbauen
Handshake/Test ob sich der Richtige meldet
Sende einen Buchstaben über seriellen Port, lese Rückmeldung.
Das Plugin findet sich unten.
Es läuft aber beim Initialisieren schon etwas falsch:
Sieht jemand den Fehler?
Gruß&Danke,
Hendrik
Conf:
ich arbeite hier an meinem ersten Plugin zum Auslesen meines Pelletofen.
Funktion:
Serielle Verbindung aufbauen
Handshake/Test ob sich der Richtige meldet
Sende einen Buchstaben über seriellen Port, lese Rückmeldung.
Das Plugin findet sich unten.
Es läuft aber beim Initialisieren schon etwas falsch:
2014-05-02 21:09:10,022 ERROR Main Plugin lohberger exception: invalid token (__init__.py, line 104) -- plugin.py:__init__
Gruß&Danke,
Hendrik
Code:
#!/usr/bin/env python3 # vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab ######################################################################### # Copyright 2013 KNX-User-Forum e.V. https://knx-user-forum.de/ ######################################################################### # This file is part of SmartHome.py. http://mknx.github.io/smarthome/ # # SmartHome.py is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # SmartHome.py is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with SmartHome.py. If not, see <http://www.gnu.org/licenses/>. ######################################################################### import logging logger = logging.getLogger('') class Plugin(): def __init__(self, smarthome, port, cycle=300): self._sh = smarthome self._cycle = int(cycle) self._is_connected = False self._lock = threading.Lock() self.refresh_cycle = self._cycle befehle=['V','j','W','F','X','I','J','d','i','2','L','p','v','4','3','1','5','O','6','7','8','9','a','A','b','B','c','C','f','l','g','G'] namen=['RG_Geblaese_IST','Einschub_IST','FlammT_IST','FlammT_SOLL','BackrohrT_IST','RueckbrandT_IST','I_Backrohr_IST','I_Rueckbrand_IST','KesselT_IST','KesselT_SOLL','Kessel_AUS_T','Kessel_EIN_T','Leistungsstufe_IST','Leistungsstufe_PID','Funktion_PID','Fehler','MainState','SubState','GrateState','FlapState','FlapChangeState','Zuendung','Tuer','Extern','STB','Rostkontakt','Rostmotor','Rostmotor_pos','SH','Pumpe','Luftklappe','Reserve'] self._d=dict(zip(befehle, namen)) try: self._port = serial.Serial() self._port.port = "/dev/ttyUSB1" # may be called something different self._port.timeout=1 self._port.open() except: logger.error("Could not open {}.".format(tty)) return else: self._is_connected = True self._port.write('?'.encode('latin1')) sleep(0.3) handshake=self._port.readline().decode('latin1') if handshake.index("Pellets") == 10: for i in range(1,10): logger.debug(self._port.readline().decode('latin1')) #to clear the buffer else: logger.error("Unknown interface: {0}".format(interface)) def _read_lohberger(self, cmd): if not self._is_connected: return False self._lock.acquire() try: self._port.write(cmd.encode('latin1')) r = self._port.readline().decode('latin1') ret=int(r.split(';')[1]) #0: repeat of command, #1 ret value, #2: CRC-CCITT (0x1D0F) http://www.lemonsoftware.eu/pycrc-library except: logger.warning("Problem reading from Lohberger") ret = False finally: self._lock.release() def run(self): self.alive = True # if you want to create child threads, do not make them daemon = True! # They will not shutdown properly. (It's a python bug) self._sh.scheduler.add('Lohberger', self.refresh, prio=5, cycle=self._cycle, offset=2) def stop(self): self._port.close() self.alive = False def read(self, cmd): self._read_lohberger(self, self._d[cmd]) def parse_logic(self, logic): pass def parse_item(self, item): if 'lohberger_cmd' in item.conf: logger.debug("parse item: {0}".format(item)) return self.update_item else: return None def refresh(self): for item in self._items: time.sleep(0.1) lohberger_cmd = item.conf['lohberger_cmd'] value = self.request(request) #if reading fails (i.e. at broadcast-commands) the value will not be updated if 'command not found' not in str(value) and value is not None: item(value, 'Lohberger', 'refresh') if not self.alive: break def update_item(self, item, caller=None, source=None, dest=None): cmd=item.conf['lohberger_cmd']: self.read(cmd, int(item())) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) myplugin = Plugin('Lohberger') myplugin.run()
Code:
[Lohberger] [[RG_Geblaese_IST]] type = num lohberger_cmd= RG_Geblaese_IST [[FlammT_IST]] type = num lohberger_cmd= FlammT_IST
Kommentar