master^2
Varun 7 months ago
parent 5d54fce8f7
commit a3480c5d67

@ -5855,35 +5855,32 @@ exports.getBlockData = async (req, reply) => {
const mqtt = require('mqtt'); const mqtt = require('mqtt');
const brokerUrl = 'mqtt://35.207.198.4:1883'; const brokerUrl = 'mqtt://35.207.198.4:1883';
const clients = new Map(); // Store client instances dynamically per hw_Id
const deviceLastSeen = new Map(); // Track last seen timestamps for devices const deviceLastSeen = new Map(); // Track last seen timestamps for devices
function createClient(hw_Id) { // Connect a single global MQTT client
if (clients.has(hw_Id)) return clients.get(hw_Id); // Return existing client const mqttClient = mqtt.connect(brokerUrl, {
clientId: 'global_subscriber',
const client = mqtt.connect(brokerUrl, { clean: true,
clientId: `client_${hw_Id}`, reconnectPeriod: 5000, // Reconnect every 5 seconds
clean: false, // Ensures session persistence });
reconnectPeriod: 5000, // Attempt reconnect every 5 seconds
});
client.on('connect', () => { mqttClient.on('connect', () => {
console.log(`✅ Client for ${hw_Id} connected to MQTT broker`); console.log('🌎 Global MQTT client connected');
const topic = `water/iot-data/${hw_Id}`; // Subscribe to all IoT data topics
client.subscribe(topic, { qos: 1 }, (err) => { mqttClient.subscribe('water/iot-data/+', { qos: 1 }, (err) => {
if (err) { if (err) {
console.error(`❌ Error subscribing to topic ${topic}:`, err); console.error('❌ Error subscribing to wildcard topic:', err);
} else { } else {
console.log(`📡 Subscribed to topic: ${topic}`); console.log('📡 Subscribed to wildcard topic: water/iot-data/+');
} }
}); });
}); });
client.on('message', async (topic, message) => { mqttClient.on('message', async (topic, message) => {
try {
console.log(`📩 Message received on topic ${topic}:`, message.toString()); console.log(`📩 Message received on topic ${topic}:`, message.toString());
try {
const data = JSON.parse(message.toString()); const data = JSON.parse(message.toString());
const { hw_Id, Motor_status, tanks } = data.objects; const { hw_Id, Motor_status, tanks } = data.objects;
@ -5927,7 +5924,7 @@ function createClient(hw_Id) {
existingTank.waterlevel = waterLevel; existingTank.waterlevel = waterLevel;
await existingTank.save(); await existingTank.save();
console.log(`✅ Tank data saved for ${hw_Id}, tank: ${tank.Id}`); console.log(`✅ Tank data updated for ${hw_Id}, tank: ${tank.Id}`);
}) })
); );
@ -5951,50 +5948,16 @@ function createClient(hw_Id) {
} }
} }
} catch (err) { } catch (err) {
console.error(`❌ Error processing message from ${hw_Id}:`, err.message); console.error('❌ Error processing message:', err.message);
} }
});
client.on('error', (err) => {
console.error(`❌ MQTT Error for ${hw_Id}:`, err);
});
client.on('disconnect', () => {
console.log(`⚠️ Client for ${hw_Id} disconnected`);
});
clients.set(hw_Id, client);
return client;
}
// Subscribe to wildcard topic to detect new devices
const globalClient = mqtt.connect(brokerUrl, {
clientId: 'global_subscriber',
clean: false,
}); });
globalClient.on('connect', () => { mqttClient.on('error', (err) => {
console.log('🌎 Global client connected to MQTT broker'); console.error('❌ MQTT Client Error:', err);
globalClient.subscribe('water/iot-data/+', { qos: 1 }, (err) => {
if (err) {
console.error('❌ Error subscribing to wildcard topic:', err);
} else {
console.log('📡 Subscribed to wildcard topic: water/iot-data/+');
}
});
}); });
globalClient.on('message', (topic, message) => { mqttClient.on('disconnect', () => {
try { console.log('⚠️ Global MQTT client disconnected');
const data = JSON.parse(message.toString());
const { hw_Id } = data.objects;
if (!clients.has(hw_Id)) {
console.log(` Creating new MQTT client for device: ${hw_Id}`);
createClient(hw_Id);
}
} catch (err) {
console.error('❌ Error parsing global message:', err.message);
}
}); });
// Periodically check for offline devices // Periodically check for offline devices
@ -6012,30 +5975,6 @@ setInterval(() => {
exports.getPendingAndCompletedsurveyOfparticularInstaller = async (request, reply) => {
try {
const { installationId } = request.params;
const survey_status = request.body;
const surveydata = await User.find({
installationId,
survey_status,
});
// Send the response, including both total consumption and filtered consumption records
reply.send({
status_code: 200,
surveydata,
});
} catch (err) {
throw boom.boomify(err);
}
};

Loading…
Cancel
Save