@ -1674,39 +1674,6 @@ eventEmitter.on('sendCriticalHighWaterNotification', async (fcmTokens, tankInfo,
const message = ` Attention: Water level in ' ${ tankInfo . tankName } ' located at ' ${ tankInfo . tankLocation } ' is critically high at ${ tankInfo . waterLevel } % ( ${ tankInfo . volumeInLitres } L). Water may overflow. Please stop the motor. Motor running from ${ startTime } to ${ stopTime } . ` ;
await sendNotification ( fcmTokens , 'Critical High Water Level' , message , 'Stop Motor' ) ;
} ) ;
eventEmitter . on ( 'sendThresholdTimeNotification' , async ( fcmTokens , message ) => {
try {
await sendNotification ( fcmTokens , 'Threshold Time Reached' , message , 'Motor Alert' ) ;
console . log ( "Threshold time notification sent successfully." ) ;
} catch ( error ) {
console . error ( "Error sending threshold time notification:" , error ) ;
}
} ) ;
eventEmitter . on ( 'sendMotorStartNotification' , async ( fcmTokens , message ) => {
try {
await sendNotification ( fcmTokens , "Motor Started" , message ) ;
console . log ( "Manual method time notification sent successfully." ) ;
} catch ( error ) {
console . error ( "Error sending thresmanual method time notification:" , error ) ;
}
} ) ;
eventEmitter . on ( 'sendMotorStopNotification' , async ( fcmTokens , message ) => {
try {
await sendNotification ( fcmTokens , "Motor Stopped" , message ) ;
console . log ( "Manual method time notification sent successfully." ) ;
} catch ( error ) {
console . error ( "Error sending thresmanual method time notification:" , error ) ;
}
} ) ;
// Emit high water level event with motorId
// eventEmitter.on('highWaterLevel', async (fcmTokens, timestamp, motorId, waterLevel) => {
// await sendNotification(fcmTokens, 'High Water Level', `Motor ID: ${motorId}, water level reached above 90% at ${timestamp}. Current Water Level: ${waterLevel} Ltrs`);
@ -2185,51 +2152,7 @@ console.log(fcmToken)
if ( action === "start" ) {
motorStopStatus = "2" ;
const startMessage = ` The motor supplying water to ' ${ tankName } ' in block ' ${ blockName } ' has started manually at ${ new Date ( ) . toISOString ( ) } . ` ;
eventEmitter . emit ( "sendMotorStartNotification" , fcmToken , startMessage ) ;
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{ $set : { "connections.inputConnections.$.motor_stop_status" : motorStopStatus } }
) ;
const thresholdTimeMs = req . body . manual _threshold _time * 60 * 1000 ; // Convert minutes to milliseconds
//const startTime = new Date(); // Record the start time
const startTime = req . body . startTime ;
// Schedule a task to send a notification when the threshold time is reached
motorIntervals [ motorId ] = setTimeout ( async ( ) => {
try {
// Fetch the latest tank and motor data
const receiverTank = await Tank . findOne ( {
customerId ,
tankName : req . body . to ,
tankLocation : req . body . to _type . toLowerCase ( ) ,
} ) ;
// Send a notification for threshold time reached
const message = ` Threshold time of ${ req . body . manual _threshold _time } minutes has been reached for the motor supplying ' ${ receiverTank . tankName } ' located at ' ${ receiverTank . tankLocation } '. Please review the motor operation. ` ;
eventEmitter . emit ( 'sendThresholdTimeNotification' , fcmToken , message ) ;
// Optionally update the tank or motor state in the database
await Tank . updateOne (
{ customerId , tankName : receiverTank . tankName } ,
{ $set : { notificationSentThresholdTime : true } }
) ;
// Optionally stop the motor if needed
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{ $set : { "connections.inputConnections.$.motor_stop_status" : "1" } }
) ;
clearTimeout ( motorIntervals [ motorId ] ) ;
delete motorIntervals [ motorId ] ;
} catch ( error ) {
console . error ( "Error handling threshold time notification:" , error ) ;
}
} , thresholdTimeMs )
const stopCriteria =
motorOnType === "time"
? ` ${ req . body . manual _threshold _time } minutes `
@ -2274,26 +2197,6 @@ console.log(fcmToken)
} , 30000 ) ; // Check every 30 seconds
} else if ( action === "stop" ) {
motorStopStatus = "1" ; // If action is stop, set stop status to "1"
// Emit stop notification
const stopMessage = ` The motor supplying water to ' ${ tankName } ' in block ' ${ blockName } ' was stopped manually at ${ stopTime } . ` ;
eventEmitter . emit ( "sendMotorStopNotification" , fcmToken , stopMessage ) ;
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{
$set : {
"connections.inputConnections.$.motor_stop_status" : motorStopStatus ,
"connections.inputConnections.$.motor_on_type" : "manual" ,
"connections.inputConnections.$.stopTime" : stopTime
}
}
) ;
// Clear intervals if any
if ( motorIntervals [ motorId ] ) {
clearInterval ( motorIntervals [ motorId ] ) ;
delete motorIntervals [ motorId ] ;
}
eventEmitter . emit (
"motorStop" ,
fcmToken ,
@ -2336,14 +2239,13 @@ console.log(fcmToken)
// Perform stop operations in the background
( async ( ) => {
await delay ( 300000 ) ;
const motorData = await MotorData . findOne ( { customerId , motor _id : motorId , start _instance _id : start _instance _id } ) ;
if ( motorData ) {
const supplierTank = await Tank . findOne ( { customerId , tankName : motorData . supplierTank , tankLocation : motorData . supplier _type . toLowerCase ( ) } ) ;
const supplierFinalWaterLevel = parseInt ( supplierTank . waterlevel , 10 ) ;
const receiverTank = await Tank . findOne ( { customerId , tankName : motorData . receiverTank , tankLocation : motorData . receiver _type . toLowerCase ( ) } ) ;
const receiverFinalWaterLevel = parseInt ( receiverTank . waterlevel , 10 ) ;
const quantityDelivered = parseInt ( motorData . supplierInitialwaterlevel, 10 ) - supplierFinalWaterLevel ;
const quantityDelivered = receiverFinalWaterLevel - parseInt ( motorData . receiverInitialwaterlevel, 10 ) ;
const water _pumped _till _now = parseInt ( receiverTank . total _water _added _from _midnight , 10 ) ;
const totalwaterpumped = quantityDelivered + water _pumped _till _now ;
@ -2358,7 +2260,6 @@ console.log(fcmToken)
$set : {
stopTime : req . body . stopTime ,
receiverfinalwaterlevel : receiverFinalWaterLevel . toString ( ) ,
supplierfinalwaterlevel : supplierFinalWaterLevel . toString ( ) ,
quantity _delivered : quantityDelivered . toString ( )
}
}
@ -2379,7 +2280,6 @@ console.log(fcmToken)
if ( req . body . threshold _type === "time" ) {
// Create a new MotorData entry
const receiverTank = await Tank . findOne ( { customerId , tankName : req . body . to , tankLocation : req . body . to _type . toLowerCase ( ) } ) ;
const supplierTank = await Tank . findOne ( { customerId , tankName : req . body . from , tankLocation : req . body . from _type . toLowerCase ( ) } ) ;
const newMotorData = new MotorData ( {
customerId ,
motor _id : motorId ,
@ -2389,7 +2289,6 @@ console.log(fcmToken)
supplier _type : req . body . from _type ,
receiver _type : req . body . to _type ,
startTime : req . body . startTime ,
supplierInitialwaterlevel : parseInt ( supplierTank . waterlevel , 10 ) ,
receiverInitialwaterlevel : parseInt ( receiverTank . waterlevel , 10 )
} ) ;
await newMotorData . save ( ) ;
@ -2442,16 +2341,15 @@ console.log(fcmToken)
delete motorIntervals [ motorId ] ;
this . publishMotorStopStatus ( motorId , "1" ) ;
await delay ( 300000 ) ;
const motorData = await MotorData . findOne ( { customerId , motor _id : motorId , start _instance _id : start _instance _id } ) ;
if ( motorData ) {
const suppli erTank = await Tank . findOne ( { customerId , tankName : motorData . supplierTank, tankLocation : motorData . suppli er_type . toLowerCase ( ) } ) ;
const supplierFinalWaterLevel = parseInt ( suppli erTank. waterlevel , 10 ) ;
const quantityDelivered = parseInt ( motorData . supplierInitialwaterlevel, 10 ) - supplierFinalWaterLevel ;
const receiv erTank = await Tank . findOne ( { customerId , tankName : motorData . receiverTank, tankLocation : motorData . receiv er_type . toLowerCase ( ) } ) ;
const receiverFinalWaterLevel = parseInt ( receiv erTank. waterlevel , 10 ) ;
const quantityDelivered = receiverFinalWaterLevel - parseInt ( motorData . receiverInitialwaterlevel, 10 ) ;
const water _pumped _till _now = parseInt ( receiverTank . total _water _added _from _midnight , 10 ) ;
const totalwaterpumped = quantityDelivered + water _pumped _till _now ;
await Tank . findOneAndUpdate (
{ customerId , tankName : motorData . receiverTank , tankLocation : motorData . receiver _type . toLowerCase ( ) } ,
{ $set : { total _water _added _from _midnight : totalwaterpumped } }
@ -2462,7 +2360,6 @@ console.log(fcmToken)
{
$set : {
stopTime : req . body . stopTime ,
supplierfinalwaterlevel : supplierFinalWaterLevel . toString ( ) ,
receiverfinalwaterlevel : receiverFinalWaterLevel . toString ( ) ,
quantity _delivered : quantityDelivered . toString ( )
}
@ -2478,114 +2375,8 @@ console.log(fcmToken)
} , 30000 ) ; // Check every minute
}
} else if ( req . body . threshold _type === "litres" ) {
console . log ( "entered litres" )
const receiver _tank _info7 = await Tank . findOne ( { customerId , tankName : req . body . to , tankLocation : req . body . to _type . toLowerCase ( ) } ) ;
const supplier _tank _info7 = await Tank . findOne ( { customerId , tankName : req . body . from , tankLocation : req . body . from _type . toLowerCase ( ) } ) ;
const newMotorData = new MotorData ( {
customerId : customerId ,
motor _id : motorId ,
start _instance _id : start _instance _id ,
supplierTank : req . body . from ,
receiverTank : req . body . to ,
supplier _type : req . body . from _type ,
receiver _type : req . body . to _type ,
startTime : req . body . startTime ,
receiverInitialwaterlevel : parseInt ( receiver _tank _info7 . waterlevel , 10 ) ,
supplierInitialwaterlevel : parseInt ( supplier _tank _info7 . waterlevel , 10 )
} ) ;
await newMotorData . save ( ) ;
// If threshold type is percentage, calculate percentage threshold
const receiver _tank _info = await Tank . findOne ( { customerId , tankName : req . body . to , tankLocation : req . body . to _type . toLowerCase ( ) } ) ;
const supplier _tank _info = await Tank . findOne ( { customerId , tankName : req . body . from , tankLocation : req . body . from _type . toLowerCase ( ) } ) ;
if ( ! receiver _tank _info ) {
throw new Error ( "Receiver tank not found." ) ;
}
if ( ! supplier _tank _info ) {
throw new Error ( "Supplierr tank not found." ) ;
}
const supplier _capacity = parseInt ( supplier _tank _info . capacity , 10 ) ;
const supplier _waterLevel = parseInt ( supplier _tank _info . waterlevel , 10 ) ;
const capacity = parseInt ( receiver _tank _info . capacity , 10 ) ;
const waterLevel = parseInt ( receiver _tank _info . waterlevel , 10 ) ;
const desired _percentage = parseInt ( req . body . manual _threshold _litres . replace ( /,/g , '' ) , 10 ) ;
console . log ( desired _percentage )
const threshold _water _level = waterLevel + desired _percentage ;
const supplier _threshold = supplier _waterLevel - desired _percentage
console . log ( supplier _threshold , "supplier_threshold" )
for await ( const tank of Tank . find ( { "connections.inputConnections.motor_id" : motorId } ) ) {
this . publishMotorStopStatus ( motorId , motorStopStatus ) ;
for await ( const tank of Tank . find ( { "connections.inputConnections.motor_id" : motorId } ) ) {
const index = tank . connections . inputConnections . findIndex ( connection => connection . motor _id === motorId ) ;
if ( index !== - 1 ) {
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{ $set : { [ ` connections.inputConnections. ${ index } .manual_threshold_percentage ` ] : supplier _threshold . toString ( ) , [ ` connections.inputConnections. ${ index } .startTime ` ] : req . body . startTime } }
) ;
}
}
// Update water level threshold
// Start monitoring water level based on threshold percentage
motorIntervals [ motorId ] = setInterval ( async ( ) => {
// Check if water level has reached the threshold percentage
const supplier _tank _info1 = await Tank . findOne ( { customerId , tankName : req . body . from , tankLocation : req . body . from _type . toLowerCase ( ) } ) ;
const current _water _level = parseInt ( supplier _tank _info1 . waterlevel , 10 ) ;
if ( current _water _level <= supplier _threshold ) {
// Stop the motor pump
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{
$set : {
"connections.inputConnections.$.motor_stop_status" : "1" ,
"connections.inputConnections.$.threshold_type" : null ,
"connections.inputConnections.$.manual_threshold_time" : null ,
"connections.inputConnections.$.manual_threshold_percentage" : null
}
}
) ;
clearInterval ( motorIntervals [ motorId ] ) ; // Clear interval
delete motorIntervals [ motorId ] ;
this . publishMotorStopStatus ( motorId , "1" ) ;
await delay ( 300000 ) ;
const motorData = await MotorData . findOne ( { customerId , motor _id : motorId , start _instance _id : start _instance _id } ) ;
if ( motorData ) {
const receiverTank = await Tank . findOne ( { customerId , tankName : motorData . receiverTank , tankLocation : motorData . receiver _type . toLowerCase ( ) } ) ;
const receiverFinalWaterLevel = parseInt ( receiverTank . waterlevel , 10 ) ;
const quantityDelivered = receiverFinalWaterLevel - parseInt ( motorData . receiverInitialwaterlevel , 10 ) ;
const stopTime = formatDate ( new Date ( ) ) ;
await MotorData . updateOne (
{ customerId , motor _id : motorId , start _instance _id : start _instance _id } ,
{
$set : {
stopTime : stopTime ,
receiverfinalwaterlevel : receiverFinalWaterLevel . toString ( ) ,
quantity _delivered : quantityDelivered . toString ( )
}
}
) ;
}
}
} , 20000 ) ;
}
}
}
// Respond with success message
reply . code ( 200 ) . send ( { message : ` Motor ${ action === "start" ? "started" : "stopped" } successfully. ` } ) ;
@ -2595,6 +2386,7 @@ console.log(fcmToken)
} ;
// exports.motorAction = async (req, reply) => {
// try {
// const customerId = req.params.customerId;
@ -5038,12 +4830,14 @@ client.on('message', async (topic, message) => {
const inputConnection = motorTank . connections . inputConnections . find ( conn => conn . motor _id === hw _Id ) ; // Updated variable name
if ( inputConnection ) {
inputConnection . motor _status = status ; // Update motor status
if ( inputConnection . motor _stop _status === "1" && status === "2" && inputConnection . motor _on _type !== "forced_manual" ) {
if ( inputConnection . motor _stop _status === "1" && status === 2 && inputConnection . motor _on _type !== "forced_manual" ) {
const currentTime = moment ( ) . tz ( 'Asia/Kolkata' ) . format ( 'DD-MMM-YYYY - HH:mm' ) ;
inputConnection . motor _stop _status = "2" ;
inputConnection . motor _on _type = "forced_manual" ;
inputConnection . startTime = currentTime ;
}
if ( inputConnection . motor _stop _status === "2" && status === "1" ) {
if ( inputConnection . motor _stop _status === "2" && status === 1 ) {
inputConnection . motor _stop _status = "1" ;
}
@ -5189,7 +4983,6 @@ exports.consumptionofparticulartank = async (request, reply) => {
} ;
// // Set start and end dates
// const startDate = new Date("2024-08-20T00:00:00Z");
// const endDate = new Date("2024-11-04T00:00:00Z");
@ -5260,6 +5053,7 @@ exports.consumptionofparticulartank = async (request, reply) => {
// // Run the data generation function
// generateData();
async function removeDuplicates ( ) {
try {
// Step 1: Find duplicates, considering time and ignoring case for typeofwater
@ -5309,7 +5103,6 @@ async function removeDuplicates() {
}
}
// Run the remove duplicates function
// removeDuplicates();
console . log ( "this is for testing autopush,line located in tankscontroller" )