From 5d54fce8f745658010991390a72cc4da8e4b8b38 Mon Sep 17 00:00:00 2001 From: Varun Date: Wed, 5 Mar 2025 17:21:37 +0530 Subject: [PATCH] changes --- src/controllers/tanksController.js | 59 +++++++++++++++++++----------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/src/controllers/tanksController.js b/src/controllers/tanksController.js index 193f1060..77134480 100644 --- a/src/controllers/tanksController.js +++ b/src/controllers/tanksController.js @@ -5854,30 +5854,34 @@ exports.getBlockData = async (req, reply) => { const mqtt = require('mqtt'); - const brokerUrl = 'mqtt://35.207.198.4:1883'; -const clients = new Map(); // Store client instances dynamically per hw_Id -const deviceLastSeen = new Map(); // Track last seen timestamps for devices +const clients = new Map(); // Store client instances dynamically per hw_Id +const deviceLastSeen = new Map(); // Track last seen timestamps for devices function createClient(hw_Id) { if (clients.has(hw_Id)) return clients.get(hw_Id); // Return existing client - const client = mqtt.connect(brokerUrl); - + const client = mqtt.connect(brokerUrl, { + clientId: `client_${hw_Id}`, + clean: false, // Ensures session persistence + reconnectPeriod: 5000, // Attempt reconnect every 5 seconds + }); + client.on('connect', () => { - console.log(`Client for ${hw_Id} connected to MQTT broker`); + console.log(`✅ Client for ${hw_Id} connected to MQTT broker`); + const topic = `water/iot-data/${hw_Id}`; client.subscribe(topic, { qos: 1 }, (err) => { if (err) { - console.error(`Error subscribing to topic ${topic}:`, err); + console.error(`❌ Error subscribing to topic ${topic}:`, err); } else { - console.log(`Subscribed to topic: ${topic}`); + console.log(`📡 Subscribed to topic: ${topic}`); } }); }); client.on('message', async (topic, message) => { - console.log(`Message received on topic ${topic}:`, message.toString()); + console.log(`📩 Message received on topic ${topic}:`, message.toString()); try { const data = JSON.parse(message.toString()); @@ -5901,11 +5905,12 @@ function createClient(hw_Id) { }); await iotTankData.save(); - console.log(`Data saved for device: ${hw_Id}`); + console.log(`✅ Data saved for device: ${hw_Id}`); // Keep only the latest 3 records - const recordsToDelete = await IotData.find({ hardwareId: hw_Id }).sort({ date: -1 }).skip(3); - if (recordsToDelete.length > 0) { + const records = await IotData.find({ hardwareId: hw_Id }).sort({ date: -1 }); + if (records.length > 3) { + const recordsToDelete = records.slice(3); // Get older records await Promise.all(recordsToDelete.map((record) => record.remove())); } @@ -5922,7 +5927,7 @@ function createClient(hw_Id) { existingTank.waterlevel = waterLevel; await existingTank.save(); - console.log(`Tank data saved for ${hw_Id}, tank: ${tank.Id}`); + console.log(`✅ Tank data saved for ${hw_Id}, tank: ${tank.Id}`); }) ); @@ -5942,27 +5947,39 @@ function createClient(hw_Id) { inputConnection.stopTime = moment().tz('Asia/Kolkata').format('HH:mm'); } await motorTank.save(); - console.log(`Motor status updated for device: ${hw_Id}`); + console.log(`✅ Motor status updated for device: ${hw_Id}`); } } } catch (err) { - console.error(`Error processing message from ${hw_Id}:`, err.message); + console.error(`❌ Error processing message from ${hw_Id}:`, err.message); } }); + client.on('error', (err) => { + console.error(`❌ MQTT Error for ${hw_Id}:`, err); + }); + + client.on('disconnect', () => { + console.log(`⚠️ Client for ${hw_Id} disconnected`); + }); + clients.set(hw_Id, client); return client; } // Subscribe to wildcard topic to detect new devices -const globalClient = mqtt.connect(brokerUrl); +const globalClient = mqtt.connect(brokerUrl, { + clientId: 'global_subscriber', + clean: false, +}); + globalClient.on('connect', () => { - console.log('Connected to MQTT broker'); + console.log('🌎 Global client connected to MQTT broker'); globalClient.subscribe('water/iot-data/+', { qos: 1 }, (err) => { if (err) { - console.error('Error subscribing to wildcard topic:', err); + console.error('❌ Error subscribing to wildcard topic:', err); } else { - console.log('Subscribed to wildcard topic: water/iot-data/+'); + console.log('📡 Subscribed to wildcard topic: water/iot-data/+'); } }); }); @@ -5972,11 +5989,11 @@ globalClient.on('message', (topic, message) => { const data = JSON.parse(message.toString()); const { hw_Id } = data.objects; if (!clients.has(hw_Id)) { - console.log(`Creating new MQTT client for device: ${hw_Id}`); + console.log(`➕ Creating new MQTT client for device: ${hw_Id}`); createClient(hw_Id); } } catch (err) { - console.error('Error parsing global message:', err.message); + console.error('❌ Error parsing global message:', err.message); } });