728x90
Kafka Consumer Thread(하나의 스레드)에 멀티 스레드를 추가하여 성능을 향상시키는 방법은 몇 가지가 있습니다. 일반적으로 Kafka의 poll() 메서드는 단일 스레드에서 실행해야 하지만, 그 후의 데이터 처리는 멀티 스레드로 확장할 수 있습니다.
1. 기본 개념
- Kafka Consumer Thread(단일 스레드): poll()을 실행하고 메시지를 가져옴.
- 멀티 스레드(Worker Threads): 가져온 메시지를 병렬로 처리.
💡 Kafka Consumer는 병렬 처리를 위해 poll()을 여러 개의 스레드에서 실행할 수 없음.
➡ 따라서 한 개의 Consumer Thread에서 poll()을 실행하고, 데이터를 Worker Threads로 분배하는 방식이 필요함.
2. 구현 방식
(1) Consumer Thread + Worker Threads 모델
- Kafka Consumer Thread (poll() 실행)
- Worker Threads (실제 메시지 처리 담당, 멀티 스레드로 실행)
구조
Kafka Consumer Thread 1개
├── Worker Thread 1 (데이터 처리)
├── Worker Thread 2 (데이터 처리)
├── Worker Thread 3 (데이터 처리)
├── ...
(2) 장점
✅ poll()은 단일 스레드로 유지 → Kafka Consumer 안정성 확보
✅ Worker Threads에서 병렬로 메시지 처리 → 성능 향상 가능
3. 예제 코드 (ExecutorService 활용)
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MultiThreadedKafkaConsumer {
private static final int WORKER_THREADS = 4; // Worker Thread 개수
public static void main(String[] args) {
// Kafka Consumer 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "true");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
// Worker Thread Pool 생성
ExecutorService executor = Executors.newFixedThreadPool(WORKER_THREADS);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> processRecord(record)); // Worker Thread에서 처리
}
}
} finally {
consumer.close();
executor.shutdown();
}
}
// 실제 데이터를 처리하는 메서드
private static void processRecord(ConsumerRecord<String, String> record) {
System.out.println("Processing record: " + record.value() + " on thread " + Thread.currentThread().getName());
}
}
4. 동작 방식
- Kafka Consumer Thread (poll() 실행)
- consumer.poll()을 단일 스레드에서 실행하여 데이터를 가져옴.
- Worker Threads에서 데이터 처리
- 가져온 데이터를 ExecutorService의 Worker Threads에 분배하여 병렬 처리.
- Worker Threads에서 병렬 작업 수행
- 여러 Worker Threads가 데이터를 동시에 처리하여 성능을 향상.
5. 고려해야 할 점
(1) Consumer Commit 전략
- enable.auto.commit=true를 사용하면 poll()이 호출될 때 자동으로 오프셋을 커밋하므로, 일부 메시지가 유실될 가능성이 있음.
- 수동 커밋을 원할 경우:
props.put("enable.auto.commit", "false");
- Worker Thread에서 데이터 처리가 완료된 후 수동으로 consumer.commitSync(); 호출.
(2) 데이터 처리 순서 보장
- Kafka Consumer는 기본적으로 파티션 순서를 유지하지만, Worker Thread를 사용하면 순서가 뒤섞일 수 있음.
- 순서가 중요한 경우:
- 특정 파티션의 데이터를 동일한 Worker Thread에 할당하는 방법 사용.
- 배치 단위 처리를 고려.
(3) Worker Thread 개수 조정
- CPU 코어 개수에 맞춰 설정하는 것이 좋음.
- 예를 들어, CPU 코어 수가 4개라면 Worker Threads = 4가 적절.
6. 결론
✔ Kafka Consumer Thread에서 poll() 실행 후, Worker Thread에서 병렬 처리하는 방식이 최적.
✔ Consumer Thread를 여러 개 실행하는 것보다 안정적이며, Kafka의 동작 원리에 적합.
✔ Worker Thread 개수는 CPU 코어와 처리량을 고려하여 설정.
✔ 오프셋 커밋 방식(auto commit vs manual commit)과 데이터 처리 순서도 고려해야 함.
728x90
'AI' 카테고리의 다른 글
Trino(트리노) 설치 (1) | 2025.03.20 |
---|---|
온도 데이터를 활용한 예측 시스템 (0) | 2025.03.14 |
Precise Throughput Timer를 사용하여 TPS(초당 요청 수) 를 조절하는 방법 (0) | 2025.03.06 |
DB 데이터를 직접 머신러닝 모델 학습에 활용하는 방법 (1) | 2025.03.03 |
LSTM 모델로 이상 징후를 감지 (0) | 2025.02.27 |
LSTM 모델 (0) | 2025.02.27 |
약품 냉장고 온도 이상 증후 감지 및 예측 방법 (0) | 2025.02.27 |
JMeter 플러그인 - 동시 요청처리 (0) | 2025.02.25 |
댓글