Varun 7 months ago
commit 4fa456793f

@ -1780,6 +1780,40 @@ eventEmitter.on("sendMotorStopNotification", async (hw_Id, customerId, fcmTokens
}
});
eventEmitter.on('sendLowWaterNotification', async (customerId, fcmTokens, hw_Id, tankName, blockName, lowWaterLevel, currentWaterLevel, currentWaterPercentage) => {
try {
const message =
`⚠️ Warning: Low water level detected!\n` +
`🛢️ Tank Name: '${tankName}'\n` +
`🏢 Block Name: '${blockName}'\n` +
`📉 Low Water Threshold: '${lowWaterLevel} liters'\n` +
`📌 Current Water Level: '${currentWaterLevel} liters'\n` +
`📅 Date & Time: '${new Date().toLocaleString()}'`;
await sendNotification(hw_Id, customerId, fcmTokens, 'Low Water Alert', message);
console.log("✅ Low water notification sent successfully.");
} catch (error) {
console.error("❌ Error sending low water notification:", error);
}
});
eventEmitter.on('sendCriticalLowWaterNotification', async (customerId, fcmTokens, hw_Id, tankName, blockName, criticalLowWaterLevel, currentWaterLevel, currentWaterPercentage) => {
try {
const message =
`🚨 Critical Alert: Water level is **critically low!**\n` +
`🛢️ Tank Name: '${tankName}'\n` +
`🏢 Block Name: '${blockName}'\n` +
`🔴 Critical Low Threshold: '${criticalLowWaterLevel} liters'\n` +
`📌 Current Water Level: '${currentWaterLevel} liters'\n` +
`📅 Date & Time: '${new Date().toLocaleString()}'`;
await sendNotification(hw_Id, customerId, fcmTokens, 'Critical Water Alert', message);
console.log("✅ Critical low water notification sent successfully.");
} catch (error) {
console.error("❌ Error sending critical low water notification:", error);
}
});
// eventEmitter.on('sendLowWaterNotification', (fcmTokens, message) => {
@ -2138,17 +2172,20 @@ const sendNotification = async (hw_Id, customerId, fcmIds, title, body) => {
const response = await admin.messaging().send({
token,
notification: { title, body },
data: { hw_Id, target: "/tank_levels" },
data: {
hw_Id: String(hw_Id),
target: "/tank_levels"
},
});
console.log(`✅ Notification sent successfully to token: ${token}`);
console.log("📬 FCM Response:", response);
console.log(`📡 Sending notification to Customer ID: ${customerId}`);
console.log(`🔍 FCM Tokens:`, fcmTokens);
//console.log(`🔍 FCM Tokens:`, fcmTokens);
} catch (error) {
console.error(`❌ Failed to send notification to token: ${token}`, error);
// console.error(`❌ Failed to send notification to token: ${token}`, error);
if (error.code === "messaging/registration-token-not-registered") {
await User.updateOne({ customerId }, { $pull: { fcmIds: token } });
@ -2789,38 +2826,38 @@ exports.getPumpsAndUsers = async (req, reply) => {
const monitorWaterLevels = async () => {
try {
const tanks = await Tank.find({});
// Iterate through each tank
for (const tank of tanks) {
// Fetch users associated with the customerId of the tank
const users = await User.find({ customerId: tank.customerId });
const fcmTokens = users
.map(user => user.fcmIds)
.filter(fcmIds => fcmIds)
.flat(); // Flatten if there are multiple fcmIds for each user
// Ensure that fcmTokens exist before proceeding
if (fcmTokens.length > 0) {
const customerId = tank.customerId;
const tankName = tank.tankName; // Assuming tank has a 'name' field
const tankLocation = tank.tankLocation; // Assuming tank has a 'location' field
// const monitorWaterLevels = async () => {
// try {
// const tanks = await Tank.find({});
// Call the function to check water levels and send notifications
//await checkWaterLevelsAndNotify(customerId, tankName, tankLocation, fcmTokens);
} else {
//console.log(`No FCM tokens found for customerId ${tank.customerId}`);
}
}
} catch (error) {
console.error('Error monitoring water levels:', error);
}
};
// // Iterate through each tank
// for (const tank of tanks) {
// // Fetch users associated with the customerId of the tank
// const users = await User.find({ customerId: tank.customerId });
// const fcmTokens = users
// .map(user => user.fcmIds)
// .filter(fcmIds => fcmIds)
// .flat(); // Flatten if there are multiple fcmIds for each user
// // Ensure that fcmTokens exist before proceeding
// if (fcmTokens.length > 0) {
// const customerId = tank.customerId;
// const tankName = tank.tankName; // Assuming tank has a 'name' field
// const tankLocation = tank.tankLocation; // Assuming tank has a 'location' field
// // Call the function to check water levels and send notifications
// //await checkWaterLevelsAndNotify(customerId, tankName, tankLocation, fcmTokens);
// } else {
// //console.log(`No FCM tokens found for customerId ${tank.customerId}`);
// }
// }
// } catch (error) {
// console.error('Error monitoring water levels:', error);
// }
// };
// Schedule the task to run every 30 minutes
setInterval(monitorWaterLevels, 30 * 60 * 1000);
// // Schedule the task to run every 30 minutes
// setInterval(monitorWaterLevels, 30 * 60 * 1000);
const motorIntervals = {};
async function calculateTotalPumpedWater(customerId, motorId, start_instance_id) {
@ -2863,7 +2900,7 @@ exports.motorAction = async (req, reply) => {
let motorStopStatus = action === "start" ? "2" : "1";
const blockName = req.body.from || "Unknown Block";
const tankName = req.body.to || "Unknown Tank";
const stopTime = req.body.stopTime
if (action === "start") {
if (motorIntervals[motorId]) {
@ -2946,7 +2983,45 @@ exports.motorAction = async (req, reply) => {
console.log(thresholdTime,"thresholdTime")
console.log("motor stopping because it entered this condition")
// Emit the threshold time notification
try {
console.log("motor stopping because it entered this condition")
const tank = await Tank.findOne(
{ customerId, "connections.inputConnections.motor_id": motorId },
{ "connections.inputConnections.$": 1 } // Fetch only relevant motor connection
);
if (tank && tank.connections.inputConnections[0].motor_stop_status === "1") {
console.log("⚠️ Motor already stopped. Skipping notification.");
} else {
console.log("🚀 Sending threshold time notification...");
eventEmitter.emit(
"sendThresholdTimeNotification",
customerId,
fcmToken,
manual_threshold_time,
motorId,
tankName,
blockName
);
}
// eventEmitter.emit(
// "sendThresholdTimeNotification",
// customerId,
// fcmToken,
// manual_threshold_time,
// motorId,
// tankName,
// blockName
// );
reply.code(200).send({ message: "Motor stopped successfully." });
} catch (error) {
console.error("Error in handleMotorStop:", error);
reply.code(500).send({ error: "Internal Server Error" });
}
// eventEmitter.emit(
// "sendThresholdTimeNotification",
// customerId,
@ -2957,32 +3032,32 @@ exports.motorAction = async (req, reply) => {
// blockName
// );
const notificationKey = `${customerId}_${motorId}_threshold`;
// const notificationKey = `${customerId}_${motorId}_threshold`;
// Check if the notification has already been sent
if (!notificationTracker.get(notificationKey)) {
console.log("Sending threshold time notification...");
// // Check if the notification has already been sent
// if (!notificationTracker.get(notificationKey)) {
// console.log("Sending threshold time notification...");
eventEmitter.emit(
"sendThresholdTimeNotification",
customerId,
fcmToken,
manual_threshold_time,
motorId,
tankName,
blockName
);
// Mark notification as sent
notificationTracker.set(notificationKey, true);
// Optionally, reset the flag after some time (e.g., 24 hours)
setTimeout(() => {
notificationTracker.delete(notificationKey);
}, 24 * 60 * 60 * 1000); // Reset after 24 hours
} else {
console.log("Notification already sent, skipping...");
}
// eventEmitter.emit(
// "sendThresholdTimeNotification",
// customerId,
// fcmToken,
// manual_threshold_time,
// motorId,
// tankName,
// blockName
// );
// // Mark notification as sent
// notificationTracker.set(notificationKey, true);
// // Optionally, reset the flag after some time (e.g., 24 hours)
// setTimeout(() => {
// notificationTracker.delete(notificationKey);
// }, 24 * 60 * 60 * 1000); // Reset after 24 hours
// } else {
// console.log("Notification already sent, skipping...");
// }
const currentTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm');
await Tank.updateOne(
@ -3163,6 +3238,19 @@ exports.motorAction = async (req, reply) => {
} else if (action === "stop") {
await stopMotor(motorId, customerId, start_instance_id);
try {
const totalWaterPumped = await calculateTotalPumpedWater(customerId, motorId, start_instance_id);
eventEmitter.emit("motorStop", customerId, fcmToken, tankName, blockName, stopTime, "Mobile APP", totalWaterPumped, typeOfWater, motorId,
loggedInUser.phone,);
reply.code(200).send({ message: "Motor stopped successfully." });
} catch (error) {
console.error("Error in handleMotorStop:", error);
reply.code(500).send({ error: "Internal Server Error" });
}
this.publishMotorStopStatus(motorId, motorStopStatus);
reply.code(200).send({ message: "Motor stopped successfully." });
}
@ -3190,7 +3278,7 @@ async function stopMotor(motorId, customerId, start_instance_id) {
delete motorIntervals[motorId];
}
eventEmitter.emit("motorStop", customerId, [], "", "", currentTime, "Mobile APP", 0, "", motorId, "");
// eventEmitter.emit("motorStop", customerId, [], "", "", currentTime, "Mobile APP", 0, "", motorId, "");
const motorData = await MotorData.findOne({ customerId, motor_id: motorId, start_instance_id });
if (motorData) {
@ -3211,6 +3299,77 @@ async function stopMotor(motorId, customerId, start_instance_id) {
const monitorWaterLevels = async () => {
try {
// console.log("⏳ Monitoring water levels...");
const tanks = await Tank.find(); // Fetch all tanks
//console.log("Fetched Tanks:", tanks.length);
for (const tank of tanks) {
const {
_id,
customerId, // Move this to the top
motor_id,
tankName,
blockName,
waterlevel: currentWaterLevel,
capacity,
auto_min_percentage,
reserved_percentage,
notificationSentLow,
notificationSentCritical,
} = tank;
const users = await User.find({ customerId });
const fcmTokens = users
.map(user => user.fcmIds)
.filter(fcmIds => Array.isArray(fcmIds) && fcmIds.length > 0) // Ensure it's an array
.flat();
if (!fcmTokens || fcmTokens.length === 0) {
//console.error("❌ No valid FCM tokens found for customerId:", customerId);
continue; // Skip this tank
}
const LOW_PERCENTAGE = 20; // 20% threshold
const CRITICAL_PERCENTAGE = 10; // 10% threshold
// **Calculate thresholds based on tank capacity**
const lowWaterLevel = (LOW_PERCENTAGE / 100) * capacity;
const criticalLowWaterLevel = (CRITICAL_PERCENTAGE / 100) * capacity;
const currentWaterPercentage = (currentWaterLevel / capacity) * 100;
// const lowWaterLevel = 9483; // Low threshold in liters
// const criticalLowWaterLevel = 9483;
if (currentWaterLevel <= criticalLowWaterLevel) {
if (!notificationSentCritical) {
console.log("🚨 Sending Critical Low Water Notification...");
eventEmitter.emit("sendCriticalLowWaterNotification", customerId, fcmTokens, motor_id, tankName, blockName, criticalLowWaterLevel, currentWaterLevel, currentWaterPercentage);
await Tank.updateOne({ _id }, { notificationSentCritical: true, notificationSentLow: true });
}
}
else if (currentWaterLevel <= lowWaterLevel) {
if (!notificationSentLow) {
console.log("⚠️ Sending Low Water Notification...");
eventEmitter.emit("sendLowWaterNotification", customerId, fcmTokens, motor_id, tankName, blockName, lowWaterLevel, currentWaterLevel, currentWaterPercentage);
await Tank.updateOne({ _id }, { notificationSentLow: true });
}
}
else if (currentWaterLevel > lowWaterLevel && (notificationSentLow || notificationSentCritical)) {
console.log("🔄 Water level restored. Resetting flags.");
await Tank.updateOne({ _id }, { notificationSentCritical: false, notificationSentLow: false });
}
}
} catch (error) {
console.error("❌ Error monitoring water levels:", error);
}
};
setInterval(monitorWaterLevels, 1000);
// async function stopMotor(motorId, customerId, start_instance_id) {
// const currentTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm');
// await Tank.updateOne(
@ -5913,6 +6072,7 @@ client.on('message', async (topic, message) => {
}
});
client.on('error', (err) => console.error('❌ MQTT Error:', err));
client.on('close', () => console.log('⚠️ MQTT Connection Closed.'));
client.on('offline', () => console.log('⚠️ MQTT Broker Offline.'));
@ -6075,7 +6235,7 @@ const sendMotorNotifications = async () => {
// 🔹 Motor Start Condition
if (
inputConnection.motor_stop_status === "2" && status ===1 && inputConnection.motor_on_type === "forced_manual" &&
inputConnection.motor_stop_status === "2" && inputConnection.motor_on_type === "forced_manual" &&
!motorTank.motor_start_notified
) {
console.log("✅ Sending Motor Start Notification...");
@ -6100,8 +6260,8 @@ const sendMotorNotifications = async () => {
// 🔹 Motor Stop Condition
if (
inputConnection.motor_stop_status === "1" && inputConnection.motor_on_type === "forced_manual" && status ===2 &&
!motorTank.motor_stop_notified
inputConnection.motor_stop_status === "1" &&
!motorTank.motor_stop_notified && inputConnection.motor_on_type === "forced_manual"
) {
console.log("✅ Sending Motor Stop Notification...");

Loading…
Cancel
Save