Spring Kafka에서는 컨슈머가 가져온 메시지를 처리하는 방식에 따라 배치 리스너와 레코드 리스너로 구분할 수 있습니다.
레코드 리스너 (Record Listener)
하나의 메시지를 받아서 처리하는 방식
- poll()로 가져온 배치를 내부적으로 루프 돌며 개별 처리하는 방식
@KafkaListener(topics = "my-topic", groupId = "my-group") public void listen(String message) { // 메시지 하나씩 처리 System.out.println("받은 메시지: " + message); }
배치 리스너 (Batch Listener)
여러 메시지를 한 번에 받아서 처리하는 방식
poll()로 가져온 배치를 그대로 변환해 처리하는 방식
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(List<String> messages) {
// 배치 전체 처리
System.out.println("배치 크기: " + messages.size());
messages.forEach(msg -> System.out.println("메시지: " + msg));
}
레코드 리스너 내부 동작 흐름
KafkaMessageListenerContainer.run() 메서드에 내부 동작을 까라가다 보면 해당 메서드를 발견할 수 있습니다.
private void invokeListener(final ConsumerRecords<K, V> records) {
if (this.isBatchListener) {
invokeBatchListener(records);
}
else {
invokeRecordListener(records);
}
}레코드 리스너 먼저 살펴보면 이후에 트랜잭션 유무로 한번 더 분기문을 타게되고 결과 적으로는 아래 처럼 while 문을 돌면서 인터셉터와 리스너를 통해 메시지가 하나씩 처리 되는것을 확인할 수 있습니다.
private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
if (this.stopImmediate && !isRunning()) {
break;
}
final ConsumerRecord<K, V> cRecord = checkEarlyIntercept(iterator.next());
if (cRecord == null) {
continue;
}
this.logger.trace(() -> "Processing " + KafkaUtils.format(cRecord));
doInvokeRecordListener(cRecord, iterator);배치 리스너 내부 동작 흐름
private void invokeBatchListener(final ConsumerRecords<K, V> recordsArg) {
ConsumerRecords<K, V> records = checkEarlyIntercept(recordsArg); // 인터셉터 처리
if (records == null || records.count() == 0) {
return;
}
List<ConsumerRecord<K, V>> recordList = new ArrayList<>();
if (!this.wantsFullRecords) { // 리스너의 메서드 시그니처가 List<Message>, ConsumerRecords 에 따라 레코드 변환
recordList = createRecordList(records);
}
if (this.wantsFullRecords || !recordList.isEmpty()) {
if (this.transactionTemplate != null) {
invokeBatchListenerInTx(records, recordList, this.transactionTemplate);
}
else {
doInvokeBatchListener(records, recordList);
}
}
}BatchInterceptor 전처리 (checkEarlyIntercept) -> 리스너 타입 확인 -> 트랜잭션 설정 확인 -> 리스너에서 메시지 처리 과정을 통해 메시지가 처리됨을 알 수 있습니다.
doInvokeBatchListener 메서드를 좀 더 들여다 보면
private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> records,
List<ConsumerRecord<K, V>> recordList) {
try {
// 배치 전체를 한 번에 처리
invokeBatchOnMessage(records, recordList);
if (this.batchFailed) {
this.batchFailed = false;
if (this.commonErrorHandler != null) {
this.commonErrorHandler.clearThreadState();
}
getAfterRollbackProcessor().clearThreadState();
}
// 배치 전체 커밋
if (!this.autoCommit && !this.isRecordAck) {
processCommits();
}
}
catch (RuntimeException e) {
// 에러 핸들러가 없으면 예외 던짐
if (this.commonErrorHandler == null) {
throw e;
}
try {
// 배치 에러 핸들러 호출
invokeBatchErrorHandler(records, recordList, e);
commitOffsetsIfNeededAfterHandlingError(records);
}
catch (RecordInRetryException rire) {
this.logger.info("Record in retry and not yet recovered");
return rire; // 재시도 필요
}
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
return ee;
}
catch (Error er) {
this.logger.error(er, "Error handler threw an error");
throw er;
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
}예외 처리 메커니즘
레코드 리스너: 개별 레코드 에러 핸들링
// doInvokeRecordListener() 내부
try {
invokeOnMessage(cRecord); // 리스너 메서드 호출
successTimer(sample, cRecord);
recordInterceptAfter(cRecord, null);
}
catch (RuntimeException e) {
failureTimer(sample, cRecord, e);
recordInterceptAfter(cRecord, e);
if (!isListenerAdapterObservationAware()) {
observation.error(e);
}
if (this.commonErrorHandler == null) {
throw e; // 에러 핸들러 없으면 예외 전파
}
try {
// 개별 레코드 에러 핸들러
invokeErrorHandler(cRecord, iterator, e);
commitOffsetsIfNeededAfterHandlingError(cRecord);
}invokeErrorHandler(cRecord, iterator, e) 을 살펴보면 seeksAfterHandling() = true | false에 따라 Seek 후 재처리(DLT or 재시도)를 하거나 개별 레코드 에러 핸들링을 진행하게 됩니다. (코드가 길어서 코드는 생략합니다.)
포인트
- 개별 레코드 실패가 전체 배치를 막지 않음
- 성공한 레코드는 먼저 커밋
- 실패한 레코드만 재시도
- 나머지 레코드는 다음 사이클에 처리
배치 리스너: 전체 배치 에러 핸들링
// invokeBatchErrorHandler
if (Objects.requireNonNull(this.commonErrorHandler).seeksAfterHandling()
|| this.transactionManager != null
|| rte instanceof CommitFailedException) {
// 전체 배치 재처리 (seek back)
this.commonErrorHandler.handleBatch(
rte,
records,
this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer,
() -> invokeBatchOnMessageWithRecordsOrList(records, list) // 재시도 콜백
);
}
else {
// 실패한 레코드부터 재처리
ConsumerRecords<K, V> afterHandling =
this.commonErrorHandler.handleBatchAndReturnRemaining(
rte,
records,
this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer,
() -> invokeBatchOnMessageWithRecordsOrList(records, list)
);
if (afterHandling != null && !afterHandling.isEmpty()) {
this.remainingRecords = afterHandling; // 남은 레코드 저장
this.pauseForPending = true; // 다음 사이클에 재처리
}
}포인트:
- 하나의 레코드 실패가 전체 배치에 영향
- 성공한 레코드도 재시도마다 중복 처리
- 중복 처리 방지 로직 필요 (멱등성)
- 처리 순서가 중요한 경우 문제 발생 가능
참고 자료
'BE > 카프카' 카테고리의 다른 글
| 카프카 컨슈머 설정별 성능 측정해보기 (0) | 2025.11.19 |
|---|---|
| Kafka Producer 파헤치기 (0) | 2025.11.18 |
| 카프카 컨슈머 프로토콜 비교하기 ClassicKafkaConsumer vs AsyncKafkaConsumer (0) | 2025.11.17 |