-
[카프카(Kafka) 어플리케이션 제작 ] #2. 컨슈머Kafka 2019. 3. 7. 17:50
[카프카(Kafka) 어플리케이션 제작 ] #1. 프로듀서
이전 글에서는 프로듀서 내부 동작 확인 및 어플리케이션을 제작하였다.
이번에는 컨슈머 어플리케이션을 제작해본다.
컨슈머
카프카 컨슈머 내부 동작 및 컨슈머 어플리케이션에서 메시지 소비하는 과정을 알아보자.컨슈머 내부 동작
컨슈머의 전체적인 내부 동작을 이해하면 컨슈머 어플리케이션을 디버깅할 때 도움이 많이 되며, 올바른 결정을 하도록 도와준다.카프카 컨슈머의 역활토픽 구독
- 컨슈머 동작의 시작은 토픽의 구독임
오프셋 위치
- 카프카는 다른 큐와는 다르게 메시지 오프셋을 저장 안함
- 오프셋은 각자의 컨슈머들이 유지해야함(컨슈머 API를 사용)
재연/되감기/메시지 스킵
- 상황에 따라 커스텀하게 오프셋을 변경할 수 있음
- 재연/되감기/메시지 스킵 가능
하트비트 - 컨슈머의 지정된 파티션의 멤버쉽과 소유권을 확인하기 위해 하트비트를 이용하여 주기적으로 체크
- 하트비트를 수신하지 못하면 컨슈머 그룹의 컨슈머 재할당(리벨런싱)이 일어난다.
오프셋 커밋
- 리벨런싱이 일어날 때 이미 읽은 오프셋을 다시 읽을 수 있으므로 오프셋 커밋을 함
역직렬화
- 프로듀서에서는 카프카에 메시지를 보낼 때 어떤 직렬화를 했는지 명시하고 컨슈머에서는 역직렬화를 명시함 (프로듀서 글의 key.serializer 참고)
메시지 수신(Subscribe)을 위한 동작 순서
1. 토픽 구독 ↓2. 카프카 서버 조사(폴 루프) : 서버 조정, 파티션 리벨런스, 컨슈머 하트피트 체크 등↓3. 새로운 레코드가 있는 지 체크하고 있으면 가져옴↓4. 역직렬화 ↓5. 데이터 유효성 검증 ↓6. 다른 시스템에 이관 및 저장컨슈머 어플리케이션
프로듀서 API와 마찬가지로 컨슈머도 풍부한 API 세트를 제공한다.컨슈머 기본 설정 및 코드bootstrap.servers
- 브로커 주소 목록
- hostname:port 형식
- 프로듀서 설정과 동일
key.deserializer
- 프로듀서에서 key.serializer와 동일한 클래스로 deserializer 한다.
- 다른 클래스로 deserializer 하면 예외 발생
- ByteArraySerializer, StringSerializer, IntegerSerializer 클래스 선택
value.deserializer
- key.deserializer와 비슷하며 메시지 값 정보에 대한 역직렬화를 명시
group.id
- 컨슈머 그룹 정의
- 이전 버전에서는 필수가 아니었지만 최근 버전에서는 필수로 변경 사용하지 않으면 아래와 같은 에러 발생(Attempt to join group failed due to fatal error: The configured groupId is invalid)
Properties producerProps = new Properties(); producerProps.put("bootstrap.servers","192.168.100.173:9092"); producerProps.put("group.id", "hirang_test02"); producerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); producerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return producerProps; KafkaConsumer
consumer = new KafkaConsumer(setConsumerProperty()); 구독
컨슈머는 메시지 데이터를 받기 위해 토픽을 구독한다. (KafkaConsumer.subscribe 사용)
구독 메소드는 입력 파라미터에 따라 다양한 구독이 가능하다.
subscribe(Collection<String> topics)
: 원하는 토픽 목록을 전달 / 기본 리밸런서 사용
subscribe(Pattern pattern, ConsumerRebalanceListener listener)
: 원하는 토픽을 찾기 위해 정규식을 전달 / 정규식에 대응하는 새로운 토픽의 추가가 삭제 발생시 리밸런서 트리거됨
subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
: 토픽의 목록을 전달 / 리밸런서 트리거됨
오프셋 커밋 처리
오프셋에 대한 커밋이 일어나는 방법은 3가지가 존재하며 각각 특성이 존재한다.
자동커밋
- 컨슈머의 기본 설정값
- enable.auto.commit=true
- auto.commit.interval.ms=1000 이 값을 길게 잡으면 장애 발생시 중복 읽기가 발생할 수 있음
- 예) 커밋 주기가 10초인데 7초가 지난 후 컨슈머 장애가 발생하면 이전 10초 전에 커밋한 오프셋을 가져오므로 중복 발생
현재 오프셋 커밋
- 상황에 따라 필요할 때 커밋 제어시 사용
- enable.auto.commit=false
- commitSync() 메서드를 사용해 커밋할 컨슈머 오프셋을 호출한다.
- ConsumerRecord의 인스턴스를 처리 후 사용 권장하며 그렇게 안하면 컨슈머 장애기 발생할 경우 레코드 손실 발생
비동기 커밋
- 동기 방식의 커밋은 Ack 수신이 없는 경우, 컨슈머는 대기 상태가 되므로 결과적으로 처리 속도가 좋지 못하다.
- 비동기도 메시지 중복은 발생할 수 있다.
- 예) 메시지 오프셋 10을 오프셋 5 이전에 커밋 했다면, 카프카는 5부터 10까지 다시 읽으므로 중복 발생
추가 설정
설정 작업 이전에 데이터를 처리하기 위해 컨슈머가 얼마의 시간이 필요한지 확인 및 테스트가 필요하다.
enable.auto.commit
- true/false
- 오프셋 자동 커밋
fetch.min.bytes
- 데이터 읽기 요청에 대한 카프카 서버의 회신을 요구하는 데이터 바이트의 최소 크기
request.timeout.ms
- 응답을 기다리는 최대 시간
auto.offset.reset
- 유효한 오프셋이 없을 때 자동처리됨
lastest : 파티션에서 가장 최근 메시지부터 시작
earliest : 파티션 맨 처음부터 시작
none : 예외가 발생됨
session.timeout.ms
- "컨슈머 동작 중이다"라고 알리기 위한 하트비트 전송 주기
- 리밸런서 트리거 하는걸 막기 위함
max.partition.fetch.bytes
- 파티션마다 할당할 최대 데이터 크기
- ConsumerRecord 객체에 대해 컨슈머가 필요로 하는 메모리는 파티션수 * 설정값보다 커야함
- 예) 파티션 10개 1개의 컨슈머이고, max.partiton.fetch.bytes가 2MB면 10 * 2MB = 20MB가 메모리로 잡혀 있어야함
컨슈머 전체 코드package com.example.kafkatest; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; @SpringBootApplication public class KafkaConsumerApplication { private static final Logger log = LoggerFactory.getLogger(KafkaTestApplication.class); public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); subscribe(); } private static Properties setConsumerProperty(){ Properties producerProps = new Properties(); producerProps.put("bootstrap.servers","192.168.100.173:9092"); producerProps.put("group.id", "hirang_test02"); producerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); producerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); producerProps.put("enable.auto.commit", "true"); producerProps.put("auto.commit.interval.ms", "1000"); producerProps.put("session.timeout.ms", "30000"); return producerProps; } private static void subscribe(){ String topic = "hirang"; List
topicList = new ArrayList<>(); topicList.add(topic); KafkaConsumer consumer = new KafkaConsumer(setConsumerProperty()); consumer.subscribe(topicList); log.info("Subscribed to topic :\t" + KafkaStatic.DEFAULT_TOPIC); int i = 0; try { while (true) { ConsumerRecords records = consumer.poll(500); for (ConsumerRecord record : records) log.info("offset = " + record.offset() + "\tkey =" + record.key() + "\tvalue =" + record.value()); //TODO : Do processing for data here consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map map, Exception e) { } }); } } catch (Exception ex) { //TODO : Log Exception Here } finally { try { consumer.commitSync(); } finally { consumer.close(); } } } } 컨슈머 콘솔 로그
읽은 메시지 데이터를 로그로 확인하였다.
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.1.3.RELEASE) 2019-03-08 11:30:52.369 INFO 4556 --- [ main] c.e.kafkatest.KafkaConsumerApplication : Starting KafkaConsumerApplication on ts-hirang with PID 4556 (D:\workspace\github\kafka-test\target\classes started by DESKTOP in D:\workspace\github\kafka-test) 2019-03-08 11:30:52.379 INFO 4556 --- [ main] c.e.kafkatest.KafkaConsumerApplication : No active profile set, falling back to default profiles: default 2019-03-08 11:30:53.178 INFO 4556 --- [ main] c.e.kafkatest.KafkaConsumerApplication : Started KafkaConsumerApplication in 1.766 seconds (JVM running for 3.564) 2019-03-08 11:30:53.192 INFO 4556 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: auto.commit.interval.ms = 1000 auto.offset.reset = latest bootstrap.servers = [192.168.100.173:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = hirang_test02 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 30000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2019-03-08 11:30:53.238 INFO 4556 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1 2019-03-08 11:30:53.239 INFO 4556 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5 2019-03-08 11:30:53.240 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : Subscribed to topic : hirang 2019-03-08 11:30:53.318 INFO 4556 --- [ main] org.apache.kafka.clients.Metadata : Cluster ID: 6ejl7iMZQASnPqRCdv13wA 2019-03-08 11:30:53.319 INFO 4556 --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=hirang_test02] Discovered group coordinator 192.168.100.173:9092 (id: 2147483646 rack: null) 2019-03-08 11:30:53.321 INFO 4556 --- [ main] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=hirang_test02] Revoking previously assigned partitions [] 2019-03-08 11:30:53.321 INFO 4556 --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=hirang_test02] (Re-)joining group 2019-03-08 11:30:53.347 INFO 4556 --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=hirang_test02] Successfully joined group with generation 12 2019-03-08 11:30:53.348 INFO 4556 --- [ main] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=hirang_test02] Setting newly assigned partitions [hirang-0] 2019-03-08 11:31:03.104 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277494 key =null value =0-----87d9bcc5-3b75-4032-9427-e02b75a9eed7 2019-03-08 11:31:03.107 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277495 key =null value =1-----9c6e8e14-dd8e-4ddf-b810-4cf1f3c80e5c 2019-03-08 11:31:03.108 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277496 key =null value =2-----5f8eac28-06ed-4c53-a0c0-a15721a6ede2 2019-03-08 11:31:03.108 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277497 key =null value =3-----e6dd3658-5bc2-4bdb-ab7b-b127f8ca176a 2019-03-08 11:31:03.108 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277498 key =null value =4-----e077a534-d95e-4aca-adb3-1c2c1a29fce1 2019-03-08 11:31:03.108 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277499 key =null value =5-----5b38908b-ec60-43e8-98cc-76fa66451483 2019-03-08 11:31:03.112 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277500 key =null value =6-----d049b157-9370-433e-9cee-7101918eb449 2019-03-08 11:31:03.112 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277501 key =null value =7-----8786e6a2-c757-4a3f-be59-130d5c393bfc 2019-03-08 11:31:03.116 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277502 key =null value =8-----30b82719-abc5-4e1e-81d1-d8e47e53cb7d 2019-03-08 11:31:03.122 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277503 key =null value =9-----687f0bdb-3280-415f-9fb8-8d5734380755
컨슈머 모범 사례
카프카 컨슈머의 내부 동작과 토픽의 메시지 소비(Subscribe)하는 방법을 알아보았다.컨슈머 구성 요소를 정의할 때 좀 더 바람직한 가이드라인에 대해서 알아보자.예외처리를 하자!모든 어플리케이션은 예외처리...리벨런스 관리!새로운 컨슈머가 컨슈머 그룹에 합류할 때마다, 혹은 예전 컨슈머가 종료되면 파티션 리밸런스가 트리거 된다.(컨슈머 그룹의 컨슈머 개수와 파티션이 같을 때 새로운 컨슈머 추가는 리밸런스가 안일어남)컨슈머가 파티션 소유권을 잃을 때마다 가장 최근에 수신한 오프셋을 반드시 커밋해 줘야 한다.제때 오프셋 커밋하기!메시지 오프셋에 대한 커밋은 처리 상황에 따라 잘 제때 수행해야 한다.그렇지 않으면 중복 및 소실이 발생할 수 있다.오프셋 주기에 따른 성능과 중복에 대한 문제간의 트레이드 오프가 필요한다.예) 장애 발생시 데이터 중복 처리에 대한 심각한 문제가 발생할 경우 오프셋을 커밋하는 시간을 가능한 짧게 한다.자동 오프셋 커밋!중복 처리에 대한 문제가 없거나, 컨슈머가 자동으로 오프셋을 커밋하도록 관리하기를 바랄경우 사용한다.일반적으로 대부분 사용마치며
카프카의 프로듀서 및 컨슈머에 대한 내부 동작에 대해서 이해하였고 어플리케이션도 만들어 보았다.
프로덕션 환경에 적용할 때 컨슈머 그룹안에서 오프셋 커밋 전략,하트비트, 리벨런스 등 여러 성능 테스트가 필요하다.
'Kafka' 카테고리의 다른 글
[카프카(Kafka)] 주키퍼의 역활 정리 (0) 2019.03.11 [카프카(Kafka)] 성능 관련 고찰 (0) 2019.03.11 [카프카(Kafka) 어플리케이션 제작 ] #1. 프로듀서 (0) 2019.03.06 댓글