@ -1571,13 +1571,15 @@ eventEmitter.on(
console . log ( "users" , users ) ;
const userNames = users . map ( user => user . username ) . join ( ', ' ) ;
console . log ( "userNames" , userNames ) ;
const startMethod = motorOnType . toUpperCase ( ) === "APP" ? "via the App" : "Manual" ;
// Prepare the message
const message =
` Tank Name: ' ${ tankName } ' \n ` +
` Pump started at: ' ${ startTime } ' \n ` +
` Initiated by : ${ userNames } \n ` +
` Pump started by: ' ${ motorOnType . toUpperCase ( ) } ' \n ` +
// `Pump started by: '${motorOnType.toUpperCase()}'\n` +
` Pump started by: ' ${ startMethod } ' \n ` +
` Will stop at after: ' ${ manual _threshold _time } ' mins ` ;
// Send the notification
@ -1597,13 +1599,17 @@ eventEmitter.on('motorStop', async (fcmTokens, tankName,stopTime, motorOnType)
const userNames = users . map ( user => user . username ) . join ( ', ' ) ;
console . log ( "userNames" , userNames )
const stopMethod = motorOnType . toUpperCase ( ) === "APP" ? "via the App" : "manual" ;
// Prepare the message
// const message = `Tank Name: '${tankName}', Pump stopped at '${stopTime}' by Initiated by user(s): ${userNames} '${motorOnType}'`;
const message =
` Tank Name: ' ${ tankName } ' \n ` +
` Pump stopped at: ' ${ stopTime } ' \n ` +
` Initiated by : ${ userNames } \n ` +
` Motor On Type: ' ${ motorOnType } ' ` ;
// `Motor Off Type: '${motorOnType}'`;
` Motor Off Type: ' ${ stopMethod } ' ` ;
// Send the notification
await sendNotification ( fcmTokens , 'Arminta Water Management' , message ) ;
@ -1760,7 +1766,7 @@ const sendNotification = async (fcmIds, title, body) => {
notification : { title , body } ,
token ,
data : {
exampleKey : 'exampleValue' , // Optional additional data
exampleKey : 'exampleValue' ,
} ,
} ) ;
console . log ( ` Notification sent successfully to token: ${ token } ` , response ) ;
@ -2105,6 +2111,94 @@ const stat_stop_intervals = {};
// throw boom.boomify(err);
// }
// };
const checkWaterLevelsAndNotify = async ( customerId , tankName , tankLocation , fcmTokens ) => {
try {
// Fetch the tank details from the database
const tank = await Tank . findOne ( { customerId , tankName , tankLocation } ) ;
if ( ! tank ) {
console . error ( ` Tank not found: ${ tankName } at location ${ tankLocation } ` ) ;
return ;
}
// Extract the current water level and capacity
const currentWaterLevel = parseInt ( tank . waterlevel , 10 ) ;
const capacity = parseInt ( tank . capacity . replace ( /,/g , '' ) , 10 ) ;
// Calculate the water level percentage
const waterLevelPercentage = ( currentWaterLevel / capacity ) * 100 ;
// Thresholds for notifications
const thresholds = {
criticallyLow : 10 ,
veryLow : 20 ,
low : 30 ,
high : 70 ,
veryHigh : 80 ,
criticallyHigh : 85 ,
} ;
// Check water levels and send notifications
if ( waterLevelPercentage <= thresholds . criticallyLow ) {
eventEmitter . emit ( 'sendCriticalLowWaterNotification' , fcmTokens , tank ) ;
await Tank . updateOne ( { customerId , tankName : tank . tankName } , { $set : { notificationSentCritical : true } } ) ;
} else if ( waterLevelPercentage <= thresholds . veryLow ) {
eventEmitter . emit ( 'sendVeryLowWaterNotification' , fcmTokens , tank ) ;
await Tank . updateOne ( { customerId , tankName : tank . tankName } , { $set : { notificationSentVeryLow : true } } ) ;
} else if ( waterLevelPercentage <= thresholds . low ) {
eventEmitter . emit ( 'sendLowWaterNotification' , fcmTokens , tank ) ;
await Tank . updateOne ( { customerId , tankName : tank . tankName } , { $set : { notificationSentLow : true } } ) ;
} else if ( waterLevelPercentage >= thresholds . criticallyHigh ) {
eventEmitter . emit ( 'sendCriticalHighWaterNotification' , fcmTokens , tank ) ;
await Tank . updateOne ( { customerId , tankName : tank . tankName } , { $set : { notificationSentCriticalHigh : true } } ) ;
} else if ( waterLevelPercentage >= thresholds . veryHigh ) {
eventEmitter . emit ( 'sendVeryHighWaterNotification' , fcmTokens , tank ) ;
await Tank . updateOne ( { customerId , tankName : tank . tankName } , { $set : { notificationSentVeryHigh : true } } ) ;
} else if ( waterLevelPercentage >= thresholds . high ) {
eventEmitter . emit ( 'sendHighWaterNotification' , fcmTokens , tank ) ;
await Tank . updateOne ( { customerId , tankName : tank . tankName } , { $set : { notificationSentHigh : true } } ) ;
}
} catch ( error ) {
console . error ( ` Error checking water levels for tank ${ tankName } : ` , error ) ;
}
} ;
const monitorWaterLevels = async ( ) => {
try {
const tanks = await Tank . find ( { } ) ;
// Iterate through each tank
for ( const tank of tanks ) {
// Fetch users associated with the customerId of the tank
const users = await User . find ( { customerId : tank . customerId } ) ;
const fcmTokens = users
. map ( user => user . fcmIds )
. filter ( fcmIds => fcmIds )
. flat ( ) ; // Flatten if there are multiple fcmIds for each user
// Ensure that fcmTokens exist before proceeding
if ( fcmTokens . length > 0 ) {
const customerId = tank . customerId ;
const tankName = tank . tankName ; // Assuming tank has a 'name' field
const tankLocation = tank . tankLocation ; // Assuming tank has a 'location' field
// Call the function to check water levels and send notifications
await checkWaterLevelsAndNotify ( customerId , tankName , tankLocation , fcmTokens ) ;
} else {
console . log ( ` No FCM tokens found for customerId ${ tank . customerId } ` ) ;
}
}
} catch ( error ) {
console . error ( 'Error monitoring water levels:' , error ) ;
}
} ;
// Schedule the task to run every 30 minutes
setInterval ( monitorWaterLevels , 30 * 60 * 1000 ) ;
const motorIntervals = { } ;
exports . motorAction = async ( req , reply ) => {
try {
@ -2133,44 +2227,7 @@ exports.motorAction = async (req, reply) => {
const currentWaterLevel = parseInt ( receiverTank . waterlevel , 10 ) ;
const waterLevelThresholds = { low : 30 , veryLow : 20 , criticallyLow : 10 } ;
// Check if the water level is below any of the thresholds
if ( currentWaterLevel < waterLevelThresholds . criticallyLow ) {
if ( ! receiverTank . notificationSentCritical ) {
eventEmitter . emit ( 'sendCriticalLowWaterNotification' , fcmToken , receiverTank ) ;
await Tank . updateOne ( { customerId , tankName : receiverTank . tankName } , { $set : { notificationSentCritical : true } } ) ;
}
} else if ( currentWaterLevel < waterLevelThresholds . veryLow ) {
if ( ! receiverTank . notificationSentVeryLow ) {
eventEmitter . emit ( 'sendVeryLowWaterNotification' , fcmToken , receiverTank ) ;
await Tank . updateOne ( { customerId , tankName : receiverTank . tankName } , { $set : { notificationSentVeryLow : true } } ) ;
}
} else if ( currentWaterLevel < waterLevelThresholds . low ) {
if ( ! receiverTank . notificationSentLow ) {
eventEmitter . emit ( 'sendLowWaterNotification' , fcmToken , receiverTank ) ;
await Tank . updateOne ( { customerId , tankName : receiverTank . tankName } , { $set : { notificationSentLow : true } } ) ;
}
}
// Check for critical high water level
// if (currentWaterLevel >= criticalHighWaterThreshold) {
// if (!receiverTank.notificationSentCriticalHigh) {
// eventEmitter.emit('sendCriticalHighWaterNotification', fcmToken, receiverTank);
// await Tank.updateOne({ customerId, tankName: receiverTank.tankName }, { $set: { notificationSentCriticalHigh: true } });
// }
// }
// // Check for very high water level
// else if (currentWaterLevel >= veryHighWaterThreshold) {
// if (!receiverTank.notificationSentVeryHigh) {
// eventEmitter.emit('sendVeryHighWaterNotification', fcmToken, receiverTank);
// await Tank.updateOne({ customerId, tankName: receiverTank.tankName }, { $set: { notificationSentVeryHigh: true } });
// }
// }
// // Check for high water level
// else if (currentWaterLevel >= highWaterThreshold) {
// if (!receiverTank.notificationSentHigh) {
// eventEmitter.emit('sendHighWaterNotification', fcmToken, receiverTank);
// await Tank.updateOne({ customerId, tankName: receiverTank.tankName }, { $set: { notificationSentHigh: true } });
// }
// }
// Determine the motor stop status based on the action
let motorStopStatus ;
const blockName = req . body . from || "Unknown Block" ; // Provide a fallback if `from` is missing
@ -2178,156 +2235,111 @@ exports.motorAction = async (req, reply) => {
const stopTime = req . body . stopTime
const motorOnType = req . body . motor _on _type || "APP" ;
const manual _threshold _time = req . body . manual _threshold _time ;
let hasNotifiedStart = false ;
let hasNotifiedStop = false ;
if ( action === "start" ) {
motorStopStatus = "2" ;
const startTime = req . body . startTime ;
// const startMessage = `The motor supplying water to '${tankName}' in block '${blockName}' has started manually at ${new Date().toISOString()}.`;
// eventEmitter.emit("sendMotorStartNotification", fcmToken, startMessage);
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{ $set : { "connections.inputConnections.$.motor_stop_status" : motorStopStatus } }
) ;
const thresholdTimeMs = req . body . manual _threshold _time * 60 * 1000 ; // Convert minutes to milliseconds
//const startTime = new Date(); // Record the start time
//const startTime = req.body.startTime;
// Schedule a task to send a notification when the threshold time is reached
// motorIntervals[motorId] = setTimeout(async () => {
// try {
// // Perform threshold time logic (e.g., notification, motor stop)
// console.log(`Threshold time of ${manual_threshold_time} minutes reached for motor ${motorId}`);
// await Tank.updateOne(
// { customerId, "connections.inputConnections.motor_id": motorId },
// { $set: { "connections.inputConnections.$.motor_stop_status": "1" } }
// );
// clearTimeout(motorIntervals[motorId]); // Clear timeout
// delete motorIntervals[motorId];
// } catch (error) {
// console.error("Error in threshold time handling:", error);
// }
// }, thresholdTimeMs);
const stopCriteria =
const stopCriteria =
motorOnType === "time"
? ` ${ req . body . manual _threshold _time } minutes `
: ` ${ req . body . manual _threshold _litres } litres ` ;
eventEmitter . emit (
"motorStart" ,
fcmToken ,
new Date ( ) . toISOString ( ) ,
motorId ,
currentWaterLevel ,
blockName , // Block Name
tankName , // Tank Name
startTime ,
motorOnType ,
stopCriteria ,
manual _threshold _time
) ;
await checkWaterLevelsAndNotify ( customerId , tankName , receiverTank . tankLocation , fcmToken ) ;
// eventEmitter.emit(
// "motorStart",
// fcmToken,
// new Date().toISOString(),
// motorId,
// currentWaterLevel,
// blockName, // Block Name
// tankName, // Tank Name
// startTime,
// motorOnType,
// stopCriteria,
// manual_threshold_time
// );
if ( ! hasNotifiedStart ) {
eventEmitter . emit (
"motorStart" ,
fcmToken ,
new Date ( ) . toISOString ( ) ,
motorId ,
currentWaterLevel ,
blockName ,
tankName ,
startTime ,
motorOnType ,
stopCriteria ,
manual _threshold _time
) ;
hasNotifiedStart = true ; // Set flag to true to prevent duplicate notifications
}
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{ $set : { "connections.inputConnections.$.motor_stop_status" : "2" ,
"connections.inputConnections.$.manual_threshold_time" : manual _threshold _time ,
"connections.inputConnections.$.threshold_type" : "time" } }
) ;
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{
$set : {
"connections.inputConnections.$.motor_stop_status" : "2" ,
"connections.inputConnections.$.motor_on_type" : "manual" ,
}
}
"connections.inputConnections.$.threshold_type" : "time" ,
"connections.inputConnections.$.motor_on_type" : motorOnType } }
) ;
// const startMessage = `Motor supplying water to '${tankName}' in block '${blockName}' started manually at ${startTime}.`;
// eventEmitter.emit('sendMotorStartNotification', fcmToken, startMessage);
// Schedule threshold check
// const thresholdTimeMs = manual_threshold_time * 60 * 1000;
// motorIntervals[motorId] = setTimeout(async () => {
// try {
// // const stopMessage = `Threshold time of ${manual_threshold_time} minutes reached for motor supplying water to '${tankName}' in block '${blockName}'.`;
// // eventEmitter.emit('sendThresholdTimeNotification', fcmToken, stopMessage);
// await Tank.updateOne(
// { customerId, "connections.inputConnections.motor_id": motorId },
// {
// $set: {
// "connections.inputConnections.$.motor_stop_status": "1",
// "connections.inputConnections.$.manual_threshold_time": null,
// "connections.inputConnections.$.threshold_type": null,
// }
// }
// );
// delete motorIntervals[motorId];
// } catch (error) {
// console.error("Error during threshold handling:", error);
// }
// }, thresholdTimeMs);
reply . code ( 200 ) . send ( { message : "Motor started successfully." } ) ;
// Schedule water level checks after motor start
motorIntervals [ motorId ] = setInterval ( async ( ) => {
const receiverTank = await Tank . findOne ( { customerId , tankName : req . body . to , tankLocation : req . body . to _type . toLowerCase ( ) } ) ;
const currentWaterLevel = parseInt ( receiverTank . waterlevel , 10 ) ;
// Check water levels and send notifications
if ( currentWaterLevel >= criticalHighWaterThreshold && ! receiverTank . notificationSentCriticalHigh ) {
eventEmitter . emit ( 'sendCriticalHighWaterNotification' , fcmToken , receiverTank ) ;
await Tank . updateOne ( { customerId , tankName : receiverTank . tankName } , { $set : { notificationSentCriticalHigh : true } } ) ;
} else if ( currentWaterLevel >= veryHighWaterThreshold && ! receiverTank . notificationSentVeryHigh ) {
eventEmitter . emit ( 'sendVeryHighWaterNotification' , fcmToken , receiverTank ) ;
await Tank . updateOne ( { customerId , tankName : receiverTank . tankName } , { $set : { notificationSentVeryHigh : true } } ) ;
} else if ( currentWaterLevel >= highWaterThreshold && ! receiverTank . notificationSentHigh ) {
eventEmitter . emit ( 'sendHighWaterNotification' , fcmToken , receiverTank ) ;
await Tank . updateOne ( { customerId , tankName : receiverTank . tankName } , { $set : { notificationSentHigh : true } } ) ;
}
} , 30000 ) ; // Check every 30 seconds
} else if ( action === "stop" ) {
motorStopStatus = "1" ; // If action is stop, set stop status to "1"
// Emit stop notification
// const stopMessage = `The motor supplying water to '${tankName}' in block '${blockName}' was stopped manually at ${stopTime}.`;
// eventEmitter.emit("sendMotorStopNotification", fcmToken, stopMessage);
eventEmitter . emit (
"motorStop" ,
fcmToken ,
// motorId,
// currentWaterLevel,
// blockName,
tankName ,
stopTime ,
motorOnType
) ;
// await Tank.updateOne(
// { customerId, "connections.inputConnections.motor_id": motorId },
// {
// $set: {
// "connections.inputConnections.$.motor_stop_status": " 1 ",
// "connections.inputConnections.$.motor_stop_status": "2",
// "connections.inputConnections.$.motor_on_type": "manual",
// }
// }
// );
// const stopMessage = `Motor supplying water to '${tankName}' in block '${blockName}' stopped manually at ${stopTime}.`;
// eventEmitter.emit('sendMotorStopNotification', fcmToken, stopMessage);
// if (motorIntervals[motorId]) {
// clearTimeout(motorIntervals[motorId]);
// delete motorIntervals[motorId];
// }
reply . code ( 200 ) . send ( { message : "Motor started successfully." } ) ;
} else if ( action === "stop" ) {
motorStopStatus = "1" ; // If action is stop, set stop status to "1"
await checkWaterLevelsAndNotify ( customerId , tankName , receiverTank . tankLocation , fcmToken ) ;
// eventEmitter.emit(
// "motorStop",
// fcmToken,
// // motorId,
// // currentWaterLevel,
// // blockName,
// tankName,
// stopTime,
// motorOnType
// );
if ( ! hasNotifiedStop ) {
eventEmitter . emit (
"motorStop" ,
fcmToken ,
tankName ,
stopTime ,
motorOnType
) ;
hasNotifiedStop = true ; // Set flag to true to prevent duplicate notifications
}
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{
$set : {
"connections.inputConnections.$.motor_stop_status" : "1" ,
"connections.inputConnections.$.motor_on_type" : motorOnType }
}
) ;
} else {
throw new Error ( "Invalid action provided." ) ;
}
@ -2444,6 +2456,13 @@ exports.motorAction = async (req, reply) => {
console . log ( new Date ( ) , "new date" )
console . log ( thresholdTime , "thresholdTime" )
console . log ( "motor stopping because it entered this condition" )
// Emit the threshold time notification
eventEmitter . emit (
"sendThresholdTimeNotification" ,
fcmToken ,
` Motor has reached its time threshold of ${ req . body . manual _threshold _time } minutes and will stop. `
) ;
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{
@ -2456,10 +2475,13 @@ exports.motorAction = async (req, reply) => {
}
}
) ;
eventEmitter . emit ( 'sendLowWaterNotification' , fcmToken , receiverTank ) ;
console . log ( motorIntervals [ motorId ] , "deleted automatically" ) // Emit low water level notification
clearInterval ( motorIntervals [ motorId ] ) ; // Clear interval
delete motorIntervals [ motorId ] ;
// eventEmitter.emit('sendLowWaterNotification', fcmToken, receiverTank);
// console.log(motorIntervals[motorId],"deleted automatically") // Emit low water level notification
// clearInterval(motorIntervals[motorId]); // Clear interval
// delete motorIntervals[motorId];
await checkWaterLevelsAndNotify ( customerId , tankName , supplierTank . tankLocation , fcmToken ) ;
clearInterval ( motorIntervals [ motorId ] ) ; // Stop the motor if condition met
delete motorIntervals [ motorId ] ; // Remove from interval object
this . publishMotorStopStatus ( motorId , "1" ) ;
await delay ( 300000 ) ;
@ -2491,7 +2513,9 @@ exports.motorAction = async (req, reply) => {
// Check for high water level and send notification
if ( currentWaterPercentage >= highWaterThreshold ) {
eventEmitter . emit ( 'sendHighWaterNotification' , fcmToken , receiverTank ) ;
// eventEmitter.emit('sendHighWaterNotification', fcmToken, receiverTank);
await checkWaterLevelsAndNotify ( customerId , tankName , supplierTank . tankLocation , fcmToken ) ;
}
} , 30000 ) ; // Check every minute