feature/change_sensor_handling #1

Merged
moltox merged 9 commits from feature/change_sensor_handling into main 2024-04-27 21:38:03 +00:00
16 changed files with 651 additions and 8 deletions

3
REDME.md Normal file
View File

@ -0,0 +1,3 @@
# Show output on terminal #
`minicom -b 115200 -o -D /dev/cu.usbmodem3301`

34
ambilight_sensor.py Normal file
View File

@ -0,0 +1,34 @@
from sensors import Sensors
from machine import Pin, I2C
from utime import sleep
from lib.bh1750 import BH1750
class AmbilightSensor(Sensors):
most_recent_values = []
def __init__(self, settings):
super().__init__(settings)
print(settings) # TODO remove
self.sensor_pin_int = settings['pin_int']
# self.sensor = DHT22(Pin(self.sensor_pin_int, Pin.IN, Pin.PULL_UP))
self.sensor = BH1750(I2C(0, sda=Pin(settings['pin_int_sda']), scl=Pin(settings['pin_int_scl'])))
def read(self):
try:
measurement = self.sensor.luminance(BH1750.ONCE_HIRES_1)
print("ambilight ..")
print(measurement)
self.most_recent_values = [
{
'type': 'ambilight',
'value': measurement,
'unit': '-'
},
]
except OSError:
print('Ambilight Error reading temperature/humidity. Check wires')
print()

1
bh1750.old_1/__init__.py Normal file
View File

@ -0,0 +1 @@
from .bh1750 import BH1750

118
bh1750.old_1/bh1750.py Normal file
View File

@ -0,0 +1,118 @@
# https://github.com/flrrth/pico-bh1750
import math
from micropython import const
from utime import sleep_ms
class BH1750:
"""Class for the BH1750 digital Ambient Light Sensor
The datasheet can be found at https://components101.com/sites/default/files/component_datasheet/BH1750.pdf
"""
MEASUREMENT_MODE_CONTINUOUSLY = const(1)
MEASUREMENT_MODE_ONE_TIME = const(2)
RESOLUTION_HIGH = const(0)
RESOLUTION_HIGH_2 = const(1)
RESOLUTION_LOW = const(2)
MEASUREMENT_TIME_DEFAULT = const(69)
MEASUREMENT_TIME_MIN = const(31)
MEASUREMENT_TIME_MAX = const(254)
def __init__(self, address, i2c):
self._address = address
self._i2c = i2c
self._measurement_mode = BH1750.MEASUREMENT_MODE_ONE_TIME
self._resolution = BH1750.RESOLUTION_HIGH
self._measurement_time = BH1750.MEASUREMENT_TIME_DEFAULT
self._write_measurement_time()
self._write_measurement_mode()
def configure(self, measurement_mode: int, resolution: int, measurement_time: int):
"""Configures the BH1750.
Keyword arguments:
measurement_mode -- measure either continuously or once
resolution -- return measurements in either high, high2 or low resolution
measurement_time -- the duration of a single measurement
"""
if measurement_time not in range(BH1750.MEASUREMENT_TIME_MIN, BH1750.MEASUREMENT_TIME_MAX + 1):
raise ValueError("measurement_time must be between {0} and {1}"
.format(BH1750.MEASUREMENT_TIME_MIN, BH1750.MEASUREMENT_TIME_MAX))
self._measurement_mode = measurement_mode
self._resolution = resolution
self._measurement_time = measurement_time
self._write_measurement_time()
self._write_measurement_mode()
def _write_measurement_time(self):
buffer = bytearray(1)
high_bit = 1 << 6 | self._measurement_time >> 5
low_bit = 3 << 5 | (self._measurement_time << 3) >> 3
buffer[0] = high_bit
self._i2c.writeto(self._address, buffer)
buffer[0] = low_bit
self._i2c.writeto(self._address, buffer)
def _write_measurement_mode(self):
buffer = bytearray(1)
buffer[0] = self._measurement_mode << 4 | self._resolution
self._i2c.writeto(self._address, buffer)
sleep_ms(24 if self._measurement_time == BH1750.RESOLUTION_LOW else 180)
def reset(self):
"""Clear the illuminance data register."""
self._i2c.writeto(self._address, bytearray(b'\x07'))
def power_on(self):
"""Powers on the BH1750."""
self._i2c.writeto(self._address, bytearray(b'\x01'))
def power_off(self):
"""Powers off the BH1750."""
self._i2c.writeto(self._address, bytearray(b'\x00'))
@property
def measurement(self) -> float:
"""Returns the latest measurement."""
if self._measurement_mode == BH1750.MEASUREMENT_MODE_ONE_TIME:
self._write_measurement_mode()
buffer = bytearray(2)
self._i2c.readfrom_into(self._address, buffer)
lux = (buffer[0] << 8 | buffer[1]) / (1.2 * (BH1750.MEASUREMENT_TIME_DEFAULT / self._measurement_time))
if self._resolution == BH1750.RESOLUTION_HIGH_2:
return lux / 2
else:
return lux
def measurements(self) -> float:
"""This is a generator function that continues to provide the latest measurement. Because the measurement time
is greatly affected by resolution and the configured measurement time, this function attemts to calculate the
appropriate sleep time between measurements.
Example usage:
for measurement in bh1750.measurements(): # bh1750 is an instance of this class
print(measurement)
"""
while True:
yield self.measurement
if self._measurement_mode == BH1750.MEASUREMENT_MODE_CONTINUOUSLY:
base_measurement_time = 16 if self._measurement_time == BH1750.RESOLUTION_LOW else 120
sleep_ms(math.ceil(base_measurement_time * self._measurement_time / BH1750.MEASUREMENT_TIME_DEFAULT))

62
device_info.py Normal file
View File

@ -0,0 +1,62 @@
import network
class DeviceInfo:
# Device Infos
name = "Dev Device 1"
token = "PC]-0Bmp83h7F5#U!D6KJ(A&"
server_url = 'api.growsystem.muelleronline.org'
wlan_ssid = 'Oppa-95.lan'
wlan_pw = '95%04-MM'
sensors = [
{
'type': 'moisture',
'pin_int': 26
},
{
'type': 'ambilight',
'pin_int': 8, # for compatibility only
'pin_int_sda': 8,
'pin_int_scl': 9
},
# {
# 'type': 'dht22',
# 'pin_int': 15
# }
]
read_secs = 5
# Device Infos End
wlan = network.WLAN(network.STA_IF)
def get_macaddress(self):
return self._format_mac(self.wlan.config('mac').hex())
def get_ipaddress(self):
return self.wlan.ifconfig()[0]
def get_all_device_infos(self):
return {
'name': self.name,
'mac_address': self.get_macaddress(),
'ip_address': self.get_ipaddress(),
'token': self.token}
def _format_mac(self, mac):
# Split the MAC address into pairs of two characters each
pairs = [mac[i:i+2] for i in range(0, len(mac), 2)]
# Join the pairs with colons to create the formatted MAC address
formatted_mac = ":".join(pairs)
return formatted_mac

36
dht22.py Normal file
View File

@ -0,0 +1,36 @@
from sensors import Sensors
from dht import DHT22
from machine import ADC, Pin
class TemperatureHumiditySensor(Sensors):
sensor = None
most_recent_values = []
def __init__(self, settings):
super().__init__(settings)
print("Initialize dht22 sensor. Sensor pin is: " + str(settings['pin_int']))
print(settings)
self.sensor_pin_int = settings['pin_int']
self.sensor = DHT22(Pin(self.sensor_pin_int, Pin.IN, Pin.PULL_UP))
def read(self):
try:
self.sensor.measure()
self.most_recent_values = [
{
'type': 'temperature',
'value': self.sensor.temperature(),
'unit': 'C'
},
{
'type': 'humidity',
'value': self.sensor.humidity(),
'unit': '%'
}
]
except OSError:
print('DHT22 Error reading temperature/humidity. Check wires')
print()

78
grow_system.py Normal file
View File

@ -0,0 +1,78 @@
import time
from moisture_sensor import MoistureSensor
from dht22 import TemperatureHumiditySensor
from ambilight_sensor import AmbilightSensor
from sensor_data_manager import SensorDataManager
from grow_system_api import GrowSystemApi
from device_info import DeviceInfo
class GrowSystem:
grow_system_api = GrowSystemApi()
# moisture_sensor = None
# temperature_humidity_sensor = None
sensors = []
most_recent_values = []
sensor_data_manager = None
device_id = None
device_info = DeviceInfo()
def __init__(self, settings):
for sensor in self.device_info.sensors:
print("")
print("Initialize sensor:")
print(sensor)
sensor_type = sensor['type']
if sensor_type == 'moisture':
print("Found sensor of type moisture")
self.sensors.append(MoistureSensor(sensor))
elif sensor_type == 'dht22':
print("Found sensor of type DHT22")
self.sensors.append(TemperatureHumiditySensor(sensor))
elif sensor_type == 'ambilight':
print("Found sensor of type GY302/BH1750")
self.sensors.append(AmbilightSensor(sensor))
else:
print("No sensor type configured for: " + sensor['type'])
#if not self.moisture_sensor:
# self.moisture_sensor = MoistureSensor(settings['moisture_sensor'])
#if not self.temperature_humidity_sensor:
# self.temperature_humidity_sensor = TemperatureHumiditySensor(settings['temperature_humidity_sensor'])
def start(self):
print("Say the server hello...")
result = self.grow_system_api.say_hello()
message = result['message']
if message != 'OK':
print("Device not activated. Stopping")
return
self.device_id = result['data']['device_id']
self.sensor_data_manager = SensorDataManager(self.device_id)
print("Start reading sensors ...")
while True:
# Reset data
self.most_recent_values = []
for sensor in self.sensors:
print("Read sensor of type " + sensor.type + " at pin " + str(sensor.sensor_pin_int))
sensor.read()
for measurement in sensor.most_recent_values:
print(f"Got {measurement['value']} {measurement['unit']} ({measurement['type']})")
self.most_recent_values = self.most_recent_values + sensor.most_recent_values
self.sensor_data_manager.handleData(self.most_recent_values)
time.sleep(self.device_info.read_secs)

31
grow_system_api.py Normal file
View File

@ -0,0 +1,31 @@
from http_client import HTTPClient
from device_info import DeviceInfo
import json
class GrowSystemApi:
http_client = HTTPClient()
device_info = DeviceInfo()
base_url = ''
def __init__(self):
self.base_url = self.device_info.server_url
def say_hello(self):
response = self.http_client.post(self.base_url + "/api/device", self._get_device_data())
print(response.text)
jsonResult = json.loads(response.text)
print(jsonResult)
return jsonResult;
def send_measurements(self, device_id, data):
url = self.base_url + "/api/device/" + str(device_id) + "/sensor-log"
response = self.http_client.post(url, data)
return json.loads(response.text)
def _get_device_data(self):
return self.device_info.get_all_device_infos()

35
http_client.py Normal file
View File

@ -0,0 +1,35 @@
import urequests
import json
class HTTPClient:
def __init__(self):
pass
def get(self, url):
url = 'https://' + url
try:
# headers = {'Content-Type': 'application/json'}
response = urequests.get(url)
if response.status_code == 200:
print("Data sent, got response")
return response
else:
print("Failed to get data. Status code:", response.status_code)
except Exception as e:
print("Exception occurred:", e)
def post(self, url, data):
url = 'https://' + url
try:
headers = {'Content-Type': 'application/json', 'Accept': 'application/json, text/plain, */*'}
json_data = json.dumps(data)
print("Send post request to: " + url)
response = urequests.post(url, data=json_data, headers=headers)
if response.status_code == 200:
return response
else:
print("Failed to send data. Status code:", response.status_code)
print(response.text)
except Exception as e:
print("Exception occurred:", e)

62
lib/bh1750.py Normal file
View File

@ -0,0 +1,62 @@
"""
Micropython BH1750 ambient light sensor driver.
* https://github.com/PinkInk/upylib/tree/master/bh1750
"""
from utime import sleep_ms
class BH1750():
"""Micropython BH1750 ambient light sensor driver."""
PWR_OFF = 0x00
PWR_ON = 0x01
RESET = 0x07
# modes
CONT_LOWRES = 0x13
CONT_HIRES_1 = 0x10
CONT_HIRES_2 = 0x11
ONCE_HIRES_1 = 0x20
ONCE_HIRES_2 = 0x21
ONCE_LOWRES = 0x23
# default addr=0x23 if addr pin floating or pulled to ground
# addr=0x5c if addr pin pulled high
def __init__(self, bus, addr=0x23):
self.bus = bus
self.addr = addr
self.off()
self.reset()
def off(self):
"""Turn sensor off."""
self.set_mode(self.PWR_OFF)
def on(self):
"""Turn sensor on."""
self.set_mode(self.PWR_ON)
def reset(self):
"""Reset sensor, turn on first if required."""
self.on()
self.set_mode(self.RESET)
def set_mode(self, mode):
"""Set sensor mode."""
self.mode = mode
self.bus.writeto(self.addr, bytes([self.mode]))
def luminance(self, mode):
"""Sample luminance (in lux), using specified sensor mode."""
# continuous modes
if mode & 0x10 and mode != self.mode:
self.set_mode(mode)
# one shot modes
if mode & 0x20:
self.set_mode(mode)
# earlier measurements return previous reading
sleep_ms(24 if mode in (0x13, 0x23) else 180)
data = self.bus.readfrom(self.addr, 2)
factor = 2.0 if mode in (0x11, 0x21) else 1.0
return (data[0]<<8 | data[1]) / (1.2 * factor)

57
main.py
View File

@ -1,16 +1,57 @@
# This is a sample Python script. # GrowSystem
# Author: Maik Müller (maik@muelleronlineorg)
# Press ⌃R to execute it or replace it with your code. # WIP!
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings. # This file should do only:
# - provide constants for settings
# - eventually necessary system settings
# - init wlan connection
# - Call base class, permitting the configured constants
import network
import urequests
from grow_system import GrowSystem
from wlan import WlanClient
from http_client import HTTPClient
from device_info import DeviceInfo
def print_hi(name): settings = {
# Use a breakpoint in the code line below to debug your script. 'wlan_ssid': 'Oppa-95.lan',
print(f'Hi, {name}') # Press ⌘F8 to toggle the breakpoint. 'wlan_pw': '95%04-MM',
'moisture_sensor': {
'pin_int': 26
},
'temperature_humidity_sensor': {
'pin_int': 15
},
'pump_pin_int': 24
}
def wlan_scan():
# Client-Betrieb
wlan = network.WLAN(network.STA_IF)
# WLAN-Interface aktivieren
wlan.active(True)
# WLANs ausgeben
found_wlans = wlan.scan()
return found_wlans
# Press the green button in the gutter to run the script. # Press the green button in the gutter to run the script.
if __name__ == '__main__': if __name__ == '__main__':
print_hi('PyCharm') #print(wlan_scan())
print("Connect WLAN")
wlanClient = WlanClient(settings['wlan_ssid'], settings['wlan_pw'])
wlanClient.connect()
print("")
di = DeviceInfo()
print("Device Infos:")
print(di.get_all_device_infos())
print("")
print("Start grow system")
gs = GrowSystem(settings)
gs.start()
# See PyCharm help at https://www.jetbrains.com/help/pycharm/

39
moisture_sensor.py Normal file
View File

@ -0,0 +1,39 @@
# Moisture Sensor Class
from sensors import Sensors
from machine import ADC, Pin
class MoistureSensor(Sensors):
sensor = None
most_recent_values = []
min_raw_value = None
max_raw_value = None
def __init__(self, sensor_data, min_raw_value=300, max_raw_value=65535):
super().__init__(sensor_data)
print("Initialize moisture sensor. Sensor pin is: " + str(sensor_data['pin_int']))
self.sensor_pin_int = sensor_data['pin_int']
self.min_raw_value = min_raw_value
self.max_raw_value = max_raw_value
self.sensor = ADC(Pin(self.sensor_pin_int))
def read(self):
self.most_recent_values = [
{
'type': 'moisture',
'value': self.convert_to_moisture_percentage(self.sensor.read_u16()),
'unit': '%'
},
]
def normalize_sensor_value(self, raw_value):
return (raw_value - self.min_raw_value) / (self.max_raw_value - self.min_raw_value)
def convert_to_moisture_percentage(self, raw_value):
normalized_value = self.normalize_sensor_value(raw_value)
return round(100 - normalized_value * 100, 1)

View File

@ -0,0 +1,31 @@
from machine import Pin, ADC
import time
import network
import urequests
import statistics
import secrets
sensor = ADC(Pin(26))
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(secrets.SSID, secrets.PASSWORD)
time.sleep(5)
print(wlan.isconnected())
readings = []
try:
while True:
for i in range(5):
reading = sensor.read_u16()
readings.append(reading)
print(readings)
time.sleep(1)
median_value = statistics.median(readings)
if median_value < 400:
urequests.get("https://api.telegram.org/bot"+secrets.API+"/sendMessage?text=Gary is thirsty&chat_id="+secrets.ID)
print("Message Sent")
else:
print("Gary has enough water")
time.sleep(3600)
except OSError:
print("@"*68)
print("@ Cannot connect to the Wi-Fi, please check your SSID and PASSWORD @")
print("@"*68)

20
sensor_data_manager.py Normal file
View File

@ -0,0 +1,20 @@
from grow_system_api import GrowSystemApi
# from device_info import DeviceInfo
class SensorDataManager:
device_info = None
grow_system_api = None
device_id = None
def __init__(self, device_id):
self.grow_system_api = GrowSystemApi()
# self.device_info = DeviceInfo()
self.device_id = device_id
def handleData(self, data):
jsonResponse = self.grow_system_api.send_measurements(self.device_id, data)
print("Response message: " + jsonResponse['message'])

13
sensors.py Normal file
View File

@ -0,0 +1,13 @@
class Sensors:
# this is a parent class for the Sensor classes
sensor_pin_int = -1
sensor = None
type = "unset"
def __init__(self, settings):
self.type = settings['type']
print("Initialize " + self.type + " sensor. Sensor pin is: " + str(settings['pin_int']))

39
wlan.py Normal file
View File

@ -0,0 +1,39 @@
import machine
import network
import time
# network.country('DE')
class WlanClient:
ssid = ''
pw = ''
wlan = None
# Status-LED
led_onboard = machine.Pin('LED', machine.Pin.OUT)
led_onboard.value(False)
def __init__(self, ssid, pw):
# print("Hello from wlan class")
self.ssid = ssid
self.pw = pw
self.wlan = network.WLAN(network.STA_IF)
def connect(self):
if not self.is_connected():
print('No WLAN connected. Connecting ...' + self.ssid + ' ' + self.pw)
self.wlan.active(True)
self.wlan.connect(self.ssid, self.pw)
for i in range(10):
if self.wlan.status() < 0 or self.wlan.status() >= 3:
break
time.sleep(1)
# led_value = self.led_onboard.value() == 1
# self.led_onboard.value(led_value)
if self.wlan.isconnected():
net_config = self.wlan.ifconfig()
print("NetConfig:")
print(net_config)
def is_connected(self):
return self.wlan.isconnected()