From e2800f416ca49601b28036fad7c73944bfe2c5b7 Mon Sep 17 00:00:00 2001 From: Varun Date: Tue, 18 Mar 2025 16:19:00 +0530 Subject: [PATCH] changes reverted on mqtt --- src/controllers/tanksController.js | 125 ++++++++++++++--------------- 1 file changed, 59 insertions(+), 66 deletions(-) diff --git a/src/controllers/tanksController.js b/src/controllers/tanksController.js index 3904ee0e..5c69cfc9 100644 --- a/src/controllers/tanksController.js +++ b/src/controllers/tanksController.js @@ -5765,20 +5765,32 @@ exports.getBlockData = async (req, reply) => { const mqtt = require('mqtt'); require('dotenv').config(); -const activeClients = new Map(); // Store active clients per hw_Id - -// Temporary client to listen for device announcements -const tempClientId = `mqtt_temp_${Math.random().toString(16).substr(2, 8)}`; -const tempClient = mqtt.connect('mqtt://35.207.198.4:1883', { - clientId: tempClientId, - clean: false, - reconnectPeriod: 2000, +// **Persistent MQTT Connection** +const client = mqtt.connect('mqtt://35.207.198.4:1883', { + clientId: `mqtt_server_${Math.random().toString(16).substr(2, 8)}`, + clean: false, // Ensures MQTT retains subscriptions + reconnectPeriod: 2000, // Reconnect every 2 seconds }); -// Subscribe only to device announcements -tempClient.on('connect', () => { - console.log(`✅ Connected to MQTT broker as ${tempClientId}`); - tempClient.subscribe('water/iot-data/announce', { qos: 1 }, (err) => { +const subscribedTopics = new Set(); +const activeDevices = new Set(); // Keep track of active devices + +client.on('connect', () => { + console.log('✅ Connected to MQTT broker'); + + // **Ensure re-subscriptions after reconnect** + subscribedTopics.forEach(topic => { + client.subscribe(topic, { qos: 1 }, (err) => { + if (err) { + console.error(`❌ Error resubscribing to ${topic}:`, err); + } else { + console.log(`🔄 Resubscribed to ${topic}`); + } + }); + }); + + // **Subscribe to new device announcements** + client.subscribe('water/iot-data/announce', { qos: 1 }, (err) => { if (err) { console.error('❌ Error subscribing to announcement topic:', err); } else { @@ -5787,12 +5799,13 @@ tempClient.on('connect', () => { }); }); -tempClient.on('message', async (topic, message) => { +client.on('message', async (topic, message) => { console.log(`📩 Message received on topic ${topic}: ${message.toString()}`); try { const data = JSON.parse(message.toString()); + // **Handle device announcements** if (topic === 'water/iot-data/announce') { if (!data.objects || !data.objects.hw_Id) { console.error("❌ Invalid announcement format. Missing hw_Id."); @@ -5800,66 +5813,45 @@ tempClient.on('message', async (topic, message) => { } const hw_Id = data.objects.hw_Id; - console.log(`🔄 Received first hw_Id: ${hw_Id}`); - - // If this hw_Id already has a client, skip creating a new one - if (!activeClients.has(hw_Id)) { - console.log(`🔄 Creating new MQTT client for hw_Id: ${hw_Id}`); - activeClients.set(hw_Id, createDeviceClient(hw_Id)); + const deviceTopic = `water/iot-data/${hw_Id}`; + + if (!subscribedTopics.has(deviceTopic)) { + client.subscribe(deviceTopic, { qos: 1 }, (err) => { + if (err) { + console.error(`❌ Error subscribing to ${deviceTopic}:`, err); + } else { + console.log(`✅ Subscribed to ${deviceTopic}`); + subscribedTopics.add(deviceTopic); + activeDevices.add(hw_Id); + console.log('📡 Active Devices:', Array.from(activeDevices)); + + // ✅ **Now also process data** + processIotData(hw_Id, data); + } + }); + } else { + console.log(`🔄 Already subscribed to ${deviceTopic}, processing data.`); + processIotData(hw_Id, data); } + return; + } + + // **Process IoT Data for device topics** + if (topic.startsWith('water/iot-data/')) { + setImmediate(() => { + console.log(`🚀 Entering processIotData() for topic: ${topic}`); + const hw_Id = topic.split('/')[2]; + processIotData(hw_Id, data); + }); } } catch (err) { console.error('❌ Error processing message:', err.message); } }); -// Function to create a client for each device after hw_Id is known -function createDeviceClient(hw_Id) { - const clientId = `mqtt_client_${hw_Id}`; - console.log(`🔄 Connecting to MQTT with clientId: ${clientId}`); - - const client = mqtt.connect('mqtt://35.207.198.4:1883', { - clientId, - clean: false, - reconnectPeriod: 2000, - }); - - client.on('connect', () => { - console.log(`✅ Connected to MQTT broker as ${clientId}`); - - const deviceTopic = `water/iot-data/${hw_Id}`; - client.subscribe(deviceTopic, { qos: 1 }, (err) => { - if (err) { - console.error(`❌ Error subscribing to ${deviceTopic}:`, err); - } else { - console.log(`✅ Subscribed to ${deviceTopic}`); - } - }); - }); - - client.on('message', async (topic, message) => { - console.log(`📩 Message received on topic ${topic}: ${message.toString()}`); - - try { - const data = JSON.parse(message.toString()); - - if (topic.startsWith('water/iot-data/')) { - setImmediate(() => { - console.log(`🚀 Processing IoT Data for topic: ${topic}`); - processIotData(hw_Id, data); - }); - } - } catch (err) { - console.error('❌ Error processing message:', err.message); - } - }); - - client.on('error', (err) => console.error(`❌ MQTT Error (${clientId}):`, err)); - client.on('close', () => console.log(`⚠️ MQTT Connection Closed for ${clientId}`)); - client.on('offline', () => console.log(`⚠️ MQTT Broker Offline for ${clientId}`)); - - return client; -} +client.on('error', (err) => console.error('❌ MQTT Error:', err)); +client.on('close', () => console.log('⚠️ MQTT Connection Closed.')); +client.on('offline', () => console.log('⚠️ MQTT Broker Offline.')); async function processIotData(hw_Id, data) { try { @@ -5978,6 +5970,7 @@ async function processIotData(hw_Id, data) { + // function logSets() { // console.log("Subscribed Topics:", Array.from(subscribedTopics)); // console.log("Active Devices:", Array.from(activeDevices));