Compare commits

..

No commits in common. "main" and "feature/new_api_process" have entirely different histories.

55 changed files with 1494 additions and 371 deletions

72
classes/http_request.py Normal file
View File

@ -0,0 +1,72 @@
import json
import ure as re
class HttpRequest:
METHOD = {
"GET": "GET",
"POST": "POST"
}
method = ''
path = ''
raw_content = ''
headers = {}
def __init__(self, request):
self.original_request = request
# self.method = method
# self.path = path
# self.headers = headers
# self.raw_content = content
print("http request initialized")
self.parse_request(request)
def parse_request(self, request):
# Split the request into lines
lines = request.decode().split('\r\n')
# Extract method, path, and HTTP version from the first line
self.method, self.path, _ = lines[0].split()
# Parse headers
for line in lines[1:]:
if not line:
break # Empty line indicates end of headers
key, value = line.split(': ', 1)
self.headers[key] = value
# Content is assumed to be in the last line
self.raw_content = lines[-1]
def get_content_json(self):
# Parse the POST request content into a dictionary
parsed_content = {}
pairs = self.raw_content.split('&')
for pair in pairs:
key, value = pair.split('=')
parsed_content[key] = value
# Encode the values in the dictionary
encoded_content = {}
for key, value in parsed_content.items():
encoded_value = re.sub(r'%([0-9A-Fa-f]{2})', lambda m: chr(int(m.group(1), 16)), value)
encoded_content[key] = encoded_value
return encoded_content
def __str__(self):
return {
"method": self.method,
"headers": self.headers,
"content": self.raw_content,
"content_json": self.get_content_json()
}

1
config/app.py Normal file
View File

@ -0,0 +1 @@
api_url = 'https://growsystem.muellerdev.kozow.com/api/'

View File

@ -1,4 +1,4 @@
from gs.classes.a_sensors import Sensors
from sensors import Sensors
from machine import Pin, I2C
from utime import sleep
from lib.bh1750 import BH1750
@ -32,4 +32,3 @@ class AmbilightSensor(Sensors):
print('Ambilight Error reading temperature/humidity. Check wires')
print()

View File

@ -0,0 +1,69 @@
import network
class DeviceInfo:
# Device Infos
name = "Dev Device 1"
token = "PC]-0Bmp83h7F5#U!D6KJ(A&"
server_url = 'api.growsystem.muelleronline.org'
wlan_ssid = 'Oppa-95.lan'
wlan_pw = '95%04-MM'
sensors = [
{
'type': 'moisture',
'pin_int': 26
},
{
'type': 'ambilight',
'pin_int': 8, # for compatibility only
'pin_int_sda': 8,
'pin_int_scl': 9
},
# {
# 'type': 'dht22',
# 'pin_int': 15
# }
]
engines = [
{
'type': 'pump',
'pins': [15]
}
]
read_secs = 5
# Device Infos End
wlan = network.WLAN(network.STA_IF)
def get_macaddress(self):
return self._format_mac(self.wlan.config('mac').hex())
def get_ipaddress(self):
return self.wlan.ifconfig()[0]
def get_all_device_infos(self):
return {
'name': self.name,
'mac_address': self.get_macaddress(),
'ip_address': self.get_ipaddress(),
'token': self.token}
def _format_mac(self, mac):
# Split the MAC address into pairs of two characters each
pairs = [mac[i:i+2] for i in range(0, len(mac), 2)]
# Join the pairs with colons to create the formatted MAC address
formatted_mac = ":".join(pairs)
return formatted_mac

View File

@ -1,4 +1,4 @@
from gs.classes.a_sensors import Sensors
from sensors import Sensors
from dht import DHT22
from machine import ADC, Pin
@ -34,4 +34,3 @@ class TemperatureHumiditySensor(Sensors):
except OSError:
print('DHT22 Error reading temperature/humidity. Check wires')
print()

View File

@ -0,0 +1,23 @@
from machine import Pin
class Engine():
pins = []
engine = None
def __init__(self, engine):
print("Hello from Engine parent class")
print(engine)
self.pins = engine['pins']
self.engine = Pin(self.pins[0], Pin.OUT)
def on(self):
print("engine on")
self.engine.value(1)
def off(self):
print("engine off")
self.engine.value(0)

View File

@ -0,0 +1,96 @@
import time
from moisture_sensor import MoistureSensor
from dht22 import TemperatureHumiditySensor
from ambilight_sensor import AmbilightSensor
from pump import Pump
from sensor_data_manager import SensorDataManager
from grow_system_api import GrowSystemApi
from device_info import DeviceInfo
class GrowSystem:
grow_system_api = GrowSystemApi()
# moisture_sensor = None
# temperature_humidity_sensor = None
sensors = []
engines = []
most_recent_values = []
sensor_data_manager = None
device_id = None
device_info = DeviceInfo()
def __init__(self, settings):
# Init sensors
for sensor in self.device_info.sensors:
print("")
print("Initialize sensor:")
print(sensor)
sensor_type = sensor['type']
if sensor_type == 'moisture':
print("Found sensor of type moisture")
self.sensors.append(MoistureSensor(sensor))
elif sensor_type == 'dht22':
print("Found sensor of type DHT22")
self.sensors.append(TemperatureHumiditySensor(sensor))
elif sensor_type == 'ambilight':
print("Found sensor of type GY302/BH1750")
self.sensors.append(AmbilightSensor(sensor))
else:
print("No sensor type configured for: " + sensor['type'])
# Init engines
for engine in self.device_info.engines:
print("")
print("Initialize engine:")
print(engine)
engine_type = engine['type']
if engine_type == 'pump':
print("Found egine of type pump")
self.engines.append(Pump(engine))
self.engines[0].on()
time.sleep(15)
self.engines[0].off()
#if not self.moisture_sensor:
# self.moisture_sensor = MoistureSensor(settings['moisture_sensor'])
#if not self.temperature_humidity_sensor:
# self.temperature_humidity_sensor = TemperatureHumiditySensor(settings['temperature_humidity_sensor'])
def start(self):
print("Say the server hello...")
result = self.grow_system_api.say_hello()
message = result['message']
if message != 'OK':
print("Device not activated. Stopping")
return
self.device_id = result['data']['device_id']
self.sensor_data_manager = SensorDataManager(self.device_id)
print("Start reading sensors ...")
while True:
# Reset data
self.most_recent_values = []
for sensor in self.sensors:
print("Read sensor of type " + sensor.type + " at pin " + str(sensor.sensor_pin_int))
sensor.read()
for measurement in sensor.most_recent_values:
print(f"Got {measurement['value']} {measurement['unit']} ({measurement['type']})")
self.most_recent_values = self.most_recent_values + sensor.most_recent_values
self.sensor_data_manager.handleData(self.most_recent_values)
time.sleep(self.device_info.read_secs)

View File

@ -0,0 +1,31 @@
from http_client import HTTPClient
from device_info import DeviceInfo
import json
class GrowSystemApi:
http_client = HTTPClient()
device_info = DeviceInfo()
base_url = ''
def __init__(self):
self.base_url = self.device_info.server_url
def say_hello(self):
response = self.http_client.post(self.base_url + "/api/device", self._get_device_data())
print(response.text)
jsonResult = json.loads(response.text)
print(jsonResult)
return jsonResult;
def send_measurements(self, device_id, data):
url = self.base_url + "/api/device/" + str(device_id) + "/sensor-log"
response = self.http_client.post(url, data)
return json.loads(response.text)
def _get_device_data(self):
return self.device_info.get_all_device_infos()

View File

@ -0,0 +1,35 @@
import urequests
import json
class HTTPClient:
def __init__(self):
pass
def get(self, url):
url = 'https://' + url
try:
# headers = {'Content-Type': 'application/json'}
response = urequests.get(url)
if response.status_code == 200:
print("Data sent, got response")
return response
else:
print("Failed to get data. Status code:", response.status_code)
except Exception as e:
print("Exception occurred:", e)
def post(self, url, data):
url = 'https://' + url
try:
headers = {'Content-Type': 'application/json', 'Accept': 'application/json, text/plain, */*'}
json_data = json.dumps(data)
print("Send post request to: " + url)
response = urequests.post(url, data=json_data, headers=headers)
if response.status_code == 200:
return response
else:
print("Failed to send data. Status code:", response.status_code)
print(response.text)
except Exception as e:
print("Exception occurred:", e)

View File

@ -0,0 +1,57 @@
# GrowSystem
# Author: Maik Müller (maik@muelleronlineorg)
# WIP!
# This file should do only:
# - provide constants for settings
# - eventually necessary system settings
# - init wlan connection
# - Call base class, permitting the configured constants
import network
import urequests
from grow_system import GrowSystem
from wlan import WlanClient
from http_client import HTTPClient
from device_info import DeviceInfo
settings = {
'wlan_ssid': 'Oppa-95.lan',
'wlan_pw': '95%04-MM',
'moisture_sensor': {
'pin_int': 26
},
'temperature_humidity_sensor': {
'pin_int': 15
},
'pump_pin_int': 24
}
def wlan_scan():
# Client-Betrieb
wlan = network.WLAN(network.STA_IF)
# WLAN-Interface aktivieren
wlan.active(True)
# WLANs ausgeben
found_wlans = wlan.scan()
return found_wlans
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
#print(wlan_scan())
print("Connect WLAN")
wlanClient = WlanClient(settings['wlan_ssid'], settings['wlan_pw'])
wlanClient.connect()
print("")
di = DeviceInfo()
print("Device Infos:")
print(di.get_all_device_infos())
print("")
print("Start grow system")
gs = GrowSystem(settings)
gs.start()

View File

@ -1,5 +1,5 @@
# Moisture Sensor Class
from gs.classes.a_sensors import Sensors
from sensors import Sensors
from machine import ADC, Pin

View File

@ -0,0 +1,7 @@
from engine import Engine
class Pump(Engine):
def __init__(self, engine):
super().__init__(engine)

View File

@ -0,0 +1,20 @@
from grow_system_api import GrowSystemApi
# from device_info import DeviceInfo
class SensorDataManager:
device_info = None
grow_system_api = None
device_id = None
def __init__(self, device_id):
self.grow_system_api = GrowSystemApi()
# self.device_info = DeviceInfo()
self.device_id = device_id
def handleData(self, data):
jsonResponse = self.grow_system_api.send_measurements(self.device_id, data)
print("Response message: " + jsonResponse['message'])

View File

@ -0,0 +1,39 @@
import machine
import network
import time
# network.country('DE')
class WlanClient:
ssid = ''
pw = ''
wlan = None
# Status-LED
led_onboard = machine.Pin('LED', machine.Pin.OUT)
led_onboard.value(False)
def __init__(self, ssid, pw):
# print("Hello from wlan class")
self.ssid = ssid
self.pw = pw
self.wlan = network.WLAN(network.STA_IF)
def connect(self):
if not self.is_connected():
print('No WLAN connected. Connecting ...' + self.ssid + ' ' + self.pw)
self.wlan.active(True)
self.wlan.connect(self.ssid, self.pw)
for i in range(10):
if self.wlan.status() < 0 or self.wlan.status() >= 3:
break
time.sleep(1)
# led_value = self.led_onboard.value() == 1
# self.led_onboard.value(led_value)
if self.wlan.isconnected():
net_config = self.wlan.ifconfig()
print("NetConfig:")
print(net_config)
def is_connected(self):
return self.wlan.isconnected()

View File

@ -51,8 +51,7 @@ class HttpRequest:
# Encode the values in the dictionary
encoded_content = {}
for key, value in parsed_content.items():
encoded_value = re.sub(r'\%([0-9A-Fa-f]{2})', lambda m: chr(int(m.group(1), 16)), value)
print("encoding ...", key, value, encoded_value)
encoded_value = re.sub(r'%([0-9A-Fa-f]{2})', lambda m: chr(int(m.group(1), 16)), value)
encoded_content[key] = encoded_value
return encoded_content

View File

@ -1,4 +1 @@
dev_api_url = 'api.growsystem.muellerdev.kozow.com'
api_url = 'api.growsystem.muellerdev.kozow.com'
read_secs = 15 * 60
token = "dummy"
api_url = 'https://growsystem.muellerdev.kozow.com/api/'

View File

@ -0,0 +1 @@
sensors = [{"pin_int_sda": 8, "type": "ambilight", "pin_int": 8, "pin_int_scl": 9}, {"pin_int": 15, "type": "dht22"}, {"pin_int": 26, "type": "moisture"}]

View File

@ -1 +0,0 @@
{"sensors": [{"pin_int": 15, "type": "dht22"}, {"pin_int": 26, "type": "moisture"}], "device_id": 9, "name": "Fulltest1", "token": "uStIrOgScrpbUr0Y", "user_id": 1}

View File

@ -1 +0,0 @@
config = {"ssid": "Oppa-95.lan", "user_id": "1", "password": "95%04-MM", "pin": 9534}

View File

@ -0,0 +1 @@
config = {"pin": "1234", "password": "95%04-MM", "ssid": "Oppa-95.lan"}

View File

@ -4,9 +4,47 @@ import json
class DeviceInfo:
wlan = network.WLAN(network.STA_IF)
# Device Infos
name = "Dev Device 1"
app_version = "1.0.0"
token = "PC]-0Bmp83h7F5#U!D6KJ(A&"
#server_url = 'api.growsystem.muelleronline.org'
server_url = 'api.growsystem.muellerdev.kozow.com'
wlan_ssid = 'Oppa-95.lan'
wlan_pw = '95%04-MM'
sensors = [
{
'type': 'moisture',
'pin_int': 26
},
{
'type': 'ambilight',
'pin_int': 8, # for compatibility only
'pin_int_sda': 8,
'pin_int_scl': 9
},
# {
# 'type': 'dht22',
# 'pin_int': 15
# }
]
engines = [
{
'type': 'pump',
'pins': [15]
}
]
read_secs = 5
# Device Infos End
wlan = network.WLAN(network.STA_IF)
def get_macaddress(self):
return self._format_mac(self.wlan.config('mac').hex())
@ -16,29 +54,13 @@ class DeviceInfo:
def get_all_device_infos(self):
return {
'name': self.get_name(),
'name': self.name,
'mac_address': self.get_macaddress(),
'ip_address': self.get_ipaddress(),
'token': self.get_token()}
'token': self.token}
def config(self, filepath=None):
return self._loadJsonConfig(filepath)
def app_config(self):
import gs.config.app as app_config
return app_config
def server_url(self):
return self.app_config().api_url
def get_token(self):
return self.config()['token'] if self.config() and self.config()['token'] else ''
def get_name(self):
return self.config()['name'] if self.config() and self.config()['name'] else ''
def get_device_id(self):
return self.config()['device_id'] if self.config() and self.config()['device_id'] else ''
def config(self):
return self._loadConfig()
def _format_mac(self, mac):
# Split the MAC address into pairs of two characters each
@ -47,20 +69,10 @@ class DeviceInfo:
formatted_mac = ":".join(pairs)
return formatted_mac
def _loadJsonConfig(self, filepath=None):
filepath = filepath if filepath else '/gs/config/device_config.json'
try:
file = open(filepath, "r")
def _loadConfig(self):
with open('/gs/config/sensors.py', 'r') as file:
json_content = file.read()
return json.loads(json_content)
except OSError: # open failed
print("File not found: ", filepath)
return None
#with open(filepath, 'r') as file:
# json_content = file.read()
#return json.loads(json_content)
return json.loads(json_content)

View File

@ -1,2 +0,0 @@
class NotSubscriptableError(Exception):
pass

View File

@ -3,7 +3,6 @@ from gs.device_info import DeviceInfo
from gs.wlan_client import WlanClient
import json
import gs.config.initial_config as ic
import helper.token_helper as th
class GrowSystemApi:
@ -15,7 +14,7 @@ class GrowSystemApi:
base_url = ''
def __init__(self):
self.base_url = self.device_info.server_url()
self.base_url = self.device_info.server_url
self.connect_wifi(ic.config['ssid'], ic.config['password'])
# config = self.device_info.config()
@ -23,7 +22,6 @@ class GrowSystemApi:
# print("Test", config['test'])
def activate(self, config):
print("ACtivate config:", config)
data = self._get_device_data()
data.update({
'user_id': 1,
@ -31,30 +29,19 @@ class GrowSystemApi:
})
print("activate ...", data)
response = self.http_client.post(self.base_url + "/api/device/activate", data)
print("REsponse ...", response.content)
return self._get_json_encoded(response.text)
def update_device_info(self, config):
device_id = self.device_info.get_device_id()
token = self.device_info.get_token()
print("update device info. Token ...", token)
url = self.base_url + "/api/device/" + str(device_id) + "/update-device-info/" + token
response = self.http_client.get(url)
print("Device Info Update Response ...", response.text)
return self._get_json_encoded(response.text)
def say_hello(self):
response = self.http_client.post(self.base_url + "/api/device", self._get_device_data())
print(response.text)
jsonResult = json.loads(response.text)
print(jsonResult)
return jsonResult;
def send_measurements(self, device_id, data):
url = self.base_url + "/api/device/" + str(device_id) + "/sensor-log"
response = self.http_client.post(url, data)
try:
return json.loads(response.text)
except ValueError as e:
print("JSON Value error raised after sending measurements")
except Exception as e:
print("Exception raised while sending measurements", e)
return response
return json.loads(response.text)
def _get_device_data(self):
return self.device_info.get_all_device_infos()

View File

@ -1,110 +1,62 @@
from gs.device_info import DeviceInfo
from gs.setup import Setup
import os
import ujson
import machine
import time
from gs.grow_system_api import GrowSystemApi
from gs.classes.sensors.ambilight_sensor import AmbilightSensor
from gs.classes.sensors.dht22 import TemperatureHumiditySensor
from gs.classes.sensors.moisture_sensor import MoistureSensor
import gs.config.initial_config as ic
import helper.token_helper as th
class GrowSystem:
version = "1.0.0.1"
version = "1.0"
initial_config = None
device_info = DeviceInfo()
sensors = []
def __init__(self):
print("Initialize Growsystem", self.version)
self.initial_config = ic
self.gsapi = GrowSystemApi()
def start(self):
from gs.device_info import DeviceInfo
di = DeviceInfo()
from gs.sensor_data_manager import SensorDataManager
self.sensor_data_manager = SensorDataManager(di.get_device_id())
if not self._is_initial_config_existing():
print("No config existing. Start setup ...")
self._setup()
return
self._initialize_sensors()
print("Start reading sensors ...")
from gs.grow_system_api import GrowSystemApi as GSA
self.gsapi = GSA()
read_secs = self.device_info.app_config().read_secs
print("Reading and sending every " + str(read_secs) + " seconds")
while True:
# Reset data
self.most_recent_values = []
if self._is_config_existing():
print("Skip Setup. Config existing.")
elif self._is_initial_config_existing():
print("Initial config only existing (no base config). Start activation ...")
self._activate()
for sensor in self.sensors:
print("Read sensor of type " + sensor.type + " at pin " + str(sensor.sensor_pin_int))
sensor.read()
for measurement in sensor.most_recent_values:
print(f"Got {measurement['value']} {measurement['unit']} ({measurement['type']})")
self.most_recent_values = self.most_recent_values + sensor.most_recent_values
def _setup(self):
setup = Setup()
setup.setup_pico()
self.sensor_data_manager.handleData(self.most_recent_values)
time.sleep(read_secs)
def activate(self):
def _activate(self):
print("Start activation!")
import gs.config.initial_config as ic
self.initial_config = ic.config
device_config = self.gsapi.activate(self.initial_config)
print("Device Config:", device_config['data'], device_config['data']['token'])
th.write_token(device_config['data']['token'])
self.write_device_infos(device_config['data'])
def update_device_info(self):
print("Start Device Info Update!")
self.initial_config = ic.config
device_config = self.gsapi.update_device_info(self.initial_config)
print("Device Config:", device_config['data'])
self.write_device_infos(device_config['data'])
def write_device_infos(self, device_config):
print("Function received data:", device_config)
if True:
sensors = device_config['sensors']
sensor_configs = []
for sensor in sensors:
sensor_configs.append(sensor['config'])
print(sensor['config'])
device_configs = {
'name': device_config['name'],
'device_id': device_config['id'],
'token': device_config['token'],
'user_id': device_config['user_id'],
'sensors': sensor_configs
}
print("Update device_config.json with:", device_configs)
with open("/gs/config/device_config.json", "w") as f:
f.write(ujson.dumps(device_configs))
f.close()
def _initialize_sensors(self):
# Init sensors
sensors = self.device_info.config()['sensors']
#print("Device Config:", device_config['data'])
sensors = device_config['data']['sensors']
sensor_configs = []
for sensor in sensors:
print("--------------------------------------")
print("Initialize sensor:")
print(sensor)
sensor_type = sensor['type']
if sensor_type == 'moisture':
print("Found sensor of type moisture")
self.sensors.append(MoistureSensor(sensor))
elif sensor_type == 'dht22':
print("Found sensor of type DHT22")
self.sensors.append(TemperatureHumiditySensor(sensor))
elif sensor_type == 'ambilight':
print("Found sensor of type GY302/BH1750")
self.sensors.append(AmbilightSensor(sensor))
else:
print("No sensor type configured for: " + sensor['type'])
sensor_configs.append(sensor['config'])
print(sensor['config'])
with open("/gs/config/device_config.py", "w") as f:
f.write("sensors = " + ujson.dumps(sensor_configs))
def _is_config_existing(self):
return self._is_file_existing('/gs/config/config.py')
def _is_initial_config_existing(self):
return self._is_file_existing('/gs/config/initial_config.py')
def _is_file_existing(self, filepath):
try:
f = open(filepath, "r")
f.close()
# continue with the file.
return True
except OSError: # open failed
# handle the file open cas
return False

View File

@ -9,14 +9,12 @@ class HTTPClient:
url = 'https://' + url
try:
# headers = {'Content-Type': 'application/json'}
print("GET request to: ", url)
response = urequests.get(url)
if response.status_code == 200:
print("Data sent, got response")
return response
else:
print("Failed to get data. Status code:", response.status_code)
return response
except Exception as e:
print("Exception occurred:", e)
@ -28,11 +26,10 @@ class HTTPClient:
print("Send post request to: " + url)
response = urequests.post(url, data=json_data, headers=headers)
if response.status_code == 200:
print("Request OK (200)")
return response
else:
print("Failed to send data.", response.status_code, response.text)
return response
print("Failed to send data. Status code:", response.status_code)
print(response.text)
except Exception as e:
print("Exception raised:", e)
return None
print("Exception occurred:", e)

View File

@ -77,85 +77,15 @@ class LittleApache():
options = []
for w in self.available_wifis:
options.append('<option value="{}">{}</option>'.format(w, w))
body = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Growsystem Setup</title>
<style>
body {{
font-family: Arial, sans-serif;
background-color: #f2f2f2;
margin: 0;
padding: 0;
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
}}
form {{
background-color: #fff;
padding: 20px;
border-radius: 10px;
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
max-width: 400px;
width: 100%;
}}
h1 {{
text-align: center;
color: #333;
}}
input[type="submit"] {{
background-color: #4CAF50;
color: white;
padding: 10px 20px;
border: none;
border-radius: 5px;
cursor: pointer;
font-size: 16px;
}}
input[type="submit"]:hover {{
background-color: #45a049;
}}
input[type="text"],
input[type="password"],
input[type="number"],
select {{
width: 100%;
padding: 10px;
margin: 5px 0;
border: 1px solid #ccc;
border-radius: 5px;
box-sizing: border-box;
}}
</style>
</head>
<body>
<h1>Growsystem Setup</h1>
<form method="post" action="save">
<label for="ssid">SSID:</label><br>
<select name="ssid" id="ssid">
{}
</select><br>
<label for="password">Password:</label><br>
<input type="password" name="password" id="password"><br><br>
<label for="user_id">ID:</label><br>
<input type="number" name="user_id" id="user_id"><br>
<label for="pin">PIN:</label><br>
<input type="number" name="pin" id="pin" maxlength="4"><br>
<h1>Setup Pico</h1>
<form method="post" action="save">
SSID: <select name="ssid">{}</select><br>
Password: <input type="password" name="password"><br>
PIN: <input type="text" name="pin" maxlength="4"><br>
<input type="submit" value="Submit">
</form>
</body>
</html>
"""
</form>
"""
body = body.format(''.join(options))
elif self.is_path_match(request, '/save', 'POST'):
print("Save config path: ", request)
@ -168,6 +98,7 @@ select {{
body = f"""
<div>Unknown page</div>
"""
html = ''
html_arr = [header, body, footer]
html = html.join(html_arr)
@ -177,4 +108,3 @@ select {{
return path == request.path and method == request.method

View File

@ -1,24 +0,0 @@
from gs.grow_system_api import GrowSystemApi
# from device_info import DeviceInfo
from gs.exceptions.not_subscriptable_error import NotSubscriptableError
class SensorDataManager:
device_info = None
grow_system_api = None
device_id = None
def __init__(self, device_id):
self.grow_system_api = GrowSystemApi()
# self.device_info = DeviceInfo()
self.device_id = device_id
def handleData(self, data):
json_response = self.grow_system_api.send_measurements(self.device_id, data)
try:
print("Response message: " + json_response['message'])
except TypeError as e:
print("The response is no json data", e, json_response)

View File

@ -3,7 +3,7 @@ import ujson
import ure as re
import usocket as socket
import time
from setup.little_apache import LittleApache
from gs.little_apache import LittleApache
class Setup:
@ -11,7 +11,6 @@ class Setup:
wlans = []
def __init__(self):
print("Start Pico Setup")
self.ap_ssid = "Growsystem 1.0"
self.ap_password = "password"
self.wlans = []
@ -58,10 +57,9 @@ class Setup:
config = {
"ssid": config['ssid'],
"password": config['password'],
"pin": config['pin'],
"user_id": config['user_id']
"pin": config['pin']
}
print("Save initial config:", config)
print("Save config:", config)
with open("/gs/config/initial_config.py", "w") as f:
f.write("config = " + ujson.dumps(config))
@ -84,4 +82,3 @@ class Setup:
# self.stop_ap_mode()
# self.switch_to_client_mode()

View File

@ -1,36 +0,0 @@
config_path = '/gs/config/'
def is_config_existing():
return is_file_existing(config_path + 'device_config.json')
def is_initial_config_existing():
return is_file_existing(config_path + 'initial_config.py')
def is_file_existing(filepath):
try:
f = open(filepath, "r")
f.close()
# continue with the file.
return True
except OSError: # open failed
# handle the file open cas
return False
def replace_value(filepath, prop, newvalue):
with open(filepath, 'r') as file:
config_lines = file.readlines()
file.close()
# Parse the config data into a dictionary
config_dict = {}
for line in config_lines:
if '=' in line:
key, value = line.strip().split('=')
config_dict[key.strip()] = value.strip()
# Add or Update the value of the property
config_dict[prop] = newvalue
# Write the modified data back to the file
with open(filepath, 'w') as file:
for key, value in config_dict.items():
file.write(f"{key} = {value}\n")

View File

@ -1,9 +0,0 @@
import helper.file_helper as fh
app_config_path = '/gs/config/app.py'
def write_token(token):
print("Write new token: " + token)
fh.replace_value(app_config_path, 'token', '"' + token + '"')
#with open(app_config_path, "a") as app_config:
# app_config.write(token)

35
http_client.py Normal file
View File

@ -0,0 +1,35 @@
import urequests
import json
class HTTPClient:
def __init__(self):
pass
def get(self, url):
url = 'https://' + url
try:
# headers = {'Content-Type': 'application/json'}
response = urequests.get(url)
if response.status_code == 200:
print("Data sent, got response")
return response
else:
print("Failed to get data. Status code:", response.status_code)
except Exception as e:
print("Exception occurred:", e)
def post(self, url, data):
url = 'https://' + url
try:
headers = {'Content-Type': 'application/json', 'Accept': 'application/json, text/plain, */*'}
json_data = json.dumps(data)
print("Send post request to: " + url)
response = urequests.post(url, data=json_data, headers=headers)
if response.status_code == 200:
return response
else:
print("Failed to send data. Status code:", response.status_code)
print(response.text)
except Exception as e:
print("Exception occurred:", e)

43
main.py
View File

@ -1,49 +1,10 @@
## Only start grow system
from gs.growsystem import GrowSystem
from time import sleep
import helper.file_helper as fh
if False:
if not self._is_initial_config_existing():
print("No config existing. Start setup ...")
self._setup()
return
from gs.sensor_data_manager import SensorDataManager
self.sensor_data_manager = SensorDataManager(self.device_info.get_device_id())
from gs.grow_system_api import GrowSystemApi as GSA
self.gsapi = GSA()
if self._is_config_existing():
print("Skip Setup. Config existing.")
self._initialize_sensors()
elif self._is_initial_config_existing():
print("Initial config only existing (no base config). Start activation ...")
self._activate()
def setup():
from setup.setup import Setup
setup = Setup()
setup.setup_pico()
machine.reset
if __name__ == '__main__':
# 1) Check and in case of update Growsystem
# 2) Check if initial_config is existing -> Setup if not
if not fh.is_initial_config_existing():
setup()
else:
from gs.growsystem import GrowSystem
gs = GrowSystem()
# 3) if device info exists update Device Info else activate
if fh.is_config_existing():
gs.update_device_info()
else:
gs.activate()
# 4) Start growsystem
gs.start()
gs = GrowSystem()
while True:
print("Keep running in main.py")

2
old_version/README.md Normal file
View File

@ -0,0 +1,2 @@
# GrowSystem

3
old_version/REDME.md Normal file
View File

@ -0,0 +1,3 @@
# Show output on terminal #
`minicom -b 115200 -o -D /dev/cu.usbmodem3301`

View File

@ -0,0 +1,34 @@
from sensors import Sensors
from machine import Pin, I2C
from utime import sleep
from lib.bh1750 import BH1750
class AmbilightSensor(Sensors):
most_recent_values = []
def __init__(self, settings):
super().__init__(settings)
print(settings) # TODO remove
self.sensor_pin_int = settings['pin_int']
# self.sensor = DHT22(Pin(self.sensor_pin_int, Pin.IN, Pin.PULL_UP))
self.sensor = BH1750(I2C(0, sda=Pin(settings['pin_int_sda']), scl=Pin(settings['pin_int_scl'])))
def read(self):
try:
measurement = self.sensor.luminance(BH1750.ONCE_HIRES_1)
print("ambilight ..")
print(measurement)
self.most_recent_values = [
{
'type': 'ambilight',
'value': measurement,
'unit': '-'
},
]
except OSError:
print('Ambilight Error reading temperature/humidity. Check wires')
print()

View File

@ -0,0 +1 @@
from .bh1750 import BH1750

View File

@ -0,0 +1,118 @@
# https://github.com/flrrth/pico-bh1750
import math
from micropython import const
from utime import sleep_ms
class BH1750:
"""Class for the BH1750 digital Ambient Light Sensor
The datasheet can be found at https://components101.com/sites/default/files/component_datasheet/BH1750.pdf
"""
MEASUREMENT_MODE_CONTINUOUSLY = const(1)
MEASUREMENT_MODE_ONE_TIME = const(2)
RESOLUTION_HIGH = const(0)
RESOLUTION_HIGH_2 = const(1)
RESOLUTION_LOW = const(2)
MEASUREMENT_TIME_DEFAULT = const(69)
MEASUREMENT_TIME_MIN = const(31)
MEASUREMENT_TIME_MAX = const(254)
def __init__(self, address, i2c):
self._address = address
self._i2c = i2c
self._measurement_mode = BH1750.MEASUREMENT_MODE_ONE_TIME
self._resolution = BH1750.RESOLUTION_HIGH
self._measurement_time = BH1750.MEASUREMENT_TIME_DEFAULT
self._write_measurement_time()
self._write_measurement_mode()
def configure(self, measurement_mode: int, resolution: int, measurement_time: int):
"""Configures the BH1750.
Keyword arguments:
measurement_mode -- measure either continuously or once
resolution -- return measurements in either high, high2 or low resolution
measurement_time -- the duration of a single measurement
"""
if measurement_time not in range(BH1750.MEASUREMENT_TIME_MIN, BH1750.MEASUREMENT_TIME_MAX + 1):
raise ValueError("measurement_time must be between {0} and {1}"
.format(BH1750.MEASUREMENT_TIME_MIN, BH1750.MEASUREMENT_TIME_MAX))
self._measurement_mode = measurement_mode
self._resolution = resolution
self._measurement_time = measurement_time
self._write_measurement_time()
self._write_measurement_mode()
def _write_measurement_time(self):
buffer = bytearray(1)
high_bit = 1 << 6 | self._measurement_time >> 5
low_bit = 3 << 5 | (self._measurement_time << 3) >> 3
buffer[0] = high_bit
self._i2c.writeto(self._address, buffer)
buffer[0] = low_bit
self._i2c.writeto(self._address, buffer)
def _write_measurement_mode(self):
buffer = bytearray(1)
buffer[0] = self._measurement_mode << 4 | self._resolution
self._i2c.writeto(self._address, buffer)
sleep_ms(24 if self._measurement_time == BH1750.RESOLUTION_LOW else 180)
def reset(self):
"""Clear the illuminance data register."""
self._i2c.writeto(self._address, bytearray(b'\x07'))
def power_on(self):
"""Powers on the BH1750."""
self._i2c.writeto(self._address, bytearray(b'\x01'))
def power_off(self):
"""Powers off the BH1750."""
self._i2c.writeto(self._address, bytearray(b'\x00'))
@property
def measurement(self) -> float:
"""Returns the latest measurement."""
if self._measurement_mode == BH1750.MEASUREMENT_MODE_ONE_TIME:
self._write_measurement_mode()
buffer = bytearray(2)
self._i2c.readfrom_into(self._address, buffer)
lux = (buffer[0] << 8 | buffer[1]) / (1.2 * (BH1750.MEASUREMENT_TIME_DEFAULT / self._measurement_time))
if self._resolution == BH1750.RESOLUTION_HIGH_2:
return lux / 2
else:
return lux
def measurements(self) -> float:
"""This is a generator function that continues to provide the latest measurement. Because the measurement time
is greatly affected by resolution and the configured measurement time, this function attemts to calculate the
appropriate sleep time between measurements.
Example usage:
for measurement in bh1750.measurements(): # bh1750 is an instance of this class
print(measurement)
"""
while True:
yield self.measurement
if self._measurement_mode == BH1750.MEASUREMENT_MODE_CONTINUOUSLY:
base_measurement_time = 16 if self._measurement_time == BH1750.RESOLUTION_LOW else 120
sleep_ms(math.ceil(base_measurement_time * self._measurement_time / BH1750.MEASUREMENT_TIME_DEFAULT))

View File

@ -0,0 +1,69 @@
import network
class DeviceInfo:
# Device Infos
name = "Dev Device 1"
token = "PC]-0Bmp83h7F5#U!D6KJ(A&"
server_url = 'api.growsystem.muelleronline.org'
wlan_ssid = 'Oppa-95.lan'
wlan_pw = '95%04-MM'
sensors = [
{
'type': 'moisture',
'pin_int': 26
},
{
'type': 'ambilight',
'pin_int': 8, # for compatibility only
'pin_int_sda': 8,
'pin_int_scl': 9
},
# {
# 'type': 'dht22',
# 'pin_int': 15
# }
]
engines = [
{
'type': 'pump',
'pins': [15]
}
]
read_secs = 5
# Device Infos End
wlan = network.WLAN(network.STA_IF)
def get_macaddress(self):
return self._format_mac(self.wlan.config('mac').hex())
def get_ipaddress(self):
return self.wlan.ifconfig()[0]
def get_all_device_infos(self):
return {
'name': self.name,
'mac_address': self.get_macaddress(),
'ip_address': self.get_ipaddress(),
'token': self.token}
def _format_mac(self, mac):
# Split the MAC address into pairs of two characters each
pairs = [mac[i:i+2] for i in range(0, len(mac), 2)]
# Join the pairs with colons to create the formatted MAC address
formatted_mac = ":".join(pairs)
return formatted_mac

36
old_version/dht22.py Normal file
View File

@ -0,0 +1,36 @@
from sensors import Sensors
from dht import DHT22
from machine import ADC, Pin
class TemperatureHumiditySensor(Sensors):
sensor = None
most_recent_values = []
def __init__(self, settings):
super().__init__(settings)
print("Initialize dht22 sensor. Sensor pin is: " + str(settings['pin_int']))
print(settings)
self.sensor_pin_int = settings['pin_int']
self.sensor = DHT22(Pin(self.sensor_pin_int, Pin.IN, Pin.PULL_UP))
def read(self):
try:
self.sensor.measure()
self.most_recent_values = [
{
'type': 'temperature',
'value': self.sensor.temperature(),
'unit': 'C'
},
{
'type': 'humidity',
'value': self.sensor.humidity(),
'unit': '%'
}
]
except OSError:
print('DHT22 Error reading temperature/humidity. Check wires')
print()

23
old_version/engine.py Normal file
View File

@ -0,0 +1,23 @@
from machine import Pin
class Engine():
pins = []
engine = None
def __init__(self, engine):
print("Hello from Engine parent class")
print(engine)
self.pins = engine['pins']
self.engine = Pin(self.pins[0], Pin.OUT)
def on(self):
print("engine on")
self.engine.value(1)
def off(self):
print("engine off")
self.engine.value(0)

View File

@ -0,0 +1,96 @@
import time
from moisture_sensor import MoistureSensor
from dht22 import TemperatureHumiditySensor
from ambilight_sensor import AmbilightSensor
from pump import Pump
from sensor_data_manager import SensorDataManager
from grow_system_api import GrowSystemApi
from device_info import DeviceInfo
class GrowSystem:
grow_system_api = GrowSystemApi()
# moisture_sensor = None
# temperature_humidity_sensor = None
sensors = []
engines = []
most_recent_values = []
sensor_data_manager = None
device_id = None
device_info = DeviceInfo()
def __init__(self, settings):
# Init sensors
for sensor in self.device_info.sensors:
print("")
print("Initialize sensor:")
print(sensor)
sensor_type = sensor['type']
if sensor_type == 'moisture':
print("Found sensor of type moisture")
self.sensors.append(MoistureSensor(sensor))
elif sensor_type == 'dht22':
print("Found sensor of type DHT22")
self.sensors.append(TemperatureHumiditySensor(sensor))
elif sensor_type == 'ambilight':
print("Found sensor of type GY302/BH1750")
self.sensors.append(AmbilightSensor(sensor))
else:
print("No sensor type configured for: " + sensor['type'])
# Init engines
for engine in self.device_info.engines:
print("")
print("Initialize engine:")
print(engine)
engine_type = engine['type']
if engine_type == 'pump':
print("Found egine of type pump")
self.engines.append(Pump(engine))
self.engines[0].on()
time.sleep(15)
self.engines[0].off()
#if not self.moisture_sensor:
# self.moisture_sensor = MoistureSensor(settings['moisture_sensor'])
#if not self.temperature_humidity_sensor:
# self.temperature_humidity_sensor = TemperatureHumiditySensor(settings['temperature_humidity_sensor'])
def start(self):
print("Say the server hello...")
result = self.grow_system_api.say_hello()
message = result['message']
if message != 'OK':
print("Device not activated. Stopping")
return
self.device_id = result['data']['device_id']
self.sensor_data_manager = SensorDataManager(self.device_id)
print("Start reading sensors ...")
while True:
# Reset data
self.most_recent_values = []
for sensor in self.sensors:
print("Read sensor of type " + sensor.type + " at pin " + str(sensor.sensor_pin_int))
sensor.read()
for measurement in sensor.most_recent_values:
print(f"Got {measurement['value']} {measurement['unit']} ({measurement['type']})")
self.most_recent_values = self.most_recent_values + sensor.most_recent_values
self.sensor_data_manager.handleData(self.most_recent_values)
time.sleep(self.device_info.read_secs)

View File

@ -0,0 +1,38 @@
from http_client import HTTPClient
from device_info import DeviceInfo
import json
class GrowSystemApi:
http_client = HTTPClient()
device_info = DeviceInfo()
base_url = ''
def __init__(self):
self.base_url = self.device_info.server_url
def activate(self, config):
response = self.http_client.post(self.base_url + "/api/device/activate", self._get_device_data())
return self_get_json_encoded(response.text)
def say_hello(self):
response = self.http_client.post(self.base_url + "/api/device", self._get_device_data())
print(response.text)
jsonResult = json.loads(response.text)
print(jsonResult)
return jsonResult;
def send_measurements(self, device_id, data):
url = self.base_url + "/api/device/" + str(device_id) + "/sensor-log"
response = self.http_client.post(url, data)
return json.loads(response.text)
def _get_device_data(self):
return self.device_info.get_all_device_infos()
def _get_json_encoded(self, text):
return json.loads(text)

View File

@ -0,0 +1,35 @@
import urequests
import json
class HTTPClient:
def __init__(self):
pass
def get(self, url):
url = 'https://' + url
try:
# headers = {'Content-Type': 'application/json'}
response = urequests.get(url)
if response.status_code == 200:
print("Data sent, got response")
return response
else:
print("Failed to get data. Status code:", response.status_code)
except Exception as e:
print("Exception occurred:", e)
def post(self, url, data):
url = 'https://' + url
try:
headers = {'Content-Type': 'application/json', 'Accept': 'application/json, text/plain, */*'}
json_data = json.dumps(data)
print("Send post request to: " + url)
response = urequests.post(url, data=json_data, headers=headers)
if response.status_code == 200:
return response
else:
print("Failed to send data. Status code:", response.status_code)
print(response.text)
except Exception as e:
print("Exception occurred:", e)

62
old_version/lib/bh1750.py Normal file
View File

@ -0,0 +1,62 @@
"""
Micropython BH1750 ambient light sensor driver.
* https://github.com/PinkInk/upylib/tree/master/bh1750
"""
from utime import sleep_ms
class BH1750():
"""Micropython BH1750 ambient light sensor driver."""
PWR_OFF = 0x00
PWR_ON = 0x01
RESET = 0x07
# modes
CONT_LOWRES = 0x13
CONT_HIRES_1 = 0x10
CONT_HIRES_2 = 0x11
ONCE_HIRES_1 = 0x20
ONCE_HIRES_2 = 0x21
ONCE_LOWRES = 0x23
# default addr=0x23 if addr pin floating or pulled to ground
# addr=0x5c if addr pin pulled high
def __init__(self, bus, addr=0x23):
self.bus = bus
self.addr = addr
self.off()
self.reset()
def off(self):
"""Turn sensor off."""
self.set_mode(self.PWR_OFF)
def on(self):
"""Turn sensor on."""
self.set_mode(self.PWR_ON)
def reset(self):
"""Reset sensor, turn on first if required."""
self.on()
self.set_mode(self.RESET)
def set_mode(self, mode):
"""Set sensor mode."""
self.mode = mode
self.bus.writeto(self.addr, bytes([self.mode]))
def luminance(self, mode):
"""Sample luminance (in lux), using specified sensor mode."""
# continuous modes
if mode & 0x10 and mode != self.mode:
self.set_mode(mode)
# one shot modes
if mode & 0x20:
self.set_mode(mode)
# earlier measurements return previous reading
sleep_ms(24 if mode in (0x13, 0x23) else 180)
data = self.bus.readfrom(self.addr, 2)
factor = 2.0 if mode in (0x11, 0x21) else 1.0
return (data[0]<<8 | data[1]) / (1.2 * factor)

63
old_version/main.py Normal file
View File

@ -0,0 +1,63 @@
# GrowSystem
# Author: Maik Müller (maik@muelleronlineorg)
# WIP!
# This file should do only:
# - first start check and actions if not configured
# - provide constants for settings
# - eventually necessary system settings
# - init wlan connection
# - Call base class, permitting the configured constants
import network
import urequests
from grow_system import GrowSystem
from wlan import WlanClient
from http_client import HTTPClient
from device_info import DeviceInfo
from setup import Setup
settings = {
'wlan_ssid': 'Oppa-95.lan',
'wlan_pw': '95%04-MM',
'moisture_sensor': {
'pin_int': 26
},
'temperature_humidity_sensor': {
'pin_int': 15
},
'pump_pin_int': 24
}
def wlan_scan():
# Client-Betrieb
wlan = network.WLAN(network.STA_IF)
# WLAN-Interface aktivieren
wlan.active(True)
# WLANs ausgeben
found_wlans = wlan.scan()
return found_wlans
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
pico_setup = Setup()
pico_setup.setup_pico()
if false:
#print(wlan_scan())
print("Connect WLAN")
wlanClient = WlanClient(settings['wlan_ssid'], settings['wlan_pw'])
wlanClient.connect()
print("")
di = DeviceInfo()
print("Device Infos:")
print(di.get_all_device_infos())
print("")
print("Start grow system")
gs = GrowSystem(settings)
gs.start()

View File

@ -0,0 +1,39 @@
# Moisture Sensor Class
from sensors import Sensors
from machine import ADC, Pin
class MoistureSensor(Sensors):
sensor = None
most_recent_values = []
min_raw_value = None
max_raw_value = None
def __init__(self, sensor_data, min_raw_value=300, max_raw_value=65535):
super().__init__(sensor_data)
print("Initialize moisture sensor. Sensor pin is: " + str(sensor_data['pin_int']))
self.sensor_pin_int = sensor_data['pin_int']
self.min_raw_value = min_raw_value
self.max_raw_value = max_raw_value
self.sensor = ADC(Pin(self.sensor_pin_int))
def read(self):
self.most_recent_values = [
{
'type': 'moisture',
'value': self.convert_to_moisture_percentage(self.sensor.read_u16()),
'unit': '%'
},
]
def normalize_sensor_value(self, raw_value):
return (raw_value - self.min_raw_value) / (self.max_raw_value - self.min_raw_value)
def convert_to_moisture_percentage(self, raw_value):
normalized_value = self.normalize_sensor_value(raw_value)
return round(100 - normalized_value * 100, 1)

View File

@ -0,0 +1,31 @@
from machine import Pin, ADC
import time
import network
import urequests
import statistics
import secrets
sensor = ADC(Pin(26))
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(secrets.SSID, secrets.PASSWORD)
time.sleep(5)
print(wlan.isconnected())
readings = []
try:
while True:
for i in range(5):
reading = sensor.read_u16()
readings.append(reading)
print(readings)
time.sleep(1)
median_value = statistics.median(readings)
if median_value < 400:
urequests.get("https://api.telegram.org/bot"+secrets.API+"/sendMessage?text=Gary is thirsty&chat_id="+secrets.ID)
print("Message Sent")
else:
print("Gary has enough water")
time.sleep(3600)
except OSError:
print("@"*68)
print("@ Cannot connect to the Wi-Fi, please check your SSID and PASSWORD @")
print("@"*68)

7
old_version/pump.py Normal file
View File

@ -0,0 +1,7 @@
from engine import Engine
class Pump(Engine):
def __init__(self, engine):
super().__init__(engine)

View File

@ -0,0 +1,20 @@
from grow_system_api import GrowSystemApi
# from device_info import DeviceInfo
class SensorDataManager:
device_info = None
grow_system_api = None
device_id = None
def __init__(self, device_id):
self.grow_system_api = GrowSystemApi()
# self.device_info = DeviceInfo()
self.device_id = device_id
def handleData(self, data):
jsonResponse = self.grow_system_api.send_measurements(self.device_id, data)
print("Response message: " + jsonResponse['message'])

113
old_version/setup.py Normal file
View File

@ -0,0 +1,113 @@
import network
import ujson
import ure as re
import usocket as socket
import time
class Setup:
def __init__(self):
self.ap_ssid = "Growsystem"
self.ap_password = "12345678"
self.wlans = []
self.selected_ssid = ""
self.wlan_password = ""
self.pin = ""
self.html = """
<html>
<head><title>Pico Setup</title></head>
<body>
<h1>Setup Pico</h1>
<form method="post">
SSID: <select name="ssid">{}</select><br>
Password: <input type="password" name="password"><br>
PIN: <input type="text" name="pin" maxlength="4"><br>
<input type="submit" value="Submit">
</form>
</body>
</html>
"""
def scan_wlans(self):
print("Scan WLANs")
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
self.wlans = [w[0].decode() for w in wlan.scan()]
print("Found:", self.wlans)
wlan.active(False)
def start_ap_mode(self):
self.ap = network.WLAN(network.AP_IF)
self.ap.active(True)
while self.ap.active() == False:
pass
self.ap.config(essid=self.ap_ssid, password=self.ap_password)
print("AP SSID:", self.ap.config("essid"))
# print("AP Password:", self.ap.config("password"))
def stop_ap_mode(self):
ap = network.WLAN(network.AP_IF)
ap.active(False)
def start_webserver(self):
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.bind(addr)
s.listen(1)
print("Listening on", addr)
while True:
conn, addr = s.accept()
print("Got a connection from", addr)
request = conn.recv(1024)
print("Content =", request)
conn.send("HTTP/1.1 200 OK\n")
conn.send("Content-Type: text/html\n")
conn.send("Connection: close\n\n")
conn.sendall(self.html.format(''.join(['<option value="{}">{}</option>'.format(w, w) for w in self.wlans])))
conn.close()
def parse_request(self, request):
request = request.decode()
ssid_match = re.search("ssid=([^&]+)", request)
if ssid_match:
self.selected_ssid = ssid_match.group(1)
password_match = re.search("password=([^&]+)", request)
if password_match:
self.wlan_password = password_match.group(1)
pin_match = re.search("pin=([^&]+)", request)
if pin_match:
self.pin = pin_match.group(1)
def save_config(self):
config = {
"ssid": self.selected_ssid,
"password": self.wlan_password,
"pin": self.pin
}
with open("initial_config.py", "w") as f:
f.write("config = " + ujson.dumps(config))
def switch_to_client_mode(self):
ap = network.WLAN(network.AP_IF)
ap.active(False)
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(self.selected_ssid, self.wlan_password)
while not wlan.isconnected():
time.sleep(1)
print("Connected to", self.selected_ssid)
def setup_pico(self):
print("Start PICO setup")
self.scan_wlans()
self.start_ap_mode()
self.start_webserver()
while True:
conn, addr = self.s.accept()
request = conn.recv(1024)
self.parse_request(request)
self.save_config()
conn.close()
break # Assuming only one request is handled
self.stop_ap_mode()
self.switch_to_client_mode()

39
old_version/wlan.py Normal file
View File

@ -0,0 +1,39 @@
import machine
import network
import time
# network.country('DE')
class WlanClient:
ssid = ''
pw = ''
wlan = None
# Status-LED
led_onboard = machine.Pin('LED', machine.Pin.OUT)
led_onboard.value(False)
def __init__(self, ssid, pw):
# print("Hello from wlan class")
self.ssid = ssid
self.pw = pw
self.wlan = network.WLAN(network.STA_IF)
def connect(self):
if not self.is_connected():
print('No WLAN connected. Connecting ...' + self.ssid + ' ' + self.pw)
self.wlan.active(True)
self.wlan.connect(self.ssid, self.pw)
for i in range(10):
if self.wlan.status() < 0 or self.wlan.status() >= 3:
break
time.sleep(1)
# led_value = self.led_onboard.value() == 1
# self.led_onboard.value(led_value)
if self.wlan.isconnected():
net_config = self.wlan.ifconfig()
print("NetConfig:")
print(net_config)
def is_connected(self):
return self.wlan.isconnected()

50
teststuff/main.py Normal file
View File

@ -0,0 +1,50 @@
import network
import time
import socket
ssid="Growsystem"
password="12345678"
def web_page():
html = """<html><head><meta name="viewport" content="width=device-width, initial-scale=1"></head>
<body><h1>Hello World</h1></body></html>
"""
return html
# if you do not see the network you may have to power cycle
# unplug your pico w for 10 seconds and plug it in again
def ap_mode(ssid, password):
"""
Description: This is a function to activate AP mode
Parameters:
ssid[str]: The name of your internet connection
password[str]: Password for your internet connection
Returns: Nada
"""
# Just making our internet connection
ap = network.WLAN(network.AP_IF)
ap.config(essid=ssid, password=password)
ap.active(True)
while ap.active() == False:
pass
print('AP Mode Is Active, You can Now Connect')
print('IP Address To Connect to:: ' + ap.ifconfig()[0])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #creating socket object
s.bind(('', 80))
s.listen(5)
while True:
conn, addr = s.accept()
print('Got a connection from %s' % str(addr))
request = conn.recv(1024)
print('Content = %s' % str(request))
response = web_page()
conn.send(response)
conn.close()
ap_mode('Growsystem95','PASSWORD')