Kafka

(Kafka) 클러스터에서 파티션이 호스트별로 분배되는 이유, 알고 보니?

주누 2024. 8. 25. 19:26

 

Apache Kafka에서 특정 토픽에 3개의 파티션이 있다고 가정해 봅시다.
이 토픽을 구독하는 컨슈머가 있으며, 컨슈머의 동시성 설정(setConcurrency(3))을 통해 각각 3개의 컨슈머 인스턴스를 생성합니다.
이 경우, 3개의 호스트에서 각각 3개의 컨슈머를 실행하면 총 9개의 컨슈머가 생성됩니다.

이 상황에서 어떤 호스트가 각 파티션을 점유하게 될까요? 일반적인 우려는 특정 인스턴스가 모든 파티션을 점유하여 부하가 집중되고 장애가 발생할 수 있다는 것입니다.

하지만 Kafka는 이러한 문제를 해결하기 위해 설계되었습니다. 각 인스턴스가 균등하게 파티션을 분배받도록 보장하여, 모든 호스트에서 파티션이 골고루 분포되도록 합니다. 이 글에서는 이러한 분배 방식이 어떻게 이루어지는지 살펴보겠습니다.

 


아래 그림은 파티션을 구독하기 시작한 첫 번째 인스턴스가 파티션을 점유하게 되는 상황을 나타냅니다.
이 그림은 특정 인스턴스가 모든 파티션을 점유하여 다른 인스턴스에 대한 처리가 지연될 수 있는 우려를 시각화한 것입니다. 이러한 상황에서 부하가 특정 인스턴스에 집중되면서 장애가 발생할 수 있다는 걱정이 생길 수 있습니다.

 

 

하지만 Kafka는 이러한 상황을 고려하여 설계되었습니다. 아래의 그림은 여러 개의 인스턴스를 실행하는 경우, 각 인스턴스가 어떻게 균등하게 파티션을 분배받는지를 보여줍니다.

Kafka는 각 인스턴스에 대해 파티션을 균등하게 할당하여, 특정 인스턴스에 부하가 집중되는 상황을 방지합니다. 이 과정에서 Kafka는 파티션을 효과적으로 분배하기 위한 여러 가지 전략을 사용합니다. 그림은 이러한 분배 과정이 실제로 어떻게 이루어지는지를 시각적으로 설명하고 있습니다.

그림을 보면, 각 호스트가 할당받은 파티션이 어떻게 균등하게 분포되는지를 확인할 수 있습니다. 이와 같은 균형 잡힌 분배는 Kafka의 파티션 할당 전략 덕분에 가능하며, 이를 통해 모든 인스턴스가 비슷한 부하를 가지도록 합니다. 결과적으로 시스템 전체의 안정성과 성능이 향상됩니다.

 


 

얼핏 생각해보면 당연히 카프카가 이러한 상황을 고려했다는 생각을 할 수 있지만 이러한 과정이 어떻게 이루어지는지를 샘플코드와 래퍼런스를 통해 정확하게 알아보도록 하겠습니다.

 

예시

Topic 생성

Kafka Setting

  • Topic: topic1
  • ConsumerGroup: test-topic-group
  • concurrency 설정: 3
  • 파티션 할당 전략 : RangeAssignor (default)
@Bean
public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    String[] topics = {"topic1"};
    ContainerProperties containerProps = new ContainerProperties(topics);
    containerProps.setGroupId("test-topic-group");

    ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);
    container.setConcurrency(3);

    return container;
}

docker-compose.yml

  • 어플리케이션를 별도의 호스트로 구동하기 위해 인스턴스 별 고정 IP 사용
    • Instance1: 172.28.1.1
    • Instance2: 172.28.1.2
    • Instance3: 172.28.1.3
version: "3.8"

services:
  app1:
    image: your-application-image
    container_name: app1
    volumes:
      - ./config/application-host1.properties:/config/application.properties
    ports:
      - "8081:8080"
    environment:
      - SPRING_CONFIG_LOCATION=/config/application.properties
      - SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092
    networks:
      my_network:
        ipv4_address: 172.28.1.1

  app2:
    image: your-application-image
    container_name: app2
    volumes:
      - ./config/application-host2.properties:/config/application.properties
    ports:
      - "8082:8080"
    environment:
      - SPRING_CONFIG_LOCATION=/config/application.properties
      - SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092
    networks:
      my_network:
        ipv4_address: 172.28.1.2

  app3:
    image: your-application-image
    container_name: app3
    volumes:
      - ./config/application-host3.properties:/config/application.properties
    ports:
      - "8083:8080"
    environment:
      - SPRING_CONFIG_LOCATION=/config/application.properties
      - SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092
    networks:
      my_network:
        ipv4_address: 172.28.1.3

networks:
  my_network:
    driver: bridge
    ipam:
      config:
        - subnet: 172.28.0.0/16

 

Publish

  • 1000 개의 String Message 발행
@Component
public class KafkaPublisherRunner implements CommandLineRunner {

    private final KafkaMessagePublisher kafkaMessagePublisher;

    public KafkaPublisherRunner(KafkaMessagePublisher kafkaMessagePublisher) {
        this.kafkaMessagePublisher = kafkaMessagePublisher;
    }

    @Override
    public void run(String... args) throws Exception {
        for (int i = 1; i <= 1000; i++) {
            String jsonMessage = String.format("{\"message\":\"Hello Kafka %d\"}", i);
            kafkaMessagePublisher.sendMessage(jsonMessage);
        }
    }

}

 

Consume

  • inputChannel을 통해 발행된 메세지 컨슘 (Spring Integration 사용, @KafkaListener 로 대체가능)
@ServiceActivator(inputChannel = "testQueueChannel")
public void handleKafkaMessage(Message<EventDomain> message) throws InterruptedException {
    EventDomain payload = message.getPayload();
    logger.info("Received message: {}", payload.getMessage());
}

실행결과


파티션의 균등 분배: 카프카의 내부 작동 방식

예제를 통해 확인할 수 있듯이, 각 애플리케이션에서 3개의 컨슈머를 생성하여 총 9개의 컨슈머가 생성되지만, 파티션은 호스트별로 균등하게 분배됩니다. 이 균형 잡힌 분배는 Kafka의 파티션 할당 전략인 RangeAssignor 덕분에 가능합니다.

 

Kafka의 기본 파티션 할당 전략

아파치 카프카 공식 문서를 살펴보면 아래와 같은 설명이 적혀있습니다.

범위 할당자는 주제별로 작동합니다. 각 토픽에 대해 사용 가능한 파티션을 순서대로 정렬하고, 컨슈머들을 사전순으로 배치합니다.

 

 

해당 내용을 살펴보았을때 최종적으로 파티션을 할당할때 카프카 기본 할당 전략인 RangeAssignor 는 파티션에 대한 컨슈머를 할당 할때

정렬 이후 사전순으로 배치한다는 것이 핵심입니다.

 

 

Kafka의 내부 코드와 동작 방식

 

  • Client ID 생성: Kafka는 ConsumerConfig.java에서 클라이언트 ID를 생성합니다. 이 ID는 순서대로 넘버링되며,
    AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE를 사용하여 관리됩니다.
  • Member ID 생성: GroupMetadata.scala에서 생성된 Member ID는 clientId와 UUID를 조합하여 만들어집니다.
  • 정렬 및 할당: AbstractPartitionAssignor.MemberInfo.java에서는 Member ID를 기준으로 문자열 정렬을 수행합니다. 최종적으로, 이 정렬된 Member ID를 기반으로 파티션이 균등하게 분배됩니다.

 

ConsumerConfig.java - maybeOverrideClientId

  • clientId 생성
    생성된 순서대로 넘버링 (getAndIncrement)

GroupMetadata.scala - generateMemberId

  • memberId 생성
    clientId + ":" + UUID

Abstractpartitionassignor.memberinfo.java - compareTo

  • memberId를 기준으로 문자열 정렬을 수행

최종적으로, Member ID를 기준으로 정렬된 컨슈머들 덕분에 특정 인스턴스에 부하가 집중되는 쏠림 현상 없이, 모든 인스턴스에 파티션이 균등하게 분배됩니다. 이 방식은 시스템 전체의 균형을 유지하며, 각 인스턴스가 비슷한 부하를 가지도록 보장합니다.


References

https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html

 

RangeAssignor (kafka 2.4.0 API)

The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumers in lexicographic order. We then divide the number of partitions by the total number of consumers to determine the number o

kafka.apache.org