Kafka 프로듀서 완벽 가이드
프로듀서란?
Kafka 프로듀서는 메시지를 Kafka 클러스터의 토픽에 발행하는 클라이언트입니다. 프로듀서는 메시지를 직렬화하고, 파티션을 선택하며, 배치로 묶어서 효율적으로 전송합니다.
핵심 구현: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
메시지 발행 방법
Fire and Forget
메시지를 전송하고 결과를 신경 쓰지 않는 방식입니다. 가장 빠르지만 메시지 손실 가능성이 있습니다.
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key", "value");
producer.send(record); // 결과를 무시
특징:
- 가장 높은 처리량
- 메시지 손실 가능
- 네트워크 오류나 브로커 오류 시 재시도 불가
동기적으로 전송
Future.get()을 호출하여 전송이 완료될 때까지 대기합니다.
try {
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Topic: %s, Partition: %d, Offset: %d\n",
metadata.topic(), metadata.partition(), metadata.offset());
} catch (ExecutionException e) {
// 전송 실패 처리
System.err.println("Failed to send: " + e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
특징:
- 전송 성공 확인 가능
- 처리량이 낮음 (블로킹)
- 순서 보장 필요 시 유용
동작 흐름 (KafkaProducer.java:1081-1182):
- 메타데이터 대기 (토픽/파티션 정보)
- Key/Value 직렬화
- 파티션 선택
- RecordAccumulator에 버퍼링
- Sender 스레드가 배치 전송
- 응답 대기 (블로킹)
비동기적으로 전송
Callback을 등록하여 전송 완료 시 비동기로 처리합니다. 저희 프로젝트에서도 실제로 이 방법을 통래 처리하고 있습니다.
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// 실패 처리
System.err.println("Failed: " + exception.getMessage());
} else {
// 성공 처리
System.out.printf("Sent to partition %d, offset %d\n",
metadata.partition(), metadata.offset());
}
}
});
특징:
- 높은 처리량 (논블로킹)
- 전송 결과 확인 가능
- 가장 권장되는 방식
주의사항:
- Callback은 Sender I/O 스레드에서 실행되므로 빠르게 처리해야 함
- 무거운 작업은 별도 스레드로 위임
프로듀서 설정하기
주요 설정 값
필수 설정 (ProducerConfig.java):
| 설정 | 설명 | 기본값 |
|---|---|---|
bootstrap.servers |
브로커 주소 목록 | 필수 |
key.serializer |
Key 직렬화 클래스 | 필수 |
value.serializer |
Value 직렬화 클래스 | 필수 |
성능 설정:
| 설정 | 설명 | 기본값 |
|---|---|---|
batch.size |
배치 크기 (바이트) | 16384 (16KB) |
linger.ms |
배치 대기 시간 (밀리초) | 5ms |
buffer.memory |
전체 버퍼 메모리 | 33554432 (32MB) |
compression.type |
압축 타입 (none, gzip, snappy, lz4, zstd) | none |
max.request.size |
최대 요청 크기 | 1048576 (1MB) |
신뢰성 설정:
| 설정 | 설명 | 기본값 |
|---|---|---|
acks |
응답 대기 수준 (0, 1, all) | all |
retries |
재시도 횟수 | Integer.MAX_VALUE |
enable.idempotence |
멱등성 활성화 | true |
transactional.id |
트랜잭션 ID | null |
acks 설정:
0: 응답 대기 안 함 (빠르지만 손실 가능)1: Leader 기록 확인 (균형)all(또는-1): 모든 ISR 복제 확인 (가장 안전)
성능 튜닝 팁:
props.put("batch.size", 32768); // 배치 크기 증가 → 처리량 증가
props.put("linger.ms", 10); // 대기 시간 증가 → 배칭 효율 증가
props.put("compression.type", "lz4"); // 압축 활성화 → 네트워크 대역폭 절약
props.put("acks", "1"); // 응답 수준 낮춤 → 지연 시간 감소
시리얼라이저
시리얼라이저는 객체를 바이트 배열로 변환하는 역할을 합니다.
인터페이스: clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
public interface Serializer<T> extends Closeable {
// 설정 초기화
default void configure(Map<String, ?> configs, boolean isKey) {}
// 직렬화 수행
byte[] serialize(String topic, T data);
// 헤더 포함 직렬화
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
// 리소스 정리
default void close() {}
}
내장 시리얼라이저
Kafka는 기본적으로 다음 시리얼라이저를 제공합니다:
StringSerializer: 문자열IntegerSerializer: IntegerLongSerializer: LongByteArraySerializer: byte[] (그대로)ByteBufferSerializer: ByteBuffer
커스텀 시리얼라이저
복잡한 객체를 전송하려면 커스텀 시리얼라이저를 구현합니다:
public class UserSerializer implements Serializer<User> {
@Override
public byte[] serialize(String topic, User user) {
if (user == null) return null;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
out.writeInt(user.getId());
out.writeUTF(user.getName());
out.writeUTF(user.getEmail());
return baos.toByteArray();
} catch (IOException e) {
throw new SerializationException("Error serializing User", e);
}
}
}
파티셔너
파티셔너는 메시지를 어느 파티션으로 보낼지 결정합니다.
인터페이스: clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
public interface Partitioner extends Configurable, Closeable {
int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster);
void close();
}
파티션 선택 우선순위
KafkaProducer 내부 로직 (KafkaProducer.java:1576-1596):
1. ProducerRecord에 파티션 명시
→ 해당 파티션 사용
2. 커스텀 Partitioner 설정
→ 커스텀 로직 사용
3. Key 존재
→ BuiltInPartitioner.partitionForKey() 사용 (해시 기반)
4. Key 없음
→ RecordAccumulator가 Sticky Partitioning 적용
기본 파티셔너
Kafka 3.3+ 기본 파티셔너 (BuiltInPartitioner.java):
1) Key 기반 파티셔닝 (라인 329-331):
public static int partitionForKey(byte[] serializedKey, int numPartitions) {
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
- MurmurHash2 알고리즘으로 해싱
- 동일한 key는 항상 같은 파티션
- 파티션 개수 변경 시 매핑 변경됨
2) Adaptive Sticky Partitioning (Key 없을 때):
batch.size만큼 레코드가 쌓이면 파티션 전환- 브로커 성능에 따라 파티션 선택 비율 자동 조정 (KIP-794)
partitionLoadStats기반 부하 분산
장점:
- 배칭 효율 극대화 (동일 파티션에 연속 전송) 순서보장이 필요할 때 사용
- 브로커 성능 불균형 시 자동 조정
- 처리량 향상
커스텀 파티셔너
특정 비즈니스 로직이 필요하면 커스텀 파티셔너를 구현합니다:
public class RegionPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
String region = extractRegion(value);
int numPartitions = cluster.partitionCountForTopic(topic);
// region별로 파티션 할당
return Math.abs(region.hashCode()) % numPartitions;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
설정:
props.put("partitioner.class", "com.example.RegionPartitioner");
인터셉터
인터셉터는 메시지 전송 전후에 실행처리를 할 수 있는 인터페이스입니다.
인터페이스: clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
// 전송 전 호출 (직렬화 전)
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
// 전송 후 호출 (성공/실패)
void onAcknowledgement(RecordMetadata metadata, Exception exception);
void close();
}
사용 예시
메트릭 수집 인터셉터:
public class MetricsInterceptor implements ProducerInterceptor<String, String> {
private AtomicLong sentCount = new AtomicLong(0);
private AtomicLong ackCount = new AtomicLong(0);
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
sentCount.incrementAndGet();
// 타임스탬프 추가
record.headers().add("send-time",
String.valueOf(System.currentTimeMillis()).getBytes());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
ackCount.incrementAndGet();
}
// 메트릭 전송 (빠르게 처리!)
System.out.printf("Sent: %d, Acked: %d\n", sentCount.get(), ackCount.get());
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
설정:
props.put("interceptor.classes",
"com.example.MetricsInterceptor,com.example.AuditInterceptor");
인터셉터 체인
여러 인터셉터를 등록하면 순서대로 실행됩니다:
ProducerRecord 생성
↓
Interceptor1.onSend()
↓
Interceptor2.onSend()
↓
Serializer (Key/Value)
↓
Partitioner
↓
RecordAccumulator
↓
Sender 전송
↓
Interceptor2.onAcknowledgement()
↓
Interceptor1.onAcknowledgement()
↓
User Callback
주의사항:
onAcknowledgement()는 I/O 스레드에서 실행 → 빠르게 처리 필수- 예외 발생해도 프로듀서 동작은 계속됨
- 멀티스레드 안전성 보장 필요
전체 흐름 요약
┌──────────────────────────────────────────────────────┐
│ KafkaProducer 메시지 발행 흐름 │
├──────────────────────────────────────────────────────┤
│ │
│ 1. send() 호출 │
│ ├─ ProducerInterceptor.onSend() │
│ └─ 레코드 전처리 │
│ │
│ 2. 메타데이터 대기 │
│ └─ 토픽/파티션 정보 조회 │
│ │
│ 3. 직렬화 │
│ ├─ Key Serializer │
│ └─ Value Serializer │
│ │
│ 4. 파티션 선택 │
│ ├─ 레코드에 파티션 명시? │
│ ├─ 커스텀 Partitioner? │
│ ├─ Key 있음? → Hash 기반 │
│ └─ Key 없음? → Sticky Partitioning │
│ │
│ 5. RecordAccumulator 버퍼링 │
│ └─ 배치로 묶기 (batch.size, linger.ms) │
│ │
│ 6. Sender 스레드 전송 │
│ ├─ 브로커로 네트워크 전송 │
│ └─ 응답 대기 (acks 설정) │
│ │
│ 7. 완료 처리 │
│ ├─ ProducerInterceptor.onAcknowledgement() │
│ └─ User Callback 호출 │
│ │
└──────────────────────────────────────────────────────┘
참고 자료
'BE > 카프카' 카테고리의 다른 글
| Spring kafka의 배치 리스너와 레코드 리스너 비교 (0) | 2025.11.21 |
|---|---|
| 카프카 컨슈머 설정별 성능 측정해보기 (0) | 2025.11.19 |
| 카프카 컨슈머 프로토콜 비교하기 ClassicKafkaConsumer vs AsyncKafkaConsumer (0) | 2025.11.17 |