add pico stand

This commit is contained in:
Maik Müller 2024-05-04 15:32:26 +02:00
parent 389996f0b3
commit f8a7f47fbf
15 changed files with 579 additions and 12 deletions

View File

@ -0,0 +1,34 @@
from sensors import Sensors
from machine import Pin, I2C
from utime import sleep
from lib.bh1750 import BH1750
class AmbilightSensor(Sensors):
most_recent_values = []
def __init__(self, settings):
super().__init__(settings)
print(settings) # TODO remove
self.sensor_pin_int = settings['pin_int']
# self.sensor = DHT22(Pin(self.sensor_pin_int, Pin.IN, Pin.PULL_UP))
self.sensor = BH1750(I2C(0, sda=Pin(settings['pin_int_sda']), scl=Pin(settings['pin_int_scl'])))
def read(self):
try:
measurement = self.sensor.luminance(BH1750.ONCE_HIRES_1)
print("ambilight ..")
print(measurement)
self.most_recent_values = [
{
'type': 'ambilight',
'value': measurement,
'unit': '-'
},
]
except OSError:
print('Ambilight Error reading temperature/humidity. Check wires')
print()

View File

@ -0,0 +1,62 @@
"""
Micropython BH1750 ambient light sensor driver.
* https://github.com/PinkInk/upylib/tree/master/bh1750
"""
from utime import sleep_ms
class BH1750():
"""Micropython BH1750 ambient light sensor driver."""
PWR_OFF = 0x00
PWR_ON = 0x01
RESET = 0x07
# modes
CONT_LOWRES = 0x13
CONT_HIRES_1 = 0x10
CONT_HIRES_2 = 0x11
ONCE_HIRES_1 = 0x20
ONCE_HIRES_2 = 0x21
ONCE_LOWRES = 0x23
# default addr=0x23 if addr pin floating or pulled to ground
# addr=0x5c if addr pin pulled high
def __init__(self, bus, addr=0x23):
self.bus = bus
self.addr = addr
self.off()
self.reset()
def off(self):
"""Turn sensor off."""
self.set_mode(self.PWR_OFF)
def on(self):
"""Turn sensor on."""
self.set_mode(self.PWR_ON)
def reset(self):
"""Reset sensor, turn on first if required."""
self.on()
self.set_mode(self.RESET)
def set_mode(self, mode):
"""Set sensor mode."""
self.mode = mode
self.bus.writeto(self.addr, bytes([self.mode]))
def luminance(self, mode):
"""Sample luminance (in lux), using specified sensor mode."""
# continuous modes
if mode & 0x10 and mode != self.mode:
self.set_mode(mode)
# one shot modes
if mode & 0x20:
self.set_mode(mode)
# earlier measurements return previous reading
sleep_ms(24 if mode in (0x13, 0x23) else 180)
data = self.bus.readfrom(self.addr, 2)
factor = 2.0 if mode in (0x11, 0x21) else 1.0
return (data[0]<<8 | data[1]) / (1.2 * factor)

View File

@ -0,0 +1,69 @@
import network
class DeviceInfo:
# Device Infos
name = "Dev Device 1"
token = "PC]-0Bmp83h7F5#U!D6KJ(A&"
server_url = 'api.growsystem.muelleronline.org'
wlan_ssid = 'Oppa-95.lan'
wlan_pw = '95%04-MM'
sensors = [
{
'type': 'moisture',
'pin_int': 26
},
{
'type': 'ambilight',
'pin_int': 8, # for compatibility only
'pin_int_sda': 8,
'pin_int_scl': 9
},
# {
# 'type': 'dht22',
# 'pin_int': 15
# }
]
engines = [
{
'type': 'pump',
'pins': [15]
}
]
read_secs = 5
# Device Infos End
wlan = network.WLAN(network.STA_IF)
def get_macaddress(self):
return self._format_mac(self.wlan.config('mac').hex())
def get_ipaddress(self):
return self.wlan.ifconfig()[0]
def get_all_device_infos(self):
return {
'name': self.name,
'mac_address': self.get_macaddress(),
'ip_address': self.get_ipaddress(),
'token': self.token}
def _format_mac(self, mac):
# Split the MAC address into pairs of two characters each
pairs = [mac[i:i+2] for i in range(0, len(mac), 2)]
# Join the pairs with colons to create the formatted MAC address
formatted_mac = ":".join(pairs)
return formatted_mac

View File

@ -0,0 +1,36 @@
from sensors import Sensors
from dht import DHT22
from machine import ADC, Pin
class TemperatureHumiditySensor(Sensors):
sensor = None
most_recent_values = []
def __init__(self, settings):
super().__init__(settings)
print("Initialize dht22 sensor. Sensor pin is: " + str(settings['pin_int']))
print(settings)
self.sensor_pin_int = settings['pin_int']
self.sensor = DHT22(Pin(self.sensor_pin_int, Pin.IN, Pin.PULL_UP))
def read(self):
try:
self.sensor.measure()
self.most_recent_values = [
{
'type': 'temperature',
'value': self.sensor.temperature(),
'unit': 'C'
},
{
'type': 'humidity',
'value': self.sensor.humidity(),
'unit': '%'
}
]
except OSError:
print('DHT22 Error reading temperature/humidity. Check wires')
print()

View File

@ -0,0 +1,23 @@
from machine import Pin
class Engine():
pins = []
engine = None
def __init__(self, engine):
print("Hello from Engine parent class")
print(engine)
self.pins = engine['pins']
self.engine = Pin(self.pins[0], Pin.OUT)
def on(self):
print("engine on")
self.engine.value(1)
def off(self):
print("engine off")
self.engine.value(0)

View File

@ -0,0 +1,96 @@
import time
from moisture_sensor import MoistureSensor
from dht22 import TemperatureHumiditySensor
from ambilight_sensor import AmbilightSensor
from pump import Pump
from sensor_data_manager import SensorDataManager
from grow_system_api import GrowSystemApi
from device_info import DeviceInfo
class GrowSystem:
grow_system_api = GrowSystemApi()
# moisture_sensor = None
# temperature_humidity_sensor = None
sensors = []
engines = []
most_recent_values = []
sensor_data_manager = None
device_id = None
device_info = DeviceInfo()
def __init__(self, settings):
# Init sensors
for sensor in self.device_info.sensors:
print("")
print("Initialize sensor:")
print(sensor)
sensor_type = sensor['type']
if sensor_type == 'moisture':
print("Found sensor of type moisture")
self.sensors.append(MoistureSensor(sensor))
elif sensor_type == 'dht22':
print("Found sensor of type DHT22")
self.sensors.append(TemperatureHumiditySensor(sensor))
elif sensor_type == 'ambilight':
print("Found sensor of type GY302/BH1750")
self.sensors.append(AmbilightSensor(sensor))
else:
print("No sensor type configured for: " + sensor['type'])
# Init engines
for engine in self.device_info.engines:
print("")
print("Initialize engine:")
print(engine)
engine_type = engine['type']
if engine_type == 'pump':
print("Found egine of type pump")
self.engines.append(Pump(engine))
self.engines[0].on()
time.sleep(15)
self.engines[0].off()
#if not self.moisture_sensor:
# self.moisture_sensor = MoistureSensor(settings['moisture_sensor'])
#if not self.temperature_humidity_sensor:
# self.temperature_humidity_sensor = TemperatureHumiditySensor(settings['temperature_humidity_sensor'])
def start(self):
print("Say the server hello...")
result = self.grow_system_api.say_hello()
message = result['message']
if message != 'OK':
print("Device not activated. Stopping")
return
self.device_id = result['data']['device_id']
self.sensor_data_manager = SensorDataManager(self.device_id)
print("Start reading sensors ...")
while True:
# Reset data
self.most_recent_values = []
for sensor in self.sensors:
print("Read sensor of type " + sensor.type + " at pin " + str(sensor.sensor_pin_int))
sensor.read()
for measurement in sensor.most_recent_values:
print(f"Got {measurement['value']} {measurement['unit']} ({measurement['type']})")
self.most_recent_values = self.most_recent_values + sensor.most_recent_values
self.sensor_data_manager.handleData(self.most_recent_values)
time.sleep(self.device_info.read_secs)

View File

@ -0,0 +1,31 @@
from http_client import HTTPClient
from device_info import DeviceInfo
import json
class GrowSystemApi:
http_client = HTTPClient()
device_info = DeviceInfo()
base_url = ''
def __init__(self):
self.base_url = self.device_info.server_url
def say_hello(self):
response = self.http_client.post(self.base_url + "/api/device", self._get_device_data())
print(response.text)
jsonResult = json.loads(response.text)
print(jsonResult)
return jsonResult;
def send_measurements(self, device_id, data):
url = self.base_url + "/api/device/" + str(device_id) + "/sensor-log"
response = self.http_client.post(url, data)
return json.loads(response.text)
def _get_device_data(self):
return self.device_info.get_all_device_infos()

View File

@ -0,0 +1,35 @@
import urequests
import json
class HTTPClient:
def __init__(self):
pass
def get(self, url):
url = 'https://' + url
try:
# headers = {'Content-Type': 'application/json'}
response = urequests.get(url)
if response.status_code == 200:
print("Data sent, got response")
return response
else:
print("Failed to get data. Status code:", response.status_code)
except Exception as e:
print("Exception occurred:", e)
def post(self, url, data):
url = 'https://' + url
try:
headers = {'Content-Type': 'application/json', 'Accept': 'application/json, text/plain, */*'}
json_data = json.dumps(data)
print("Send post request to: " + url)
response = urequests.post(url, data=json_data, headers=headers)
if response.status_code == 200:
return response
else:
print("Failed to send data. Status code:", response.status_code)
print(response.text)
except Exception as e:
print("Exception occurred:", e)

View File

@ -0,0 +1,57 @@
# GrowSystem
# Author: Maik Müller (maik@muelleronlineorg)
# WIP!
# This file should do only:
# - provide constants for settings
# - eventually necessary system settings
# - init wlan connection
# - Call base class, permitting the configured constants
import network
import urequests
from grow_system import GrowSystem
from wlan import WlanClient
from http_client import HTTPClient
from device_info import DeviceInfo
settings = {
'wlan_ssid': 'Oppa-95.lan',
'wlan_pw': '95%04-MM',
'moisture_sensor': {
'pin_int': 26
},
'temperature_humidity_sensor': {
'pin_int': 15
},
'pump_pin_int': 24
}
def wlan_scan():
# Client-Betrieb
wlan = network.WLAN(network.STA_IF)
# WLAN-Interface aktivieren
wlan.active(True)
# WLANs ausgeben
found_wlans = wlan.scan()
return found_wlans
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
#print(wlan_scan())
print("Connect WLAN")
wlanClient = WlanClient(settings['wlan_ssid'], settings['wlan_pw'])
wlanClient.connect()
print("")
di = DeviceInfo()
print("Device Infos:")
print(di.get_all_device_infos())
print("")
print("Start grow system")
gs = GrowSystem(settings)
gs.start()

View File

@ -0,0 +1,39 @@
# Moisture Sensor Class
from sensors import Sensors
from machine import ADC, Pin
class MoistureSensor(Sensors):
sensor = None
most_recent_values = []
min_raw_value = None
max_raw_value = None
def __init__(self, sensor_data, min_raw_value=300, max_raw_value=65535):
super().__init__(sensor_data)
print("Initialize moisture sensor. Sensor pin is: " + str(sensor_data['pin_int']))
self.sensor_pin_int = sensor_data['pin_int']
self.min_raw_value = min_raw_value
self.max_raw_value = max_raw_value
self.sensor = ADC(Pin(self.sensor_pin_int))
def read(self):
self.most_recent_values = [
{
'type': 'moisture',
'value': self.convert_to_moisture_percentage(self.sensor.read_u16()),
'unit': '%'
},
]
def normalize_sensor_value(self, raw_value):
return (raw_value - self.min_raw_value) / (self.max_raw_value - self.min_raw_value)
def convert_to_moisture_percentage(self, raw_value):
normalized_value = self.normalize_sensor_value(raw_value)
return round(100 - normalized_value * 100, 1)

View File

@ -0,0 +1,7 @@
from engine import Engine
class Pump(Engine):
def __init__(self, engine):
super().__init__(engine)

View File

@ -0,0 +1,20 @@
from grow_system_api import GrowSystemApi
# from device_info import DeviceInfo
class SensorDataManager:
device_info = None
grow_system_api = None
device_id = None
def __init__(self, device_id):
self.grow_system_api = GrowSystemApi()
# self.device_info = DeviceInfo()
self.device_id = device_id
def handleData(self, data):
jsonResponse = self.grow_system_api.send_measurements(self.device_id, data)
print("Response message: " + jsonResponse['message'])

View File

@ -0,0 +1,13 @@
class Sensors:
# this is a parent class for the Sensor classes
sensor_pin_int = -1
sensor = None
type = "unset"
def __init__(self, settings):
self.type = settings['type']
print("Initialize " + self.type + " sensor. Sensor pin is: " + str(settings['pin_int']))

View File

@ -0,0 +1,39 @@
import machine
import network
import time
# network.country('DE')
class WlanClient:
ssid = ''
pw = ''
wlan = None
# Status-LED
led_onboard = machine.Pin('LED', machine.Pin.OUT)
led_onboard.value(False)
def __init__(self, ssid, pw):
# print("Hello from wlan class")
self.ssid = ssid
self.pw = pw
self.wlan = network.WLAN(network.STA_IF)
def connect(self):
if not self.is_connected():
print('No WLAN connected. Connecting ...' + self.ssid + ' ' + self.pw)
self.wlan.active(True)
self.wlan.connect(self.ssid, self.pw)
for i in range(10):
if self.wlan.status() < 0 or self.wlan.status() >= 3:
break
time.sleep(1)
# led_value = self.led_onboard.value() == 1
# self.led_onboard.value(led_value)
if self.wlan.isconnected():
net_config = self.wlan.ifconfig()
print("NetConfig:")
print(net_config)
def is_connected(self):
return self.wlan.isconnected()

View File

@ -3,6 +3,7 @@
# WIP! # WIP!
# This file should do only: # This file should do only:
# - first start check and actions if not configured
# - provide constants for settings # - provide constants for settings
# - eventually necessary system settings # - eventually necessary system settings
# - init wlan connection # - init wlan connection
@ -13,6 +14,7 @@ from grow_system import GrowSystem
from wlan import WlanClient from wlan import WlanClient
from http_client import HTTPClient from http_client import HTTPClient
from device_info import DeviceInfo from device_info import DeviceInfo
from setup import Setup
settings = { settings = {
@ -40,6 +42,10 @@ def wlan_scan():
# Press the green button in the gutter to run the script. # Press the green button in the gutter to run the script.
if __name__ == '__main__': if __name__ == '__main__':
pico_setup = Setup()
pico_setup.setup_pico()
if false:
#print(wlan_scan()) #print(wlan_scan())
print("Connect WLAN") print("Connect WLAN")
wlanClient = WlanClient(settings['wlan_ssid'], settings['wlan_pw']) wlanClient = WlanClient(settings['wlan_ssid'], settings['wlan_pw'])