From 47d6c5d7027a4504311c6ba4458aa1fa1b27a19e Mon Sep 17 00:00:00 2001 From: Varun Date: Wed, 5 Mar 2025 17:14:36 +0530 Subject: [PATCH] changes --- src/controllers/tanksController.js | 230 ++++++++++++++--------------- 1 file changed, 114 insertions(+), 116 deletions(-) diff --git a/src/controllers/tanksController.js b/src/controllers/tanksController.js index 01f52f7b..193f1060 100644 --- a/src/controllers/tanksController.js +++ b/src/controllers/tanksController.js @@ -5850,149 +5850,147 @@ exports.getBlockData = async (req, reply) => { // } // }); -const mqtt = require('mqtt'); -const client = mqtt.connect('mqtt://35.207.198.4:1883'); -const subscribedTopics = new Set(); -client.on('connect', () => { - console.log('Connected to MQTT broker'); - client.subscribe('water/iot-data/+', (err) => { - if (err) { - console.error('Error subscribing to wildcard topic:', err); - } else { - console.log('Subscribed to water/iot-data/+ wildcard topic'); - } - }); -}); - -client.on('message', async (topic, message) => { - console.log(`Message received on topic ${topic}:`, message.toString()); - - if (!topic.startsWith('water/iot-data/')) return; - - try { - const data = JSON.parse(message.toString()); - const { hw_Id, Motor_status, tanks } = data.objects; - const deviceTopic = `water/iot-data/${hw_Id}`; - - if (!subscribedTopics.has(deviceTopic)) { - client.subscribe(deviceTopic, (err) => { - if (err) console.error(`Error subscribing to topic ${deviceTopic}:`, err); - else { - console.log(`Subscribed to device-specific topic: ${deviceTopic}`); - subscribedTopics.add(deviceTopic); - } - }); - } - const currentDate = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm'); - const currentTime = moment().tz('Asia/Kolkata').format('HH:mm'); +const mqtt = require('mqtt'); - // Create tank documents - const tankDocuments = tanks.map((tank) => ({ - tankhardwareId: tank.Id, - tankHeight: tank.level, - date: currentDate, - time: currentTime, - })); - +const brokerUrl = 'mqtt://35.207.198.4:1883'; +const clients = new Map(); // Store client instances dynamically per hw_Id +const deviceLastSeen = new Map(); // Track last seen timestamps for devices - // Save IoT data - const iotTankData = new IotData({ - hardwareId: hw_Id, - Motor_status, - tanks: tankDocuments, - date: currentDate, - time: currentTime, +function createClient(hw_Id) { + if (clients.has(hw_Id)) return clients.get(hw_Id); // Return existing client + const client = mqtt.connect(brokerUrl); + + client.on('connect', () => { + console.log(`Client for ${hw_Id} connected to MQTT broker`); + const topic = `water/iot-data/${hw_Id}`; + client.subscribe(topic, { qos: 1 }, (err) => { + if (err) { + console.error(`Error subscribing to topic ${topic}:`, err); + } else { + console.log(`Subscribed to topic: ${topic}`); + } }); + }); - await iotTankData.save(); - - // Keep only the latest 3 records - const recordsToDelete = await IotData.find({ hardwareId: hw_Id }) - .sort({ date: -1 }) - .skip(3); + client.on('message', async (topic, message) => { + console.log(`Message received on topic ${topic}:`, message.toString()); - await Promise.all(recordsToDelete.map((record) => record.remove())); + try { + const data = JSON.parse(message.toString()); + const { hw_Id, Motor_status, tanks } = data.objects; + + const currentTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm'); + deviceLastSeen.set(hw_Id, new Date().toISOString()); + + // Save IoT data + const iotTankData = new IotData({ + hardwareId: hw_Id, + Motor_status, + tanks: tanks.map((tank) => ({ + tankhardwareId: tank.Id, + tankHeight: tank.level, + date: currentTime, + time: moment().tz('Asia/Kolkata').format('HH:mm'), + })), + date: currentTime, + time: moment().tz('Asia/Kolkata').format('HH:mm'), + }); - // Process each tank update - await Promise.all( - tanks.map(async (tank) => { - const { Id: tankhardwareId, level: tankHeight } = tank; - const existingTank = await Tank.findOne({ hardwareId: hw_Id, tankhardwareId }); - if (!existingTank) return; + await iotTankData.save(); + console.log(`Data saved for device: ${hw_Id}`); - const customerId = existingTank.customerId; - const tank_name = existingTank.tankName; + // Keep only the latest 3 records + const recordsToDelete = await IotData.find({ hardwareId: hw_Id }).sort({ date: -1 }).skip(3); + if (recordsToDelete.length > 0) { + await Promise.all(recordsToDelete.map((record) => record.remove())); + } - const tankHeightInCm = parseInt(existingTank.height.replace(/,/g, ''), 10) * 30.48; - const waterLevelHeight = tankHeightInCm - tankHeight; - const waterCapacityPerCm = parseInt(existingTank.waterCapacityPerCm.replace(/,/g, ''), 10); - const waterLevel = parseInt(waterLevelHeight * waterCapacityPerCm, 10); + // Process tanks and update status + await Promise.all( + tanks.map(async (tank) => { + const existingTank = await Tank.findOne({ hardwareId: hw_Id, tankhardwareId: tank.Id }); + if (!existingTank) return; - console.log(tankHeight, "Processed in IoT Data MQTT Subscription"); + const tankHeightInCm = parseInt(existingTank.height.replace(/,/g, ''), 10) * 30.48; + const waterLevelHeight = tankHeightInCm - tank.level; + const waterCapacityPerCm = parseInt(existingTank.waterCapacityPerCm.replace(/,/g, ''), 10); + const waterLevel = parseInt(waterLevelHeight * waterCapacityPerCm, 10); - if (tankHeight > 0 && waterLevel >= 0) { existingTank.waterlevel = waterLevel; await existingTank.save(); + console.log(`Tank data saved for ${hw_Id}, tank: ${tank.Id}`); + }) + ); - // Update linked tanks - await Promise.all( - existingTank.connections.outputConnections.map(async (outputConnection) => { - const linkedTank = await Tank.findOne({ - customerId, - tankName: outputConnection.outputConnections, - tankLocation: outputConnection.output_type, - }); - if (linkedTank) { - linkedTank.connections.inputConnections.forEach((inputConnection) => { - if (inputConnection.inputConnections === tank_name) { - inputConnection.water_level = waterLevel; - } - }); - await linkedTank.save(); - } - }) - ); + // Update motor status + const motorTank = await Tank.findOne({ "connections.inputConnections.motor_id": hw_Id }); + if (motorTank) { + const inputConnection = motorTank.connections.inputConnections.find((conn) => conn.motor_id === hw_Id); + if (inputConnection) { + inputConnection.motor_status = Motor_status; + if (inputConnection.motor_stop_status === "1" && Motor_status === 2) { + inputConnection.motor_stop_status = "2"; + inputConnection.motor_on_type = "forced_manual"; + inputConnection.startTime = moment().tz('Asia/Kolkata').format('HH:mm'); + } + if (inputConnection.motor_stop_status === "2" && Motor_status === 1) { + inputConnection.motor_stop_status = "1"; + inputConnection.stopTime = moment().tz('Asia/Kolkata').format('HH:mm'); + } + await motorTank.save(); + console.log(`Motor status updated for device: ${hw_Id}`); } - }) - ); - - // Update motor status - const motorTank = await Tank.findOne({ "connections.inputConnections.motor_id": hw_Id }); - if (!motorTank) { - console.log('Motor not found for motor_id:', hw_Id); - return; - } - - const inputConnection = motorTank.connections.inputConnections.find((conn) => conn.motor_id === hw_Id); - if (inputConnection) { - inputConnection.motor_status = Motor_status; - - if (inputConnection.motor_stop_status === "1" && Motor_status === 2 && inputConnection.motor_on_type !== "forced_manual") { - inputConnection.motor_stop_status = "2"; - inputConnection.motor_on_type = "forced_manual"; - inputConnection.startTime = currentTime; } + } catch (err) { + console.error(`Error processing message from ${hw_Id}:`, err.message); + } + }); - if (inputConnection.motor_stop_status === "2" && Motor_status === 1) { - inputConnection.motor_stop_status = "1"; - inputConnection.stopTime = currentTime; - } + clients.set(hw_Id, client); + return client; +} - await motorTank.save(); +// Subscribe to wildcard topic to detect new devices +const globalClient = mqtt.connect(brokerUrl); +globalClient.on('connect', () => { + console.log('Connected to MQTT broker'); + globalClient.subscribe('water/iot-data/+', { qos: 1 }, (err) => { + if (err) { + console.error('Error subscribing to wildcard topic:', err); + } else { + console.log('Subscribed to wildcard topic: water/iot-data/+'); } + }); +}); - console.log('Successfully processed data for hardwareId:', hw_Id); - +globalClient.on('message', (topic, message) => { + try { + const data = JSON.parse(message.toString()); + const { hw_Id } = data.objects; + if (!clients.has(hw_Id)) { + console.log(`Creating new MQTT client for device: ${hw_Id}`); + createClient(hw_Id); + } } catch (err) { - console.error('Error processing message:', err.message); + console.error('Error parsing global message:', err.message); } }); - +// Periodically check for offline devices +setInterval(() => { + const now = new Date(); + deviceLastSeen.forEach((lastSeen, hw_Id) => { + const lastSeenDate = new Date(lastSeen); + const diffInSeconds = (now - lastSeenDate) / 1000; + if (diffInSeconds > 60) { + console.log(`🚨 Device ${hw_Id} is offline. Last seen: ${lastSeen}`); + } + }); +}, 60000);