master^2
Varun 8 months ago
parent a3480c5d67
commit 22bb6a4bc4

@ -5854,6 +5854,7 @@ exports.getBlockData = async (req, reply) => {
const mqtt = require('mqtt');
const brokerUrl = 'mqtt://35.207.198.4:1883';
const deviceLastSeen = new Map(); // Track last seen timestamps for devices
@ -5864,6 +5865,7 @@ const mqttClient = mqtt.connect(brokerUrl, {
reconnectPeriod: 5000, // Reconnect every 5 seconds
});
// Event: When the MQTT client connects
mqttClient.on('connect', () => {
console.log('🌎 Global MQTT client connected');
@ -5877,17 +5879,23 @@ mqttClient.on('connect', () => {
});
});
// Event: When a message is received
mqttClient.on('message', async (topic, message) => {
try {
console.log(`📩 Message received on topic ${topic}:`, message.toString());
// Parse the message
const data = JSON.parse(message.toString());
const { hw_Id, Motor_status, tanks } = data.objects;
// Log the device ID and current time
const currentTime = moment().tz('Asia/Kolkata').format('DD-MMM-YYYY - HH:mm');
console.log(`📅 Device ${hw_Id} last seen at: ${currentTime}`);
// Update the last seen timestamp for the device
deviceLastSeen.set(hw_Id, new Date().toISOString());
// Save IoT data
// Save IoT data to the database
const iotTankData = new IotData({
hardwareId: hw_Id,
Motor_status,
@ -5904,18 +5912,22 @@ mqttClient.on('message', async (topic, message) => {
await iotTankData.save();
console.log(`✅ Data saved for device: ${hw_Id}`);
// Keep only the latest 3 records
// Keep only the latest 3 records for the device
const records = await IotData.find({ hardwareId: hw_Id }).sort({ date: -1 });
if (records.length > 3) {
const recordsToDelete = records.slice(3); // Get older records
await Promise.all(recordsToDelete.map((record) => record.remove()));
console.log(`🧹 Deleted older records for device: ${hw_Id}`);
}
// Process tanks and update status
await Promise.all(
tanks.map(async (tank) => {
const existingTank = await Tank.findOne({ hardwareId: hw_Id, tankhardwareId: tank.Id });
if (!existingTank) return;
if (!existingTank) {
console.log(`⚠️ Tank not found for device: ${hw_Id}, tank: ${tank.Id}`);
return;
}
const tankHeightInCm = parseInt(existingTank.height.replace(/,/g, ''), 10) * 30.48;
const waterLevelHeight = tankHeightInCm - tank.level;
@ -5952,10 +5964,12 @@ mqttClient.on('message', async (topic, message) => {
}
});
// Event: When the MQTT client encounters an error
mqttClient.on('error', (err) => {
console.error('❌ MQTT Client Error:', err);
});
// Event: When the MQTT client disconnects
mqttClient.on('disconnect', () => {
console.log('⚠️ Global MQTT client disconnected');
});
@ -5970,10 +5984,16 @@ setInterval(() => {
console.log(`🚨 Device ${hw_Id} is offline. Last seen: ${lastSeen}`);
}
});
}, 60000);
}, 60000); // Check every 60 seconds
// Handle application shutdown gracefully
process.on('SIGINT', () => {
console.log('🛑 Application is shutting down...');
mqttClient.end(() => {
console.log('🚪 MQTT client disconnected');
process.exit(0);
});
});

Loading…
Cancel
Save