Jump to content

Verbesserungsvorschläge für mein python-Script


derAngler

Recommended Posts

Hallo,

 

nach einigen Fragen von mir hier im Forum und einigen Versuchen meinerseits bin ich jetzt soweit das mein python-Skript fertig ist.

Konkret plane ich eine Art "smartHome", bestehend aus diversen TF-Bauteilen und jeweils einem Raspberry Pi.

Im Detail geht es hier und jetzt um den "Badezimmerspiegel" im "smartHome".

Dieser erfasst diverse Werte und steuert das Licht.

 

Das Skript ist zu einem Großteil fertig, es fehlt nur noch die Möglichkeit zur direkten Kommunikation mit einer Website (zwecks Darstellung aktueller Werte, bzw. um über die Website Aktione des python-Skripts auszulösen.

Ansonsten ist aber alles fertig und scheint nach mehreren Tagen im Probebetrieb auch ohne Fehler zu laufen.

 

Da ich leider noch blutiger Anfänger in Sachen python bin und mir auch beim Thema smartHome und TinkerForge die Erfahrung fehlt, wollte ich mein Skript hier online stellen und fragen ob irgendwer einen Fehler findet.

Oder, noch besser, ob jemand Teile des Codes findet die verbessert werden könnten.

 

Also hier der Source-Code

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

__author__ = 'Mausbiber'

HOST = "localhost"
PORT = 4223
DATABASE = True
MYSQL_HOST = "192.168.16.1"
MYSQL_PORT = 3306
MYSQL_DB = "database"
MYSQL_USER = "user"
MYSQL_PW = "password"

import logging
import time
import threading
import signal
import sys

from tinkerforge.bricklet_led_strip import LEDStrip
from tinkerforge.bricklet_motion_detector import MotionDetector
from tinkerforge.ip_connection import IPConnection
from tinkerforge.bricklet_ambient_light import AmbientLight
from tinkerforge.bricklet_temperature import Temperature
from tinkerforge.bricklet_humidity import Humidity


import pymysql


def set_exit_handler(func):
    signal.signal(signal.SIGTERM, func)


def on_exit(sig, func=None):
    logger.info('... Exit Handler')
    sys.exit(1)


class BrickletHumidity:
    humidity = 0.0
    __timer_is_running = False
    __timer_intervall = 0

    def __init__(self, uid):
        self.bricklet = Humidity(uid, tf_connection)
        logger.debug('Humidity ........... Bricklet initialisiert')

    def set_callback(self, timeframe):
        self.bricklet.set_humidity_callback_period(timeframe)
        self.bricklet.register_callback(self.bricklet.CALLBACK_HUMIDITY, self.__humidity_changed)

    def set_timer(self, intervall):
        if intervall == 0:
            self.__timer_is_running = False
            self.__timer_intervall = 0
        else:
            self.__timer_is_running = True
            self.__timer_intervall = intervall
            __timer = threading.Timer(intervall, self.__timer_tasks).start()

    def read(self):
        self.__humidity_changed(self.bricklet.get_humidity())

    def __timer_tasks(self):
        uhrzeit = time.strftime('%Y-%m-%d %H:%M:%S')
        if DATABASE:
            mysql_cursor.execute(
                "INSERT INTO sensor_data_humidity (timestamp , data , sensors_id) VALUES (%s , %s , %s)",
                (uhrzeit, self.humidity, 2))
        if self.__timer_is_running:
            __timer = threading.Timer(self.__timer_intervall, self.__timer_tasks).start()

    def __humidity_changed(self, rh):
        self.humidity = (rh / 10.0)
        logger.debug('Humidity ........... Luftfeuchtigkeit %s %%', self.humidity)


class BrickletTemperature:
    temperature = 0.0
    __timer_is_running = False
    __timer_intervall = 0

    def __init__(self, uid):
        self.bricklet = Temperature(uid, tf_connection)
        logger.debug('Temperature ........ Bricklet initialisiert')

    def set_timer(self, intervall):
        if intervall == 0:
            self.__timer_is_running = False
            self.__timer_intervall = 0
        else:
            self.__timer_is_running = True
            self.__timer_intervall = intervall
            __timer = threading.Timer(intervall, self.__timer_tasks).start()

    def __timer_tasks(self):
        uhrzeit = time.strftime('%Y-%m-%d %H:%M:%S')
        if DATABASE:
            mysql_cursor.execute(
                "INSERT INTO sensor_data_temperature (timestamp , data , sensors_id) VALUES (%s , %s , %s)",
                (uhrzeit, self.temperature, 1))
        if self.__timer_is_running:
            __timer = threading.Timer(self.__timer_intervall, self.__timer_tasks).start()

    def set_callback(self, timeframe=5000):
        self.bricklet.set_temperature_callback_period(timeframe)
        self.bricklet.register_callback(self.bricklet.CALLBACK_TEMPERATURE, self.__temperature_changed)

    def read(self):
        self.__temperature_changed(self.bricklet.get_temperature())

    def __temperature_changed(self, te):
        self.temperature = (te / 100.0)
        logger.debug('Temperatur ......... Temperatur %s °C', self.temperature)


class BrickletAmbientLight:
    ambient_light = 0.0
    __timer_is_running = False
    __timer_intervall = 0

    def __init__(self, uid):
        self.bricklet = AmbientLight(uid, tf_connection)
        logger.debug('Ambient Light ...... Bricklet initialisiert')

    def set_timer(self, intervall):
        if intervall == 0:
            self.__timer_is_running = False
            self.__timer_intervall = 0
        else:
            self.__timer_is_running = True
            self.__timer_intervall = intervall
            __timer = threading.Timer(intervall, self.__timer_tasks).start()

    def __timer_tasks(self):
        uhrzeit = time.strftime('%Y-%m-%d %H:%M:%S')
        if DATABASE:
            mysql_cursor.execute(
                "INSERT INTO sensor_data_ambient_light (timestamp , data , sensors_id) VALUES (%s , %s , %s)",
                (uhrzeit, self.ambient_light, 3))
        if self.__timer_is_running:
            __timer = threading.Timer(self.__timer_intervall, self.__timer_tasks).start()

    def set_callback(self, timeframe=5000):
        self.bricklet.set_illuminance_callback_period(timeframe)
        self.bricklet.register_callback(self.bricklet.CALLBACK_ILLUMINANCE, self.__ambient_changed)

    def read(self):
        self.__ambient_changed(self.bricklet.get_illuminance())

    def __ambient_changed(self, il):
        self.ambient_light = (il / 10.0)
        logger.debug('Ambient Light ...... Lichtstärke %s lx', self.ambient_light)


class BrickletLED:
    led_anzahl = 0
    __led_value_new = [0, 0, 0]
    __led_value_old = [0, 0, 0]
    __led_timer = 0
    __frame_duration = 0
    __difference_red = 0
    __difference_green = 0
    __difference_blue = 0
    __verlauf_schritte = 0
    __timer_is_running = False
    __timer_intervall = 0

    def __init__(self, uid):
        # generate object
        self.bricklet = LEDStrip(uid, tf_connection)
        logger.debug('LED-Strip .......... Bricklet initialisiert')

    def set_timer(self, intervall):
        if intervall == 0:
            self.__timer_is_running = False
            self.__timer_intervall = 0
        else:
            self.__timer_is_running = True
            self.__timer_intervall = intervall
            __timer = threading.Timer(intervall, self.__timer_tasks).start()

    def __timer_tasks(self):
        if not Motion_bricklet.motion_detected:
            self.set_color(self.__led_value_new[0], self.__led_value_new[1], self.__led_value_new[2])
        if self.__timer_is_running:
            __timer = threading.Timer(self.__timer_intervall, self.__timer_tasks).start()

    def set_color(self, r, g, b):
        # sending color information to the LED's
        __value_red = [r] * 16
        __value_green = [g] * 16
        __value_blue = [b] * 16
        # set LED Color, one call for maximum 16 LED's
        for x in range(0, self.led_anzahl, 16):
            # count LED's, maximum is 16
            length = min(self.led_anzahl - x, 16)
            # sending data
            self.bricklet.set_rgb_values(x, length, __value_red, __value_green, __value_blue)

    def set_callback(self, frames_per_second):
        # calculating frame duration in ms
        self.__frame_duration = int(1000 / frames_per_second)
        # setting frame duration
        self.bricklet.set_frame_duration(self.__frame_duration)
        # setting Callback
        self.bricklet.register_callback(self.bricklet.CALLBACK_FRAME_RENDERED, self.__frame_rendered)
        logger.debug('LED-Strip .......... Callback gesetzt')

    def __frame_rendered(self, arg):
        if self.__led_timer < self.__verlauf_schritte:
            self.__led_timer += 1
            if self.__difference_red >= 0:
                rgb_faktor_red = ((self.__difference_red + 1) ** (self.__led_timer / self.__verlauf_schritte)) - 1
                rgb_faktor_red = round(self.__led_value_old[0] + rgb_faktor_red)
            else:
                rgb_faktor_red = ((abs(self.__difference_red) + 1) ** (
                    1 - (self.__led_timer / self.__verlauf_schritte))) - 1
                rgb_faktor_red = round(self.__led_value_new[0] + rgb_faktor_red)
            if self.__difference_green >= 0:
                rgb_faktor_green = ((self.__difference_green + 1) ** (self.__led_timer / self.__verlauf_schritte)) - 1
                rgb_faktor_green = round(self.__led_value_old[1] + rgb_faktor_green)
            else:
                rgb_faktor_green = ((abs(self.__difference_green) + 1) ** (
                    1 - (self.__led_timer / self.__verlauf_schritte))) - 1
                rgb_faktor_green = round(self.__led_value_new[1] + rgb_faktor_green)
            if self.__difference_blue >= 0:
                rgb_faktor_blue = ((self.__difference_blue + 1) ** (self.__led_timer / self.__verlauf_schritte)) - 1
                rgb_faktor_blue = round(self.__led_value_old[2] + rgb_faktor_blue)
            else:
                rgb_faktor_blue = ((abs(self.__difference_blue) + 1) ** (
                    1 - (self.__led_timer / self.__verlauf_schritte))) - 1
                rgb_faktor_blue = round(self.__led_value_new[2] + rgb_faktor_blue)
            logger.debug('LED-Strip .......... (%s) r=%s g=%s b=%s', self.__led_timer, rgb_faktor_red, rgb_faktor_green,
                         rgb_faktor_blue)
            self.set_color(rgb_faktor_red, rgb_faktor_green, rgb_faktor_blue)

    def set_verlauf(self, r, g, b, seconds):
        self.__led_value_old = [self.__led_value_new[0], self.__led_value_new[1], self.__led_value_new[2]]
        self.__led_value_new = [r, g, b]
        self.__difference_red = self.__led_value_new[0] - self.__led_value_old[0]
        self.__difference_green = self.__led_value_new[1] - self.__led_value_old[1]
        self.__difference_blue = self.__led_value_new[2] - self.__led_value_old[2]
        self.__verlauf_schritte = seconds * (1000 / self.__frame_duration)
        self.__led_timer = 0
        logger.info('LED-Strip .......... Verlauf gestartet - %s Schritte (%s Sekunden)', self.__verlauf_schritte,
                    seconds)
        logger.debug('LED-Strip .......... Start %s|%s|%s >> Ziel %s|%s|%s', self.__led_value_old[0],
                     self.__led_value_old[1], self.__led_value_old[2], self.__led_value_new[0], self.__led_value_new[1],
                     self.__led_value_new[2])
        self.set_color(self.__led_value_old[0], self.__led_value_old[1], self.__led_value_old[2])

    def set_uhrzeit_farbe(self):
        timestamp = time.gmtime()
        stunde = timestamp.tm_hour
        if (stunde >= 4) and (stunde < 10):
            # ORANGE     6.00 - 12.00 =>  4.00 - 10.00
            self.set_verlauf(175, 120, 0, 6)
            logger.info('LED-Strip .......... Farbeinstellung auf ORANGE')
        elif (stunde >= 10) and (stunde < 16):
            # WEISS     12.00 - 18.00 => 10.00 - 16.00
            self.set_verlauf(150, 175, 150, 6)
            logger.info('LED-Strip .......... Farbeinstellung auf WEISS')
        elif (stunde >= 16) and (stunde < 22):
            # GRÜN      18.00 -  0.00 => 16.00 - 22.00
            self.set_verlauf(0, 175, 0, 6)
            logger.info('LED-Strip .......... Farbeinstellung auf GRÜN')
        elif ((stunde >= 22) and (stunde < 24)) or ((stunde >= 0) and (stunde < 4)):
            # BLAU       0.00 -  6.00 => 22.00 -  4.00
            self.set_verlauf(0, 0, 175, 6)
            logger.info('LED-Strip .......... Farbeinstellung auf BLAU')


class BrickletMotion:
    motion_detected = False

    def __init__(self, uid):
        self.bricklet = MotionDetector(uid, tf_connection)
        logger.debug('Motion Detection ... Bricklet initialisiert')

    def set_callback(self):
        self.bricklet.register_callback(self.bricklet.CALLBACK_MOTION_DETECTED, self.__motion_on)
        self.bricklet.register_callback(self.bricklet.CALLBACK_DETECTION_CYCLE_ENDED, self.__motion_off)

    def __motion_on(self):
        logger.info('Motion Detection ... Jemand BETRITT das Bad - Licht %s Lux', BrickletAmbientLight.ambient_light)
        uhrzeit = time.strftime('%Y-%m-%d %H:%M:%S')
        if DATABASE:
            mysql_cursor.execute("INSERT INTO sensor_data_motion (timestamp , data , sensors_id) VALUES (%s , %s , %s)",
                                 (uhrzeit, True, 4))
        self.motion_detected = True
        if BrickletAmbientLight.ambient_light < 100.0:
            LED_bricklet.set_uhrzeit_farbe()
        else:
            logger.debug('Motion Detection ... keine LED weil zu hell - Licht %s Lux',
                         BrickletAmbientLight.ambient_light)

    def __motion_off(self):
        logger.info('Motion Detection ... Jemand VERLAESST das Bad')
        uhrzeit = time.strftime('%Y-%m-%d %H:%M:%S')
        if DATABASE:
            mysql_cursor.execute("INSERT INTO sensor_data_motion (timestamp , data , sensors_id) VALUES (%s , %s , %s)",
                                 (uhrzeit, False, 4))
        self.motion_detected = False
        if Ambient_bricklet.ambient_light < 100.0:
            LED_bricklet.set_verlauf(0, 0, 0, 1)
        else:
            LED_bricklet.set_color(0, 0, 0)


def set_logging(logging_daemon):
    # Logging auf Console
    #console_handler = logging.StreamHandler()
    #formatter = logging.Formatter('%(asctime)s : %(message)s', '%Y-%m-%d %H:%M:%S')
    #console_handler.setFormatter(formatter)
    #logging_daemon.addHandler(console_handler)
    # Logging in Datei
    file_handler = logging.FileHandler("/var/log/raspi_badezimmer.log", mode='w', encoding=None, delay=False)
    formatter = logging.Formatter('%(asctime)s : %(message)s', '%Y-%m-%d %H:%M:%S')
    file_handler.setFormatter(formatter)
    logging_daemon.addHandler(file_handler)


if __name__ == "__main__":
    set_exit_handler(on_exit)

    # Logging Deamon erzeugen
    logger = logging.getLogger('raspi_server')
    logger.setLevel(logging.INFO)
    set_logging(logger)

    # MySQL Verbindung herstellen
    if DATABASE:
        mysql_connection = pymysql.connect(host=MYSQL_HOST, port=MYSQL_PORT, user=MYSQL_USER, passwd=MYSQL_PW,
                                           db=MYSQL_DB, autocommit=True)
        mysql_cursor = mysql_connection.cursor()
        logger.info('... mySQL Verbindung online')

    # Tinkerforge Objekt erzeugen
    tf_connection = IPConnection()

    # TF Bricklets initialisieren
    LED_bricklet = BrickletLED("jGU")
    Motion_bricklet = BrickletMotion("jX1")
    Ambient_bricklet = BrickletAmbientLight("jy1")
    Temperature_bricklet = BrickletTemperature("dXW")
    Humidity_bricklet = BrickletHumidity("kdt")

    # Mit Tinkerforge berbinden
    tf_connection.connect(HOST, PORT)

    # TF Bricklets konfigurieren
    LED_bricklet.led_anzahl = 60

    Motion_bricklet.set_callback()
    Ambient_bricklet.set_callback(10000)
    Temperature_bricklet.set_callback(10000)
    Humidity_bricklet.set_callback(10000)
    LED_bricklet.set_callback(25)

    Ambient_bricklet.read()
    Temperature_bricklet.read()
    Humidity_bricklet.read()

    Ambient_bricklet.set_timer(300)
    Temperature_bricklet.set_timer(120)
    Humidity_bricklet.set_timer(120)
    LED_bricklet.set_timer(60)

    logger.info('... Tinkerforge online')

    input('Press key to exit\n')

    tf_connection.disconnect()
    if DATABASE:
        mysql_cursor.close()
        mysql_connection.close()

 

Ich habe versucht Daten+Logik jeweils in Klassen zu kapseln.

Zum einen finde ich den Code dadurch besser zum lesen, zum anderen ist es so leichter den Code wiederzuverwenden (bei den anderen Teilen meines "smartHome").

 

Was mir wirklich helfen würde wären wie gesagt Verbesserungsvorschläge.

Und, richtig toll wäre es, wenn mir jemand sagen könnte wie ich es schaffe das Skript mit meiner Website kommunizieren zu lassen.

Link to comment
Share on other sites

Ich würde die Website direkt mit PHP / MySQL erstellen.

Wird derzeit so gemacht, z.Bsp kann ich schon einen Temperatur-Chart aus der Datenbank erstellen.

Allerdings nutze ich dazu auch noch bootstrap (demnächst eventuell node.js).

 

Die Bibliothek für PHP bietet ebenfalls TF.

Das Python-Skript und die Website befinden sich auf verschiedenen Rechner und ich will nicht die gesamte Logik in php auf der Website programmieren.

Vorallem würde das nur für den TF-Teil funktionieren, die Abfrage der GPIO-Ports würde so z.Bsp. nicht funktionieren (und bei anderen Teilen des "smartHome"-Projekts muss ich genau das machen).

Ich muss also weiterhin ein Skript auf dem RasPi laufen lassen.

 

Oder halt nur den letzten Wert aus der Datenbank auslesen und anzeigen.
Diese "Notlösung" ist mir auch schon eingefallen und wurde direkt verworfen, das ist zu billig.

Außerdem will ich ja auch Buttons auf der HP haben, welche Aktionen im python-Skript auslösen.

Da braucht es dann schon mehr.

Link to comment
Share on other sites

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...