RabbitMQ에서 MQTT 데이터를 여러 Subscriber가 동시에 받아 중복 수신되는 문제를 방지하는 방법은 다음과 같습니다.
🚀 해결 방법 요약
해결 방법 설명 적용 대상 동시 수신 방지 효과
1. Exclusive Queue 사용 | 한 Consumer만 메시지를 소비 가능 | 단일 Consumer | ✅ 완벽 |
2. Shared Subscription (RabbitMQ 3.8+ 지원) | 여러 Subscriber가 한 그룹을 공유하여 로드 밸런싱 | RabbitMQ MQTT Plugin | ✅✅ |
3. Work Queue 방식 (Competing Consumers) | 여러 Subscriber가 동일한 Queue에서 메시지를 나눠 처리 | Node.js Consumer | ✅✅✅ |
4. 메시지 ID 기반 중복 방지 (Redis, DB 활용) | 메시지 ID를 저장하고 중복 메시지 무시 | Node.js 애플리케이션 | ✅✅✅ |
✅ 1. Exclusive Queue 사용 (단일 Consumer만 메시지 수신)
RabbitMQ에서 Exclusive Queue를 설정하면 단 하나의 Subscriber만 해당 Queue에서 메시지를 받을 수 있음.
📌 Node.js MQTT Consumer 예제
const amqp = require('amqplib');
async function startConsumer() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'mqtt_queue';
// Exclusive Queue: 한 개의 Consumer만 메시지를 처리할 수 있도록 설정
const q = await channel.assertQueue(queue, { exclusive: true });
console.log(`Waiting for messages in ${q.queue}`);
channel.consume(q.queue, (msg) => {
if (msg !== null) {
console.log("Received:", msg.content.toString());
channel.ack(msg); // 메시지 처리 완료
}
});
}
startConsumer();
✅ 이 설정을 적용하면 동일한 메시지를 여러 Subscriber가 동시에 받지 않음.
❌ 하지만 하나의 Consumer만 존재할 수 있어 확장성이 떨어짐.
✅ 2. RabbitMQ Shared Subscription 사용 (MQTT 3.1.1 / 5.0)
RabbitMQ의 Shared Subscription 기능을 사용하면, 여러 Subscriber가 같은 그룹을 공유하여 메시지를 로드밸런싱할 수 있음.
📌 RabbitMQ 설정 (Shared Subscription 활성화)
rabbitmq-plugins enable rabbitmq_mqtt
rabbitmqctl set_parameter mqtt shared_subscription_policy '{"pattern": "^mqtt_topic/.*", "definition": {"group_by": "topic"}}'
- MQTT shared_subscription_policy를 설정하여 Subscriber가 동일한 그룹으로 메시지를 공유하도록 설정.
📌 Node.js MQTT Consumer 예제
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost');
client.on('connect', () => {
client.subscribe('$share/group1/mqtt_topic/data'); // Shared Subscription 사용
console.log('Subscribed to MQTT topic');
});
client.on('message', (topic, message) => {
console.log(`Received from ${topic}: ${message.toString()}`);
});
✅ 이제 동일한 데이터를 여러 Subscriber가 받지 않고, 하나의 Subscriber만 가져가도록 분배됨.
📌 RabbitMQ 3.8+ 버전에서만 지원됨.
✅ 3. Work Queue 방식 (Competing Consumers)
RabbitMQ에서 일반적인 Work Queue 패턴을 사용하면 여러 개의 Subscriber가 동일한 Queue에서 메시지를 나눠 처리할 수 있음.
📌 Node.js Producer (RabbitMQ에 MQTT 메시지 전달)
const amqp = require('amqplib');
async function publishMessage(message) {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'mqtt_work_queue';
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
console.log(`Sent: ${message}`);
setTimeout(() => {
connection.close();
}, 500);
}
publishMessage("MQTT Data Message");
📌 Node.js Consumer (Work Queue 방식)
const amqp = require('amqplib');
async function startConsumer() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'mqtt_work_queue';
await channel.assertQueue(queue, { durable: true });
console.log(`Waiting for messages in ${queue}`);
channel.consume(queue, (msg) => {
if (msg !== null) {
console.log("Processing:", msg.content.toString());
channel.ack(msg); // 메시지 처리 완료
}
});
}
startConsumer();
✅ 여러 Consumer가 동일한 메시지를 받지 않고, 메시지를 나누어 처리하게 됨.
✅ 확장성이 뛰어나고, Consumer 수가 증가하면 자동으로 로드밸런싱.
✅ 4. 메시지 ID 기반 중복 방지 (Redis, DB 활용)
만약 RabbitMQ가 여러 번 메시지를 전송할 가능성이 있다면, 애플리케이션 레벨에서 중복 필터링을 추가할 수도 있음.
📌 Node.js + Redis 중복 필터링
const redis = require('redis');
const amqp = require('amqplib');
const redisClient = redis.createClient();
async function startConsumer() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'mqtt_queue';
await channel.assertQueue(queue, { durable: true });
channel.consume(queue, async (msg) => {
if (msg !== null) {
const messageId = msg.properties.messageId || msg.content.toString().slice(0, 10); // 메시지 ID 추출
// Redis에서 중복 체크
redisClient.get(messageId, (err, reply) => {
if (!reply) {
// 메시지 처리
console.log("Processing:", msg.content.toString());
// 메시지 ID 저장 (TTL 10초)
redisClient.setex(messageId, 10, "processed");
// 메시지 ACK (처리 완료)
channel.ack(msg);
} else {
console.log("Duplicate message ignored:", messageId);
}
});
}
});
}
startConsumer();
✅ Redis를 활용하여 일정 시간 동안 동일한 메시지가 처리되지 않도록 방지.
✅ MQTT가 중복 메시지를 보낼 가능성이 있거나, 브로커 설정으로 방지할 수 없는 경우 유용.
🎯 최종 정리: 어떤 방법을 선택해야 할까?
해결 방법 추천 상황 구현 난이도 효과
Exclusive Queue | 하나의 Consumer만 메시지를 처리해야 할 경우 | 쉬움 | ✅ |
Shared Subscription (MQTT 3.1.1 / 5.0) | 여러 Consumer가 동일한 메시지를 받지 않고 로드 밸런싱해야 할 경우 | 중간 | ✅✅ |
Work Queue (Competing Consumers) | 여러 Consumer가 메시지를 나눠 처리해야 할 경우 | 쉬움 | ✅✅✅ |
Redis 기반 중복 필터링 | 동일한 메시지가 여러 번 처리되지 않도록 방지 | 중간 | ✅✅✅ |
🚀 결론
- RabbitMQ 3.8+ 사용 가능하면 "Shared Subscription" 활용 ($share/group/mqtt_topic/data)
- Work Queue 방식 적용하면 확장성이 뛰어나고 Consumer가 자동 분배됨
- Redis를 활용하여 중복 메시지를 필터링하면 더욱 안정적
- Exclusive Queue는 단일 Consumer만 사용할 경우 고려 가능
'AI' 카테고리의 다른 글
RabbitMQ는 MQTT 브로커 역할과 AMQP 역할을 동시에 할 수 있는가 (0) | 2025.02.22 |
---|---|
SageMaker : Bring your own Algorithms (0) | 2020.05.24 |
추천시스템 맛보기 4탄 (0) | 2019.10.15 |
추천시스템 맛보기 3탄 (0) | 2019.10.13 |
추천시스템 맛보기 2탄 (0) | 2019.10.07 |
추천시스템 맛보기 1탄 (0) | 2019.10.03 |
머신러닝 맛보기 8탄 (0) | 2019.05.27 |
머신러닝 맛보기 7탄 (0) | 2019.05.09 |
댓글