@ -10,7 +10,8 @@ const fastify = require("fastify")({
const cron = require ( 'node-cron' ) ;
const moment = require ( 'moment' ) ;
const EventEmitter = require ( 'events' ) ;
const eventEmitter = new EventEmitter ( ) ;
async function deleteOldRecords ( ) {
const SEVEN _DAYS _IN _MILLISECONDS = 7 * 24 * 60 * 60 * 1000 ;
const sevenDaysAgo = new Date ( Date . now ( ) - SEVEN _DAYS _IN _MILLISECONDS ) ;
@ -1320,28 +1321,83 @@ admin.initializeApp({
credential : admin . credential . cert ( serviceAccount ) ,
} ) ;
// // Handle motor start event
// eventEmitter.on('motorStart', async (fcmTokens) => {
// await sendNotification(fcmTokens, 'Motor Started', 'The motor has been started successfully.');
// });
// // Handle motor stop event
// eventEmitter.on('motorStop', async (fcmTokens) => {
// await sendNotification(fcmTokens, 'Motor Stopped', 'The motor has been stopped successfully.');
// });
// // Handle low water level event
// eventEmitter.on('lowWaterLevel', async (fcmTokens) => {
// await sendNotification(fcmTokens, 'Low Water Level', 'The water level is below 20%.');
// });
// // Handle high water level event
// eventEmitter.on('highWaterLevel', async (fcmTokens) => {
// await sendNotification(fcmTokens, 'High Water Level', 'The water level has reached above 90%.');
// });
// Handle motor start event with timestamp
eventEmitter . on ( 'motorStart' , async ( fcmTokens , timestamp , motorId , waterLevel ) => {
await sendNotification ( fcmTokens , 'Motor Started' , ` Motor ID: ${ motorId } started successfully at ${ timestamp } . Current Water Level: ${ waterLevel } Ltrs ` ) ;
} ) ;
// Emit motor stop event with motorId
eventEmitter . on ( 'motorStop' , async ( fcmTokens , timestamp , motorId , waterLevel ) => {
await sendNotification ( fcmTokens , 'Motor Stopped' , ` Motor ID: ${ motorId } stopped successfully at ${ timestamp } .Current Water Level: ${ waterLevel } Ltrs ` ) ;
} ) ;
// Emit low water level event with motorId
eventEmitter . on ( 'lowWaterLevel' , async ( fcmTokens , timestamp , motorId , waterLevel ) => {
await sendNotification ( fcmTokens , 'Low Water Level' , ` Motor ID: ${ motorId } , water level dropped below 20% at ${ timestamp } . Current Water Level: ${ waterLevel } Ltrs ` ) ;
} ) ;
// Emit high water level event with motorId
eventEmitter . on ( 'highWaterLevel' , async ( fcmTokens , timestamp , motorId , waterLevel ) => {
await sendNotification ( fcmTokens , 'High Water Level' , ` Motor ID: ${ motorId } , water level reached above 90% at ${ timestamp } . Current Water Level: ${ waterLevel } Ltrs ` ) ;
} ) ;
// Function to emit events with timestamps
const emitWithTimestamp = ( eventName , fcmTokens , motorId , waterLevel ) => {
const timestamp = moment ( ) . format ( 'HH:mm:ss YYYY-MM-DD ' ) ;
eventEmitter . emit ( eventName , fcmTokens , timestamp , motorId , waterLevel ) ;
} ;
const sendNotification = async ( fcmTokens , title , body ) => {
if ( ! Array . isArray ( fcmTokens ) || fcmTokens . length === 0 ) {
console . error ( 'No FCM tokens provided.' ) ;
return ;
}
const message = {
tokens : fcmTokens ,
notification : {
title : title ,
body : body ,
} ,
} ;
for ( const token of fcmTokens ) {
const message = {
token : token ,
notification : {
title : title ,
body : body ,
} ,
data : {
target : 'tank_levels' ,
} ,
} ;
try {
const response = await admin . messaging ( ) . sendMulticast ( message ) ;
console . log ( 'Notification sent successfully:' , response ) ;
} catch ( error ) {
console . error ( 'Error sending notification:' , error ) ;
try {
const response = await admin . messaging ( ) . send ( message ) ; // Send each message individually
console . log ( 'Notification sent successfully:' , response ) ;
} catch ( error ) {
console . error ( ` Failed to send notification to token ${ token } : ` , error ) ;
}
}
} ;
// const sendPushNotification = async (registrationToken, title, body) => {
// const message = {
// notification: {
@ -1383,35 +1439,329 @@ exports.publishMotorStopStatus = async (motor_id, motor_stop_status) => {
} ;
const stat _stop _intervals = { } ;
// exports.motorAction = async (req, reply) => {
// try {
// const customerId = req.params.customerId;
// const action = req.body.action;
// const motorId = req.body.motor_id;
// const start_instance_id = req.body.start_instance_id
// console.log(req.body.startTime)
// // Ensure motor_id is provided
// if (!motorId) {
// throw new Error("Motor ID is required.");
// }
// const users = await User.find({ customerId: customerId });
// const fcmToken = users.map(user => user.fcmId).filter(fcmId => fcmId);
// console.log(fcmToken)
// // Determine the motor stop status based on the action
// let motorStopStatus;
// if (action === "start") {
// motorStopStatus = "2"; // If action is start, set stop status to "2"
// // eventEmitter.emit('motorStart', fcmToken); // Emit motor start event
// emitWithTimestamp('motorStart', fcmToken); // Emit motor start event with timestamp
// console.log( eventEmitter.emit('motorStart', fcmToken))
// } else if (action === "stop") {
// motorStopStatus = "1"; // If action is stop, set stop status to "1"
// // eventEmitter.emit('motorStop', fcmToken); // Emit motor stop event
// emitWithTimestamp('motorStop', fcmToken); // Emit motor stop event with timestamp
// } else {
// throw new Error("Invalid action provided.");
// }
// // Update the motor stop status immediately if action is stop
// if (action === "stop") {
// // Update the motor stop status and other fields
// await Tank.updateOne(
// { customerId, "connections.inputConnections.motor_id": motorId },
// {
// $set: {
// "connections.inputConnections.$.motor_stop_status": "1",
// "connections.inputConnections.$.stopTime": req.body.stopTime,
// "connections.inputConnections.$.threshold_type": null,
// "connections.inputConnections.$.manual_threshold_time": null,
// "connections.inputConnections.$.manual_threshold_percentage": null
// }
// }
// );
// reply.code(200).send({ message: "Motor stopped successfully." });
// // Perform stop operations in the background
// (async () => {
// await delay(300000);
// // Update the existing motor data entry with stop details
// const motorData = await MotorData.findOne({ customerId, motor_id: motorId, start_instance_id: start_instance_id });
// if (motorData) {
// const receiverTank = await Tank.findOne({ customerId, tankName: motorData.receiverTank, tankLocation: motorData.receiver_type.toLowerCase() });
// const receiverFinalWaterLevel = parseInt(receiverTank.waterlevel, 10);
// const quantityDelivered = receiverFinalWaterLevel - parseInt(motorData.receiverInitialwaterlevel, 10);
// const water_pumped_till_now = parseInt(receiverTank.total_water_added_from_midnight, 10);
// const totalwaterpumped = quantityDelivered + water_pumped_till_now;
// await Tank.findOneAndUpdate(
// { customerId, tankName: motorData.receiverTank, tankLocation: motorData.receiver_type.toLowerCase() },
// { $set: { total_water_added_from_midnight: totalwaterpumped } }
// );
// await MotorData.updateOne(
// { customerId, motor_id: motorId, start_instance_id: start_instance_id },
// {
// $set: {
// stopTime: req.body.stopTime,
// receiverfinalwaterlevel: receiverFinalWaterLevel.toString(),
// quantity_delivered: quantityDelivered.toString()
// }
// }
// );
// }
// })();
// // Return here to ensure the rest of the code is not executed for the stop action
// return;
// } else {
// // Update the motor stop status to "2" for start action
// await Tank.updateOne(
// { customerId, "connections.inputConnections.motor_id": motorId },
// { $set: { "connections.inputConnections.$.motor_stop_status": "2" } }
// );
// }
// // Check threshold settings if action is start
// if (action === "start") {
// if (req.body.threshold_type === "time") {
// // If threshold type is time, update threshold time
// // await Tank.updateOne(
// // { customerId, "connections.inputConnections.motor_id": motorId },
// // { $set: { "connections.inputConnections.$.manual_threshold_time": req.body.manual_threshold_time,startTime:req.body.startTime } }
// // );
// const receiver_tank_info7 = await Tank.findOne({ customerId, tankName: req.body.to, tankLocation: req.body.to_type.toLowerCase() });
// const newMotorData = new MotorData({
// customerId: customerId,
// motor_id: motorId,
// start_instance_id: start_instance_id,
// supplierTank: req.body.from,
// receiverTank: req.body.to,
// supplier_type: req.body.from_type,
// receiver_type: req.body.to_type,
// startTime: req.body.startTime,
// receiverInitialwaterlevel:parseInt(receiver_tank_info7.waterlevel, 10)
// });
// await newMotorData.save();
// for await (const tank of Tank.find({ "connections.inputConnections.motor_id": motorId })) {
// const index = tank.connections.inputConnections.findIndex(connection => connection.motor_id === motorId);
// if (index !== -1) {
// await Tank.updateOne(
// { customerId, "connections.inputConnections.motor_id": motorId },
// { $set: { [`connections.inputConnections.${index}.manual_threshold_time`]: req.body.manual_threshold_time, [`connections.inputConnections.${index}.startTime`]: req.body.startTime,[`connections.inputConnections.${index}.start_instance_id`]: start_instance_id } }
// );
// }
// }
// // Start monitoring water level based on threshold time
// const thresholdTime = moment().add(req.body.manual_threshold_time, 'minutes').toDate();
// const intervalId = setInterval(async () => {
// const splr_tank_info3 = await Tank.findOne({ customerId, tankName: req.body.from, tankLocation: req.body.from_type.toLowerCase() });
// const splr_tank_info3_waterlevel = parseInt(splr_tank_info3.waterlevel, 10);
// //console.log(splr_tank_info3_waterlevel,"splr_tank_info3_waterlevel")
// const splr_tank_info3_capacity = parseInt(splr_tank_info3.capacity.replace(/,/g, ''), 10);
// // const splr_tank_info3_capacity = parseInt(splr_tank_info3.capacity, 10);
// // console.log(splr_tank_info3.capacity,splr_tank_info3_capacity,"splr_tank_info3_capacity")
// const splr_tank_info3_percentage = (splr_tank_info3_waterlevel / splr_tank_info3_capacity) * 100;
// // console.log(splr_tank_info3_percentage, "percentage for less than 20");
// if (new Date() >= thresholdTime || splr_tank_info3_percentage <= 20) {
// console.log(splr_tank_info3_percentage,)
// await Tank.updateOne(
// { customerId, "connections.inputConnections.motor_id": motorId },
// {
// $set: {
// "connections.inputConnections.$.motor_stop_status": "1",
// "connections.inputConnections.$.threshold_type": null,
// "connections.inputConnections.$.manual_threshold_time": null,
// "connections.inputConnections.$.manual_threshold_percentage": null
// }
// }
// );
// clearInterval(intervalId);
// await delay(300000);
// const motorData = await MotorData.findOne({ customerId, motor_id: motorId, start_instance_id: start_instance_id });
// if (motorData) {
// const receiverTank = await Tank.findOne({ customerId, tankName: motorData.receiverTank, tankLocation: motorData.receiver_type.toLowerCase() });
// const receiverFinalWaterLevel = parseInt(receiverTank.waterlevel, 10);
// const quantityDelivered = receiverFinalWaterLevel - parseInt(motorData.receiverInitialwaterlevel, 10);
// const water_pumped_till_now = parseInt(receiverTank.total_water_added_from_midnight, 10);
// const totalwaterpumped = quantityDelivered + water_pumped_till_now
// await Tank.findOneAndUpdate({customerId, tankName: motorData.receiverTank, tankLocation: motorData.receiver_type.toLowerCase()}, { $set: { total_water_added_from_midnight: totalwaterpumped } })
// const stopTime = formatDate(new Date());
// await MotorData.updateOne(
// { customerId, motor_id: motorId, start_instance_id: start_instance_id },
// {
// $set: {
// stopTime:stopTime,
// receiverfinalwaterlevel: receiverFinalWaterLevel.toString(),
// quantity_delivered: quantityDelivered.toString()
// }
// }
// );
// }
// }
// }, 60000);
// } else if (req.body.threshold_type === "litres") {
// console.log("entered litres")
// const receiver_tank_info7 = await Tank.findOne({ customerId, tankName: req.body.to, tankLocation: req.body.to_type.toLowerCase() });
// const newMotorData = new MotorData({
// customerId: customerId,
// motor_id: motorId,
// start_instance_id: start_instance_id,
// supplierTank: req.body.from,
// receiverTank: req.body.to,
// supplier_type: req.body.from_type,
// receiver_type: req.body.to_type,
// startTime: req.body.startTime,
// receiverInitialwaterlevel:parseInt(receiver_tank_info7.waterlevel, 10)
// });
// await newMotorData.save();
// // If threshold type is percentage, calculate percentage threshold
// const receiver_tank_info = await Tank.findOne({ customerId, tankName: req.body.to, tankLocation: req.body.to_type.toLowerCase() });
// const supplier_tank_info = await Tank.findOne({ customerId, tankName: req.body.from, tankLocation: req.body.from_type.toLowerCase() });
// if (!receiver_tank_info) {
// throw new Error("Receiver tank not found.");
// }
// if (!supplier_tank_info) {
// throw new Error("Supplierr tank not found.");
// }
// const supplier_capacity = parseInt(supplier_tank_info.capacity, 10);
// const supplier_waterLevel = parseInt(supplier_tank_info.waterlevel, 10);
// const capacity = parseInt(receiver_tank_info.capacity, 10);
// const waterLevel = parseInt(receiver_tank_info.waterlevel, 10);
// const desired_percentage = parseInt(req.body.manual_threshold_litres.replace(/,/g, ''), 10);
// console.log(desired_percentage)
// const threshold_water_level = waterLevel+desired_percentage;
// const supplier_threshold = supplier_waterLevel-desired_percentage
// console.log(supplier_threshold,"supplier_threshold")
// for await (const tank of Tank.find({ "connections.inputConnections.motor_id": motorId })) {
// const index = tank.connections.inputConnections.findIndex(connection => connection.motor_id === motorId);
// if (index !== -1) {
// await Tank.updateOne(
// { customerId, "connections.inputConnections.motor_id": motorId },
// { $set: { [`connections.inputConnections.${index}.manual_threshold_percentage`]: supplier_threshold.toString(), [`connections.inputConnections.${index}.startTime`]: req.body.startTime } }
// );
// }
// }
// // Update water level threshold
// // Start monitoring water level based on threshold percentage
// const intervalId = setInterval(async () => {
// // Check if water level has reached the threshold percentage
// const supplier_tank_info1 = await Tank.findOne({ customerId, tankName: req.body.from, tankLocation: req.body.from_type.toLowerCase() });
// const current_water_level = parseInt(supplier_tank_info1.waterlevel, 10);
// if (current_water_level <= supplier_threshold) {
// // Stop the motor pump
// await Tank.updateOne(
// { customerId, "connections.inputConnections.motor_id": motorId },
// {
// $set: {
// "connections.inputConnections.$.motor_stop_status": "1",
// "connections.inputConnections.$.threshold_type": null,
// "connections.inputConnections.$.manual_threshold_time": null,
// "connections.inputConnections.$.manual_threshold_percentage": null
// }
// }
// );
// clearInterval(intervalId); // Stop monitoring water level
// await delay(300000);
// const motorData = await MotorData.findOne({ customerId, motor_id: motorId, start_instance_id: start_instance_id });
// if (motorData) {
// const receiverTank = await Tank.findOne({ customerId, tankName: motorData.receiverTank, tankLocation: motorData.receiver_type.toLowerCase() });
// const receiverFinalWaterLevel = parseInt(receiverTank.waterlevel, 10);
// const quantityDelivered = receiverFinalWaterLevel - parseInt(motorData.receiverInitialwaterlevel, 10);
// const stopTime = formatDate(new Date());
// await MotorData.updateOne(
// { customerId, motor_id: motorId, start_instance_id: start_instance_id },
// {
// $set: {
// stopTime:stopTime,
// receiverfinalwaterlevel: receiverFinalWaterLevel.toString(),
// quantity_delivered: quantityDelivered.toString()
// }
// }
// );
// }
// }
// }, 20000); // Check water level every minute
// }
// }
// // Respond with success message
// reply.code(200).send({ message: `Motor ${action === "start" ? "started" : "stopped"} successfully.` });
// } catch (err) {
// // Handle errors
// throw boom.boomify(err);
// }
// };
exports . motorAction = async ( req , reply ) => {
try {
const customerId = req . params . customerId ;
const action = req . body . action ;
const motorId = req . body . motor _id ;
const start _instance _id = req . body . start _instance _id
console . log ( req . body . startTime )
const start _instance _id = req . body . start _instance _id ;
// Define thresholds for water levels
const lowWaterThreshold = 20 ; // Low water level percentage threshold
const highWaterThreshold = 90 ; // High water level percentage threshold
// Ensure motor_id is provided
if ( ! motorId ) {
throw new Error ( "Motor ID is required." ) ;
}
const users = await User . find ( { customerId : customerId } ) ;
// Get user FCM tokens
const users = await User . find ( { customerId } ) ;
const fcmToken = users . map ( user => user . fcmId ) . filter ( fcmId => fcmId ) ;
console . log ( fcmToken )
const receiverTank = await Tank . findOne ( { customerId , tankName : req . body . to , tankLocation : req . body . to _type . toLowerCase ( ) } ) ;
console . log ( receiverTank )
const currentWaterLevel = parseInt ( receiverTank . waterlevel , 10 ) ;
// Determine the motor stop status based on the action
let motorStopStatus ;
if ( action === "start" ) {
motorStopStatus = "2" ; // If action is start, set stop status to "2"
emitWithTimestamp ( 'motorStart' , fcmToken , motorId , currentWaterLevel ) ;
} else if ( action === "stop" ) {
motorStopStatus = "1" ; // If action is stop, set stop status to "1"
emitWithTimestamp ( 'motorStop' , fcmToken , motorId , currentWaterLevel ) ;
} else {
throw new Error ( "Invalid action provided." ) ;
}
// Update the motor stop status immediately if action is stop
// If action is stop, immediately update motor status and perform stop operations
if ( action === "stop" ) {
// Update the motor stop status and other fields
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{
@ -1432,7 +1782,6 @@ exports.motorAction = async (req, reply) => {
( async ( ) => {
await delay ( 300000 ) ;
// Update the existing motor data entry with stop details
const motorData = await MotorData . findOne ( { customerId , motor _id : motorId , start _instance _id : start _instance _id } ) ;
if ( motorData ) {
const receiverTank = await Tank . findOne ( { customerId , tankName : motorData . receiverTank , tankLocation : motorData . receiver _type . toLowerCase ( ) } ) ;
@ -1440,6 +1789,7 @@ exports.motorAction = async (req, reply) => {
const quantityDelivered = receiverFinalWaterLevel - parseInt ( motorData . receiverInitialwaterlevel , 10 ) ;
const water _pumped _till _now = parseInt ( receiverTank . total _water _added _from _midnight , 10 ) ;
const totalwaterpumped = quantityDelivered + water _pumped _till _now ;
await Tank . findOneAndUpdate (
{ customerId , tankName : motorData . receiverTank , tankLocation : motorData . receiver _type . toLowerCase ( ) } ,
{ $set : { total _water _added _from _midnight : totalwaterpumped } }
@ -1455,14 +1805,11 @@ exports.motorAction = async (req, reply) => {
}
}
) ;
}
} ) ( ) ;
// Return here to ensure the rest of the code is not executed for the stop action
return ;
return ; // Return early to avoid executing the start logic
} else {
// Update the motor stop status to "2" for start action
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{ $set : { "connections.inputConnections.$.motor_stop_status" : "2" } }
@ -1472,15 +1819,10 @@ exports.motorAction = async (req, reply) => {
// Check threshold settings if action is start
if ( action === "start" ) {
if ( req . body . threshold _type === "time" ) {
// If threshold type is time, update threshold time
// await Tank.updateOne(
// { customerId, "connections.inputConnections.motor_id": motorId },
// { $set: { "connections.inputConnections.$.manual_threshold_time": req.body.manual_threshold_time,startTime:req.body.startTime } }
// );
const receiver _tank _info7 = await Tank . findOne ( { customerId , tankName : req . body . to , tankLocation : req . body . to _type . toLowerCase ( ) } ) ;
// Create a new MotorData entry
const receiverTank = await Tank . findOne ( { customerId , tankName : req . body . to , tankLocation : req . body . to _type . toLowerCase ( ) } ) ;
const newMotorData = new MotorData ( {
customerId : customerId ,
customerId ,
motor _id : motorId ,
start _instance _id : start _instance _id ,
supplierTank : req . body . from ,
@ -1488,35 +1830,36 @@ exports.motorAction = async (req, reply) => {
supplier _type : req . body . from _type ,
receiver _type : req . body . to _type ,
startTime : req . body . startTime ,
receiverInitialwaterlevel : parseInt ( receiver _tank _info7 . waterlevel , 10 )
receiverInitialwaterlevel : parseInt ( receiverTank . waterlevel , 10 )
} ) ;
await newMotorData . save ( ) ;
// Update the tank connections with start time and threshold time
for await ( const tank of Tank . find ( { "connections.inputConnections.motor_id" : motorId } ) ) {
this . publishMotorStopStatus ( motorId , motorStopStatus ) ;
for await ( const tank of Tank . find ( { "connections.inputConnections.motor_id" : motorId } ) ) {
const index = tank . connections . inputConnections . findIndex ( connection => connection . motor _id === motorId ) ;
if ( index !== - 1 ) {
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{ $set : { [ ` connections.inputConnections. ${ index } .manual_threshold_time ` ] : req . body . manual _threshold _time , [ ` connections.inputConnections. ${ index } .startTime ` ] : req . body . startTime , [ ` connections.inputConnections. ${ index } .start_instance_id ` ] : start _instance _id } }
{
$set : {
[ ` connections.inputConnections. ${ index } .manual_threshold_time ` ] : req . body . manual _threshold _time ,
[ ` connections.inputConnections. ${ index } .startTime ` ] : req . body . startTime ,
[ ` connections.inputConnections. ${ index } .start_instance_id ` ] : start _instance _id
}
}
) ;
}
}
// Start monitoring water level based on threshold time
const thresholdTime = moment ( ) . add ( req . body . manual _threshold _time , 'minutes' ) . toDate ( ) ;
const intervalId = setInterval ( async ( ) => {
const splr _tank _info3 = await Tank . findOne ( { customerId , tankName : req . body . from , tankLocation : req . body . from _type . toLowerCase ( ) } ) ;
const splr _tank _info3 _waterlevel = parseInt ( splr _tank _info3 . waterlevel , 10 ) ;
//console.log(splr_tank_info3_waterlevel,"splr_tank_info3_waterlevel")
const splr _tank _info3 _capacity = parseInt ( splr _tank _info3 . capacity . replace ( /,/g , '' ) , 10 ) ;
// const splr_tank_info3_capacity = parseInt(splr_tank_info3.capacity, 10);
// console.log(splr_tank_info3.capacity,splr_tank_info3_capacity,"splr_tank_info3_capacity")
const splr _tank _info3 _percentage = ( splr _tank _info3 _waterlevel / splr _tank _info3 _capacity ) * 100 ;
// console.log(splr_tank_info3_percentage, "percentage for less than 20");
if ( new Date ( ) >= thresholdTime || splr _tank _info3 _percentage <= 20 ) {
console . log ( splr _tank _info3 _percentage , )
const supplierTank = await Tank . findOne ( { customerId , tankName : req . body . from , tankLocation : req . body . from _type . toLowerCase ( ) } ) ;
const currentWaterLevel = parseInt ( supplierTank . waterlevel , 10 ) ;
const currentWaterPercentage = ( currentWaterLevel / parseInt ( supplierTank . capacity . replace ( /,/g , '' ) , 10 ) ) * 100 ;
if ( new Date ( ) >= thresholdTime || currentWaterPercentage <= lowWaterThreshold ) {
await Tank . updateOne (
{ customerId , "connections.inputConnections.motor_id" : motorId } ,
{
@ -1528,27 +1871,29 @@ exports.motorAction = async (req, reply) => {
}
}
) ;
emitWithTimestamp ( 'lowWaterLevel' , fcmToken ) ; // Emit low water level notification
clearInterval ( intervalId ) ;
this . publishMotorStopStatus ( motorId , "1" ) ;
await delay ( 300000 ) ;
const motorData = await MotorData . findOne ( { customerId , motor _id : motorId , start _instance _id : start _instance _id } ) ;
if ( motorData ) {
const receiverTank = await Tank . findOne ( { customerId , tankName : motorData . receiverTank , tankLocation : motorData . receiver _type . toLowerCase ( ) } ) ;
const receiverFinalWaterLevel = parseInt ( receiverTank . waterlevel , 10 ) ;
const quantityDelivered = receiverFinalWaterLevel - parseInt ( motorData . receiverInitialwaterlevel , 10 ) ;
const water _pumped _till _now = parseInt ( receiverTank . total _water _added _from _midnight , 10 ) ;
const totalwaterpumped = quantityDelivered + water _pumped _till _now
await Tank . findOneAndUpdate ( { customerId , tankName : motorData . receiverTank , tankLocation : motorData . receiver _type . toLowerCase ( ) } , { $set : { total _water _added _from _midnight : totalwaterpumped } } )
const totalwaterpumped = quantityDelivered + water _pumped _till _now ;
const stopTime = formatDate ( new Date ( ) ) ;
await Tank . findOneAndUpdate (
{ customerId , tankName : motorData . receiverTank , tankLocation : motorData . receiver _type . toLowerCase ( ) } ,
{ $set : { total _water _added _from _midnight : totalwaterpumped } }
) ;
await MotorData . updateOne (
{ customerId , motor _id : motorId , start _instance _id : start _instance _id } ,
{
$set : {
stopTime : stopTime ,
stopTime : req . body . stopTime ,
receiverfinalwaterlevel : receiverFinalWaterLevel . toString ( ) ,
quantity _delivered : quantityDelivered . toString ( )
}
@ -1556,7 +1901,14 @@ exports.motorAction = async (req, reply) => {
) ;
}
}
} , 60000 ) ;
// Check for high water level and send notification
if ( currentWaterPercentage >= highWaterThreshold ) {
emitWithTimestamp ( 'highWaterLevel' , fcmToken ) ; // Emit high water level notification
}
} , 60000 ) ; // Check every minute
}
} else if ( req . body . threshold _type === "litres" ) {
console . log ( "entered litres" )
const receiver _tank _info7 = await Tank . findOne ( { customerId , tankName : req . body . to , tankLocation : req . body . to _type . toLowerCase ( ) } ) ;
@ -1653,14 +2005,13 @@ exports.motorAction = async (req, reply) => {
) ;
}
}
} , 20000 ) ; // Check water level every minute
} , 20000 ) ;
}
}
// Respond with success message
reply . code ( 200 ) . send ( { message : ` Motor ${ action === "start" ? "started" : "stopped" } successfully. ` } ) ;
} catch ( err ) {
// Handle errors
throw boom . boomify ( err ) ;
}
} ;
@ -3440,7 +3791,7 @@ const updatetotalConsumptiontillmidnight = async () => {
const formattedDate = moment ( ) . tz ( 'Asia/Kolkata' ) . format ( 'DD-MMM-YYYY - HH:mm' ) ;
// Check if the record already exists
const existingRecord = await TankConsumption Schema. findOne ( {
const existingRecord = await TankConsumption Original Schema. findOne ( {
customerId : tank . customerId ,
tankName : tank . tankName ,
tankLocation : tank . tankLocation ,
@ -3449,7 +3800,7 @@ const updatetotalConsumptiontillmidnight = async () => {
if ( ! existingRecord ) {
// Create and save the new document if it doesn't exist
const newTankConsumption = new TankConsumption Schema( {
const newTankConsumption = new TankConsumption Original Schema( {
customerId : tank . customerId ,
tankName : tank . tankName ,
tankLocation : tank . tankLocation ,
@ -3475,7 +3826,7 @@ const updatetotalConsumptiontillmidnight = async () => {
clearConsumptionSchedule ( ) ;
// Schedule the task to run every day at 12:49 PM IST and store the reference
consumptionTask = cron . schedule ( ' 32 1 5 * * *', updatetotalConsumptiontillmidnight , {
consumptionTask = cron . schedule ( ' 50 23 * * *', updatetotalConsumptiontillmidnight , {
timezone : "Asia/Kolkata"
} ) ;
@ -3925,7 +4276,7 @@ exports.getBlockData = async (req, reply) => {
const mqtt = require ( 'mqtt' ) ;
const client = mqtt . connect ( 'mqtt://35.207.198.4:188 3 ') ; // Connect to MQTT broker
const client = mqtt . connect ( 'mqtt://35.207.198.4:188 4 ') ; // Connect to MQTT broker
client . on ( 'connect' , ( ) => {
console . log ( 'Connected to MQTT broker' ) ;
@ -4057,5 +4408,74 @@ client.on('message', async (topic, message) => {
// };
exports . consumptionofparticulartank = async ( request , reply ) => {
try {
const { customerId } = request . params ;
const { startDate , stopDate , tankName , tankLocation , block } = request . body ;
// Ensure dates are formatted or parsed correctly for the query
const start = startDate ;
const end = stopDate ;
// Find the tank by customerId, tankLocation, and tankName
const tank = await Tank . findOne ( {
customerId ,
tankLocation : tankLocation || "overhead" , // Default to "overhead" if not provided
tankName ,
} ) ;
if ( ! tank ) {
return reply . status ( 404 ) . send ( {
status _code : 404 ,
message : "Tank not found" ,
} ) ;
}
const waterlevel _at _midnight = parseInt ( tank . waterlevel _at _midnight . replace ( /,/g , "" ) , 10 ) ;
const total _water _added _from _midnight = parseInt ( tank . total _water _added _from _midnight . replace ( /,/g , "" ) , 10 ) ;
const waterlevel = parseInt ( tank . waterlevel . replace ( /,/g , "" ) , 10 ) ;
// Find consumption records for the tank between the given dates
const tankConsumptions = await TankConsumptionOriginalSchema . find ( {
customerId ,
tankName ,
tankLocation : tankLocation ,
time : {
$gte : start ,
$lte : end ,
} ,
} ) ;
// Calculate total consumption from records
const total _consumption _from _records = tankConsumptions . reduce ( ( acc , record ) => {
return acc + parseInt ( record . consumption , 10 ) ;
} , 0 ) ;
// Calculate final consumption
const consumption = ( waterlevel _at _midnight + total _water _added _from _midnight ) - waterlevel + total _consumption _from _records ;
// Prepare response data
const tankData = {
tankname : tank . tankName ,
totalConsumption : consumption ,
block : tank . blockName ,
TypeofWater : tank . typeOfWater ,
location : tank . tankLocation ,
capacity : tank . capacity ,
waterlevel : tank . waterlevel ,
} ;
// Send the response, including both total consumption and tankConsumptions data
reply . send ( {
status _code : 200 ,
tankData ,
totalConsumption : consumption ,
consumptionRecords : tankConsumptions , // Add the consumption records here
} ) ;
} catch ( err ) {
throw boom . boomify ( err ) ;
}
} ;