본문 바로가기
야미스터디

[Youtube] kafka 조금 아는 척하기 3편 - 컨슈머

by 의정부핵꿀밤 2024. 9. 16.
728x90

컨슈머

  • 토픽 파티션에서 레코드를 조회한다

  • 서버, group id 지정
  • 역직렬화를 위한 deserializer 지정
  • Consumer 객체 생성
    • subscribe() 메소드 호출 ➡ 구독할 토픽 목록 전달
  • poll() ➡️ 일정 시간동안 대기하다가 토픽을 읽어온다

 

토픽 파티션은 그룹 단위 할당

  • 컨슈머 그룹 단위로 파티션이 할당된다
    • 위에서 설정한 groupId 기준으로 할당
  • 파티션 개수와 컨슈머 개수는 밀접한 관련이 있다
    • 파티션 개수보다 컨슈머 그룹이 많으면 컨슈머는 논다!
    • 예시
      • 파티션 2개, 컨슈머 1개 ➡️ 컨슈머 1개가 2개의 파티션으로부터 토픽을 읽어온다
      • 파티션 2개, 컨슈머 2개 ➡️ 컨슈머 : 파티션 = 1 : 1 연결
      • 파티션 2개, 컨슈머 3개 ➡️ 나머지 컨슈머는 연결이 되지 않아 놀게 된다
  • 컨슈머 개수가 파티션 개부소다 커지면 안된다
  • 처리량을 높이려면 파티션 개수도 함께 늘려야 한다

 

커밋과 오프셋

  • 컨슈머의 poll() 메서드 ➡️ 이전에 커밋한 오프셋이 있으면 그 오프셋 이후부터 읽어오고 다시 오프셋을 커밋한다

 

커밋된 오프셋이 없는 경우

  • 처음 접근하거나 커밋된 오프셋이 없는 경우에는 auto.offset.reset 설정을 사용한다
    • earliest : 맨 처음 오프셋을 사용한다
    • latest : 가장 마지막 오프셋을 사용한다 (default)
    • none : 컨슈머 그룹에 대한 이전 커밋이 없으면 exception이 발생한다

 

컨슈머 설정

  • 조회에 영향을 주는 주요 설정
    • fetch.min.bytes
      • 조회 시 브로커가 전송할 최소 데이터 크기
      • poll() 호출하면 해당 값만큼 기다렸다가 가져온다
      • 이 값이 크면 대기 시간이 늘지만 처리량이 증가한다 (배치 사이즈가 커지니까)
      • default = 1
    • fetch.max.wait.ms
      • 데이터가 최소 크기가 될 때까지 기다리는 시간
      • 데이터가 최소 크기가 되지 않는다고 무한대로 기다릴 수 없으니까!
      • 브로커가 리턴할 때까지 대기하는 시간으로 poll()의 대기 시간과 다르다
      • default = 500
    • max.partition.fetch.bytes
      • 파티션 당 서버가 리턴할 수 있는 최대 크기
      • default = 1MB (1048576)

 

 

자동 커밋 / 수동 커밋

  • enable.auto.commit 설정
    • true : 일정 주기로 컨슈머가 읽은 오프셋을 커밋한다 (default)
    • false : 수동으로 커밋을 실행한다
  • auto.commit.intercals.ms
    • 자동 커밋 주기
    • default = 5000ms
  • 자동 커밋은 poll(), close() 메서드 호출 시 실행된다

 

수동 커밋 : 동기 / 비동기 커밋

동기 커밋

  • 커밋에 실패하면 exception이 발생한다

 

비동기 커밋

  • 비동기여서 커밋의 성공 여부를 바로 알 수 없다
  • 알려면 callback을 받아서 처리해야 한다

 

재처리와 순서

  • 컨슈머가 동일한 메시지를 조회할 가능성에 대해 주의해야 한다
    • 일시적으로 커밋에 실패하거나 리밸런스 등에 의해 발생할 수 있다
  • 컨슈머는 멱등성(idempotence)을 고려해야 한다
    • ex) 조회 수를 처리하는 경우
    • 조회수 1 증가 ➡️ 좋아요 1 증가 ➡️ 조회수 1 증가
    • 위를 재처리하면 조회수가 2가 아니라 4가 될 수 있다
  • 데이터 특성에 따라 타임스탬프, 일련번호 등을 활용해야 한다

 

세션 타임아웃, 하트비트, 최대 poll 간격

  • kafka는 컨슈머 그룹 유지를 위해서 몇가지 설정을 사용한다
  • heartbeat
    • 컨슈머는 하트비트를 전송해서 연결을 유지한다
    • 브로커는 일정 시간 컨슈머로부터 하트비트가 없으면 컨슈머를 그룹에서 빼고 리밸런스를 진행한다
    • 관련 설정
      • session.timeout.ms
        • 세션 타임아웃 시간 (default = 10초)
        • 지정한 시간동안 heartbeat이 없으면 컨슈머를 제외한다
      • heartbeat.interval.ms
        • 하트비트 전송 주기 (default = 3초)
        • 권장) session.timeout.ms의 1/3 이하
  • max.poll.interval.ms
    • poll() 메서드의 최대 호출 간격
    • 이 시간이 지나도록 poll() 하지 않으면 컨슈머를 그룹에서 빼고 리밸런스 진행

 

종료 처리

  • 컨슈머를 다 사용했으면 close()를 호출해서 종료해야 한다
    • 그러나 보통은 무한루프를 돌면서 poll()을 호출해서 사용하는데 루프를 어떻게 벗어나지..?
  • 다른 쓰레드에서 wakeup() 메서드를 호출한다
    • poll() 메서드가 WakeupException 발생 ➡️ close() 메서드로 종료 처리

 

주의: non-threadSafe

  • kafkaConsumer는 쓰레드에 안전하지 않다
  • 여러 쓰레드에서 동시에 사용하지 말 것!
  • 단, wakeup() 메서드는 예외

 


https://www.youtube.com/watch?v=xqrIDHbGjOY&t=1s

 

728x90

'야미스터디' 카테고리의 다른 글

TPS (Transaction Per Second)  (1) 2023.08.07
MAU? 그게 몬디?  (0) 2023.06.07

댓글