본문 바로가기
AI

Node.js에서 RabbitMQ를 통해 MQTT 데이터 Subscribe 시 동일한 데이터가 여러 Subscriber에게 수신되지 않도록 하는 방법

by david100gom 2025. 2. 22.

RabbitMQ에서 MQTT 데이터를 여러 Subscriber가 동시에 받아 중복 수신되는 문제를 방지하는 방법은 다음과 같습니다.


🚀 해결 방법 요약

해결 방법 설명 적용 대상 동시 수신 방지 효과

1. Exclusive Queue 사용 한 Consumer만 메시지를 소비 가능 단일 Consumer ✅ 완벽
2. Shared Subscription (RabbitMQ 3.8+ 지원) 여러 Subscriber가 한 그룹을 공유하여 로드 밸런싱 RabbitMQ MQTT Plugin ✅✅
3. Work Queue 방식 (Competing Consumers) 여러 Subscriber가 동일한 Queue에서 메시지를 나눠 처리 Node.js Consumer ✅✅✅
4. 메시지 ID 기반 중복 방지 (Redis, DB 활용) 메시지 ID를 저장하고 중복 메시지 무시 Node.js 애플리케이션 ✅✅✅

✅ 1. Exclusive Queue 사용 (단일 Consumer만 메시지 수신)

RabbitMQ에서 Exclusive Queue를 설정하면 단 하나의 Subscriber만 해당 Queue에서 메시지를 받을 수 있음.

📌 Node.js MQTT Consumer 예제

const amqp = require('amqplib');

async function startConsumer() {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const queue = 'mqtt_queue';

    // Exclusive Queue: 한 개의 Consumer만 메시지를 처리할 수 있도록 설정
    const q = await channel.assertQueue(queue, { exclusive: true });

    console.log(`Waiting for messages in ${q.queue}`);

    channel.consume(q.queue, (msg) => {
        if (msg !== null) {
            console.log("Received:", msg.content.toString());
            channel.ack(msg); // 메시지 처리 완료
        }
    });
}

startConsumer();

이 설정을 적용하면 동일한 메시지를 여러 Subscriber가 동시에 받지 않음.
❌ 하지만 하나의 Consumer만 존재할 수 있어 확장성이 떨어짐.


✅ 2. RabbitMQ Shared Subscription 사용 (MQTT 3.1.1 / 5.0)

RabbitMQ의 Shared Subscription 기능을 사용하면, 여러 Subscriber가 같은 그룹을 공유하여 메시지를 로드밸런싱할 수 있음.

📌 RabbitMQ 설정 (Shared Subscription 활성화)

rabbitmq-plugins enable rabbitmq_mqtt
rabbitmqctl set_parameter mqtt shared_subscription_policy '{"pattern": "^mqtt_topic/.*", "definition": {"group_by": "topic"}}'
  • MQTT shared_subscription_policy를 설정하여 Subscriber가 동일한 그룹으로 메시지를 공유하도록 설정.

📌 Node.js MQTT Consumer 예제

const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost');

client.on('connect', () => {
    client.subscribe('$share/group1/mqtt_topic/data');  // Shared Subscription 사용
    console.log('Subscribed to MQTT topic');
});

client.on('message', (topic, message) => {
    console.log(`Received from ${topic}: ${message.toString()}`);
});

이제 동일한 데이터를 여러 Subscriber가 받지 않고, 하나의 Subscriber만 가져가도록 분배됨.

📌 RabbitMQ 3.8+ 버전에서만 지원됨.


✅ 3. Work Queue 방식 (Competing Consumers)

RabbitMQ에서 일반적인 Work Queue 패턴을 사용하면 여러 개의 Subscriber가 동일한 Queue에서 메시지를 나눠 처리할 수 있음.

📌 Node.js Producer (RabbitMQ에 MQTT 메시지 전달)

const amqp = require('amqplib');

async function publishMessage(message) {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();
    
    const queue = 'mqtt_work_queue';

    await channel.assertQueue(queue, { durable: true });

    channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
    console.log(`Sent: ${message}`);

    setTimeout(() => {
        connection.close();
    }, 500);
}

publishMessage("MQTT Data Message");

📌 Node.js Consumer (Work Queue 방식)

const amqp = require('amqplib');

async function startConsumer() {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();
    
    const queue = 'mqtt_work_queue';

    await channel.assertQueue(queue, { durable: true });

    console.log(`Waiting for messages in ${queue}`);

    channel.consume(queue, (msg) => {
        if (msg !== null) {
            console.log("Processing:", msg.content.toString());
            channel.ack(msg); // 메시지 처리 완료
        }
    });
}

startConsumer();

여러 Consumer가 동일한 메시지를 받지 않고, 메시지를 나누어 처리하게 됨.
확장성이 뛰어나고, Consumer 수가 증가하면 자동으로 로드밸런싱.


✅ 4. 메시지 ID 기반 중복 방지 (Redis, DB 활용)

만약 RabbitMQ가 여러 번 메시지를 전송할 가능성이 있다면, 애플리케이션 레벨에서 중복 필터링을 추가할 수도 있음.

📌 Node.js + Redis 중복 필터링

const redis = require('redis');
const amqp = require('amqplib');

const redisClient = redis.createClient();

async function startConsumer() {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();
    const queue = 'mqtt_queue';

    await channel.assertQueue(queue, { durable: true });

    channel.consume(queue, async (msg) => {
        if (msg !== null) {
            const messageId = msg.properties.messageId || msg.content.toString().slice(0, 10); // 메시지 ID 추출

            // Redis에서 중복 체크
            redisClient.get(messageId, (err, reply) => {
                if (!reply) {
                    // 메시지 처리
                    console.log("Processing:", msg.content.toString());

                    // 메시지 ID 저장 (TTL 10초)
                    redisClient.setex(messageId, 10, "processed");

                    // 메시지 ACK (처리 완료)
                    channel.ack(msg);
                } else {
                    console.log("Duplicate message ignored:", messageId);
                }
            });
        }
    });
}

startConsumer();

Redis를 활용하여 일정 시간 동안 동일한 메시지가 처리되지 않도록 방지.
MQTT가 중복 메시지를 보낼 가능성이 있거나, 브로커 설정으로 방지할 수 없는 경우 유용.


🎯 최종 정리: 어떤 방법을 선택해야 할까?

해결 방법 추천 상황 구현 난이도 효과

Exclusive Queue 하나의 Consumer만 메시지를 처리해야 할 경우 쉬움
Shared Subscription (MQTT 3.1.1 / 5.0) 여러 Consumer가 동일한 메시지를 받지 않고 로드 밸런싱해야 할 경우 중간 ✅✅
Work Queue (Competing Consumers) 여러 Consumer가 메시지를 나눠 처리해야 할 경우 쉬움 ✅✅✅
Redis 기반 중복 필터링 동일한 메시지가 여러 번 처리되지 않도록 방지 중간 ✅✅✅

🚀 결론

  1. RabbitMQ 3.8+ 사용 가능하면 "Shared Subscription" 활용 ($share/group/mqtt_topic/data)
  2. Work Queue 방식 적용하면 확장성이 뛰어나고 Consumer가 자동 분배됨
  3. Redis를 활용하여 중복 메시지를 필터링하면 더욱 안정적
  4. Exclusive Queue는 단일 Consumer만 사용할 경우 고려 가능

 

댓글