From 32b68508146461be685629f0c8cfe18f15b52fbc Mon Sep 17 00:00:00 2001 From: Timo Volkmann Date: Tue, 30 Jun 2020 23:03:06 +0200 Subject: [PATCH] refactored mqtt connections --- platformio.ini | 5 +- src/connections.cpp | 148 +++++++++++++++++++++++--------------------- src/header.h | 1 - src/peripherals.cpp | 1 + 4 files changed, 80 insertions(+), 75 deletions(-) diff --git a/platformio.ini b/platformio.ini index 130393d..e5db3ad 100644 --- a/platformio.ini +++ b/platformio.ini @@ -21,5 +21,6 @@ lib_deps = AutoConnect@^1.1.7 AsyncMqttClient@^0.8.2 ArduinoJson@^6.15.2 - ; ESPRandom@^1.3.3 - ; PubSubClient@^2.8 \ No newline at end of file + MQTT@^2.4.7 + ; PubSubClient@^2.8 + ; ESPRandom@^1.3.3 \ No newline at end of file diff --git a/src/connections.cpp b/src/connections.cpp index 4646376..299d809 100644 --- a/src/connections.cpp +++ b/src/connections.cpp @@ -1,4 +1,5 @@ #include +#include #include extern "C" @@ -7,13 +8,15 @@ extern "C" #include "freertos/timers.h" } -#define MQTT_VALVE_COMMAND MQTT_TOPIC_BASE_SUB "/" MQTT_DEVICE_ID "/valve" -#define MQTT_SOIL_PROPERTIES MQTT_TOPIC_BASE_SUB "/" MQTT_DEVICE_ID "/soil" +#define MQTT_VALVE_COMMAND MQTT_TOPIC_BASE_SUB "/" MQTT_DEVICE_ID "/valve" +#define MQTT_SOIL_PROPERTIES MQTT_TOPIC_BASE_SUB "/" MQTT_DEVICE_ID "/soil" -AsyncMqttClient mqttClient; TimerHandle_t mqttReconnectTimer; TimerHandle_t wifiReconnectTimer; +// TimerHandle_t mqttProcessingTimer; +TaskHandle_t mqttTask; + WebServer Server; AutoConnect Portal(Server); WiFiClient client; @@ -21,6 +24,7 @@ WiFiClient client; AutoConnectConfig Config; // Config.autoReconnect = true; // WiFi.config(Config); +MQTTClient mqttClient; void connectWiFi() { @@ -31,11 +35,49 @@ void connectWiFi() } } +void mqttLoop(void *parameter) +{ + bool x; + do { + x = mqttClient.loop(); + delay(50); + } while (mqttClient.connected()); + + if (!mqttClient.connected()) + { + Serial.println("Disconnected from MQTT."); + if (WiFi.isConnected()) + { + xTimerStart(mqttReconnectTimer, 0); + } + } + vTaskDelete(NULL); +} + void connectMQTT() { Serial.println("Connecting to MQTT..."); - mqttClient.setClientId(MQTT_DEVICE_ID); - mqttClient.connect(); + mqttClient.begin(MQTT_HOST, MQTT_PORT, client); + mqttClient.connect(MQTT_DEVICE_ID); + if (mqttClient.connected()) + { + Serial.println("Connected!"); + } + else + { + Serial.println("NOT Connected!"); + } + mqttClient.subscribe(MQTT_PATH_SUB, 2); + Serial.print("subscribed to: "); + Serial.println(MQTT_PATH_SUB); + xTaskCreate( + mqttLoop, /* Task function. */ + "mqttLoop", /* String with name of task. */ + 8192, /* Stack size in bytes. */ + NULL, /* Parameter passed as input of the task */ + 1, /* Priority of the task. */ + &mqttTask); /* Task handle. */ + //xTimerStart(mqttProcessingTimer, 0); } void WiFiEvent(WiFiEvent_t event) @@ -51,110 +93,72 @@ void WiFiEvent(WiFiEvent_t event) break; case SYSTEM_EVENT_STA_DISCONNECTED: digitalWrite(PIN_LED_G, LOW); - Serial.println("WiFi lost connection"); xTimerStop(mqttReconnectTimer, 0); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi + // xTimerStop(mqttProcessingTimer, 0); xTimerStart(wifiReconnectTimer, 0); break; } } -void onMqttConnect(bool sessionPresent) +void onMqttMessage(MQTTClient *client, char topic[], char payload[], int payload_length) { - Serial.println("Connected to MQTT."); - - Serial.print("Session present: "); - Serial.println(sessionPresent); - - uint16_t packetIdSub = mqttClient.subscribe(MQTT_PATH_SUB, 2); - Serial.print("subscribed to: "); - Serial.println(MQTT_PATH_SUB); -} - -void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) -{ - Serial.println("Disconnected from MQTT."); - - if (WiFi.isConnected()) + Serial.print("Message arrived ["); + Serial.print(topic); + Serial.print("] "); + for (int i = 0; i < payload_length; i++) { - xTimerStart(mqttReconnectTimer, 0); + Serial.print((char)payload[i]); } -} + Serial.println(); -void onMqttSubscribe(uint16_t packetId, uint8_t qos) -{ - Serial.print("Subscribe acknowledged:"); - Serial.print(" packetId: "); - Serial.print(packetId); - Serial.print(" qos: "); - Serial.println(qos); -} - -void onMqttUnsubscribe(uint16_t packetId) -{ - Serial.print("Unsubscribe acknowledged."); - Serial.print(" packetId: "); - Serial.println(packetId); -} - -void onMqttMessage(char *topic, char *payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) -{ - Serial.print("Publish received: "); - Serial.print(" topic: "); - Serial.println(topic); - - if (strcmp(topic, MQTT_VALVE_COMMAND) == 0 ) { + if (strcmp(topic, MQTT_VALVE_COMMAND) == 0) + { Serial.println("toggling valve..."); Serial.println(topic); toggleValve(); } - if (strcmp(topic, MQTT_SOIL_PROPERTIES) == 0 ) { - Serial.println("recieving soil thresholds..."); + if (strcmp(topic, MQTT_SOIL_PROPERTIES) == 0) + { + Serial.println("receiving soil thresholds..."); Serial.println(topic); Serial.println(payload); // const int capacity = JSON_OBJECT_SIZE(3) + 2 * JSON_OBJECT_SIZE(1); StaticJsonDocument<1024> doc; DeserializationError err = deserializeJson(doc, payload); - if (err == DeserializationError::Ok) { + if (err == DeserializationError::Ok) + { int fc = doc["fc"]; int pwp = doc["pwp"]; int sat = doc["sat"]; - Serial.println(doc.size()); - Serial.println(doc["fc"].as()); - Serial.println(doc["pwp"].as()); - Serial.println(doc["sat"].as()); setSoilProperties(fc, pwp, sat); } + else + { + Serial.println(err.c_str()); + } } } -void onMqttPublish(uint16_t packetId) -{ - // Serial.print("Publish acknowledged: "); - // Serial.print(" packetId: "); - // Serial.println(packetId); -} - void setupConnections() { Serial.println(); Serial.println(); - mqttReconnectTimer = xTimerCreate("mqttTimer", pdMS_TO_TICKS(2000), pdFALSE, (void *)0, reinterpret_cast(connectMQTT)); - wifiReconnectTimer = xTimerCreate("wifiTimer", pdMS_TO_TICKS(2000), pdFALSE, (void *)0, reinterpret_cast(connectWiFi)); + mqttReconnectTimer = xTimerCreate( + "mqttTimer", pdMS_TO_TICKS(2000), pdFALSE, (void *)0, reinterpret_cast(connectMQTT)); + wifiReconnectTimer = xTimerCreate( + "wifiTimer", pdMS_TO_TICKS(2000), pdFALSE, (void *)0, reinterpret_cast(connectWiFi)); + // mqttProcessingTimer = xTimerCreate( + // "messageTimer", pdMS_TO_TICKS(50), pdTRUE, (void *)0, reinterpret_cast(mqttLoop)); WiFi.onEvent(WiFiEvent); - mqttClient.onConnect(onMqttConnect); - mqttClient.onDisconnect(onMqttDisconnect); - mqttClient.onSubscribe(onMqttSubscribe); - mqttClient.onUnsubscribe(onMqttUnsubscribe); - mqttClient.onMessage(onMqttMessage); - mqttClient.onPublish(onMqttPublish); - mqttClient.setServer(MQTT_HOST, MQTT_PORT); + mqttClient.onMessageAdvanced(onMqttMessage); connectWiFi(); } -void publishMessage(const char *topic, const char *msg) { - mqttClient.publish(topic, 1, true, msg); +void publishMessage(const char *topic, const char *msg) +{ + mqttClient.publish(topic, msg, true, 1); } diff --git a/src/header.h b/src/header.h index f5d9aa4..79b4164 100644 --- a/src/header.h +++ b/src/header.h @@ -15,7 +15,6 @@ #include #include #include -#include // fix for core panic during wifi initialization // #define configMINIMAL_STACK_SIZE 2048 diff --git a/src/peripherals.cpp b/src/peripherals.cpp index 6af2657..b1d3c59 100644 --- a/src/peripherals.cpp +++ b/src/peripherals.cpp @@ -106,4 +106,5 @@ void setSoilProperties(int FC, int PWP, int SAT) { Serial.println(permanentWiltingPoint); Serial.print("soilSaturation: "); Serial.println(soilSaturation); + // TODO save to nvs nonvolatile flash storage }