Apache Kafka에서 특정 토픽에 3개의 파티션이 있다고 가정해 봅시다.
이 토픽을 구독하는 컨슈머가 있으며, 컨슈머의 동시성 설정(setConcurrency(3))을 통해 각각 3개의 컨슈머 인스턴스를 생성합니다.
이 경우, 3개의 호스트에서 각각 3개의 컨슈머를 실행하면 총 9개의 컨슈머가 생성됩니다.
이 상황에서 어떤 호스트가 각 파티션을 점유하게 될까요? 일반적인 우려는 특정 인스턴스가 모든 파티션을 점유하여 부하가 집중되고 장애가 발생할 수 있다는 것입니다.
하지만 Kafka는 이러한 문제를 해결하기 위해 설계되었습니다. 각 인스턴스가 균등하게 파티션을 분배받도록 보장하여, 모든 호스트에서 파티션이 골고루 분포되도록 합니다. 이 글에서는 이러한 분배 방식이 어떻게 이루어지는지 살펴보겠습니다.
아래 그림은 파티션을 구독하기 시작한 첫 번째 인스턴스가 파티션을 점유하게 되는 상황을 나타냅니다.
이 그림은 특정 인스턴스가 모든 파티션을 점유하여 다른 인스턴스에 대한 처리가 지연될 수 있는 우려를 시각화한 것입니다. 이러한 상황에서 부하가 특정 인스턴스에 집중되면서 장애가 발생할 수 있다는 걱정이 생길 수 있습니다.
하지만 Kafka는 이러한 상황을 고려하여 설계되었습니다. 아래의 그림은 여러 개의 인스턴스를 실행하는 경우, 각 인스턴스가 어떻게 균등하게 파티션을 분배받는지를 보여줍니다.
Kafka는 각 인스턴스에 대해 파티션을 균등하게 할당하여, 특정 인스턴스에 부하가 집중되는 상황을 방지합니다. 이 과정에서 Kafka는 파티션을 효과적으로 분배하기 위한 여러 가지 전략을 사용합니다. 그림은 이러한 분배 과정이 실제로 어떻게 이루어지는지를 시각적으로 설명하고 있습니다.
그림을 보면, 각 호스트가 할당받은 파티션이 어떻게 균등하게 분포되는지를 확인할 수 있습니다. 이와 같은 균형 잡힌 분배는 Kafka의 파티션 할당 전략 덕분에 가능하며, 이를 통해 모든 인스턴스가 비슷한 부하를 가지도록 합니다. 결과적으로 시스템 전체의 안정성과 성능이 향상됩니다.
얼핏 생각해보면 당연히 카프카가 이러한 상황을 고려했다는 생각을 할 수 있지만 이러한 과정이 어떻게 이루어지는지를 샘플코드와 래퍼런스를 통해 정확하게 알아보도록 하겠습니다.
예시
Topic 생성
Kafka Setting
- Topic: topic1
- ConsumerGroup: test-topic-group
- concurrency 설정: 3
- 파티션 할당 전략 : RangeAssignor (default)
@Bean
public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer(ConsumerFactory<String, String> consumerFactory) {
String[] topics = {"topic1"};
ContainerProperties containerProps = new ContainerProperties(topics);
containerProps.setGroupId("test-topic-group");
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);
container.setConcurrency(3);
return container;
}
docker-compose.yml
- 어플리케이션를 별도의 호스트로 구동하기 위해 인스턴스 별 고정 IP 사용
- Instance1: 172.28.1.1
- Instance2: 172.28.1.2
- Instance3: 172.28.1.3
version: "3.8"
services:
app1:
image: your-application-image
container_name: app1
volumes:
- ./config/application-host1.properties:/config/application.properties
ports:
- "8081:8080"
environment:
- SPRING_CONFIG_LOCATION=/config/application.properties
- SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092
networks:
my_network:
ipv4_address: 172.28.1.1
app2:
image: your-application-image
container_name: app2
volumes:
- ./config/application-host2.properties:/config/application.properties
ports:
- "8082:8080"
environment:
- SPRING_CONFIG_LOCATION=/config/application.properties
- SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092
networks:
my_network:
ipv4_address: 172.28.1.2
app3:
image: your-application-image
container_name: app3
volumes:
- ./config/application-host3.properties:/config/application.properties
ports:
- "8083:8080"
environment:
- SPRING_CONFIG_LOCATION=/config/application.properties
- SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092
networks:
my_network:
ipv4_address: 172.28.1.3
networks:
my_network:
driver: bridge
ipam:
config:
- subnet: 172.28.0.0/16
Publish
- 1000 개의 String Message 발행
@Component
public class KafkaPublisherRunner implements CommandLineRunner {
private final KafkaMessagePublisher kafkaMessagePublisher;
public KafkaPublisherRunner(KafkaMessagePublisher kafkaMessagePublisher) {
this.kafkaMessagePublisher = kafkaMessagePublisher;
}
@Override
public void run(String... args) throws Exception {
for (int i = 1; i <= 1000; i++) {
String jsonMessage = String.format("{\"message\":\"Hello Kafka %d\"}", i);
kafkaMessagePublisher.sendMessage(jsonMessage);
}
}
}
Consume
- inputChannel을 통해 발행된 메세지 컨슘 (Spring Integration 사용, @KafkaListener 로 대체가능)
@ServiceActivator(inputChannel = "testQueueChannel")
public void handleKafkaMessage(Message<EventDomain> message) throws InterruptedException {
EventDomain payload = message.getPayload();
logger.info("Received message: {}", payload.getMessage());
}
실행결과
파티션의 균등 분배: 카프카의 내부 작동 방식
예제를 통해 확인할 수 있듯이, 각 애플리케이션에서 3개의 컨슈머를 생성하여 총 9개의 컨슈머가 생성되지만, 파티션은 호스트별로 균등하게 분배됩니다. 이 균형 잡힌 분배는 Kafka의 파티션 할당 전략인 RangeAssignor 덕분에 가능합니다.
Kafka의 기본 파티션 할당 전략
아파치 카프카 공식 문서를 살펴보면 아래와 같은 설명이 적혀있습니다.
범위 할당자는 주제별로 작동합니다. 각 토픽에 대해 사용 가능한 파티션을 순서대로 정렬하고, 컨슈머들을 사전순으로 배치합니다.
해당 내용을 살펴보았을때 최종적으로 파티션을 할당할때 카프카 기본 할당 전략인 RangeAssignor 는 파티션에 대한 컨슈머를 할당 할때
정렬 이후 사전순으로 배치한다는 것이 핵심입니다.
Kafka의 내부 코드와 동작 방식
- Client ID 생성: Kafka는 ConsumerConfig.java에서 클라이언트 ID를 생성합니다. 이 ID는 순서대로 넘버링되며,
AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE를 사용하여 관리됩니다. - Member ID 생성: GroupMetadata.scala에서 생성된 Member ID는 clientId와 UUID를 조합하여 만들어집니다.
- 정렬 및 할당: AbstractPartitionAssignor.MemberInfo.java에서는 Member ID를 기준으로 문자열 정렬을 수행합니다. 최종적으로, 이 정렬된 Member ID를 기반으로 파티션이 균등하게 분배됩니다.
ConsumerConfig.java - maybeOverrideClientId
- clientId 생성
생성된 순서대로 넘버링 (getAndIncrement)
GroupMetadata.scala - generateMemberId
- memberId 생성
clientId + ":" + UUID
Abstractpartitionassignor.memberinfo.java - compareTo
- memberId를 기준으로 문자열 정렬을 수행
최종적으로, Member ID를 기준으로 정렬된 컨슈머들 덕분에 특정 인스턴스에 부하가 집중되는 쏠림 현상 없이, 모든 인스턴스에 파티션이 균등하게 분배됩니다. 이 방식은 시스템 전체의 균형을 유지하며, 각 인스턴스가 비슷한 부하를 가지도록 보장합니다.
References
https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html