본문 바로가기
AI

Kafka Consumer 성능향상

by david100gom 2025. 3. 21.
728x90

Kafka Consumer Thread(하나의 스레드)에 멀티 스레드를 추가하여 성능을 향상시키는 방법은 몇 가지가 있습니다. 일반적으로 Kafka의 poll() 메서드는 단일 스레드에서 실행해야 하지만, 그 후의 데이터 처리는 멀티 스레드로 확장할 수 있습니다.


1. 기본 개념

  • Kafka Consumer Thread(단일 스레드): poll()을 실행하고 메시지를 가져옴.
  • 멀티 스레드(Worker Threads): 가져온 메시지를 병렬로 처리.

💡 Kafka Consumer는 병렬 처리를 위해 poll()을 여러 개의 스레드에서 실행할 수 없음.
➡ 따라서 한 개의 Consumer Thread에서 poll()을 실행하고, 데이터를 Worker Threads로 분배하는 방식이 필요함.


2. 구현 방식

(1) Consumer Thread + Worker Threads 모델

  • Kafka Consumer Thread (poll() 실행)
  • Worker Threads (실제 메시지 처리 담당, 멀티 스레드로 실행)

구조

Kafka Consumer Thread 1개
   ├── Worker Thread 1 (데이터 처리)
   ├── Worker Thread 2 (데이터 처리)
   ├── Worker Thread 3 (데이터 처리)
   ├── ...

(2) 장점

poll()은 단일 스레드로 유지 → Kafka Consumer 안정성 확보
Worker Threads에서 병렬로 메시지 처리 → 성능 향상 가능


3. 예제 코드 (ExecutorService 활용)

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiThreadedKafkaConsumer {
    private static final int WORKER_THREADS = 4; // Worker Thread 개수

    public static void main(String[] args) {
        // Kafka Consumer 설정
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "true");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));

        // Worker Thread Pool 생성
        ExecutorService executor = Executors.newFixedThreadPool(WORKER_THREADS);

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    executor.submit(() -> processRecord(record)); // Worker Thread에서 처리
                }
            }
        } finally {
            consumer.close();
            executor.shutdown();
        }
    }

    // 실제 데이터를 처리하는 메서드
    private static void processRecord(ConsumerRecord<String, String> record) {
        System.out.println("Processing record: " + record.value() + " on thread " + Thread.currentThread().getName());
    }
}

4. 동작 방식

  1. Kafka Consumer Thread (poll() 실행)
    • consumer.poll()을 단일 스레드에서 실행하여 데이터를 가져옴.
  2. Worker Threads에서 데이터 처리
    • 가져온 데이터를 ExecutorService의 Worker Threads에 분배하여 병렬 처리.
  3. Worker Threads에서 병렬 작업 수행
    • 여러 Worker Threads가 데이터를 동시에 처리하여 성능을 향상.

5. 고려해야 할 점

(1) Consumer Commit 전략

  • enable.auto.commit=true를 사용하면 poll()이 호출될 때 자동으로 오프셋을 커밋하므로, 일부 메시지가 유실될 가능성이 있음.
  • 수동 커밋을 원할 경우:
    props.put("enable.auto.commit", "false");
    
    • Worker Thread에서 데이터 처리가 완료된 후 수동으로 consumer.commitSync(); 호출.

(2) 데이터 처리 순서 보장

  • Kafka Consumer는 기본적으로 파티션 순서를 유지하지만, Worker Thread를 사용하면 순서가 뒤섞일 수 있음.
  • 순서가 중요한 경우:
    • 특정 파티션의 데이터를 동일한 Worker Thread에 할당하는 방법 사용.
    • 배치 단위 처리를 고려.

(3) Worker Thread 개수 조정

  • CPU 코어 개수에 맞춰 설정하는 것이 좋음.
  • 예를 들어, CPU 코어 수가 4개라면 Worker Threads = 4가 적절.

6. 결론

Kafka Consumer Thread에서 poll() 실행 후, Worker Thread에서 병렬 처리하는 방식이 최적.
Consumer Thread를 여러 개 실행하는 것보다 안정적이며, Kafka의 동작 원리에 적합.
Worker Thread 개수는 CPU 코어와 처리량을 고려하여 설정.
오프셋 커밋 방식(auto commit vs manual commit)과 데이터 처리 순서도 고려해야 함.

728x90

댓글