master^2
Varun 9 months ago
parent 55b207f076
commit 65de89a44f

@ -2836,6 +2836,8 @@ async function calculateTotalPumpedWater(customerId, motorId, start_instance_id)
}
exports.motorAction = async (req, reply) => {
try {
const { customerId } = req.params;
@ -2861,121 +2863,12 @@ exports.motorAction = async (req, reply) => {
let motorStopStatus = action === "start" ? "2" : "1";
const blockName = req.body.from || "Unknown Block";
const tankName = req.body.to || "Unknown Tank";
const stopTime = req.body.stopTime
if (action === "start") {
motorStopStatus = "2";
const startTime = req.body.startTime;
await Tank.updateOne(
{ customerId, "connections.inputConnections.motor_id": motorId },
{ $set: { "connections.inputConnections.$.motor_stop_status": motorStopStatus } }
);
const thresholdTimeMs = req.body.manual_threshold_time * 60 * 1000; // Convert minutes to milliseconds
// const stopCriteria =
// motorOnType === "time"
// ? `${req.body.manual_threshold_time} minutes`
// : `${req.body.manual_threshold_litres} litres`;
try {
console.log("enter the start")
eventEmitter.emit(
"motorStart",
customerId,
fcmToken,
tankName,
blockName,
startTime,
"Mobile APP",
manual_threshold_time,
typeOfWater,
motorId,
loggedInUser.phone,
);
reply.code(200).send({ message: "Motor started successfully." });
} catch (error) {
console.error("Error in handleMotorStart:", error);
reply.code(500).send({ error: "Internal Server Error" });
}
// Start checking water level every 30 minutes
if (!waterLevelCheckInterval) {
waterLevelCheckInterval = setInterval(async () => {
await checkWaterLevel(customerId, motorId, fcmToken, receiverTank);
}, 30 * 60 * 1000); // 30 minutes
}
await Tank.updateOne(
{ customerId, "connections.inputConnections.motor_id": motorId },
{ $set: { "connections.inputConnections.$.motor_stop_status": "2",
"connections.inputConnections.$.manual_threshold_time": manual_threshold_time,
"connections.inputConnections.$.threshold_type": "time",
"connections.inputConnections.$.motor_on_type": "manual" } }
);
reply.code(200).send({ message: "Motor started successfully." });
} else if (action === "stop") {
motorStopStatus = "1"; // If action is stop, set stop status to "1"
try {
const totalWaterPumped = await calculateTotalPumpedWater(customerId, motorId, start_instance_id);
eventEmitter.emit("motorStop", customerId, fcmToken, tankName, blockName, stopTime, "Mobile APP", totalWaterPumped, typeOfWater, motorId,
loggedInUser.phone,);
reply.code(200).send({ message: "Motor stopped successfully." });
} catch (error) {
console.error("Error in handleMotorStop:", error);
reply.code(500).send({ error: "Internal Server Error" });
}
await Tank.updateOne(
{ customerId, "connections.inputConnections.motor_id": motorId },
{
$set: {
"connections.inputConnections.$.motor_stop_status": "1",
"connections.inputConnections.$.motor_on_type": motorOnType }
}
);
// Clear the interval when the motor is stopped
if (waterLevelCheckInterval) {
clearInterval(waterLevelCheckInterval);
waterLevelCheckInterval = null; // Reset the interval ID
}
} else {
throw new Error("Invalid action provided.");
}
// If action is stop, immediately update motor status and perform stop operations
if (action === "stop") {
console.log("enterted stop")
await Tank.updateOne(
{ customerId, "connections.inputConnections.motor_id": motorId },
{
$set: {
"connections.inputConnections.$.motor_stop_status": "1",
"connections.inputConnections.$.motor_on_type": "manual",
"connections.inputConnections.$.stopTime": req.body.stopTime,
"connections.inputConnections.$.threshold_type": null,
"connections.inputConnections.$.manual_threshold_time": null,
"connections.inputConnections.$.manual_threshold_percentage": null
}
}
);
if (action === "start") {
if (motorIntervals[motorId]) {
console.log(`🔄 Clearing all existing intervals for motorId: ${motorId}`);
// Clear and delete all intervals for the motorId
Object.keys(motorIntervals).forEach(key => {
if (key.startsWith(motorId)) {
clearInterval(motorIntervals[key]);
delete motorIntervals[key];
}
});
console.log(`✅ All intervals cleared for motorId: ${motorId}`);
clearInterval(motorIntervals[motorId]);
delete motorIntervals[motorId];
}
const startTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm');
@ -3058,29 +2951,29 @@ exports.motorAction = async (req, reply) => {
const notificationKey = `${customerId}_${motorId}_threshold`;
// Check if the notification has already been sent
// if (!notificationTracker.get(notificationKey)) {
// console.log("Sending threshold time notification...");
if (!notificationTracker.get(notificationKey)) {
console.log("Sending threshold time notification...");
// eventEmitter.emit(
// "sendThresholdTimeNotification",
// customerId,
// fcmToken,
// manual_threshold_time,
// motorId,
// tankName,
// blockName
// );
eventEmitter.emit(
"sendThresholdTimeNotification",
customerId,
fcmToken,
manual_threshold_time,
motorId,
tankName,
blockName
);
// // Mark notification as sent
// notificationTracker.set(notificationKey, true);
// Mark notification as sent
notificationTracker.set(notificationKey, true);
// // Optionally, reset the flag after some time (e.g., 24 hours)
// setTimeout(() => {
// notificationTracker.delete(notificationKey);
// }, 24 * 60 * 60 * 1000); // Reset after 24 hours
// } else {
// console.log("Notification already sent, skipping...");
// }
// Optionally, reset the flag after some time (e.g., 24 hours)
setTimeout(() => {
notificationTracker.delete(notificationKey);
}, 24 * 60 * 60 * 1000); // Reset after 24 hours
} else {
console.log("Notification already sent, skipping...");
}
const currentTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm');
await Tank.updateOne(
@ -3098,8 +2991,6 @@ exports.motorAction = async (req, reply) => {
);
clearInterval(motorIntervals[motorId]); // Stop the motor if condition met
delete motorIntervals[motorId]; // Remove from interval object
@ -3254,33 +3145,6 @@ exports.motorAction = async (req, reply) => {
} else if (action === "stop") {
await stopMotor(motorId, customerId, start_instance_id);
this.publishMotorStopStatus(motorId, motorStopStatus);
// Fetch motor data to check its threshold time
const motorData = await Tank.findOne(
{ customerId, "connections.inputConnections.motor_id": motorId },
{ "connections.inputConnections.$": 1 }
);
if (motorData) {
const motor = motorData.connections.inputConnections[0];
const thresholdTime = motor.manual_threshold_time; // Get threshold time
const hw_Id = motor.hw_Id; // Get hardware ID
// Check if motor was running for at least the threshold time
if (motor.runTime >= thresholdTime) {
console.log(`Sending threshold time notification for motor ${motorId}`);
eventEmitter.emit(
"sendThresholdTimeNotification",
customerId,
fcmToken, // Use correct FCM token list
thresholdTime,
hw_Id,
tankName,
blockName
);
}
}
reply.code(200).send({ message: "Motor stopped successfully." });
}
} catch (err) {
@ -3327,6 +3191,9 @@ async function stopMotor(motorId, customerId, start_instance_id) {
// exports.motorAction = async (req, reply) => {
// try {
// const customerId = req.params.customerId;
@ -5898,32 +5765,20 @@ exports.getBlockData = async (req, reply) => {
const mqtt = require('mqtt');
require('dotenv').config();
// **Persistent MQTT Connection**
const client = mqtt.connect('mqtt://35.207.198.4:1883', {
clientId: `mqtt_server_${Math.random().toString(16).substr(2, 8)}`,
clean: false, // Ensures MQTT retains subscriptions
reconnectPeriod: 2000, // Reconnect every 2 seconds
});
const subscribedTopics = new Set();
const activeDevices = new Set(); // Keep track of active devices
client.on('connect', () => {
console.log('✅ Connected to MQTT broker');
const activeClients = new Map(); // Store active clients per hw_Id
// **Ensure re-subscriptions after reconnect**
subscribedTopics.forEach(topic => {
client.subscribe(topic, { qos: 1 }, (err) => {
if (err) {
console.error(`❌ Error resubscribing to ${topic}:`, err);
} else {
console.log(`🔄 Resubscribed to ${topic}`);
}
});
});
// Temporary client to listen for device announcements
const tempClientId = `mqtt_temp_${Math.random().toString(16).substr(2, 8)}`;
const tempClient = mqtt.connect('mqtt://35.207.198.4:1883', {
clientId: tempClientId,
clean: false,
reconnectPeriod: 2000,
});
// **Subscribe to new device announcements**
client.subscribe('water/iot-data/announce', { qos: 1 }, (err) => {
// Subscribe only to device announcements
tempClient.on('connect', () => {
console.log(`✅ Connected to MQTT broker as ${tempClientId}`);
tempClient.subscribe('water/iot-data/announce', { qos: 1 }, (err) => {
if (err) {
console.error('❌ Error subscribing to announcement topic:', err);
} else {
@ -5932,13 +5787,12 @@ client.on('connect', () => {
});
});
client.on('message', async (topic, message) => {
tempClient.on('message', async (topic, message) => {
console.log(`📩 Message received on topic ${topic}: ${message.toString()}`);
try {
const data = JSON.parse(message.toString());
// **Handle device announcements**
if (topic === 'water/iot-data/announce') {
if (!data.objects || !data.objects.hw_Id) {
console.error("❌ Invalid announcement format. Missing hw_Id.");
@ -5946,45 +5800,66 @@ client.on('message', async (topic, message) => {
}
const hw_Id = data.objects.hw_Id;
const deviceTopic = `water/iot-data/${hw_Id}`;
console.log(`🔄 Received first hw_Id: ${hw_Id}`);
// If this hw_Id already has a client, skip creating a new one
if (!activeClients.has(hw_Id)) {
console.log(`🔄 Creating new MQTT client for hw_Id: ${hw_Id}`);
activeClients.set(hw_Id, createDeviceClient(hw_Id));
}
}
} catch (err) {
console.error('❌ Error processing message:', err.message);
}
});
// Function to create a client for each device after hw_Id is known
function createDeviceClient(hw_Id) {
const clientId = `mqtt_client_${hw_Id}`;
console.log(`🔄 Connecting to MQTT with clientId: ${clientId}`);
const client = mqtt.connect('mqtt://35.207.198.4:1883', {
clientId,
clean: false,
reconnectPeriod: 2000,
});
if (!subscribedTopics.has(deviceTopic)) {
client.on('connect', () => {
console.log(`✅ Connected to MQTT broker as ${clientId}`);
const deviceTopic = `water/iot-data/${hw_Id}`;
client.subscribe(deviceTopic, { qos: 1 }, (err) => {
if (err) {
console.error(`❌ Error subscribing to ${deviceTopic}:`, err);
} else {
console.log(`✅ Subscribed to ${deviceTopic}`);
subscribedTopics.add(deviceTopic);
activeDevices.add(hw_Id);
console.log('📡 Active Devices:', Array.from(activeDevices));
// ✅ **Now also process data**
processIotData(hw_Id, data);
}
});
} else {
console.log(`🔄 Already subscribed to ${deviceTopic}, processing data.`);
processIotData(hw_Id, data);
}
return;
}
});
client.on('message', async (topic, message) => {
console.log(`📩 Message received on topic ${topic}: ${message.toString()}`);
try {
const data = JSON.parse(message.toString());
// **Process IoT Data for device topics**
if (topic.startsWith('water/iot-data/')) {
setImmediate(() => {
console.log(`🚀 Entering processIotData() for topic: ${topic}`);
const hw_Id = topic.split('/')[2];
console.log(`🚀 Processing IoT Data for topic: ${topic}`);
processIotData(hw_Id, data);
});
}
} catch (err) {
console.error('❌ Error processing message:', err.message);
}
});
});
client.on('error', (err) => console.error('❌ MQTT Error:', err));
client.on('close', () => console.log('⚠️ MQTT Connection Closed.'));
client.on('offline', () => console.log('⚠️ MQTT Broker Offline.'));
client.on('error', (err) => console.error(`❌ MQTT Error (${clientId}):`, err));
client.on('close', () => console.log(`⚠️ MQTT Connection Closed for ${clientId}`));
client.on('offline', () => console.log(`⚠️ MQTT Broker Offline for ${clientId}`));
return client;
}
async function processIotData(hw_Id, data) {
try {

Loading…
Cancel
Save