diff --git a/src/controllers/tanksController.js b/src/controllers/tanksController.js index 52c13d5b..161e4882 100644 --- a/src/controllers/tanksController.js +++ b/src/controllers/tanksController.js @@ -2820,7 +2820,7 @@ const monitorWaterLevels = async () => { }; // Schedule the task to run every 30 minutes -// setInterval(monitorWaterLevels, 30 * 60 * 1000); +setInterval(monitorWaterLevels, 30 * 60 * 1000); const motorIntervals = {}; async function calculateTotalPumpedWater(customerId, motorId, start_instance_id) { @@ -2837,26 +2837,25 @@ async function calculateTotalPumpedWater(customerId, motorId, start_instance_id) -const stoppedMotors = new Set(); // To track motors that have already been stopped exports.motorAction = async (req, reply) => { try { const { customerId } = req.params; const { action, motor_id: motorId, start_instance_id, phone, threshold_type, manual_threshold_time, manual_threshold_litres } = req.body; - // if (!motorId) throw new Error("Motor ID is required."); + if (!motorId) throw new Error("Motor ID is required."); - // const users = await User.findOne({ customerId }); - // if (!users) return reply.status(404).send({ error: "User not found" }); + const users = await User.findOne({ customerId }); + if (!users) return reply.status(404).send({ error: "User not found" }); - // let loggedInUser = users.phone === phone ? - // { role: "Customer", name: users.username, phone: users.phone } : - // users.staff?.staff?.find(staff => staff.phone === phone) ? - // { role: "Staff", name: users.staff.staff.find(staff => staff.phone === phone).name, phone } : null; + let loggedInUser = users.phone === phone ? + { role: "Customer", name: users.username, phone: users.phone } : + users.staff?.staff?.find(staff => staff.phone === phone) ? + { role: "Staff", name: users.staff.staff.find(staff => staff.phone === phone).name, phone } : null; - // if (!loggedInUser) return reply.status(404).send({ error: "User not found" }); + if (!loggedInUser) return reply.status(404).send({ error: "User not found" }); - // const fcmToken = users.fcmIds ? users.fcmIds.filter(id => id) : []; + const fcmToken = users.fcmIds ? users.fcmIds.filter(id => id) : []; const receiverTank = await Tank.findOne({ customerId, tankName: req.body.to, tankLocation: req.body.to_type.toLowerCase() }); if (!receiverTank) throw new Error("Receiver tank not found."); @@ -2893,7 +2892,7 @@ exports.motorAction = async (req, reply) => { }} ); - // eventEmitter.emit("motorStart", customerId, fcmToken, tankName, blockName, startTime, "Mobile APP", manual_threshold_time, typeOfWater, motorId, loggedInUser.phone); + eventEmitter.emit("motorStart", customerId, fcmToken, tankName, blockName, startTime, "Mobile APP", manual_threshold_time, typeOfWater, motorId, loggedInUser.phone); //this.publishMotorStopStatus(motorId, motorStopStatus); reply.code(200).send({ message: "Motor started successfully." }); @@ -2937,55 +2936,123 @@ exports.motorAction = async (req, reply) => { console.log("New Threshold Time:", thresholdTime); console.log("Current Time:", new Date()); - motorIntervals[motorId] = setInterval(async () => { - const currentTime = new Date(); - console.log(`🚀 Checking Motor ${motorId} | Threshold Time: ${thresholdTime} | Current Time: ${currentTime}`); - - // If threshold time has passed, stop the motor - if (currentTime >= thresholdTime) { - console.log(`🛑 Motor ${motorId} threshold time exceeded. Stopping motor.`); - - // Check if motor has already been stopped - if (stoppedMotors.has(motorId)) { - console.log(`⚠️ Motor ${motorId} already stopped. Skipping duplicate stop.`); - return; + + motorIntervals[motorId] = setInterval(async () => { + const supplierTank = await Tank.findOne({ customerId, tankName: req.body.from, tankLocation: req.body.from_type.toLowerCase() }); + const currentWaterLevel = parseInt(supplierTank.waterlevel, 10); + const currentWaterPercentage = (currentWaterLevel / parseInt(supplierTank.capacity.replace(/,/g, ''), 10)) * 100; + const notificationTracker = new Map(); + if (new Date() >= thresholdTime || currentWaterPercentage <= lowWaterThreshold) { + console.log(new Date(),"new date") + console.log(thresholdTime,"thresholdTime") + console.log("motor stopping because it entered this condition") + // Emit the threshold time notification + + // eventEmitter.emit( + // "sendThresholdTimeNotification", + // customerId, + // fcmToken, + // manual_threshold_time, + // motorId, + // tankName, + // blockName + // ); + + const notificationKey = `${customerId}_${motorId}_threshold`; + + // Check if the notification has already been sent + if (!notificationTracker.get(notificationKey)) { + console.log("Sending threshold time notification..."); + + eventEmitter.emit( + "sendThresholdTimeNotification", + customerId, + fcmToken, + manual_threshold_time, + motorId, + tankName, + blockName + ); + + // Mark notification as sent + notificationTracker.set(notificationKey, true); + + // Optionally, reset the flag after some time (e.g., 24 hours) + setTimeout(() => { + notificationTracker.delete(notificationKey); + }, 24 * 60 * 60 * 1000); // Reset after 24 hours + } else { + console.log("Notification already sent, skipping..."); + } + + const currentTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm'); + await Tank.updateOne( + { customerId, "connections.inputConnections.motor_id": motorId }, + { + $set: { + "connections.inputConnections.$.motor_stop_status": "1", + + "connections.inputConnections.$.threshold_type": null, + "connections.inputConnections.$.manual_threshold_time": null, + "connections.inputConnections.$.manual_threshold_percentage": null, + "connections.inputConnections.$.stopTime": currentTime, + } } - - // Mark motor as stopped to prevent duplicate execution - stoppedMotors.add(motorId); - - // Ensure the interval is cleared before proceeding - if (motorIntervals[motorId]) { - clearInterval(motorIntervals[motorId]); - delete motorIntervals[motorId]; - console.log(`✅ Interval for motorId: ${motorId} successfully deleted.`); + ); + + if (motorIntervals[motorId]) { + console.log(`🛑 Clearing interval for motorId: ${motorId}`); + clearInterval(motorIntervals[motorId]); + delete motorIntervals[motorId]; + + // Confirm deletion + if (!motorIntervals[motorId]) { + console.log(`✅ Interval for motorId: ${motorId} successfully deleted.`); + } else { + console.error(`❌ Failed to delete interval for motorId: ${motorId}`); } - - // Update tank data - const formattedTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm'); - await Tank.updateOne( - { customerId, "connections.inputConnections.motor_id": motorId }, - { - $set: { - "connections.inputConnections.$.motor_stop_status": "1", - "connections.inputConnections.$.threshold_type": null, - "connections.inputConnections.$.manual_threshold_time": null, - "connections.inputConnections.$.manual_threshold_percentage": null, - "connections.inputConnections.$.stopTime": formattedTime, - } + } + + + this.publishMotorStopStatus(motorId, "1"); + console.log(start_instance_id,"start_instance_id",customerId,"customerId",motorId,"motorId") + + const motorData = await MotorData.findOne({ customerId, motor_id: motorId, start_instance_id: start_instance_id }); + console.log(motorData,"motorData") + if (motorData) { + console.log("got into if") + const receiverTank = await Tank.findOne({ customerId, tankName: motorData.receiverTank, tankLocation: motorData.receiver_type.toLowerCase() }); + const receiverFinalWaterLevel = parseInt(receiverTank.waterlevel, 10); + const quantityDelivered = receiverFinalWaterLevel - parseInt(motorData.receiverInitialwaterlevel, 10); + const water_pumped_till_now = parseInt(receiverTank.total_water_added_from_midnight, 10); + const totalwaterpumped = quantityDelivered + water_pumped_till_now; + + await Tank.findOneAndUpdate( + { customerId, tankName: motorData.receiverTank, tankLocation: motorData.receiver_type.toLowerCase() }, + { $set: { total_water_added_from_midnight: totalwaterpumped } } + ); + + await MotorData.updateOne( + { customerId, motor_id: motorId, start_instance_id: start_instance_id }, + { + $set: { + stopTime: currentTime, + receiverfinalwaterlevel: receiverFinalWaterLevel.toString(), + quantity_delivered: quantityDelivered.toString() } + } ); - - console.log(`🚨 Motor ${motorId} stop condition met. Sending stop command.`); - this.publishMotorStopStatus(motorId, "1"); - - return; // Ensure function exits after stopping + } } - }, 10000); // Check every 10 seconds - - - + // Check for high water level and send notification + // if (currentWaterPercentage >= highWaterThreshold) { + // // eventEmitter.emit('sendHighWaterNotification', fcmToken, receiverTank); + // await checkWaterLevelsAndNotify(customerId, tankName, supplierTank.tankLocation, fcmToken); + + // } + + }, 30000); // Check every minute } } else if (req.body.threshold_type === "litres") { @@ -3124,7 +3191,7 @@ async function stopMotor(motorId, customerId, start_instance_id) { delete motorIntervals[motorId]; } - //eventEmitter.emit("motorStop", customerId, [], "", "", currentTime, "Mobile APP", 0, "", motorId, ""); + eventEmitter.emit("motorStop", customerId, [], "", "", currentTime, "Mobile APP", 0, "", motorId, ""); const motorData = await MotorData.findOne({ customerId, motor_id: motorId, start_instance_id }); if (motorData) { @@ -4958,38 +5025,38 @@ exports.totalwaterLevelSum = async (request, reply) => { } -// exports.startUpdateLoop = async (request, reply) => { -// const updateInterval = 5000; +exports.startUpdateLoop = async (request, reply) => { + const updateInterval = 5000; -// setInterval(async () => { -// try { -// const iotTank = await IotData.findOne({ hardwareId: request.body.hardwareId }); -// if (!iotTank) { -// console.log(`IOTtank not found for hardwareId ${request.body.hardwareId}`); -// return; -// } + setInterval(async () => { + try { + const iotTank = await IotData.findOne({ hardwareId: request.body.hardwareId }); + if (!iotTank) { + console.log(`IOTtank not found for hardwareId ${request.body.hardwareId}`); + return; + } -// const currentWaterlevel = Number(iotTank.tankHeight) * 200; -// const tank = await Tank.findOne({ hardwareId: iotTank.hardwareId }); + const currentWaterlevel = Number(iotTank.tankHeight) * 200; + const tank = await Tank.findOne({ hardwareId: iotTank.hardwareId }); -// let combinedWaterlevel; -// if (tank) { -// combinedWaterlevel = currentWaterlevel + Number(tank.waterlevel); -// } else { -// combinedWaterlevel = currentWaterlevel; -// } + let combinedWaterlevel; + if (tank) { + combinedWaterlevel = currentWaterlevel + Number(tank.waterlevel); + } else { + combinedWaterlevel = currentWaterlevel; + } -// await Tank.updateOne({ hardwareId: iotTank.hardwareId }, { $set: { waterlevel: combinedWaterlevel } }); + await Tank.updateOne({ hardwareId: iotTank.hardwareId }, { $set: { waterlevel: combinedWaterlevel } }); -// console.log(`Waterlevel updated successfully for hardwareId ${iotTank.hardwareId}`); -// console.log(`Previous waterlevel: ${tank ? tank.waterlevel : 0}`); -// console.log(`Current waterlevel: ${currentWaterlevel}`); -// console.log(`Combined waterlevel: ${combinedWaterlevel}`); -// } catch (err) { -// console.error(err); -// } -// }, updateInterval); -// }; + console.log(`Waterlevel updated successfully for hardwareId ${iotTank.hardwareId}`); + console.log(`Previous waterlevel: ${tank ? tank.waterlevel : 0}`); + console.log(`Current waterlevel: ${currentWaterlevel}`); + console.log(`Combined waterlevel: ${combinedWaterlevel}`); + } catch (err) { + console.error(err); + } + }, updateInterval); +}; @@ -6792,72 +6859,72 @@ exports.sendUserAutomaticStartAndStop = async (request, reply) => { // } // }; -// const calculateWaterLevelAndNotify = async () => { -// try { -// const now = moment(); -// const currentTime = now.format("HH:mm"); // Current time in HH:mm format - -// console.log(`Current time: ${currentTime}`); +const calculateWaterLevelAndNotify = async () => { + try { + const now = moment(); + const currentTime = now.format("HH:mm"); // Current time in HH:mm format -// // Get all users who have allowed notifications and have set a notification time -// const users = await User.find({ allowNotifications: true, notificationTime: currentTime }); + console.log(`Current time: ${currentTime}`); -// if (users.length === 0) { -// console.log("No users to notify at this time."); -// return; -// } + // Get all users who have allowed notifications and have set a notification time + const users = await User.find({ allowNotifications: true, notificationTime: currentTime }); -// for (const user of users) { -// const { customerId, fcmIds } = user; - -// if (!Array.isArray(fcmIds) || fcmIds.length === 0) { -// console.log(`No valid FCM tokens for customer ID: ${customerId}`); -// continue; -// } + if (users.length === 0) { + console.log("No users to notify at this time."); + return; + } -// // Get tanks associated with the user -// const tanks = await Tank.find({ customerId }); + for (const user of users) { + const { customerId, fcmIds } = user; -// for (const tank of tanks) { -// const { -// tankName, -// tankLocation, -// typeOfWater, -// capacity, -// waterlevel, -// waterlevel_at_midnight, -// } = tank; + if (!Array.isArray(fcmIds) || fcmIds.length === 0) { + console.log(`No valid FCM tokens for customer ID: ${customerId}`); + continue; + } -// // Remove commas before parsing numbers -// const tankCapacity = parseFloat(capacity.replace(/,/g, '')) || 0; -// const currentWaterLevel = parseFloat(waterlevel.replace(/,/g, '')) || 0; -// const midnightWaterLevel = parseFloat(waterlevel_at_midnight.replace(/,/g, '')) || 0; + // Get tanks associated with the user + const tanks = await Tank.find({ customerId }); -// if (tankCapacity === 0) { -// console.log(`Skipping tank ${tankName} due to zero capacity`); -// continue; -// } + for (const tank of tanks) { + const { + tankName, + tankLocation, + typeOfWater, + capacity, + waterlevel, + waterlevel_at_midnight, + } = tank; + + // Remove commas before parsing numbers + const tankCapacity = parseFloat(capacity.replace(/,/g, '')) || 0; + const currentWaterLevel = parseFloat(waterlevel.replace(/,/g, '')) || 0; + const midnightWaterLevel = parseFloat(waterlevel_at_midnight.replace(/,/g, '')) || 0; + + if (tankCapacity === 0) { + console.log(`Skipping tank ${tankName} due to zero capacity`); + continue; + } -// const currentWaterLevelPercentage = ((currentWaterLevel / tankCapacity) * 100).toFixed(2); -// const waterUsedSinceMidnight = midnightWaterLevel - currentWaterLevel; -// const waterUsedPercentageSinceMidnight = ((waterUsedSinceMidnight / tankCapacity) * 100).toFixed(2); + const currentWaterLevelPercentage = ((currentWaterLevel / tankCapacity) * 100).toFixed(2); + const waterUsedSinceMidnight = midnightWaterLevel - currentWaterLevel; + const waterUsedPercentageSinceMidnight = ((waterUsedSinceMidnight / tankCapacity) * 100).toFixed(2); -// let notificationBody = -// `🛢️ Tank Name: ${tankName}\n` + -// `🏢 Location: ${tankLocation}\n` + -// `💧 Type of Water: ${typeOfWater}\n` + -// `Current Water Level: ${currentWaterLevel} liters (${currentWaterLevelPercentage}%)\n`; + let notificationBody = + `🛢️ Tank Name: ${tankName}\n` + + `🏢 Location: ${tankLocation}\n` + + `💧 Type of Water: ${typeOfWater}\n` + + `Current Water Level: ${currentWaterLevel} liters (${currentWaterLevelPercentage}%)\n`; -// await sendNotification(customerId, fcmIds, "Water Level Update", notificationBody); -// console.log("Notification sent for tank:", tankName); -// } -// } + await sendNotification(customerId, fcmIds, "Water Level Update", notificationBody); + console.log("Notification sent for tank:", tankName); + } + } -// console.log("Water level notifications processed."); -// } catch (err) { -// console.error("Error in water level calculation:", err); -// } -// }; + console.log("Water level notifications processed."); + } catch (err) { + console.error("Error in water level calculation:", err); + } +}; // const calculateLowWaterLevelAndNotify = async () => { // try { // const now = moment(); @@ -7161,14 +7228,14 @@ exports.listofactiveandinactivetankstatus = async (req, reply) => { }; -// exports.notificationTiming = async (req, reply) => { -// const { customerId, notificationPreference } = req.body; +exports.notificationTiming = async (req, reply) => { + const { customerId, notificationPreference } = req.body; -// if (!["never", "always", "6_hours", "8_hours", "1_month"].includes(notificationPreference)) { -// return reply.status(400).send({ message: "Invalid preference" }); -// } + if (!["never", "always", "6_hours", "8_hours", "1_month"].includes(notificationPreference)) { + return reply.status(400).send({ message: "Invalid preference" }); + } -// await User.updateOne({ customerId }, { notificationPreference }); + await User.updateOne({ customerId }, { notificationPreference }); -// return reply.send({ message: "Preference updated successfully" }); -// } \ No newline at end of file + return reply.send({ message: "Preference updated successfully" }); +} \ No newline at end of file diff --git a/src/routes/tanksRoute.js b/src/routes/tanksRoute.js index f4854c0a..fdbb7dc2 100644 --- a/src/routes/tanksRoute.js +++ b/src/routes/tanksRoute.js @@ -742,48 +742,48 @@ module.exports = function (fastify, opts, next) { }); - // fastify.route({ - // method: 'POST', - // url: '/update-waterlevel', - // schema: { - // tags: ['Tank'], - // description: 'This is for updating waterlevel of a tank based on its IOTtank document', - // summary: 'This is for updating waterlevel of a tank', - // body: { - // type: 'object', - // properties: { - // hardwareId: { type: 'string' }, - // }, - // required: ['hardwareId'], - // }, - // response: { - // 200: { - // type: 'object', - // properties: { - // message: { type: 'string' }, - // }, - // }, - // 404: { - // type: 'object', - // properties: { - // message: { type: 'string' }, - // }, - // }, - // 500: { - // type: 'object', - // properties: { - // message: { type: 'string' }, - // }, - // }, - // }, - // security: [ - // { - // basicAuth: [], - // }, - // ], - // }, - // handler: tanksController.startUpdateLoop, - // }); + fastify.route({ + method: 'POST', + url: '/update-waterlevel', + schema: { + tags: ['Tank'], + description: 'This is for updating waterlevel of a tank based on its IOTtank document', + summary: 'This is for updating waterlevel of a tank', + body: { + type: 'object', + properties: { + hardwareId: { type: 'string' }, + }, + required: ['hardwareId'], + }, + response: { + 200: { + type: 'object', + properties: { + message: { type: 'string' }, + }, + }, + 404: { + type: 'object', + properties: { + message: { type: 'string' }, + }, + }, + 500: { + type: 'object', + properties: { + message: { type: 'string' }, + }, + }, + }, + security: [ + { + basicAuth: [], + }, + ], + }, + handler: tanksController.startUpdateLoop, + }); // fastify.get("/api/updatewaterlevelsatmidnight", { // schema: { @@ -1419,36 +1419,36 @@ module.exports = function (fastify, opts, next) { handler: tanksController.sendUserAutomaticStartAndStop, }); - // fastify.route({ - // method: "POST", - // url: "/api/sendNotificationDailyPreference", - // schema: { - // tags: ["Tank"], - // summary: "This is for time based notification preferences", - // body: { - // type: "object", - // properties: { - // customerId: { - // type: "string", + fastify.route({ + method: "POST", + url: "/api/sendNotificationDailyPreference", + schema: { + tags: ["Tank"], + summary: "This is for time based notification preferences", + body: { + type: "object", + properties: { + customerId: { + type: "string", - // }, - // notificationPreference: { - // type: "string", - // }, - // // allowNotifications: { - // // type: "boolean" - // // } - // }, - // }, - // security: [ - // { - // basicAuth: [], - // }, - // ], - // }, - // //preHandler: fastify.auth([fastify.authenticate]), - // handler: tanksController.notificationTiming, - // }); + }, + notificationPreference: { + type: "string", + }, + // allowNotifications: { + // type: "boolean" + // } + }, + }, + security: [ + { + basicAuth: [], + }, + ], + }, + //preHandler: fastify.auth([fastify.authenticate]), + handler: tanksController.notificationTiming, + }); fastify.route({ @@ -1478,4 +1478,3 @@ module.exports = function (fastify, opts, next) { next(); } -