kafka消息发送机制
所属分类 kafka
浏览量 1116
发送数据过程涉及 序列化器Serializer 分区器Partitioner 消息缓存池Accumulator,还可能会涉及到拦截器Interceptor
//新版本Producer
org.apache.kafka.clients.producer.KafkaProducer
//旧版本Producer
kafka.javaapi.producer.Producer
旧版本连接 Zookeeper,新版本连接Broker
ProducerRecord
public class ProducerRecord< K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
}
headers 0.11.x 版本引入 , 可存储业务相关的信息
序列化与反序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
常见的序列化器
ByteArraySerializer
ByteBufferSerializer
BytesSerializer
StringSerializer
LongSerializer
IntegerSerializer
ShortSerializer
DoubleSerializer
FloatSerializer
消息分区机制
分区器(Partitioner)
如果ProducerRecord中指定了partition字段,那就不需要分区器
默认分区器
org.apache.kafka.clients.producer.internals.DefaultPartitioner
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
The default partitioning strategy:
If a partition is specified in the record, use it
If no partition is specified but a key is present choose a partition based on a hash of the key
If no partition or key is present choose a partition in a round-robin fashion
partition 指定了直接使用
key 不为null hash策略 Murmur2Hash
key 为null 轮询策略
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
哈希策略与轮询策略
随机策略
消息缓冲池
Accumulator 由Sender线程发往broker
缓冲池大小 buffer.memory 默认32M
消息发送过快导致buffer满了,将阻塞 max.block.ms 时间,超时抛异常
批量发送
批次大小 batch.size 16KB
linger.ms 空闲超过 该时间,也会被发送
kafka发送端核心参数说明
上一篇
下一篇
kafka之broker-list bootstrap-server 和 zookeeper
Kafka 和 RocketMQ 底层存储简单比较
kafka发送端核心参数说明
Kafka中的分区分配
linux OOM Killer 如何破
Kafka幂等和事务