728x90
토픽에 메시지 전송 - 토픽, 키, 값
- prperties - 프로듀서가 사용할 속성을 설정한다 (설정 정보)
- 브로커 목록, 키와 값 직렬화를 위한 serializer, ack, 배치 사이즈 등을 설정
- 카프카 프로듀서 객체는 send 메시지를 지원한다
- 프로듀서 레코드는 토픽 이름, 키, 값으로 생성한다 → 카프카에 저장될 메시지 = 레코드
프로듀서의 기본 흐름
- 프로듀서 기본 흐름 순서
- send() 메소드를 통해서 레코드를 전송한다
- Serializer를 통해 byte 배열로 변환하고, Partitioner를 통해 변환된 메시지를 보낼 파티션을 결정한다
- 이후 변환된 메시지를 배치로 묶어서 버퍼에 저장한다
- Sender가 묶인 배치를 카프카 브로커로 전송한다
Sender의 기본 동작
- sender는 별도 스레드로 동작한다
- sender는 배치를 차례대로 꺼내서 브로커로 보내는데, 배치가 찼는지 여부에 상관없이 읽어서 보낸다
- 배치에 메시지가 하나가 있건 여러개가 있건, 배치를 보낼수만 있다면 그냥 보내버린다
- sender가 배치를 브로커에 전송하는 동안, send() 메소드를 통해 들어온 레코드는 계속해서 배치에 누적되어 쌓인다
- 즉, sender와 send() 는 별도의 스레드로 동작한다
- sender가 메세지를 브로커에 보내는 동안 메시지가 쌓이지 않거나, 메시지가 쌓이는 동안은 브로커에 전송이 되지 않는 등의 문제가 발생하지 않는다
- 왜? 별도로 동작하니까!!
처리량 관련 주요 속성
- sender와 batch 관련 설정이 처리량에 영향을 준다
- batch.size
- 배치의 최대 크기
- 해당 크기만큼 배치에 메시지가 다 차면 바로 전송한다
- 배치 사이즈가 너무 작으면 한 번에 보낼 수 있는 메시지 수가 줄고 전송 횟수가 많아져서 처리량이 떨어진다
- liner.ms
- sender가 배치를 전송하는 대기 시간
- 대기 시간이 없으면 배치를 바로 전송한다 → 이전 배치를 전송한 후 바로 다음 배치를 전송한다
- 대기 시간이 있으면 그 시간만큼 기다렸다가 배치를 전송한다 → 기다리는 시간 동안 배치에 메시지가 쌓이고, 한번에 더 많은 메시지를 전송할 수 있다
- 즉, 대기 시간을 주면 한 번에 보내는 메시지가 많아져서 전반적인 처리량이 높아진다
전송 결과 확인 안함
producer.send(new ProducerRecord<>("simple", "value"));
- 전송 실패를 알 수 없다
- 실패에 대한 별도 처리가 필요없는 메시지 전송에 사용한다
전송 결과 확인함 : Future 사용
Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic", "value"));
try {
RecordMetadata meta = f.get(); // 블로킹
} catch(ExecutionExcetpion ex) {
...
}
- send 메소드가 제공하는 Future 결과를 get() 을 통해 확인한다
- 이 때 get()을 사용하면 그 시점에서 블로킹이 발생한다
- 만약 루프를 돌면서 위 코드를 실행하면 하나의 메시지를 보내고 블로킹되는 방식으로 동작한다
- 이 때문에 배치에 메시지가 쌓이지 않는다
- 즉, 배치 효과가 떨어져서 처리량이 저하된다
- 처리량이 낮아도 전송 결과를 확실하게 해야하는 경우에만 사용한다
전송 결과 확인함: Callback 사용
producer.send(new ProducerRecord<>("simple", "value"),
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception ex) {
...
}
});
- send()에 callback 객체를 전달한다
- callback 객체는 전송이 완료되면 onCompletion() 메소드로 결과를 받는다
- 이 때 Exception 객체를 받으면 전송에 실패한 것이다
- 이는 블로킹 방식이 아니기 때문에 처리량 저하를 발생시키지 않는다
전송 보장과 ack
- 프로듀서는 전송 보장을 위해 ack 값을 사용한다
- ack = 0
- 서버 응답을 기다리지 않는다
- 전송 보장도 X
- 처리량은 높지만 메시지 유실 여부를 알 수 없다
- ack = 1
- 파티션의 리더에 저장되면 성공 응답을 받는다
- 리더 장애 시 메시지 유실 가능성이 있다
- 리더에 저장되고 팔로워에 복제되기 전 장애가 나면, 프로듀서는 성공이라고 판단하지만 다른 팔로워에는 복제 X
- 이후 다른 팔로워가 리더가 되면 해당 메시지는 유실된거임
- 엄격하게 전송 보장이 필요한 경우에는 사용 X
- ack = all (또는 -1)
- 모든 리플리카에 저장되는 성공 응답을 받는다
- 이 때 모든 리플리카의 기준은 브로커 min.insync.replicas 설정에 따라 달라진다
ack + min.insync.replicas
- min.insync.replicas (브로커 옵션)
- 프로듀서 ack 옵션이 all일 때 저장에 성공했다고 응답할 수 있는 동기화된 리플리카 최소 개수
- 예시
- 리플리카 개수 = 3 / ack = all / min.insync.replicas = 2
- 리더에 저장하고 팔로워 중 1개에 저장하면 성공 응답 (총 2개 저장이니까)
에러 유형
- 전송 과정에서 실패
- 전송 타임 아웃(일시적인 네트워크 오류 등)
- 리더 다운에 의한 새 리더 선출 진행 중 발생
- 브로커 설정 메시지 크기 한도 초과
- etc...
- 전송 전에 실패
- 직렬화 실패
- 프로듀서 자체 요청 크기 제한 초과
- 프로듀서 버퍼가 차서 기다린 시간이 최대 대기 시간 초과
- etc...
실패 대응 1 : 재시도
- 재시도
- 재시도 가능한 에러는 재시도 처리한다
- ex) 브로커 응답 타임 아웃, 일시적인 리더 없음 등
- 재시도 위치
- 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해서 재전송을 시도한다
- retries 속성
- send() 메서드에서 exception 발생 시, exception 타입에 따라 send() 재호출
- 콜백 메서드에서 exception 받으면 타입에 따라 send() 재호출
- 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해서 재전송을 시도한다
- 특별한 이유가 없다면 무한 재시도 X
실패 대응 2 : 기록
- 추후 처리를 위해 기록한다
- 실패한 메시지를 별도 파일이나 DB에 기록한다
- 추후에 수동(또는 자동) 보정 작업 진행
- 기록 위치
- send() 메서드에서 exception 발생 시
- send() 메서드에서 전달한 콜백에서 exception 받는 경우
- send() 메서드가 리턴한 Future의 get() 메서드에서 exception 발생 시
재시도와 메시지 중복 전송 가능성
- 브로커 응답이 늦게와서 재시도할 경우 중복 발송이 가능하다
- enable.idempotence 속성을 사용하면 중복 발송 방지가 가능할지도..?
재시도와 순서
→ max.in.flight.requests.per.connection=3
- max.in.flight.requests.per.connection
- 블로킹 없이 한 커넥션에서 전송할 수 있는 최대 전송중 요청 개수
- 만약 이 값이 1보다 크면 재시도 시점에 따라 메시지 순서가 발생할 수 있다 → 전송 순서가 중요하면 이 값을 1로 지정해야 한다
- 위의 예시에서 3으로 설정해서 3개를 순서에 상관없이 재전송함
- 따라서 사실상 카프카에 쌓이는 메시지 순서는 2 → 3 → 1 이 된다
https://www.youtube.com/watch?v=geMtm17ofPY
728x90
'야미스터디 > Backend' 카테고리의 다른 글
[Youtube] kafka 조금 아는 척 하기 1편 - 개발자용 (0) | 2024.08.21 |
---|---|
개발자가 알아야 할 인터페이스 개념💡 (0) | 2023.06.27 |
개발자로 성장하는 꿀팁 5가지🍯 (3) | 2022.11.07 |
[Etc] Docker 📌 (0) | 2022.10.11 |
[Etc] DTO 📌 (0) | 2022.09.28 |
댓글