diff --git a/src/controllers/tanksController.js b/src/controllers/tanksController.js index be0065fe..902efd6d 100644 --- a/src/controllers/tanksController.js +++ b/src/controllers/tanksController.js @@ -1780,6 +1780,40 @@ eventEmitter.on("sendMotorStopNotification", async (hw_Id, customerId, fcmTokens } }); +eventEmitter.on('sendLowWaterNotification', async (customerId, fcmTokens, hw_Id, tankName, blockName, lowWaterLevel, currentWaterLevel, currentWaterPercentage) => { + try { + const message = + `⚠️ Warning: Low water level detected!\n` + + `🛢️ Tank Name: '${tankName}'\n` + + `🏢 Block Name: '${blockName}'\n` + + `📉 Low Water Threshold: '${lowWaterLevel} liters'\n` + + `📌 Current Water Level: '${currentWaterLevel} liters'\n` + + `📅 Date & Time: '${new Date().toLocaleString()}'`; + + await sendNotification(hw_Id, customerId, fcmTokens, 'Low Water Alert', message); + console.log("✅ Low water notification sent successfully."); + } catch (error) { + console.error("❌ Error sending low water notification:", error); + } +}); + +eventEmitter.on('sendCriticalLowWaterNotification', async (customerId, fcmTokens, hw_Id, tankName, blockName, criticalLowWaterLevel, currentWaterLevel, currentWaterPercentage) => { + try { + const message = + `🚨 Critical Alert: Water level is **critically low!**\n` + + `🛢️ Tank Name: '${tankName}'\n` + + `🏢 Block Name: '${blockName}'\n` + + `🔴 Critical Low Threshold: '${criticalLowWaterLevel} liters'\n` + + `📌 Current Water Level: '${currentWaterLevel} liters'\n` + + `📅 Date & Time: '${new Date().toLocaleString()}'`; + + await sendNotification(hw_Id, customerId, fcmTokens, 'Critical Water Alert', message); + console.log("✅ Critical low water notification sent successfully."); + } catch (error) { + console.error("❌ Error sending critical low water notification:", error); + } +}); + // eventEmitter.on('sendLowWaterNotification', (fcmTokens, message) => { @@ -2138,17 +2172,20 @@ const sendNotification = async (hw_Id, customerId, fcmIds, title, body) => { const response = await admin.messaging().send({ token, notification: { title, body }, - data: { hw_Id, target: "/tank_levels" }, + data: { + hw_Id: String(hw_Id), + target: "/tank_levels" + }, }); console.log(`✅ Notification sent successfully to token: ${token}`); console.log("📬 FCM Response:", response); console.log(`📡 Sending notification to Customer ID: ${customerId}`); - console.log(`🔍 FCM Tokens:`, fcmTokens); + //console.log(`🔍 FCM Tokens:`, fcmTokens); } catch (error) { - console.error(`❌ Failed to send notification to token: ${token}`, error); + // console.error(`❌ Failed to send notification to token: ${token}`, error); if (error.code === "messaging/registration-token-not-registered") { await User.updateOne({ customerId }, { $pull: { fcmIds: token } }); @@ -2789,38 +2826,38 @@ exports.getPumpsAndUsers = async (req, reply) => { -const monitorWaterLevels = async () => { - try { - const tanks = await Tank.find({}); - - // Iterate through each tank - for (const tank of tanks) { - // Fetch users associated with the customerId of the tank - const users = await User.find({ customerId: tank.customerId }); - const fcmTokens = users - .map(user => user.fcmIds) - .filter(fcmIds => fcmIds) - .flat(); // Flatten if there are multiple fcmIds for each user - - // Ensure that fcmTokens exist before proceeding - if (fcmTokens.length > 0) { - const customerId = tank.customerId; - const tankName = tank.tankName; // Assuming tank has a 'name' field - const tankLocation = tank.tankLocation; // Assuming tank has a 'location' field +// const monitorWaterLevels = async () => { +// try { +// const tanks = await Tank.find({}); - // Call the function to check water levels and send notifications - //await checkWaterLevelsAndNotify(customerId, tankName, tankLocation, fcmTokens); - } else { - //console.log(`No FCM tokens found for customerId ${tank.customerId}`); - } - } - } catch (error) { - console.error('Error monitoring water levels:', error); - } -}; +// // Iterate through each tank +// for (const tank of tanks) { +// // Fetch users associated with the customerId of the tank +// const users = await User.find({ customerId: tank.customerId }); +// const fcmTokens = users +// .map(user => user.fcmIds) +// .filter(fcmIds => fcmIds) +// .flat(); // Flatten if there are multiple fcmIds for each user + +// // Ensure that fcmTokens exist before proceeding +// if (fcmTokens.length > 0) { +// const customerId = tank.customerId; +// const tankName = tank.tankName; // Assuming tank has a 'name' field +// const tankLocation = tank.tankLocation; // Assuming tank has a 'location' field + +// // Call the function to check water levels and send notifications +// //await checkWaterLevelsAndNotify(customerId, tankName, tankLocation, fcmTokens); +// } else { +// //console.log(`No FCM tokens found for customerId ${tank.customerId}`); +// } +// } +// } catch (error) { +// console.error('Error monitoring water levels:', error); +// } +// }; -// Schedule the task to run every 30 minutes -setInterval(monitorWaterLevels, 30 * 60 * 1000); +// // Schedule the task to run every 30 minutes +// setInterval(monitorWaterLevels, 30 * 60 * 1000); const motorIntervals = {}; async function calculateTotalPumpedWater(customerId, motorId, start_instance_id) { @@ -2863,7 +2900,7 @@ exports.motorAction = async (req, reply) => { let motorStopStatus = action === "start" ? "2" : "1"; const blockName = req.body.from || "Unknown Block"; const tankName = req.body.to || "Unknown Tank"; - + const stopTime = req.body.stopTime if (action === "start") { if (motorIntervals[motorId]) { @@ -2946,7 +2983,45 @@ exports.motorAction = async (req, reply) => { console.log(thresholdTime,"thresholdTime") console.log("motor stopping because it entered this condition") // Emit the threshold time notification - + try { + console.log("motor stopping because it entered this condition") + + const tank = await Tank.findOne( + { customerId, "connections.inputConnections.motor_id": motorId }, + { "connections.inputConnections.$": 1 } // Fetch only relevant motor connection + ); + + if (tank && tank.connections.inputConnections[0].motor_stop_status === "1") { + console.log("⚠️ Motor already stopped. Skipping notification."); + } else { + console.log("🚀 Sending threshold time notification..."); + + eventEmitter.emit( + "sendThresholdTimeNotification", + customerId, + fcmToken, + manual_threshold_time, + motorId, + tankName, + blockName + ); + } + // eventEmitter.emit( + // "sendThresholdTimeNotification", + // customerId, + // fcmToken, + // manual_threshold_time, + // motorId, + // tankName, + // blockName + // ); + + reply.code(200).send({ message: "Motor stopped successfully." }); + } catch (error) { + console.error("Error in handleMotorStop:", error); + reply.code(500).send({ error: "Internal Server Error" }); + } + // eventEmitter.emit( // "sendThresholdTimeNotification", // customerId, @@ -2957,32 +3032,32 @@ exports.motorAction = async (req, reply) => { // blockName // ); - const notificationKey = `${customerId}_${motorId}_threshold`; + // const notificationKey = `${customerId}_${motorId}_threshold`; - // Check if the notification has already been sent - if (!notificationTracker.get(notificationKey)) { - console.log("Sending threshold time notification..."); + // // Check if the notification has already been sent + // if (!notificationTracker.get(notificationKey)) { + // console.log("Sending threshold time notification..."); - eventEmitter.emit( - "sendThresholdTimeNotification", - customerId, - fcmToken, - manual_threshold_time, - motorId, - tankName, - blockName - ); - - // Mark notification as sent - notificationTracker.set(notificationKey, true); - - // Optionally, reset the flag after some time (e.g., 24 hours) - setTimeout(() => { - notificationTracker.delete(notificationKey); - }, 24 * 60 * 60 * 1000); // Reset after 24 hours - } else { - console.log("Notification already sent, skipping..."); - } + // eventEmitter.emit( + // "sendThresholdTimeNotification", + // customerId, + // fcmToken, + // manual_threshold_time, + // motorId, + // tankName, + // blockName + // ); + + // // Mark notification as sent + // notificationTracker.set(notificationKey, true); + + // // Optionally, reset the flag after some time (e.g., 24 hours) + // setTimeout(() => { + // notificationTracker.delete(notificationKey); + // }, 24 * 60 * 60 * 1000); // Reset after 24 hours + // } else { + // console.log("Notification already sent, skipping..."); + // } const currentTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm'); await Tank.updateOne( @@ -3163,6 +3238,19 @@ exports.motorAction = async (req, reply) => { } else if (action === "stop") { await stopMotor(motorId, customerId, start_instance_id); + try { + + + const totalWaterPumped = await calculateTotalPumpedWater(customerId, motorId, start_instance_id); + + eventEmitter.emit("motorStop", customerId, fcmToken, tankName, blockName, stopTime, "Mobile APP", totalWaterPumped, typeOfWater, motorId, + loggedInUser.phone,); + + reply.code(200).send({ message: "Motor stopped successfully." }); + } catch (error) { + console.error("Error in handleMotorStop:", error); + reply.code(500).send({ error: "Internal Server Error" }); + } this.publishMotorStopStatus(motorId, motorStopStatus); reply.code(200).send({ message: "Motor stopped successfully." }); } @@ -3190,7 +3278,7 @@ async function stopMotor(motorId, customerId, start_instance_id) { delete motorIntervals[motorId]; } - eventEmitter.emit("motorStop", customerId, [], "", "", currentTime, "Mobile APP", 0, "", motorId, ""); + // eventEmitter.emit("motorStop", customerId, [], "", "", currentTime, "Mobile APP", 0, "", motorId, ""); const motorData = await MotorData.findOne({ customerId, motor_id: motorId, start_instance_id }); if (motorData) { @@ -3211,6 +3299,77 @@ async function stopMotor(motorId, customerId, start_instance_id) { +const monitorWaterLevels = async () => { + try { + // console.log("⏳ Monitoring water levels..."); + + const tanks = await Tank.find(); // Fetch all tanks + //console.log("Fetched Tanks:", tanks.length); + + for (const tank of tanks) { + + const { + _id, + customerId, // Move this to the top + motor_id, + tankName, + blockName, + waterlevel: currentWaterLevel, + capacity, + auto_min_percentage, + reserved_percentage, + notificationSentLow, + notificationSentCritical, + } = tank; + + const users = await User.find({ customerId }); + const fcmTokens = users + .map(user => user.fcmIds) + .filter(fcmIds => Array.isArray(fcmIds) && fcmIds.length > 0) // Ensure it's an array + .flat(); + + if (!fcmTokens || fcmTokens.length === 0) { + //console.error("❌ No valid FCM tokens found for customerId:", customerId); + continue; // Skip this tank + } + const LOW_PERCENTAGE = 20; // 20% threshold + const CRITICAL_PERCENTAGE = 10; // 10% threshold + + // **Calculate thresholds based on tank capacity** + const lowWaterLevel = (LOW_PERCENTAGE / 100) * capacity; + const criticalLowWaterLevel = (CRITICAL_PERCENTAGE / 100) * capacity; + const currentWaterPercentage = (currentWaterLevel / capacity) * 100; + // const lowWaterLevel = 9483; // Low threshold in liters + // const criticalLowWaterLevel = 9483; + if (currentWaterLevel <= criticalLowWaterLevel) { + if (!notificationSentCritical) { + console.log("🚨 Sending Critical Low Water Notification..."); + eventEmitter.emit("sendCriticalLowWaterNotification", customerId, fcmTokens, motor_id, tankName, blockName, criticalLowWaterLevel, currentWaterLevel, currentWaterPercentage); + await Tank.updateOne({ _id }, { notificationSentCritical: true, notificationSentLow: true }); + } + } + else if (currentWaterLevel <= lowWaterLevel) { + if (!notificationSentLow) { + console.log("⚠️ Sending Low Water Notification..."); + eventEmitter.emit("sendLowWaterNotification", customerId, fcmTokens, motor_id, tankName, blockName, lowWaterLevel, currentWaterLevel, currentWaterPercentage); + await Tank.updateOne({ _id }, { notificationSentLow: true }); + } + } + else if (currentWaterLevel > lowWaterLevel && (notificationSentLow || notificationSentCritical)) { + console.log("🔄 Water level restored. Resetting flags."); + await Tank.updateOne({ _id }, { notificationSentCritical: false, notificationSentLow: false }); + } + + } + } catch (error) { + console.error("❌ Error monitoring water levels:", error); + } +}; + +setInterval(monitorWaterLevels, 1000); + + + // async function stopMotor(motorId, customerId, start_instance_id) { // const currentTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm'); // await Tank.updateOne( @@ -5913,6 +6072,7 @@ client.on('message', async (topic, message) => { } }); + client.on('error', (err) => console.error('❌ MQTT Error:', err)); client.on('close', () => console.log('⚠️ MQTT Connection Closed.')); client.on('offline', () => console.log('⚠️ MQTT Broker Offline.')); @@ -6075,7 +6235,7 @@ const sendMotorNotifications = async () => { // 🔹 Motor Start Condition if ( - inputConnection.motor_stop_status === "2" && status ===1 && inputConnection.motor_on_type === "forced_manual" && + inputConnection.motor_stop_status === "2" && inputConnection.motor_on_type === "forced_manual" && !motorTank.motor_start_notified ) { console.log("✅ Sending Motor Start Notification..."); @@ -6100,8 +6260,8 @@ const sendMotorNotifications = async () => { // 🔹 Motor Stop Condition if ( - inputConnection.motor_stop_status === "1" && inputConnection.motor_on_type === "forced_manual" && status ===2 && - !motorTank.motor_stop_notified + inputConnection.motor_stop_status === "1" && + !motorTank.motor_stop_notified && inputConnection.motor_on_type === "forced_manual" ) { console.log("✅ Sending Motor Stop Notification...");