Hallo,
ich arbeite hier an meinem ersten Plugin zum Auslesen meines Pelletofen.
Funktion:
Serielle Verbindung aufbauen
Handshake/Test ob sich der Richtige meldet
Sende einen Buchstaben über seriellen Port, lese Rückmeldung.
Das Plugin findet sich unten.
Es läuft aber beim Initialisieren schon etwas falsch:
Sieht jemand den Fehler?
Gruß&Danke,
Hendrik
Conf:
ich arbeite hier an meinem ersten Plugin zum Auslesen meines Pelletofen.
Funktion:
Serielle Verbindung aufbauen
Handshake/Test ob sich der Richtige meldet
Sende einen Buchstaben über seriellen Port, lese Rückmeldung.
Das Plugin findet sich unten.
Es läuft aber beim Initialisieren schon etwas falsch:
2014-05-02 21:09:10,022 ERROR Main Plugin lohberger exception: invalid token (__init__.py, line 104) -- plugin.py:__init__
Gruß&Danke,
Hendrik
Code:
#!/usr/bin/env python3
# vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab
#########################################################################
# Copyright 2013 KNX-User-Forum e.V. https://knx-user-forum.de/
#########################################################################
# This file is part of SmartHome.py. http://mknx.github.io/smarthome/
#
# SmartHome.py is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# SmartHome.py is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with SmartHome.py. If not, see <http://www.gnu.org/licenses/>.
#########################################################################
import logging
logger = logging.getLogger('')
class Plugin():
def __init__(self, smarthome, port, cycle=300):
self._sh = smarthome
self._cycle = int(cycle)
self._is_connected = False
self._lock = threading.Lock()
self.refresh_cycle = self._cycle
befehle=['V','j','W','F','X','I','J','d','i','2','L','p','v','4','3','1','5','O','6','7','8','9','a','A','b','B','c','C','f','l','g','G']
namen=['RG_Geblaese_IST','Einschub_IST','FlammT_IST','FlammT_SOLL','BackrohrT_IST','RueckbrandT_IST','I_Backrohr_IST','I_Rueckbrand_IST','KesselT_IST','KesselT_SOLL','Kessel_AUS_T','Kessel_EIN_T','Leistungsstufe_IST','Leistungsstufe_PID','Funktion_PID','Fehler','MainState','SubState','GrateState','FlapState','FlapChangeState','Zuendung','Tuer','Extern','STB','Rostkontakt','Rostmotor','Rostmotor_pos','SH','Pumpe','Luftklappe','Reserve']
self._d=dict(zip(befehle, namen))
try:
self._port = serial.Serial()
self._port.port = "/dev/ttyUSB1" # may be called something different
self._port.timeout=1
self._port.open()
except:
logger.error("Could not open {}.".format(tty))
return
else:
self._is_connected = True
self._port.write('?'.encode('latin1'))
sleep(0.3)
handshake=self._port.readline().decode('latin1')
if handshake.index("Pellets") == 10:
for i in range(1,10):
logger.debug(self._port.readline().decode('latin1')) #to clear the buffer
else:
logger.error("Unknown interface: {0}".format(interface))
def _read_lohberger(self, cmd):
if not self._is_connected:
return False
self._lock.acquire()
try:
self._port.write(cmd.encode('latin1'))
r = self._port.readline().decode('latin1')
ret=int(r.split(';')[1]) #0: repeat of command, #1 ret value, #2: CRC-CCITT (0x1D0F) http://www.lemonsoftware.eu/pycrc-library
except:
logger.warning("Problem reading from Lohberger")
ret = False
finally:
self._lock.release()
def run(self):
self.alive = True
# if you want to create child threads, do not make them daemon = True!
# They will not shutdown properly. (It's a python bug)
self._sh.scheduler.add('Lohberger', self.refresh, prio=5, cycle=self._cycle, offset=2)
def stop(self):
self._port.close()
self.alive = False
def read(self, cmd):
self._read_lohberger(self, self._d[cmd])
def parse_logic(self, logic):
pass
def parse_item(self, item):
if 'lohberger_cmd' in item.conf:
logger.debug("parse item: {0}".format(item))
return self.update_item
else:
return None
def refresh(self):
for item in self._items:
time.sleep(0.1)
lohberger_cmd = item.conf['lohberger_cmd']
value = self.request(request)
#if reading fails (i.e. at broadcast-commands) the value will not be updated
if 'command not found' not in str(value) and value is not None:
item(value, 'Lohberger', 'refresh')
if not self.alive:
break
def update_item(self, item, caller=None, source=None, dest=None):
cmd=item.conf['lohberger_cmd']:
self.read(cmd, int(item()))
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
myplugin = Plugin('Lohberger')
myplugin.run()
Code:
[Lohberger]
[[RG_Geblaese_IST]]
type = num
lohberger_cmd= RG_Geblaese_IST
[[FlammT_IST]]
type = num
lohberger_cmd= FlammT_IST


Kommentar