首页  

kafka消息发送机制     所属分类 kafka 浏览量 110
发送数据过程涉及 序列化器Serializer 分区器Partitioner 消息缓存池Accumulator,还可能会涉及到拦截器Interceptor


//新版本Producer
org.apache.kafka.clients.producer.KafkaProducer<K,V>

//旧版本Producer
kafka.javaapi.producer.Producer<K,V>



旧版本连接 Zookeeper,新版本连接Broker


ProducerRecord
public class ProducerRecord< K, V> {
    private final String topic;  
    private final Integer partition;  
    private final Headers headers; 
    private final K key;    
    private final V value;  
    private final Long timestamp;  
}

headers 0.11.x 版本引入 , 可存储业务相关的信息

序列化与反序列化


props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

常见的序列化器

ByteArraySerializer  
ByteBufferSerializer  
BytesSerializer  
StringSerializer  
LongSerializer  
IntegerSerializer 
ShortSerializer  
DoubleSerializer 
FloatSerializer  


消息分区机制

分区器(Partitioner)
如果ProducerRecord中指定了partition字段,那就不需要分区器

默认分区器 
org.apache.kafka.clients.producer.internals.DefaultPartitioner 


    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);


The default partitioning strategy:
If a partition is specified in the record, use it
If no partition is specified but a key is present choose a partition based on a hash of the key
If no partition or key is present choose a partition in a round-robin fashion


partition 指定了直接使用
key 不为null   hash策略  Murmur2Hash
key 为null    轮询策略




    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

 
 
哈希策略与轮询策略
随机策略  

消息缓冲池
Accumulator   由Sender线程发往broker

缓冲池大小  buffer.memory  默认32M
消息发送过快导致buffer满了,将阻塞 max.block.ms 时间,超时抛异常

批量发送
批次大小 batch.size  16KB
linger.ms   空闲超过 该时间,也会被发送


kafka发送端核心参数说明

上一篇     下一篇
kafka之broker-list bootstrap-server 和 zookeeper

Kafka 和 RocketMQ 底层存储简单比较

kafka发送端核心参数说明

Kafka中的分区分配

linux OOM Killer 如何破

Kafka幂等和事务