728x90
컨슈머
- 토픽 파티션에서 레코드를 조회한다
- 서버, group id 지정
- 역직렬화를 위한 deserializer 지정
- Consumer 객체 생성
- subscribe() 메소드 호출 ➡ 구독할 토픽 목록 전달
- poll() ➡️ 일정 시간동안 대기하다가 토픽을 읽어온다
토픽 파티션은 그룹 단위 할당
- 컨슈머 그룹 단위로 파티션이 할당된다
- 위에서 설정한 groupId 기준으로 할당
- 파티션 개수와 컨슈머 개수는 밀접한 관련이 있다
- 파티션 개수보다 컨슈머 그룹이 많으면 컨슈머는 논다!
- 예시
- 파티션 2개, 컨슈머 1개 ➡️ 컨슈머 1개가 2개의 파티션으로부터 토픽을 읽어온다
- 파티션 2개, 컨슈머 2개 ➡️ 컨슈머 : 파티션 = 1 : 1 연결
- 파티션 2개, 컨슈머 3개 ➡️ 나머지 컨슈머는 연결이 되지 않아 놀게 된다
- 컨슈머 개수가 파티션 개부소다 커지면 안된다
- 처리량을 높이려면 파티션 개수도 함께 늘려야 한다
커밋과 오프셋
- 컨슈머의 poll() 메서드 ➡️ 이전에 커밋한 오프셋이 있으면 그 오프셋 이후부터 읽어오고 다시 오프셋을 커밋한다
커밋된 오프셋이 없는 경우
- 처음 접근하거나 커밋된 오프셋이 없는 경우에는 auto.offset.reset 설정을 사용한다
- earliest : 맨 처음 오프셋을 사용한다
- latest : 가장 마지막 오프셋을 사용한다 (default)
- none : 컨슈머 그룹에 대한 이전 커밋이 없으면 exception이 발생한다
컨슈머 설정
- 조회에 영향을 주는 주요 설정
- fetch.min.bytes
- 조회 시 브로커가 전송할 최소 데이터 크기
- poll() 호출하면 해당 값만큼 기다렸다가 가져온다
- 이 값이 크면 대기 시간이 늘지만 처리량이 증가한다 (배치 사이즈가 커지니까)
- default = 1
- fetch.max.wait.ms
- 데이터가 최소 크기가 될 때까지 기다리는 시간
- 데이터가 최소 크기가 되지 않는다고 무한대로 기다릴 수 없으니까!
- 브로커가 리턴할 때까지 대기하는 시간으로 poll()의 대기 시간과 다르다
- default = 500
- max.partition.fetch.bytes
- 파티션 당 서버가 리턴할 수 있는 최대 크기
- default = 1MB (1048576)
- fetch.min.bytes
자동 커밋 / 수동 커밋
- enable.auto.commit 설정
- true : 일정 주기로 컨슈머가 읽은 오프셋을 커밋한다 (default)
- false : 수동으로 커밋을 실행한다
- auto.commit.intercals.ms
- 자동 커밋 주기
- default = 5000ms
- 자동 커밋은 poll(), close() 메서드 호출 시 실행된다
수동 커밋 : 동기 / 비동기 커밋
동기 커밋
- 커밋에 실패하면 exception이 발생한다
비동기 커밋
- 비동기여서 커밋의 성공 여부를 바로 알 수 없다
- 알려면 callback을 받아서 처리해야 한다
재처리와 순서
- 컨슈머가 동일한 메시지를 조회할 가능성에 대해 주의해야 한다
- 일시적으로 커밋에 실패하거나 리밸런스 등에 의해 발생할 수 있다
- 컨슈머는 멱등성(idempotence)을 고려해야 한다
- ex) 조회 수를 처리하는 경우
- 조회수 1 증가 ➡️ 좋아요 1 증가 ➡️ 조회수 1 증가
- 위를 재처리하면 조회수가 2가 아니라 4가 될 수 있다
- 데이터 특성에 따라 타임스탬프, 일련번호 등을 활용해야 한다
세션 타임아웃, 하트비트, 최대 poll 간격
- kafka는 컨슈머 그룹 유지를 위해서 몇가지 설정을 사용한다
- heartbeat
- 컨슈머는 하트비트를 전송해서 연결을 유지한다
- 브로커는 일정 시간 컨슈머로부터 하트비트가 없으면 컨슈머를 그룹에서 빼고 리밸런스를 진행한다
- 관련 설정
- session.timeout.ms
- 세션 타임아웃 시간 (default = 10초)
- 지정한 시간동안 heartbeat이 없으면 컨슈머를 제외한다
- heartbeat.interval.ms
- 하트비트 전송 주기 (default = 3초)
- 권장) session.timeout.ms의 1/3 이하
- session.timeout.ms
- max.poll.interval.ms
- poll() 메서드의 최대 호출 간격
- 이 시간이 지나도록 poll() 하지 않으면 컨슈머를 그룹에서 빼고 리밸런스 진행
종료 처리
- 컨슈머를 다 사용했으면 close()를 호출해서 종료해야 한다
- 그러나 보통은 무한루프를 돌면서 poll()을 호출해서 사용하는데 루프를 어떻게 벗어나지..?
- 다른 쓰레드에서 wakeup() 메서드를 호출한다
- poll() 메서드가 WakeupException 발생 ➡️ close() 메서드로 종료 처리
주의: non-threadSafe
- kafkaConsumer는 쓰레드에 안전하지 않다
- 여러 쓰레드에서 동시에 사용하지 말 것!
- 단, wakeup() 메서드는 예외
https://www.youtube.com/watch?v=xqrIDHbGjOY&t=1s
728x90
'야미스터디' 카테고리의 다른 글
TPS (Transaction Per Second) (1) | 2023.08.07 |
---|---|
MAU? 그게 몬디? (0) | 2023.06.07 |
댓글