main #3

Merged
moltox merged 15 commits from main into dev 2024-05-07 14:16:42 +00:00
39 changed files with 0 additions and 1526 deletions
Showing only changes of commit 9397fcd8b8 - Show all commits

View File

@ -1,72 +0,0 @@
import json
import ure as re
class HttpRequest:
METHOD = {
"GET": "GET",
"POST": "POST"
}
method = ''
path = ''
raw_content = ''
headers = {}
def __init__(self, request):
self.original_request = request
# self.method = method
# self.path = path
# self.headers = headers
# self.raw_content = content
print("http request initialized")
self.parse_request(request)
def parse_request(self, request):
# Split the request into lines
lines = request.decode().split('\r\n')
# Extract method, path, and HTTP version from the first line
self.method, self.path, _ = lines[0].split()
# Parse headers
for line in lines[1:]:
if not line:
break # Empty line indicates end of headers
key, value = line.split(': ', 1)
self.headers[key] = value
# Content is assumed to be in the last line
self.raw_content = lines[-1]
def get_content_json(self):
# Parse the POST request content into a dictionary
parsed_content = {}
pairs = self.raw_content.split('&')
for pair in pairs:
key, value = pair.split('=')
parsed_content[key] = value
# Encode the values in the dictionary
encoded_content = {}
for key, value in parsed_content.items():
encoded_value = re.sub(r'%([0-9A-Fa-f]{2})', lambda m: chr(int(m.group(1), 16)), value)
encoded_content[key] = encoded_value
return encoded_content
def __str__(self):
return {
"method": self.method,
"headers": self.headers,
"content": self.raw_content,
"content_json": self.get_content_json()
}

View File

@ -1 +0,0 @@
api_url = 'https://growsystem.muellerdev.kozow.com/api/'

View File

@ -1,34 +0,0 @@
from sensors import Sensors
from machine import Pin, I2C
from utime import sleep
from lib.bh1750 import BH1750
class AmbilightSensor(Sensors):
most_recent_values = []
def __init__(self, settings):
super().__init__(settings)
print(settings) # TODO remove
self.sensor_pin_int = settings['pin_int']
# self.sensor = DHT22(Pin(self.sensor_pin_int, Pin.IN, Pin.PULL_UP))
self.sensor = BH1750(I2C(0, sda=Pin(settings['pin_int_sda']), scl=Pin(settings['pin_int_scl'])))
def read(self):
try:
measurement = self.sensor.luminance(BH1750.ONCE_HIRES_1)
print("ambilight ..")
print(measurement)
self.most_recent_values = [
{
'type': 'ambilight',
'value': measurement,
'unit': '-'
},
]
except OSError:
print('Ambilight Error reading temperature/humidity. Check wires')
print()

View File

@ -1,62 +0,0 @@
"""
Micropython BH1750 ambient light sensor driver.
* https://github.com/PinkInk/upylib/tree/master/bh1750
"""
from utime import sleep_ms
class BH1750():
"""Micropython BH1750 ambient light sensor driver."""
PWR_OFF = 0x00
PWR_ON = 0x01
RESET = 0x07
# modes
CONT_LOWRES = 0x13
CONT_HIRES_1 = 0x10
CONT_HIRES_2 = 0x11
ONCE_HIRES_1 = 0x20
ONCE_HIRES_2 = 0x21
ONCE_LOWRES = 0x23
# default addr=0x23 if addr pin floating or pulled to ground
# addr=0x5c if addr pin pulled high
def __init__(self, bus, addr=0x23):
self.bus = bus
self.addr = addr
self.off()
self.reset()
def off(self):
"""Turn sensor off."""
self.set_mode(self.PWR_OFF)
def on(self):
"""Turn sensor on."""
self.set_mode(self.PWR_ON)
def reset(self):
"""Reset sensor, turn on first if required."""
self.on()
self.set_mode(self.RESET)
def set_mode(self, mode):
"""Set sensor mode."""
self.mode = mode
self.bus.writeto(self.addr, bytes([self.mode]))
def luminance(self, mode):
"""Sample luminance (in lux), using specified sensor mode."""
# continuous modes
if mode & 0x10 and mode != self.mode:
self.set_mode(mode)
# one shot modes
if mode & 0x20:
self.set_mode(mode)
# earlier measurements return previous reading
sleep_ms(24 if mode in (0x13, 0x23) else 180)
data = self.bus.readfrom(self.addr, 2)
factor = 2.0 if mode in (0x11, 0x21) else 1.0
return (data[0]<<8 | data[1]) / (1.2 * factor)

View File

@ -1,69 +0,0 @@
import network
class DeviceInfo:
# Device Infos
name = "Dev Device 1"
token = "PC]-0Bmp83h7F5#U!D6KJ(A&"
server_url = 'api.growsystem.muelleronline.org'
wlan_ssid = 'Oppa-95.lan'
wlan_pw = '95%04-MM'
sensors = [
{
'type': 'moisture',
'pin_int': 26
},
{
'type': 'ambilight',
'pin_int': 8, # for compatibility only
'pin_int_sda': 8,
'pin_int_scl': 9
},
# {
# 'type': 'dht22',
# 'pin_int': 15
# }
]
engines = [
{
'type': 'pump',
'pins': [15]
}
]
read_secs = 5
# Device Infos End
wlan = network.WLAN(network.STA_IF)
def get_macaddress(self):
return self._format_mac(self.wlan.config('mac').hex())
def get_ipaddress(self):
return self.wlan.ifconfig()[0]
def get_all_device_infos(self):
return {
'name': self.name,
'mac_address': self.get_macaddress(),
'ip_address': self.get_ipaddress(),
'token': self.token}
def _format_mac(self, mac):
# Split the MAC address into pairs of two characters each
pairs = [mac[i:i+2] for i in range(0, len(mac), 2)]
# Join the pairs with colons to create the formatted MAC address
formatted_mac = ":".join(pairs)
return formatted_mac

View File

@ -1,36 +0,0 @@
from sensors import Sensors
from dht import DHT22
from machine import ADC, Pin
class TemperatureHumiditySensor(Sensors):
sensor = None
most_recent_values = []
def __init__(self, settings):
super().__init__(settings)
print("Initialize dht22 sensor. Sensor pin is: " + str(settings['pin_int']))
print(settings)
self.sensor_pin_int = settings['pin_int']
self.sensor = DHT22(Pin(self.sensor_pin_int, Pin.IN, Pin.PULL_UP))
def read(self):
try:
self.sensor.measure()
self.most_recent_values = [
{
'type': 'temperature',
'value': self.sensor.temperature(),
'unit': 'C'
},
{
'type': 'humidity',
'value': self.sensor.humidity(),
'unit': '%'
}
]
except OSError:
print('DHT22 Error reading temperature/humidity. Check wires')
print()

View File

@ -1,23 +0,0 @@
from machine import Pin
class Engine():
pins = []
engine = None
def __init__(self, engine):
print("Hello from Engine parent class")
print(engine)
self.pins = engine['pins']
self.engine = Pin(self.pins[0], Pin.OUT)
def on(self):
print("engine on")
self.engine.value(1)
def off(self):
print("engine off")
self.engine.value(0)

View File

@ -1,96 +0,0 @@
import time
from moisture_sensor import MoistureSensor
from dht22 import TemperatureHumiditySensor
from ambilight_sensor import AmbilightSensor
from pump import Pump
from sensor_data_manager import SensorDataManager
from grow_system_api import GrowSystemApi
from device_info import DeviceInfo
class GrowSystem:
grow_system_api = GrowSystemApi()
# moisture_sensor = None
# temperature_humidity_sensor = None
sensors = []
engines = []
most_recent_values = []
sensor_data_manager = None
device_id = None
device_info = DeviceInfo()
def __init__(self, settings):
# Init sensors
for sensor in self.device_info.sensors:
print("")
print("Initialize sensor:")
print(sensor)
sensor_type = sensor['type']
if sensor_type == 'moisture':
print("Found sensor of type moisture")
self.sensors.append(MoistureSensor(sensor))
elif sensor_type == 'dht22':
print("Found sensor of type DHT22")
self.sensors.append(TemperatureHumiditySensor(sensor))
elif sensor_type == 'ambilight':
print("Found sensor of type GY302/BH1750")
self.sensors.append(AmbilightSensor(sensor))
else:
print("No sensor type configured for: " + sensor['type'])
# Init engines
for engine in self.device_info.engines:
print("")
print("Initialize engine:")
print(engine)
engine_type = engine['type']
if engine_type == 'pump':
print("Found egine of type pump")
self.engines.append(Pump(engine))
self.engines[0].on()
time.sleep(15)
self.engines[0].off()
#if not self.moisture_sensor:
# self.moisture_sensor = MoistureSensor(settings['moisture_sensor'])
#if not self.temperature_humidity_sensor:
# self.temperature_humidity_sensor = TemperatureHumiditySensor(settings['temperature_humidity_sensor'])
def start(self):
print("Say the server hello...")
result = self.grow_system_api.say_hello()
message = result['message']
if message != 'OK':
print("Device not activated. Stopping")
return
self.device_id = result['data']['device_id']
self.sensor_data_manager = SensorDataManager(self.device_id)
print("Start reading sensors ...")
while True:
# Reset data
self.most_recent_values = []
for sensor in self.sensors:
print("Read sensor of type " + sensor.type + " at pin " + str(sensor.sensor_pin_int))
sensor.read()
for measurement in sensor.most_recent_values:
print(f"Got {measurement['value']} {measurement['unit']} ({measurement['type']})")
self.most_recent_values = self.most_recent_values + sensor.most_recent_values
self.sensor_data_manager.handleData(self.most_recent_values)
time.sleep(self.device_info.read_secs)

View File

@ -1,31 +0,0 @@
from http_client import HTTPClient
from device_info import DeviceInfo
import json
class GrowSystemApi:
http_client = HTTPClient()
device_info = DeviceInfo()
base_url = ''
def __init__(self):
self.base_url = self.device_info.server_url
def say_hello(self):
response = self.http_client.post(self.base_url + "/api/device", self._get_device_data())
print(response.text)
jsonResult = json.loads(response.text)
print(jsonResult)
return jsonResult;
def send_measurements(self, device_id, data):
url = self.base_url + "/api/device/" + str(device_id) + "/sensor-log"
response = self.http_client.post(url, data)
return json.loads(response.text)
def _get_device_data(self):
return self.device_info.get_all_device_infos()

View File

@ -1,35 +0,0 @@
import urequests
import json
class HTTPClient:
def __init__(self):
pass
def get(self, url):
url = 'https://' + url
try:
# headers = {'Content-Type': 'application/json'}
response = urequests.get(url)
if response.status_code == 200:
print("Data sent, got response")
return response
else:
print("Failed to get data. Status code:", response.status_code)
except Exception as e:
print("Exception occurred:", e)
def post(self, url, data):
url = 'https://' + url
try:
headers = {'Content-Type': 'application/json', 'Accept': 'application/json, text/plain, */*'}
json_data = json.dumps(data)
print("Send post request to: " + url)
response = urequests.post(url, data=json_data, headers=headers)
if response.status_code == 200:
return response
else:
print("Failed to send data. Status code:", response.status_code)
print(response.text)
except Exception as e:
print("Exception occurred:", e)

View File

@ -1,57 +0,0 @@
# GrowSystem
# Author: Maik Müller (maik@muelleronlineorg)
# WIP!
# This file should do only:
# - provide constants for settings
# - eventually necessary system settings
# - init wlan connection
# - Call base class, permitting the configured constants
import network
import urequests
from grow_system import GrowSystem
from wlan import WlanClient
from http_client import HTTPClient
from device_info import DeviceInfo
settings = {
'wlan_ssid': 'Oppa-95.lan',
'wlan_pw': '95%04-MM',
'moisture_sensor': {
'pin_int': 26
},
'temperature_humidity_sensor': {
'pin_int': 15
},
'pump_pin_int': 24
}
def wlan_scan():
# Client-Betrieb
wlan = network.WLAN(network.STA_IF)
# WLAN-Interface aktivieren
wlan.active(True)
# WLANs ausgeben
found_wlans = wlan.scan()
return found_wlans
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
#print(wlan_scan())
print("Connect WLAN")
wlanClient = WlanClient(settings['wlan_ssid'], settings['wlan_pw'])
wlanClient.connect()
print("")
di = DeviceInfo()
print("Device Infos:")
print(di.get_all_device_infos())
print("")
print("Start grow system")
gs = GrowSystem(settings)
gs.start()

View File

@ -1,39 +0,0 @@
# Moisture Sensor Class
from sensors import Sensors
from machine import ADC, Pin
class MoistureSensor(Sensors):
sensor = None
most_recent_values = []
min_raw_value = None
max_raw_value = None
def __init__(self, sensor_data, min_raw_value=300, max_raw_value=65535):
super().__init__(sensor_data)
print("Initialize moisture sensor. Sensor pin is: " + str(sensor_data['pin_int']))
self.sensor_pin_int = sensor_data['pin_int']
self.min_raw_value = min_raw_value
self.max_raw_value = max_raw_value
self.sensor = ADC(Pin(self.sensor_pin_int))
def read(self):
self.most_recent_values = [
{
'type': 'moisture',
'value': self.convert_to_moisture_percentage(self.sensor.read_u16()),
'unit': '%'
},
]
def normalize_sensor_value(self, raw_value):
return (raw_value - self.min_raw_value) / (self.max_raw_value - self.min_raw_value)
def convert_to_moisture_percentage(self, raw_value):
normalized_value = self.normalize_sensor_value(raw_value)
return round(100 - normalized_value * 100, 1)

View File

@ -1,7 +0,0 @@
from engine import Engine
class Pump(Engine):
def __init__(self, engine):
super().__init__(engine)

View File

@ -1,20 +0,0 @@
from grow_system_api import GrowSystemApi
# from device_info import DeviceInfo
class SensorDataManager:
device_info = None
grow_system_api = None
device_id = None
def __init__(self, device_id):
self.grow_system_api = GrowSystemApi()
# self.device_info = DeviceInfo()
self.device_id = device_id
def handleData(self, data):
jsonResponse = self.grow_system_api.send_measurements(self.device_id, data)
print("Response message: " + jsonResponse['message'])

View File

@ -1,13 +0,0 @@
class Sensors:
# this is a parent class for the Sensor classes
sensor_pin_int = -1
sensor = None
type = "unset"
def __init__(self, settings):
self.type = settings['type']
print("Initialize " + self.type + " sensor. Sensor pin is: " + str(settings['pin_int']))

View File

@ -1,39 +0,0 @@
import machine
import network
import time
# network.country('DE')
class WlanClient:
ssid = ''
pw = ''
wlan = None
# Status-LED
led_onboard = machine.Pin('LED', machine.Pin.OUT)
led_onboard.value(False)
def __init__(self, ssid, pw):
# print("Hello from wlan class")
self.ssid = ssid
self.pw = pw
self.wlan = network.WLAN(network.STA_IF)
def connect(self):
if not self.is_connected():
print('No WLAN connected. Connecting ...' + self.ssid + ' ' + self.pw)
self.wlan.active(True)
self.wlan.connect(self.ssid, self.pw)
for i in range(10):
if self.wlan.status() < 0 or self.wlan.status() >= 3:
break
time.sleep(1)
# led_value = self.led_onboard.value() == 1
# self.led_onboard.value(led_value)
if self.wlan.isconnected():
net_config = self.wlan.ifconfig()
print("NetConfig:")
print(net_config)
def is_connected(self):
return self.wlan.isconnected()

View File

@ -1,2 +0,0 @@
# GrowSystem

View File

@ -1,3 +0,0 @@
# Show output on terminal #
`minicom -b 115200 -o -D /dev/cu.usbmodem3301`

View File

@ -1,34 +0,0 @@
from sensors import Sensors
from machine import Pin, I2C
from utime import sleep
from lib.bh1750 import BH1750
class AmbilightSensor(Sensors):
most_recent_values = []
def __init__(self, settings):
super().__init__(settings)
print(settings) # TODO remove
self.sensor_pin_int = settings['pin_int']
# self.sensor = DHT22(Pin(self.sensor_pin_int, Pin.IN, Pin.PULL_UP))
self.sensor = BH1750(I2C(0, sda=Pin(settings['pin_int_sda']), scl=Pin(settings['pin_int_scl'])))
def read(self):
try:
measurement = self.sensor.luminance(BH1750.ONCE_HIRES_1)
print("ambilight ..")
print(measurement)
self.most_recent_values = [
{
'type': 'ambilight',
'value': measurement,
'unit': '-'
},
]
except OSError:
print('Ambilight Error reading temperature/humidity. Check wires')
print()

View File

@ -1 +0,0 @@
from .bh1750 import BH1750

View File

@ -1,118 +0,0 @@
# https://github.com/flrrth/pico-bh1750
import math
from micropython import const
from utime import sleep_ms
class BH1750:
"""Class for the BH1750 digital Ambient Light Sensor
The datasheet can be found at https://components101.com/sites/default/files/component_datasheet/BH1750.pdf
"""
MEASUREMENT_MODE_CONTINUOUSLY = const(1)
MEASUREMENT_MODE_ONE_TIME = const(2)
RESOLUTION_HIGH = const(0)
RESOLUTION_HIGH_2 = const(1)
RESOLUTION_LOW = const(2)
MEASUREMENT_TIME_DEFAULT = const(69)
MEASUREMENT_TIME_MIN = const(31)
MEASUREMENT_TIME_MAX = const(254)
def __init__(self, address, i2c):
self._address = address
self._i2c = i2c
self._measurement_mode = BH1750.MEASUREMENT_MODE_ONE_TIME
self._resolution = BH1750.RESOLUTION_HIGH
self._measurement_time = BH1750.MEASUREMENT_TIME_DEFAULT
self._write_measurement_time()
self._write_measurement_mode()
def configure(self, measurement_mode: int, resolution: int, measurement_time: int):
"""Configures the BH1750.
Keyword arguments:
measurement_mode -- measure either continuously or once
resolution -- return measurements in either high, high2 or low resolution
measurement_time -- the duration of a single measurement
"""
if measurement_time not in range(BH1750.MEASUREMENT_TIME_MIN, BH1750.MEASUREMENT_TIME_MAX + 1):
raise ValueError("measurement_time must be between {0} and {1}"
.format(BH1750.MEASUREMENT_TIME_MIN, BH1750.MEASUREMENT_TIME_MAX))
self._measurement_mode = measurement_mode
self._resolution = resolution
self._measurement_time = measurement_time
self._write_measurement_time()
self._write_measurement_mode()
def _write_measurement_time(self):
buffer = bytearray(1)
high_bit = 1 << 6 | self._measurement_time >> 5
low_bit = 3 << 5 | (self._measurement_time << 3) >> 3
buffer[0] = high_bit
self._i2c.writeto(self._address, buffer)
buffer[0] = low_bit
self._i2c.writeto(self._address, buffer)
def _write_measurement_mode(self):
buffer = bytearray(1)
buffer[0] = self._measurement_mode << 4 | self._resolution
self._i2c.writeto(self._address, buffer)
sleep_ms(24 if self._measurement_time == BH1750.RESOLUTION_LOW else 180)
def reset(self):
"""Clear the illuminance data register."""
self._i2c.writeto(self._address, bytearray(b'\x07'))
def power_on(self):
"""Powers on the BH1750."""
self._i2c.writeto(self._address, bytearray(b'\x01'))
def power_off(self):
"""Powers off the BH1750."""
self._i2c.writeto(self._address, bytearray(b'\x00'))
@property
def measurement(self) -> float:
"""Returns the latest measurement."""
if self._measurement_mode == BH1750.MEASUREMENT_MODE_ONE_TIME:
self._write_measurement_mode()
buffer = bytearray(2)
self._i2c.readfrom_into(self._address, buffer)
lux = (buffer[0] << 8 | buffer[1]) / (1.2 * (BH1750.MEASUREMENT_TIME_DEFAULT / self._measurement_time))
if self._resolution == BH1750.RESOLUTION_HIGH_2:
return lux / 2
else:
return lux
def measurements(self) -> float:
"""This is a generator function that continues to provide the latest measurement. Because the measurement time
is greatly affected by resolution and the configured measurement time, this function attemts to calculate the
appropriate sleep time between measurements.
Example usage:
for measurement in bh1750.measurements(): # bh1750 is an instance of this class
print(measurement)
"""
while True:
yield self.measurement
if self._measurement_mode == BH1750.MEASUREMENT_MODE_CONTINUOUSLY:
base_measurement_time = 16 if self._measurement_time == BH1750.RESOLUTION_LOW else 120
sleep_ms(math.ceil(base_measurement_time * self._measurement_time / BH1750.MEASUREMENT_TIME_DEFAULT))

View File

@ -1,69 +0,0 @@
import network
class DeviceInfo:
# Device Infos
name = "Dev Device 1"
token = "PC]-0Bmp83h7F5#U!D6KJ(A&"
server_url = 'api.growsystem.muelleronline.org'
wlan_ssid = 'Oppa-95.lan'
wlan_pw = '95%04-MM'
sensors = [
{
'type': 'moisture',
'pin_int': 26
},
{
'type': 'ambilight',
'pin_int': 8, # for compatibility only
'pin_int_sda': 8,
'pin_int_scl': 9
},
# {
# 'type': 'dht22',
# 'pin_int': 15
# }
]
engines = [
{
'type': 'pump',
'pins': [15]
}
]
read_secs = 5
# Device Infos End
wlan = network.WLAN(network.STA_IF)
def get_macaddress(self):
return self._format_mac(self.wlan.config('mac').hex())
def get_ipaddress(self):
return self.wlan.ifconfig()[0]
def get_all_device_infos(self):
return {
'name': self.name,
'mac_address': self.get_macaddress(),
'ip_address': self.get_ipaddress(),
'token': self.token}
def _format_mac(self, mac):
# Split the MAC address into pairs of two characters each
pairs = [mac[i:i+2] for i in range(0, len(mac), 2)]
# Join the pairs with colons to create the formatted MAC address
formatted_mac = ":".join(pairs)
return formatted_mac

View File

@ -1,36 +0,0 @@
from sensors import Sensors
from dht import DHT22
from machine import ADC, Pin
class TemperatureHumiditySensor(Sensors):
sensor = None
most_recent_values = []
def __init__(self, settings):
super().__init__(settings)
print("Initialize dht22 sensor. Sensor pin is: " + str(settings['pin_int']))
print(settings)
self.sensor_pin_int = settings['pin_int']
self.sensor = DHT22(Pin(self.sensor_pin_int, Pin.IN, Pin.PULL_UP))
def read(self):
try:
self.sensor.measure()
self.most_recent_values = [
{
'type': 'temperature',
'value': self.sensor.temperature(),
'unit': 'C'
},
{
'type': 'humidity',
'value': self.sensor.humidity(),
'unit': '%'
}
]
except OSError:
print('DHT22 Error reading temperature/humidity. Check wires')
print()

View File

@ -1,23 +0,0 @@
from machine import Pin
class Engine():
pins = []
engine = None
def __init__(self, engine):
print("Hello from Engine parent class")
print(engine)
self.pins = engine['pins']
self.engine = Pin(self.pins[0], Pin.OUT)
def on(self):
print("engine on")
self.engine.value(1)
def off(self):
print("engine off")
self.engine.value(0)

View File

@ -1,96 +0,0 @@
import time
from moisture_sensor import MoistureSensor
from dht22 import TemperatureHumiditySensor
from ambilight_sensor import AmbilightSensor
from pump import Pump
from sensor_data_manager import SensorDataManager
from grow_system_api import GrowSystemApi
from device_info import DeviceInfo
class GrowSystem:
grow_system_api = GrowSystemApi()
# moisture_sensor = None
# temperature_humidity_sensor = None
sensors = []
engines = []
most_recent_values = []
sensor_data_manager = None
device_id = None
device_info = DeviceInfo()
def __init__(self, settings):
# Init sensors
for sensor in self.device_info.sensors:
print("")
print("Initialize sensor:")
print(sensor)
sensor_type = sensor['type']
if sensor_type == 'moisture':
print("Found sensor of type moisture")
self.sensors.append(MoistureSensor(sensor))
elif sensor_type == 'dht22':
print("Found sensor of type DHT22")
self.sensors.append(TemperatureHumiditySensor(sensor))
elif sensor_type == 'ambilight':
print("Found sensor of type GY302/BH1750")
self.sensors.append(AmbilightSensor(sensor))
else:
print("No sensor type configured for: " + sensor['type'])
# Init engines
for engine in self.device_info.engines:
print("")
print("Initialize engine:")
print(engine)
engine_type = engine['type']
if engine_type == 'pump':
print("Found egine of type pump")
self.engines.append(Pump(engine))
self.engines[0].on()
time.sleep(15)
self.engines[0].off()
#if not self.moisture_sensor:
# self.moisture_sensor = MoistureSensor(settings['moisture_sensor'])
#if not self.temperature_humidity_sensor:
# self.temperature_humidity_sensor = TemperatureHumiditySensor(settings['temperature_humidity_sensor'])
def start(self):
print("Say the server hello...")
result = self.grow_system_api.say_hello()
message = result['message']
if message != 'OK':
print("Device not activated. Stopping")
return
self.device_id = result['data']['device_id']
self.sensor_data_manager = SensorDataManager(self.device_id)
print("Start reading sensors ...")
while True:
# Reset data
self.most_recent_values = []
for sensor in self.sensors:
print("Read sensor of type " + sensor.type + " at pin " + str(sensor.sensor_pin_int))
sensor.read()
for measurement in sensor.most_recent_values:
print(f"Got {measurement['value']} {measurement['unit']} ({measurement['type']})")
self.most_recent_values = self.most_recent_values + sensor.most_recent_values
self.sensor_data_manager.handleData(self.most_recent_values)
time.sleep(self.device_info.read_secs)

View File

@ -1,38 +0,0 @@
from http_client import HTTPClient
from device_info import DeviceInfo
import json
class GrowSystemApi:
http_client = HTTPClient()
device_info = DeviceInfo()
base_url = ''
def __init__(self):
self.base_url = self.device_info.server_url
def activate(self, config):
response = self.http_client.post(self.base_url + "/api/device/activate", self._get_device_data())
return self_get_json_encoded(response.text)
def say_hello(self):
response = self.http_client.post(self.base_url + "/api/device", self._get_device_data())
print(response.text)
jsonResult = json.loads(response.text)
print(jsonResult)
return jsonResult;
def send_measurements(self, device_id, data):
url = self.base_url + "/api/device/" + str(device_id) + "/sensor-log"
response = self.http_client.post(url, data)
return json.loads(response.text)
def _get_device_data(self):
return self.device_info.get_all_device_infos()
def _get_json_encoded(self, text):
return json.loads(text)

View File

@ -1,35 +0,0 @@
import urequests
import json
class HTTPClient:
def __init__(self):
pass
def get(self, url):
url = 'https://' + url
try:
# headers = {'Content-Type': 'application/json'}
response = urequests.get(url)
if response.status_code == 200:
print("Data sent, got response")
return response
else:
print("Failed to get data. Status code:", response.status_code)
except Exception as e:
print("Exception occurred:", e)
def post(self, url, data):
url = 'https://' + url
try:
headers = {'Content-Type': 'application/json', 'Accept': 'application/json, text/plain, */*'}
json_data = json.dumps(data)
print("Send post request to: " + url)
response = urequests.post(url, data=json_data, headers=headers)
if response.status_code == 200:
return response
else:
print("Failed to send data. Status code:", response.status_code)
print(response.text)
except Exception as e:
print("Exception occurred:", e)

View File

@ -1,62 +0,0 @@
"""
Micropython BH1750 ambient light sensor driver.
* https://github.com/PinkInk/upylib/tree/master/bh1750
"""
from utime import sleep_ms
class BH1750():
"""Micropython BH1750 ambient light sensor driver."""
PWR_OFF = 0x00
PWR_ON = 0x01
RESET = 0x07
# modes
CONT_LOWRES = 0x13
CONT_HIRES_1 = 0x10
CONT_HIRES_2 = 0x11
ONCE_HIRES_1 = 0x20
ONCE_HIRES_2 = 0x21
ONCE_LOWRES = 0x23
# default addr=0x23 if addr pin floating or pulled to ground
# addr=0x5c if addr pin pulled high
def __init__(self, bus, addr=0x23):
self.bus = bus
self.addr = addr
self.off()
self.reset()
def off(self):
"""Turn sensor off."""
self.set_mode(self.PWR_OFF)
def on(self):
"""Turn sensor on."""
self.set_mode(self.PWR_ON)
def reset(self):
"""Reset sensor, turn on first if required."""
self.on()
self.set_mode(self.RESET)
def set_mode(self, mode):
"""Set sensor mode."""
self.mode = mode
self.bus.writeto(self.addr, bytes([self.mode]))
def luminance(self, mode):
"""Sample luminance (in lux), using specified sensor mode."""
# continuous modes
if mode & 0x10 and mode != self.mode:
self.set_mode(mode)
# one shot modes
if mode & 0x20:
self.set_mode(mode)
# earlier measurements return previous reading
sleep_ms(24 if mode in (0x13, 0x23) else 180)
data = self.bus.readfrom(self.addr, 2)
factor = 2.0 if mode in (0x11, 0x21) else 1.0
return (data[0]<<8 | data[1]) / (1.2 * factor)

View File

@ -1,63 +0,0 @@
# GrowSystem
# Author: Maik Müller (maik@muelleronlineorg)
# WIP!
# This file should do only:
# - first start check and actions if not configured
# - provide constants for settings
# - eventually necessary system settings
# - init wlan connection
# - Call base class, permitting the configured constants
import network
import urequests
from grow_system import GrowSystem
from wlan import WlanClient
from http_client import HTTPClient
from device_info import DeviceInfo
from setup import Setup
settings = {
'wlan_ssid': 'Oppa-95.lan',
'wlan_pw': '95%04-MM',
'moisture_sensor': {
'pin_int': 26
},
'temperature_humidity_sensor': {
'pin_int': 15
},
'pump_pin_int': 24
}
def wlan_scan():
# Client-Betrieb
wlan = network.WLAN(network.STA_IF)
# WLAN-Interface aktivieren
wlan.active(True)
# WLANs ausgeben
found_wlans = wlan.scan()
return found_wlans
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
pico_setup = Setup()
pico_setup.setup_pico()
if false:
#print(wlan_scan())
print("Connect WLAN")
wlanClient = WlanClient(settings['wlan_ssid'], settings['wlan_pw'])
wlanClient.connect()
print("")
di = DeviceInfo()
print("Device Infos:")
print(di.get_all_device_infos())
print("")
print("Start grow system")
gs = GrowSystem(settings)
gs.start()

View File

@ -1,39 +0,0 @@
# Moisture Sensor Class
from sensors import Sensors
from machine import ADC, Pin
class MoistureSensor(Sensors):
sensor = None
most_recent_values = []
min_raw_value = None
max_raw_value = None
def __init__(self, sensor_data, min_raw_value=300, max_raw_value=65535):
super().__init__(sensor_data)
print("Initialize moisture sensor. Sensor pin is: " + str(sensor_data['pin_int']))
self.sensor_pin_int = sensor_data['pin_int']
self.min_raw_value = min_raw_value
self.max_raw_value = max_raw_value
self.sensor = ADC(Pin(self.sensor_pin_int))
def read(self):
self.most_recent_values = [
{
'type': 'moisture',
'value': self.convert_to_moisture_percentage(self.sensor.read_u16()),
'unit': '%'
},
]
def normalize_sensor_value(self, raw_value):
return (raw_value - self.min_raw_value) / (self.max_raw_value - self.min_raw_value)
def convert_to_moisture_percentage(self, raw_value):
normalized_value = self.normalize_sensor_value(raw_value)
return round(100 - normalized_value * 100, 1)

View File

@ -1,31 +0,0 @@
from machine import Pin, ADC
import time
import network
import urequests
import statistics
import secrets
sensor = ADC(Pin(26))
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(secrets.SSID, secrets.PASSWORD)
time.sleep(5)
print(wlan.isconnected())
readings = []
try:
while True:
for i in range(5):
reading = sensor.read_u16()
readings.append(reading)
print(readings)
time.sleep(1)
median_value = statistics.median(readings)
if median_value < 400:
urequests.get("https://api.telegram.org/bot"+secrets.API+"/sendMessage?text=Gary is thirsty&chat_id="+secrets.ID)
print("Message Sent")
else:
print("Gary has enough water")
time.sleep(3600)
except OSError:
print("@"*68)
print("@ Cannot connect to the Wi-Fi, please check your SSID and PASSWORD @")
print("@"*68)

View File

@ -1,7 +0,0 @@
from engine import Engine
class Pump(Engine):
def __init__(self, engine):
super().__init__(engine)

View File

@ -1,20 +0,0 @@
from grow_system_api import GrowSystemApi
# from device_info import DeviceInfo
class SensorDataManager:
device_info = None
grow_system_api = None
device_id = None
def __init__(self, device_id):
self.grow_system_api = GrowSystemApi()
# self.device_info = DeviceInfo()
self.device_id = device_id
def handleData(self, data):
jsonResponse = self.grow_system_api.send_measurements(self.device_id, data)
print("Response message: " + jsonResponse['message'])

View File

@ -1,13 +0,0 @@
class Sensors:
# this is a parent class for the Sensor classes
sensor_pin_int = -1
sensor = None
type = "unset"
def __init__(self, settings):
self.type = settings['type']
print("Initialize " + self.type + " sensor. Sensor pin is: " + str(settings['pin_int']))

View File

@ -1,113 +0,0 @@
import network
import ujson
import ure as re
import usocket as socket
import time
class Setup:
def __init__(self):
self.ap_ssid = "Growsystem"
self.ap_password = "12345678"
self.wlans = []
self.selected_ssid = ""
self.wlan_password = ""
self.pin = ""
self.html = """
<html>
<head><title>Pico Setup</title></head>
<body>
<h1>Setup Pico</h1>
<form method="post">
SSID: <select name="ssid">{}</select><br>
Password: <input type="password" name="password"><br>
PIN: <input type="text" name="pin" maxlength="4"><br>
<input type="submit" value="Submit">
</form>
</body>
</html>
"""
def scan_wlans(self):
print("Scan WLANs")
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
self.wlans = [w[0].decode() for w in wlan.scan()]
print("Found:", self.wlans)
wlan.active(False)
def start_ap_mode(self):
self.ap = network.WLAN(network.AP_IF)
self.ap.active(True)
while self.ap.active() == False:
pass
self.ap.config(essid=self.ap_ssid, password=self.ap_password)
print("AP SSID:", self.ap.config("essid"))
# print("AP Password:", self.ap.config("password"))
def stop_ap_mode(self):
ap = network.WLAN(network.AP_IF)
ap.active(False)
def start_webserver(self):
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.bind(addr)
s.listen(1)
print("Listening on", addr)
while True:
conn, addr = s.accept()
print("Got a connection from", addr)
request = conn.recv(1024)
print("Content =", request)
conn.send("HTTP/1.1 200 OK\n")
conn.send("Content-Type: text/html\n")
conn.send("Connection: close\n\n")
conn.sendall(self.html.format(''.join(['<option value="{}">{}</option>'.format(w, w) for w in self.wlans])))
conn.close()
def parse_request(self, request):
request = request.decode()
ssid_match = re.search("ssid=([^&]+)", request)
if ssid_match:
self.selected_ssid = ssid_match.group(1)
password_match = re.search("password=([^&]+)", request)
if password_match:
self.wlan_password = password_match.group(1)
pin_match = re.search("pin=([^&]+)", request)
if pin_match:
self.pin = pin_match.group(1)
def save_config(self):
config = {
"ssid": self.selected_ssid,
"password": self.wlan_password,
"pin": self.pin
}
with open("initial_config.py", "w") as f:
f.write("config = " + ujson.dumps(config))
def switch_to_client_mode(self):
ap = network.WLAN(network.AP_IF)
ap.active(False)
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(self.selected_ssid, self.wlan_password)
while not wlan.isconnected():
time.sleep(1)
print("Connected to", self.selected_ssid)
def setup_pico(self):
print("Start PICO setup")
self.scan_wlans()
self.start_ap_mode()
self.start_webserver()
while True:
conn, addr = self.s.accept()
request = conn.recv(1024)
self.parse_request(request)
self.save_config()
conn.close()
break # Assuming only one request is handled
self.stop_ap_mode()
self.switch_to_client_mode()

View File

@ -1,39 +0,0 @@
import machine
import network
import time
# network.country('DE')
class WlanClient:
ssid = ''
pw = ''
wlan = None
# Status-LED
led_onboard = machine.Pin('LED', machine.Pin.OUT)
led_onboard.value(False)
def __init__(self, ssid, pw):
# print("Hello from wlan class")
self.ssid = ssid
self.pw = pw
self.wlan = network.WLAN(network.STA_IF)
def connect(self):
if not self.is_connected():
print('No WLAN connected. Connecting ...' + self.ssid + ' ' + self.pw)
self.wlan.active(True)
self.wlan.connect(self.ssid, self.pw)
for i in range(10):
if self.wlan.status() < 0 or self.wlan.status() >= 3:
break
time.sleep(1)
# led_value = self.led_onboard.value() == 1
# self.led_onboard.value(led_value)
if self.wlan.isconnected():
net_config = self.wlan.ifconfig()
print("NetConfig:")
print(net_config)
def is_connected(self):
return self.wlan.isconnected()

View File

@ -1,50 +0,0 @@
import network
import time
import socket
ssid="Growsystem"
password="12345678"
def web_page():
html = """<html><head><meta name="viewport" content="width=device-width, initial-scale=1"></head>
<body><h1>Hello World</h1></body></html>
"""
return html
# if you do not see the network you may have to power cycle
# unplug your pico w for 10 seconds and plug it in again
def ap_mode(ssid, password):
"""
Description: This is a function to activate AP mode
Parameters:
ssid[str]: The name of your internet connection
password[str]: Password for your internet connection
Returns: Nada
"""
# Just making our internet connection
ap = network.WLAN(network.AP_IF)
ap.config(essid=ssid, password=password)
ap.active(True)
while ap.active() == False:
pass
print('AP Mode Is Active, You can Now Connect')
print('IP Address To Connect to:: ' + ap.ifconfig()[0])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #creating socket object
s.bind(('', 80))
s.listen(5)
while True:
conn, addr = s.accept()
print('Got a connection from %s' % str(addr))
request = conn.recv(1024)
print('Content = %s' % str(request))
response = web_page()
conn.send(response)
conn.close()
ap_mode('Growsystem95','PASSWORD')