master^2
Varun 7 months ago
parent b4ece5ec63
commit 47d6c5d702

@ -5850,149 +5850,147 @@ exports.getBlockData = async (req, reply) => {
// } // }
// }); // });
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://35.207.198.4:1883');
const subscribedTopics = new Set();
client.on('connect', () => {
console.log('Connected to MQTT broker');
client.subscribe('water/iot-data/+', (err) => {
if (err) {
console.error('Error subscribing to wildcard topic:', err);
} else {
console.log('Subscribed to water/iot-data/+ wildcard topic');
}
});
});
client.on('message', async (topic, message) => {
console.log(`Message received on topic ${topic}:`, message.toString());
if (!topic.startsWith('water/iot-data/')) return;
try {
const data = JSON.parse(message.toString());
const { hw_Id, Motor_status, tanks } = data.objects;
const deviceTopic = `water/iot-data/${hw_Id}`;
if (!subscribedTopics.has(deviceTopic)) {
client.subscribe(deviceTopic, (err) => {
if (err) console.error(`Error subscribing to topic ${deviceTopic}:`, err);
else {
console.log(`Subscribed to device-specific topic: ${deviceTopic}`);
subscribedTopics.add(deviceTopic);
}
});
}
const currentDate = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm'); const mqtt = require('mqtt');
const currentTime = moment().tz('Asia/Kolkata').format('HH:mm');
// Create tank documents
const tankDocuments = tanks.map((tank) => ({
tankhardwareId: tank.Id,
tankHeight: tank.level,
date: currentDate,
time: currentTime,
})); const brokerUrl = 'mqtt://35.207.198.4:1883';
const clients = new Map(); // Store client instances dynamically per hw_Id
const deviceLastSeen = new Map(); // Track last seen timestamps for devices
// Save IoT data function createClient(hw_Id) {
const iotTankData = new IotData({ if (clients.has(hw_Id)) return clients.get(hw_Id); // Return existing client
hardwareId: hw_Id,
Motor_status,
tanks: tankDocuments,
date: currentDate,
time: currentTime,
const client = mqtt.connect(brokerUrl);
client.on('connect', () => {
console.log(`Client for ${hw_Id} connected to MQTT broker`);
const topic = `water/iot-data/${hw_Id}`;
client.subscribe(topic, { qos: 1 }, (err) => {
if (err) {
console.error(`Error subscribing to topic ${topic}:`, err);
} else {
console.log(`Subscribed to topic: ${topic}`);
}
}); });
});
await iotTankData.save(); client.on('message', async (topic, message) => {
console.log(`Message received on topic ${topic}:`, message.toString());
// Keep only the latest 3 records
const recordsToDelete = await IotData.find({ hardwareId: hw_Id })
.sort({ date: -1 })
.skip(3);
await Promise.all(recordsToDelete.map((record) => record.remove())); try {
const data = JSON.parse(message.toString());
const { hw_Id, Motor_status, tanks } = data.objects;
const currentTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm');
deviceLastSeen.set(hw_Id, new Date().toISOString());
// Save IoT data
const iotTankData = new IotData({
hardwareId: hw_Id,
Motor_status,
tanks: tanks.map((tank) => ({
tankhardwareId: tank.Id,
tankHeight: tank.level,
date: currentTime,
time: moment().tz('Asia/Kolkata').format('HH:mm'),
})),
date: currentTime,
time: moment().tz('Asia/Kolkata').format('HH:mm'),
});
// Process each tank update await iotTankData.save();
await Promise.all( console.log(`Data saved for device: ${hw_Id}`);
tanks.map(async (tank) => {
const { Id: tankhardwareId, level: tankHeight } = tank;
const existingTank = await Tank.findOne({ hardwareId: hw_Id, tankhardwareId });
if (!existingTank) return;
const customerId = existingTank.customerId; // Keep only the latest 3 records
const tank_name = existingTank.tankName; const recordsToDelete = await IotData.find({ hardwareId: hw_Id }).sort({ date: -1 }).skip(3);
if (recordsToDelete.length > 0) {
await Promise.all(recordsToDelete.map((record) => record.remove()));
}
const tankHeightInCm = parseInt(existingTank.height.replace(/,/g, ''), 10) * 30.48; // Process tanks and update status
const waterLevelHeight = tankHeightInCm - tankHeight; await Promise.all(
const waterCapacityPerCm = parseInt(existingTank.waterCapacityPerCm.replace(/,/g, ''), 10); tanks.map(async (tank) => {
const waterLevel = parseInt(waterLevelHeight * waterCapacityPerCm, 10); const existingTank = await Tank.findOne({ hardwareId: hw_Id, tankhardwareId: tank.Id });
if (!existingTank) return;
console.log(tankHeight, "Processed in IoT Data MQTT Subscription"); const tankHeightInCm = parseInt(existingTank.height.replace(/,/g, ''), 10) * 30.48;
const waterLevelHeight = tankHeightInCm - tank.level;
const waterCapacityPerCm = parseInt(existingTank.waterCapacityPerCm.replace(/,/g, ''), 10);
const waterLevel = parseInt(waterLevelHeight * waterCapacityPerCm, 10);
if (tankHeight > 0 && waterLevel >= 0) {
existingTank.waterlevel = waterLevel; existingTank.waterlevel = waterLevel;
await existingTank.save(); await existingTank.save();
console.log(`Tank data saved for ${hw_Id}, tank: ${tank.Id}`);
})
);
// Update linked tanks // Update motor status
await Promise.all( const motorTank = await Tank.findOne({ "connections.inputConnections.motor_id": hw_Id });
existingTank.connections.outputConnections.map(async (outputConnection) => { if (motorTank) {
const linkedTank = await Tank.findOne({ const inputConnection = motorTank.connections.inputConnections.find((conn) => conn.motor_id === hw_Id);
customerId, if (inputConnection) {
tankName: outputConnection.outputConnections, inputConnection.motor_status = Motor_status;
tankLocation: outputConnection.output_type, if (inputConnection.motor_stop_status === "1" && Motor_status === 2) {
}); inputConnection.motor_stop_status = "2";
if (linkedTank) { inputConnection.motor_on_type = "forced_manual";
linkedTank.connections.inputConnections.forEach((inputConnection) => { inputConnection.startTime = moment().tz('Asia/Kolkata').format('HH:mm');
if (inputConnection.inputConnections === tank_name) { }
inputConnection.water_level = waterLevel; if (inputConnection.motor_stop_status === "2" && Motor_status === 1) {
} inputConnection.motor_stop_status = "1";
}); inputConnection.stopTime = moment().tz('Asia/Kolkata').format('HH:mm');
await linkedTank.save(); }
} await motorTank.save();
}) console.log(`Motor status updated for device: ${hw_Id}`);
);
} }
})
);
// Update motor status
const motorTank = await Tank.findOne({ "connections.inputConnections.motor_id": hw_Id });
if (!motorTank) {
console.log('Motor not found for motor_id:', hw_Id);
return;
}
const inputConnection = motorTank.connections.inputConnections.find((conn) => conn.motor_id === hw_Id);
if (inputConnection) {
inputConnection.motor_status = Motor_status;
if (inputConnection.motor_stop_status === "1" && Motor_status === 2 && inputConnection.motor_on_type !== "forced_manual") {
inputConnection.motor_stop_status = "2";
inputConnection.motor_on_type = "forced_manual";
inputConnection.startTime = currentTime;
} }
} catch (err) {
console.error(`Error processing message from ${hw_Id}:`, err.message);
}
});
if (inputConnection.motor_stop_status === "2" && Motor_status === 1) { clients.set(hw_Id, client);
inputConnection.motor_stop_status = "1"; return client;
inputConnection.stopTime = currentTime; }
}
await motorTank.save(); // Subscribe to wildcard topic to detect new devices
const globalClient = mqtt.connect(brokerUrl);
globalClient.on('connect', () => {
console.log('Connected to MQTT broker');
globalClient.subscribe('water/iot-data/+', { qos: 1 }, (err) => {
if (err) {
console.error('Error subscribing to wildcard topic:', err);
} else {
console.log('Subscribed to wildcard topic: water/iot-data/+');
} }
});
});
console.log('Successfully processed data for hardwareId:', hw_Id); globalClient.on('message', (topic, message) => {
try {
const data = JSON.parse(message.toString());
const { hw_Id } = data.objects;
if (!clients.has(hw_Id)) {
console.log(`Creating new MQTT client for device: ${hw_Id}`);
createClient(hw_Id);
}
} catch (err) { } catch (err) {
console.error('Error processing message:', err.message); console.error('Error parsing global message:', err.message);
} }
}); });
// Periodically check for offline devices
setInterval(() => {
const now = new Date();
deviceLastSeen.forEach((lastSeen, hw_Id) => {
const lastSeenDate = new Date(lastSeen);
const diffInSeconds = (now - lastSeenDate) / 1000;
if (diffInSeconds > 60) {
console.log(`🚨 Device ${hw_Id} is offline. Last seen: ${lastSeen}`);
}
});
}, 60000);

Loading…
Cancel
Save