From a3480c5d6771bab63ce0e824fb6a92dba6268da9 Mon Sep 17 00:00:00 2001 From: Varun Date: Wed, 5 Mar 2025 17:28:45 +0530 Subject: [PATCH] changes --- src/controllers/tanksController.js | 229 +++++++++++------------------ 1 file changed, 84 insertions(+), 145 deletions(-) diff --git a/src/controllers/tanksController.js b/src/controllers/tanksController.js index 77134480..5472dc09 100644 --- a/src/controllers/tanksController.js +++ b/src/controllers/tanksController.js @@ -5855,146 +5855,109 @@ exports.getBlockData = async (req, reply) => { const mqtt = require('mqtt'); const brokerUrl = 'mqtt://35.207.198.4:1883'; -const clients = new Map(); // Store client instances dynamically per hw_Id -const deviceLastSeen = new Map(); // Track last seen timestamps for devices +const deviceLastSeen = new Map(); // Track last seen timestamps for devices -function createClient(hw_Id) { - if (clients.has(hw_Id)) return clients.get(hw_Id); // Return existing client - - const client = mqtt.connect(brokerUrl, { - clientId: `client_${hw_Id}`, - clean: false, // Ensures session persistence - reconnectPeriod: 5000, // Attempt reconnect every 5 seconds - }); +// Connect a single global MQTT client +const mqttClient = mqtt.connect(brokerUrl, { + clientId: 'global_subscriber', + clean: true, + reconnectPeriod: 5000, // Reconnect every 5 seconds +}); - client.on('connect', () => { - console.log(`✅ Client for ${hw_Id} connected to MQTT broker`); +mqttClient.on('connect', () => { + console.log('🌎 Global MQTT client connected'); - const topic = `water/iot-data/${hw_Id}`; - client.subscribe(topic, { qos: 1 }, (err) => { - if (err) { - console.error(`❌ Error subscribing to topic ${topic}:`, err); - } else { - console.log(`📡 Subscribed to topic: ${topic}`); - } - }); + // Subscribe to all IoT data topics + mqttClient.subscribe('water/iot-data/+', { qos: 1 }, (err) => { + if (err) { + console.error('❌ Error subscribing to wildcard topic:', err); + } else { + console.log('📡 Subscribed to wildcard topic: water/iot-data/+'); + } }); +}); - client.on('message', async (topic, message) => { +mqttClient.on('message', async (topic, message) => { + try { console.log(`📩 Message received on topic ${topic}:`, message.toString()); - - try { - const data = JSON.parse(message.toString()); - const { hw_Id, Motor_status, tanks } = data.objects; - - const currentTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm'); - deviceLastSeen.set(hw_Id, new Date().toISOString()); - - // Save IoT data - const iotTankData = new IotData({ - hardwareId: hw_Id, - Motor_status, - tanks: tanks.map((tank) => ({ - tankhardwareId: tank.Id, - tankHeight: tank.level, - date: currentTime, - time: moment().tz('Asia/Kolkata').format('HH:mm'), - })), + + const data = JSON.parse(message.toString()); + const { hw_Id, Motor_status, tanks } = data.objects; + + const currentTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm'); + deviceLastSeen.set(hw_Id, new Date().toISOString()); + + // Save IoT data + const iotTankData = new IotData({ + hardwareId: hw_Id, + Motor_status, + tanks: tanks.map((tank) => ({ + tankhardwareId: tank.Id, + tankHeight: tank.level, date: currentTime, time: moment().tz('Asia/Kolkata').format('HH:mm'), - }); - - await iotTankData.save(); - console.log(`✅ Data saved for device: ${hw_Id}`); - - // Keep only the latest 3 records - const records = await IotData.find({ hardwareId: hw_Id }).sort({ date: -1 }); - if (records.length > 3) { - const recordsToDelete = records.slice(3); // Get older records - await Promise.all(recordsToDelete.map((record) => record.remove())); - } + })), + date: currentTime, + time: moment().tz('Asia/Kolkata').format('HH:mm'), + }); - // Process tanks and update status - await Promise.all( - tanks.map(async (tank) => { - const existingTank = await Tank.findOne({ hardwareId: hw_Id, tankhardwareId: tank.Id }); - if (!existingTank) return; - - const tankHeightInCm = parseInt(existingTank.height.replace(/,/g, ''), 10) * 30.48; - const waterLevelHeight = tankHeightInCm - tank.level; - const waterCapacityPerCm = parseInt(existingTank.waterCapacityPerCm.replace(/,/g, ''), 10); - const waterLevel = parseInt(waterLevelHeight * waterCapacityPerCm, 10); - - existingTank.waterlevel = waterLevel; - await existingTank.save(); - console.log(`✅ Tank data saved for ${hw_Id}, tank: ${tank.Id}`); - }) - ); + await iotTankData.save(); + console.log(`✅ Data saved for device: ${hw_Id}`); - // Update motor status - const motorTank = await Tank.findOne({ "connections.inputConnections.motor_id": hw_Id }); - if (motorTank) { - const inputConnection = motorTank.connections.inputConnections.find((conn) => conn.motor_id === hw_Id); - if (inputConnection) { - inputConnection.motor_status = Motor_status; - if (inputConnection.motor_stop_status === "1" && Motor_status === 2) { - inputConnection.motor_stop_status = "2"; - inputConnection.motor_on_type = "forced_manual"; - inputConnection.startTime = moment().tz('Asia/Kolkata').format('HH:mm'); - } - if (inputConnection.motor_stop_status === "2" && Motor_status === 1) { - inputConnection.motor_stop_status = "1"; - inputConnection.stopTime = moment().tz('Asia/Kolkata').format('HH:mm'); - } - await motorTank.save(); - console.log(`✅ Motor status updated for device: ${hw_Id}`); - } - } - } catch (err) { - console.error(`❌ Error processing message from ${hw_Id}:`, err.message); + // Keep only the latest 3 records + const records = await IotData.find({ hardwareId: hw_Id }).sort({ date: -1 }); + if (records.length > 3) { + const recordsToDelete = records.slice(3); // Get older records + await Promise.all(recordsToDelete.map((record) => record.remove())); } - }); - client.on('error', (err) => { - console.error(`❌ MQTT Error for ${hw_Id}:`, err); - }); + // Process tanks and update status + await Promise.all( + tanks.map(async (tank) => { + const existingTank = await Tank.findOne({ hardwareId: hw_Id, tankhardwareId: tank.Id }); + if (!existingTank) return; - client.on('disconnect', () => { - console.log(`⚠️ Client for ${hw_Id} disconnected`); - }); + const tankHeightInCm = parseInt(existingTank.height.replace(/,/g, ''), 10) * 30.48; + const waterLevelHeight = tankHeightInCm - tank.level; + const waterCapacityPerCm = parseInt(existingTank.waterCapacityPerCm.replace(/,/g, ''), 10); + const waterLevel = parseInt(waterLevelHeight * waterCapacityPerCm, 10); - clients.set(hw_Id, client); - return client; -} + existingTank.waterlevel = waterLevel; + await existingTank.save(); + console.log(`✅ Tank data updated for ${hw_Id}, tank: ${tank.Id}`); + }) + ); -// Subscribe to wildcard topic to detect new devices -const globalClient = mqtt.connect(brokerUrl, { - clientId: 'global_subscriber', - clean: false, + // Update motor status + const motorTank = await Tank.findOne({ "connections.inputConnections.motor_id": hw_Id }); + if (motorTank) { + const inputConnection = motorTank.connections.inputConnections.find((conn) => conn.motor_id === hw_Id); + if (inputConnection) { + inputConnection.motor_status = Motor_status; + if (inputConnection.motor_stop_status === "1" && Motor_status === 2) { + inputConnection.motor_stop_status = "2"; + inputConnection.motor_on_type = "forced_manual"; + inputConnection.startTime = moment().tz('Asia/Kolkata').format('HH:mm'); + } + if (inputConnection.motor_stop_status === "2" && Motor_status === 1) { + inputConnection.motor_stop_status = "1"; + inputConnection.stopTime = moment().tz('Asia/Kolkata').format('HH:mm'); + } + await motorTank.save(); + console.log(`✅ Motor status updated for device: ${hw_Id}`); + } + } + } catch (err) { + console.error('❌ Error processing message:', err.message); + } }); -globalClient.on('connect', () => { - console.log('🌎 Global client connected to MQTT broker'); - globalClient.subscribe('water/iot-data/+', { qos: 1 }, (err) => { - if (err) { - console.error('❌ Error subscribing to wildcard topic:', err); - } else { - console.log('📡 Subscribed to wildcard topic: water/iot-data/+'); - } - }); +mqttClient.on('error', (err) => { + console.error('❌ MQTT Client Error:', err); }); -globalClient.on('message', (topic, message) => { - try { - const data = JSON.parse(message.toString()); - const { hw_Id } = data.objects; - if (!clients.has(hw_Id)) { - console.log(`➕ Creating new MQTT client for device: ${hw_Id}`); - createClient(hw_Id); - } - } catch (err) { - console.error('❌ Error parsing global message:', err.message); - } +mqttClient.on('disconnect', () => { + console.log('⚠️ Global MQTT client disconnected'); }); // Periodically check for offline devices @@ -6012,30 +5975,6 @@ setInterval(() => { -exports.getPendingAndCompletedsurveyOfparticularInstaller = async (request, reply) => { - try { - const { installationId } = request.params; - const survey_status = request.body; - - - const surveydata = await User.find({ - installationId, - survey_status, - - }); - - - // Send the response, including both total consumption and filtered consumption records - reply.send({ - status_code: 200, - surveydata, - - }); - } catch (err) { - throw boom.boomify(err); - } -}; -