From 65de89a44f735986f94419f678eb9d70234e5aac Mon Sep 17 00:00:00 2001 From: Varun Date: Tue, 18 Mar 2025 16:15:21 +0530 Subject: [PATCH] changes --- src/controllers/tanksController.js | 321 +++++++++-------------------- 1 file changed, 98 insertions(+), 223 deletions(-) diff --git a/src/controllers/tanksController.js b/src/controllers/tanksController.js index c3437349..3904ee0e 100644 --- a/src/controllers/tanksController.js +++ b/src/controllers/tanksController.js @@ -2836,6 +2836,8 @@ async function calculateTotalPumpedWater(customerId, motorId, start_instance_id) } + + exports.motorAction = async (req, reply) => { try { const { customerId } = req.params; @@ -2861,122 +2863,13 @@ exports.motorAction = async (req, reply) => { let motorStopStatus = action === "start" ? "2" : "1"; const blockName = req.body.from || "Unknown Block"; const tankName = req.body.to || "Unknown Tank"; - const stopTime = req.body.stopTime - if (action === "start") { - motorStopStatus = "2"; - const startTime = req.body.startTime; - await Tank.updateOne( - { customerId, "connections.inputConnections.motor_id": motorId }, - { $set: { "connections.inputConnections.$.motor_stop_status": motorStopStatus } } - ); - const thresholdTimeMs = req.body.manual_threshold_time * 60 * 1000; // Convert minutes to milliseconds - // const stopCriteria = - // motorOnType === "time" - // ? `${req.body.manual_threshold_time} minutes` - // : `${req.body.manual_threshold_litres} litres`; - try { - console.log("enter the start") - eventEmitter.emit( - "motorStart", - customerId, - fcmToken, - tankName, - blockName, - startTime, - "Mobile APP", - manual_threshold_time, - typeOfWater, - motorId, - loggedInUser.phone, - ); - - reply.code(200).send({ message: "Motor started successfully." }); - } catch (error) { - console.error("Error in handleMotorStart:", error); - reply.code(500).send({ error: "Internal Server Error" }); - } - - // Start checking water level every 30 minutes - if (!waterLevelCheckInterval) { - waterLevelCheckInterval = setInterval(async () => { - await checkWaterLevel(customerId, motorId, fcmToken, receiverTank); - }, 30 * 60 * 1000); // 30 minutes - } - await Tank.updateOne( - { customerId, "connections.inputConnections.motor_id": motorId }, - { $set: { "connections.inputConnections.$.motor_stop_status": "2", - "connections.inputConnections.$.manual_threshold_time": manual_threshold_time, - "connections.inputConnections.$.threshold_type": "time", - "connections.inputConnections.$.motor_on_type": "manual" } } - ); - - reply.code(200).send({ message: "Motor started successfully." }); - } else if (action === "stop") { - motorStopStatus = "1"; // If action is stop, set stop status to "1" - - try { - - - const totalWaterPumped = await calculateTotalPumpedWater(customerId, motorId, start_instance_id); - - eventEmitter.emit("motorStop", customerId, fcmToken, tankName, blockName, stopTime, "Mobile APP", totalWaterPumped, typeOfWater, motorId, - loggedInUser.phone,); - - reply.code(200).send({ message: "Motor stopped successfully." }); - } catch (error) { - console.error("Error in handleMotorStop:", error); - reply.code(500).send({ error: "Internal Server Error" }); - } - - - await Tank.updateOne( - { customerId, "connections.inputConnections.motor_id": motorId }, - { - $set: { - "connections.inputConnections.$.motor_stop_status": "1", - "connections.inputConnections.$.motor_on_type": motorOnType } - } - ); - // Clear the interval when the motor is stopped - if (waterLevelCheckInterval) { - clearInterval(waterLevelCheckInterval); - waterLevelCheckInterval = null; // Reset the interval ID - } - } else { - throw new Error("Invalid action provided."); - } - - // If action is stop, immediately update motor status and perform stop operations - if (action === "stop") { - console.log("enterted stop") - await Tank.updateOne( - { customerId, "connections.inputConnections.motor_id": motorId }, - { - $set: { - "connections.inputConnections.$.motor_stop_status": "1", - "connections.inputConnections.$.motor_on_type": "manual", - "connections.inputConnections.$.stopTime": req.body.stopTime, - "connections.inputConnections.$.threshold_type": null, - "connections.inputConnections.$.manual_threshold_time": null, - "connections.inputConnections.$.manual_threshold_percentage": null - } - } - ); + if (action === "start") { - if (motorIntervals[motorId]) { - console.log(`🔄 Clearing all existing intervals for motorId: ${motorId}`); - - // Clear and delete all intervals for the motorId - Object.keys(motorIntervals).forEach(key => { - if (key.startsWith(motorId)) { - clearInterval(motorIntervals[key]); - delete motorIntervals[key]; - } - }); - - console.log(`✅ All intervals cleared for motorId: ${motorId}`); - } + if (motorIntervals[motorId]) { + clearInterval(motorIntervals[motorId]); + delete motorIntervals[motorId]; + } const startTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm'); @@ -3058,29 +2951,29 @@ exports.motorAction = async (req, reply) => { const notificationKey = `${customerId}_${motorId}_threshold`; // Check if the notification has already been sent - // if (!notificationTracker.get(notificationKey)) { - // console.log("Sending threshold time notification..."); + if (!notificationTracker.get(notificationKey)) { + console.log("Sending threshold time notification..."); - // eventEmitter.emit( - // "sendThresholdTimeNotification", - // customerId, - // fcmToken, - // manual_threshold_time, - // motorId, - // tankName, - // blockName - // ); - - // // Mark notification as sent - // notificationTracker.set(notificationKey, true); - - // // Optionally, reset the flag after some time (e.g., 24 hours) - // setTimeout(() => { - // notificationTracker.delete(notificationKey); - // }, 24 * 60 * 60 * 1000); // Reset after 24 hours - // } else { - // console.log("Notification already sent, skipping..."); - // } + eventEmitter.emit( + "sendThresholdTimeNotification", + customerId, + fcmToken, + manual_threshold_time, + motorId, + tankName, + blockName + ); + + // Mark notification as sent + notificationTracker.set(notificationKey, true); + + // Optionally, reset the flag after some time (e.g., 24 hours) + setTimeout(() => { + notificationTracker.delete(notificationKey); + }, 24 * 60 * 60 * 1000); // Reset after 24 hours + } else { + console.log("Notification already sent, skipping..."); + } const currentTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm'); await Tank.updateOne( @@ -3097,9 +2990,7 @@ exports.motorAction = async (req, reply) => { } ); - - clearInterval(motorIntervals[motorId]); // Stop the motor if condition met delete motorIntervals[motorId]; // Remove from interval object @@ -3254,33 +3145,6 @@ exports.motorAction = async (req, reply) => { } else if (action === "stop") { await stopMotor(motorId, customerId, start_instance_id); this.publishMotorStopStatus(motorId, motorStopStatus); - // Fetch motor data to check its threshold time - const motorData = await Tank.findOne( - { customerId, "connections.inputConnections.motor_id": motorId }, - { "connections.inputConnections.$": 1 } - ); - - if (motorData) { - const motor = motorData.connections.inputConnections[0]; - const thresholdTime = motor.manual_threshold_time; // Get threshold time - const hw_Id = motor.hw_Id; // Get hardware ID - - // Check if motor was running for at least the threshold time - if (motor.runTime >= thresholdTime) { - console.log(`Sending threshold time notification for motor ${motorId}`); - - eventEmitter.emit( - "sendThresholdTimeNotification", - customerId, - fcmToken, // Use correct FCM token list - thresholdTime, - hw_Id, - tankName, - blockName - ); - } - } - reply.code(200).send({ message: "Motor stopped successfully." }); } } catch (err) { @@ -3327,6 +3191,9 @@ async function stopMotor(motorId, customerId, start_instance_id) { + + + // exports.motorAction = async (req, reply) => { // try { // const customerId = req.params.customerId; @@ -5898,32 +5765,20 @@ exports.getBlockData = async (req, reply) => { const mqtt = require('mqtt'); require('dotenv').config(); -// **Persistent MQTT Connection** -const client = mqtt.connect('mqtt://35.207.198.4:1883', { - clientId: `mqtt_server_${Math.random().toString(16).substr(2, 8)}`, - clean: false, // Ensures MQTT retains subscriptions - reconnectPeriod: 2000, // Reconnect every 2 seconds -}); - -const subscribedTopics = new Set(); -const activeDevices = new Set(); // Keep track of active devices +const activeClients = new Map(); // Store active clients per hw_Id -client.on('connect', () => { - console.log('✅ Connected to MQTT broker'); - - // **Ensure re-subscriptions after reconnect** - subscribedTopics.forEach(topic => { - client.subscribe(topic, { qos: 1 }, (err) => { - if (err) { - console.error(`❌ Error resubscribing to ${topic}:`, err); - } else { - console.log(`🔄 Resubscribed to ${topic}`); - } - }); - }); +// Temporary client to listen for device announcements +const tempClientId = `mqtt_temp_${Math.random().toString(16).substr(2, 8)}`; +const tempClient = mqtt.connect('mqtt://35.207.198.4:1883', { + clientId: tempClientId, + clean: false, + reconnectPeriod: 2000, +}); - // **Subscribe to new device announcements** - client.subscribe('water/iot-data/announce', { qos: 1 }, (err) => { +// Subscribe only to device announcements +tempClient.on('connect', () => { + console.log(`✅ Connected to MQTT broker as ${tempClientId}`); + tempClient.subscribe('water/iot-data/announce', { qos: 1 }, (err) => { if (err) { console.error('❌ Error subscribing to announcement topic:', err); } else { @@ -5932,13 +5787,12 @@ client.on('connect', () => { }); }); -client.on('message', async (topic, message) => { +tempClient.on('message', async (topic, message) => { console.log(`📩 Message received on topic ${topic}: ${message.toString()}`); try { const data = JSON.parse(message.toString()); - // **Handle device announcements** if (topic === 'water/iot-data/announce') { if (!data.objects || !data.objects.hw_Id) { console.error("❌ Invalid announcement format. Missing hw_Id."); @@ -5946,45 +5800,66 @@ client.on('message', async (topic, message) => { } const hw_Id = data.objects.hw_Id; - const deviceTopic = `water/iot-data/${hw_Id}`; - - if (!subscribedTopics.has(deviceTopic)) { - client.subscribe(deviceTopic, { qos: 1 }, (err) => { - if (err) { - console.error(`❌ Error subscribing to ${deviceTopic}:`, err); - } else { - console.log(`✅ Subscribed to ${deviceTopic}`); - subscribedTopics.add(deviceTopic); - activeDevices.add(hw_Id); - console.log('📡 Active Devices:', Array.from(activeDevices)); - - // ✅ **Now also process data** - processIotData(hw_Id, data); - } - }); - } else { - console.log(`🔄 Already subscribed to ${deviceTopic}, processing data.`); - processIotData(hw_Id, data); - } - return; - } + console.log(`🔄 Received first hw_Id: ${hw_Id}`); - // **Process IoT Data for device topics** - if (topic.startsWith('water/iot-data/')) { - setImmediate(() => { - console.log(`🚀 Entering processIotData() for topic: ${topic}`); - const hw_Id = topic.split('/')[2]; - processIotData(hw_Id, data); - }); + // If this hw_Id already has a client, skip creating a new one + if (!activeClients.has(hw_Id)) { + console.log(`🔄 Creating new MQTT client for hw_Id: ${hw_Id}`); + activeClients.set(hw_Id, createDeviceClient(hw_Id)); + } } } catch (err) { console.error('❌ Error processing message:', err.message); } }); -client.on('error', (err) => console.error('❌ MQTT Error:', err)); -client.on('close', () => console.log('⚠️ MQTT Connection Closed.')); -client.on('offline', () => console.log('⚠️ MQTT Broker Offline.')); +// Function to create a client for each device after hw_Id is known +function createDeviceClient(hw_Id) { + const clientId = `mqtt_client_${hw_Id}`; + console.log(`🔄 Connecting to MQTT with clientId: ${clientId}`); + + const client = mqtt.connect('mqtt://35.207.198.4:1883', { + clientId, + clean: false, + reconnectPeriod: 2000, + }); + + client.on('connect', () => { + console.log(`✅ Connected to MQTT broker as ${clientId}`); + + const deviceTopic = `water/iot-data/${hw_Id}`; + client.subscribe(deviceTopic, { qos: 1 }, (err) => { + if (err) { + console.error(`❌ Error subscribing to ${deviceTopic}:`, err); + } else { + console.log(`✅ Subscribed to ${deviceTopic}`); + } + }); + }); + + client.on('message', async (topic, message) => { + console.log(`📩 Message received on topic ${topic}: ${message.toString()}`); + + try { + const data = JSON.parse(message.toString()); + + if (topic.startsWith('water/iot-data/')) { + setImmediate(() => { + console.log(`🚀 Processing IoT Data for topic: ${topic}`); + processIotData(hw_Id, data); + }); + } + } catch (err) { + console.error('❌ Error processing message:', err.message); + } + }); + + client.on('error', (err) => console.error(`❌ MQTT Error (${clientId}):`, err)); + client.on('close', () => console.log(`⚠️ MQTT Connection Closed for ${clientId}`)); + client.on('offline', () => console.log(`⚠️ MQTT Broker Offline for ${clientId}`)); + + return client; +} async function processIotData(hw_Id, data) { try {