본문 바로가기
야미스터디/Backend

[Youtube] kafka 조금 아는 척하기 2편 - 프로듀서

by 의정부핵꿀밤 2024. 9. 4.
728x90

토픽에 메시지 전송 - 토픽, 키, 값

  • prperties - 프로듀서가 사용할 속성을 설정한다 (설정 정보)
    • 브로커 목록, 키와 값 직렬화를 위한 serializer, ack, 배치 사이즈 등을 설정
  • 카프카 프로듀서 객체는 send 메시지를 지원한다
  • 프로듀서 레코드는 토픽 이름, 키, 값으로 생성한다 → 카프카에 저장될 메시지 = 레코드

 

프로듀서의 기본 흐름

 

  • 프로듀서 기본 흐름 순서
    1. send() 메소드를 통해서 레코드를 전송한다
    2. Serializer를 통해 byte 배열로 변환하고, Partitioner를 통해 변환된 메시지를 보낼 파티션을 결정한다
    3. 이후 변환된 메시지를 배치로 묶어서 버퍼에 저장한다
    4. Sender가 묶인 배치를 카프카 브로커로 전송한다

 

Sender의 기본 동작

  • sender는 별도 스레드로 동작한다
  • sender는 배치를 차례대로 꺼내서 브로커로 보내는데, 배치가 찼는지 여부에 상관없이 읽어서 보낸다
    • 배치에 메시지가 하나가 있건 여러개가 있건, 배치를 보낼수만 있다면 그냥 보내버린다
  • sender가 배치를 브로커에 전송하는 동안, send() 메소드를 통해 들어온 레코드는 계속해서 배치에 누적되어 쌓인다
  • 즉, sender와 send() 는 별도의 스레드로 동작한다
    • sender가 메세지를 브로커에 보내는 동안 메시지가 쌓이지 않거나, 메시지가 쌓이는 동안은 브로커에 전송이 되지 않는 등의 문제가 발생하지 않는다
    • 왜? 별도로 동작하니까!!

 

처리량 관련 주요 속성

  • sender와 batch 관련 설정이 처리량에 영향을 준다
  • batch.size
    • 배치의 최대 크기
    • 해당 크기만큼 배치에 메시지가 다 차면 바로 전송한다
    • 배치 사이즈가 너무 작으면 한 번에 보낼 수 있는 메시지 수가 줄고 전송 횟수가 많아져서 처리량이 떨어진다
  • liner.ms
    • sender가 배치를 전송하는 대기 시간
    • 대기 시간이 없으면 배치를 바로 전송한다 → 이전 배치를 전송한 후 바로 다음 배치를 전송한다
    • 대기 시간이 있으면 그 시간만큼 기다렸다가 배치를 전송한다 → 기다리는 시간 동안 배치에 메시지가 쌓이고, 한번에 더 많은 메시지를 전송할 수 있다
    • 즉, 대기 시간을 주면 한 번에 보내는 메시지가 많아져서 전반적인 처리량이 높아진다

 

전송 결과 확인 안함

producer.send(new ProducerRecord<>("simple", "value"));
  • 전송 실패를 알 수 없다
  • 실패에 대한 별도 처리가 필요없는 메시지 전송에 사용한다

 

전송 결과 확인함 : Future 사용

Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic", "value"));
try {
    RecordMetadata meta = f.get(); // 블로킹
} catch(ExecutionExcetpion ex) {
    ...
}
  • send 메소드가 제공하는 Future 결과를 get() 을 통해 확인한다
  • 이 때 get()을 사용하면 그 시점에서 블로킹이 발생한다
    • 만약 루프를 돌면서 위 코드를 실행하면 하나의 메시지를 보내고 블로킹되는 방식으로 동작한다
    • 이 때문에 배치에 메시지가 쌓이지 않는다
  • 즉, 배치 효과가 떨어져서 처리량이 저하된다
  • 처리량이 낮아도 전송 결과를 확실하게 해야하는 경우에만 사용한다

 

전송 결과 확인함: Callback 사용

producer.send(new ProducerRecord<>("simple", "value"),
    new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception ex) {
            ...
        }
    });
  • send()에 callback 객체를 전달한다
  • callback 객체는 전송이 완료되면 onCompletion() 메소드로 결과를 받는다
  • 이 때 Exception 객체를 받으면 전송에 실패한 것이다
  • 이는 블로킹 방식이 아니기 때문에 처리량 저하를 발생시키지 않는다

 

전송 보장과 ack

  • 프로듀서는 전송 보장을 위해 ack 값을 사용한다
  • ack = 0
    • 서버 응답을 기다리지 않는다
    • 전송 보장도 X
    • 처리량은 높지만 메시지 유실 여부를 알 수 없다
  • ack = 1
    • 파티션의 리더에 저장되면 성공 응답을 받는다
    • 리더 장애 시 메시지 유실 가능성이 있다
      • 리더에 저장되고 팔로워에 복제되기 전 장애가 나면, 프로듀서는 성공이라고 판단하지만 다른 팔로워에는 복제 X
      • 이후 다른 팔로워가 리더가 되면 해당 메시지는 유실된거임
    •   엄격하게 전송 보장이 필요한 경우에는 사용 X
  • ack = all (또는 -1)
    • 모든 리플리카에 저장되는 성공 응답을 받는다
    • 이 때 모든 리플리카의 기준은 브로커 min.insync.replicas 설정에 따라 달라진다

 

ack + min.insync.replicas

  • min.insync.replicas (브로커 옵션)
    • 프로듀서 ack 옵션이 all일 때 저장에 성공했다고 응답할 수 있는 동기화된 리플리카 최소 개수
  • 예시
    • 리플리카 개수 = 3 / ack = all / min.insync.replicas = 2
    • 리더에 저장하고 팔로워 중 1개에 저장하면 성공 응답 (총 2개 저장이니까)

 

에러 유형

  • 전송 과정에서 실패
    • 전송 타임 아웃(일시적인 네트워크 오류 등)
    • 리더 다운에 의한 새 리더 선출 진행 중 발생
    • 브로커 설정 메시지 크기 한도 초과
    • etc...
  • 전송 전에 실패
    • 직렬화 실패
    • 프로듀서 자체 요청 크기 제한 초과
    • 프로듀서 버퍼가 차서 기다린 시간이 최대 대기 시간 초과
    • etc...

 

실패 대응 1 : 재시도

  • 재시도
    • 재시도 가능한 에러는 재시도 처리한다
    • ex) 브로커 응답 타임 아웃, 일시적인 리더 없음 등
  • 재시도 위치
    • 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해서 재전송을 시도한다
      • retries 속성
    • send() 메서드에서 exception 발생 시, exception 타입에 따라 send() 재호출
    • 콜백 메서드에서 exception 받으면 타입에 따라 send() 재호출
  • 특별한 이유가 없다면 무한 재시도 X

 

실패 대응 2 : 기록

  • 추후 처리를 위해 기록한다
    • 실패한 메시지를 별도 파일이나 DB에 기록한다
    • 추후에 수동(또는 자동) 보정 작업 진행
  • 기록 위치
    • send() 메서드에서 exception 발생 시
    • send() 메서드에서 전달한 콜백에서 exception 받는 경우
    • send() 메서드가 리턴한 Future의 get() 메서드에서 exception 발생 시

 

재시도와 메시지 중복 전송 가능성

  • 브로커 응답이 늦게와서 재시도할 경우 중복 발송이 가능하다
  • enable.idempotence 속성을 사용하면 중복 발송 방지가 가능할지도..?

 

재시도와 순서

 max.in.flight.requests.per.connection=3

  • max.in.flight.requests.per.connection
    • 블로킹 없이 한 커넥션에서 전송할 수 있는 최대 전송중 요청 개수
    • 만약 이 값이 1보다 크면 재시도 시점에 따라 메시지 순서가 발생할 수 있다 → 전송 순서가 중요하면 이 값을 1로 지정해야 한다
  • 위의 예시에서 3으로 설정해서 3개를 순서에 상관없이 재전송함
    • 따라서 사실상 카프카에 쌓이는 메시지 순서는 2 → 3 → 1 이 된다

 


https://www.youtube.com/watch?v=geMtm17ofPY

 

728x90

댓글