master^2
Varun 9 months ago
parent 47d6c5d702
commit 5d54fce8f7

@ -5854,7 +5854,6 @@ exports.getBlockData = async (req, reply) => {
const mqtt = require('mqtt'); const mqtt = require('mqtt');
const brokerUrl = 'mqtt://35.207.198.4:1883'; const brokerUrl = 'mqtt://35.207.198.4:1883';
const clients = new Map(); // Store client instances dynamically per hw_Id const clients = new Map(); // Store client instances dynamically per hw_Id
const deviceLastSeen = new Map(); // Track last seen timestamps for devices const deviceLastSeen = new Map(); // Track last seen timestamps for devices
@ -5862,22 +5861,27 @@ const deviceLastSeen = new Map(); // Track last seen timestamps for devices
function createClient(hw_Id) { function createClient(hw_Id) {
if (clients.has(hw_Id)) return clients.get(hw_Id); // Return existing client if (clients.has(hw_Id)) return clients.get(hw_Id); // Return existing client
const client = mqtt.connect(brokerUrl); const client = mqtt.connect(brokerUrl, {
clientId: `client_${hw_Id}`,
clean: false, // Ensures session persistence
reconnectPeriod: 5000, // Attempt reconnect every 5 seconds
});
client.on('connect', () => { client.on('connect', () => {
console.log(`Client for ${hw_Id} connected to MQTT broker`); console.log(`✅ Client for ${hw_Id} connected to MQTT broker`);
const topic = `water/iot-data/${hw_Id}`; const topic = `water/iot-data/${hw_Id}`;
client.subscribe(topic, { qos: 1 }, (err) => { client.subscribe(topic, { qos: 1 }, (err) => {
if (err) { if (err) {
console.error(`Error subscribing to topic ${topic}:`, err); console.error(`Error subscribing to topic ${topic}:`, err);
} else { } else {
console.log(`Subscribed to topic: ${topic}`); console.log(`📡 Subscribed to topic: ${topic}`);
} }
}); });
}); });
client.on('message', async (topic, message) => { client.on('message', async (topic, message) => {
console.log(`Message received on topic ${topic}:`, message.toString()); console.log(`📩 Message received on topic ${topic}:`, message.toString());
try { try {
const data = JSON.parse(message.toString()); const data = JSON.parse(message.toString());
@ -5901,11 +5905,12 @@ function createClient(hw_Id) {
}); });
await iotTankData.save(); await iotTankData.save();
console.log(`Data saved for device: ${hw_Id}`); console.log(`Data saved for device: ${hw_Id}`);
// Keep only the latest 3 records // Keep only the latest 3 records
const recordsToDelete = await IotData.find({ hardwareId: hw_Id }).sort({ date: -1 }).skip(3); const records = await IotData.find({ hardwareId: hw_Id }).sort({ date: -1 });
if (recordsToDelete.length > 0) { if (records.length > 3) {
const recordsToDelete = records.slice(3); // Get older records
await Promise.all(recordsToDelete.map((record) => record.remove())); await Promise.all(recordsToDelete.map((record) => record.remove()));
} }
@ -5922,7 +5927,7 @@ function createClient(hw_Id) {
existingTank.waterlevel = waterLevel; existingTank.waterlevel = waterLevel;
await existingTank.save(); await existingTank.save();
console.log(`Tank data saved for ${hw_Id}, tank: ${tank.Id}`); console.log(`Tank data saved for ${hw_Id}, tank: ${tank.Id}`);
}) })
); );
@ -5942,27 +5947,39 @@ function createClient(hw_Id) {
inputConnection.stopTime = moment().tz('Asia/Kolkata').format('HH:mm'); inputConnection.stopTime = moment().tz('Asia/Kolkata').format('HH:mm');
} }
await motorTank.save(); await motorTank.save();
console.log(`Motor status updated for device: ${hw_Id}`); console.log(`Motor status updated for device: ${hw_Id}`);
} }
} }
} catch (err) { } catch (err) {
console.error(`Error processing message from ${hw_Id}:`, err.message); console.error(`Error processing message from ${hw_Id}:`, err.message);
} }
}); });
client.on('error', (err) => {
console.error(`❌ MQTT Error for ${hw_Id}:`, err);
});
client.on('disconnect', () => {
console.log(`⚠️ Client for ${hw_Id} disconnected`);
});
clients.set(hw_Id, client); clients.set(hw_Id, client);
return client; return client;
} }
// Subscribe to wildcard topic to detect new devices // Subscribe to wildcard topic to detect new devices
const globalClient = mqtt.connect(brokerUrl); const globalClient = mqtt.connect(brokerUrl, {
clientId: 'global_subscriber',
clean: false,
});
globalClient.on('connect', () => { globalClient.on('connect', () => {
console.log('Connected to MQTT broker'); console.log('🌎 Global client connected to MQTT broker');
globalClient.subscribe('water/iot-data/+', { qos: 1 }, (err) => { globalClient.subscribe('water/iot-data/+', { qos: 1 }, (err) => {
if (err) { if (err) {
console.error('Error subscribing to wildcard topic:', err); console.error('Error subscribing to wildcard topic:', err);
} else { } else {
console.log('Subscribed to wildcard topic: water/iot-data/+'); console.log('📡 Subscribed to wildcard topic: water/iot-data/+');
} }
}); });
}); });
@ -5972,11 +5989,11 @@ globalClient.on('message', (topic, message) => {
const data = JSON.parse(message.toString()); const data = JSON.parse(message.toString());
const { hw_Id } = data.objects; const { hw_Id } = data.objects;
if (!clients.has(hw_Id)) { if (!clients.has(hw_Id)) {
console.log(`Creating new MQTT client for device: ${hw_Id}`); console.log(` Creating new MQTT client for device: ${hw_Id}`);
createClient(hw_Id); createClient(hw_Id);
} }
} catch (err) { } catch (err) {
console.error('Error parsing global message:', err.message); console.error('Error parsing global message:', err.message);
} }
}); });

Loading…
Cancel
Save