一、序言
Scala 来同时实现的,在采用 Kafka 时,Client 是使用者种叠碰触到的部份,因而,他们从 Client 端已经开始,林美珠从 Producer 端已经开始,那时他们就来对 Producer 源标识符导出一番。
二、Producer 采用
具体内容来说他们先透过几段标识符来展现 KafkaProducer 的采用方式。在下面的实例中,他们采用 KafkaProducer 同时实现向 Kafka 推送最新消息的机能。在实例流程中,具体内容来说将 KafkaProduce 采用的实用性载入
到 Properties 中,每一项实用性的具体内容涵义在注解中展开说明。后以 Properties 第一类为模块内部结构 KafkaProducer 第一类,最终透过 send 方式顺利完成推送,标识符中包涵并行推送、触发器推送三种情形。从下面的标识符能窥见 Kafka 为使用者提供更多了十分简约方便快捷的 API,在采用时,只须要如下表所示三步:
初始化 KafkaProducer 实例初始化 send USB推送数据责任编辑主要就是紧紧围绕着初始化 KafkaProducer 实例与怎样同时实现 send USB推送数据而展开的。
三、KafkaProducer 实例化
介绍了 KafkaProducer 的基本上采用,接着他们来深入细致介绍下方式核心理念方法论:
public KafkaProducer(Properties properties) { this(Utils.propsToMap(properties), (Serializer)null, (Serializer)null, (ProducerMetadata)null, (KafkaClient)null, (ProducerInterceptors)null, Time.SYSTEM); }四、最新消息推送过程
使用者是直接采用 producer.send() 推送的数据,先看一下 send() USB的同时实现
// 触发器向一个 topic 推送数据 public Future<RecordMetadata> send(ProducerRecord<K, V> record) { return this.send(record, (Callback)null); } // 向 topic 触发器地推送数据,当推送确认后唤起回调函数 publicFuture<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { ProducerRecord<K, V> interceptedRecord =this.interceptors.onSend(record);return this.doSend(interceptedRecord, callback); }数据推送的最终同时实现还是初始化了 Producer 的 doSend() USB。
4.1 拦截器
具体内容来说方式林美珠进入拦截器集合 ProducerInterceptors , onSend 方式是遍历拦截器 onSend 方 法,拦截器的目的是将数据处理加工, Kafka 本身并没有给出默认的拦截器的同时实现。如果须要采用拦截器机能,必须自己同时实现USB。
4.1.1 拦截器标识符
4.1.2 拦截器核心理念方法论
ProducerInterceptor USB包括三个方式:
onSend(ProducerRecord var1):该方式封装进 KafkaProducer.send 方式中,即它运行在使用者主线程中的。 确保在最新消息被序列化以计算分区前初始化该方式。使用者能在该方式中对最新消息做任何操作,但最好保证不要修改最新消息所属的 topic 和分区,否则会影响目标分区的计算。onAcknowledgement(RecordMetadata var1, Exception var2):该方式会在最新消息被应答之前或最新消息推送失败时初始化,并且通常都是在 producer 回调方法论触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因而不要在该方式中放入很重的方法论,否则会拖慢 producer 的最新消息推送效率。close():关闭 interceptor,主要就用于执行一些资源清理工作。拦截器可能被运行在多个线程中,因而在具体内容同时实现时使用者须要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序初始化它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。
4.2 Producer 的 doSend 同时实现
下面是 doSend() 的具体内容同时实现:
在 doSend() 方式的同时实现上,一条 Record 数据的推送,主要就分为以下五步:
确认数据要推送到的 topic 的 metadata 是可用的(如果该序列化 record 的 key 和 value;向 accumulator 中追加 record 数据,数据林美珠展开缓存;如果追加完数据后,对应的 RecordBatch 已经达到了 batch.size 的大小(或者 batch 的剩余空间不足以添加下一条 Record),则唤醒 sender 线程推送数据。数据的推送过程,能简单总结为以上五点,下面会这几部份的具体内容同时实现展开详细分析。
五、最新消息推送过程
Producer 透过 waitOnMetadata() 方式来获取对应 topic 的 metadata 信息,这块内容我下一篇再来讲。
5.2 key 和 value 的序列化
Producer 端对 record 的 key 和 value 值展开序列化操作,在 Consumer 端再展开相应的反序列化,Kafka 内部提供更多的序列化和反序列化算法如下表所示图所示:
当然他们也是能自定义序列化的具体内容同时实现,不过一般情形下,Kafka 内部提供更多的这些方式已经足够采用。
5.
指明 partition 的情形下,直接将指明的值直接作为 partiton 值;没有指明 partition 值但有 key 的情形下,将 key 的 hash 值与 topic 的 partition 数展开取余得到 partition 值;既没有 partition 值又没有 key 值的情形下,第一次初始化时随机生成一个整数(后面每次初始化在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。具体内容同时实现如下表所示:
// 当 record 中有 partition 值时,直接返回,没有的情形下初始化 partitioner 的类的 partition 方式去计算(KafkaProducer.class) private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }Producer 默认采用的 partitioner 是
org.apache.kafka.clients.producer.internals.DefaultPartitioner,使用者也能自定义 partition 的策略,下面是默认分区策略具体内容同时实现:public int partition(String topic, Object key, byte[] keyBytes, Objectvalue, byte[] valueBytes, Cluster cluster) { return this.partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size()); }public int partition(String topic, Object key,byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) { return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }下面这个默认算法核心理念就是粘着分区缓存
5.4 向 RecordAccmulator 中追加 record 数据
他们讲 RecordAccumulator 之前先看这张图,这样的话会对整个推送流程有个大局观。
RecordAccmulator 承担了缓冲区的角色。默认是 32 MB。
在 Kafka Producer 中,最新消息不是一条一条发给 broker 的,而是多条最新消息组成一个 ProducerBatch,接着由 Sender 一次性发出去,这里的 batch.size 并不是最新消息的条数(凑满多少条即推送),而是一个大小。默认是 16 KB,能根据具体内容情形来展开优化。
在 RecordAccumulator 中,最核心理念的模块就是:
private finalConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;它是一个 ConcurrentMap,key 是 TopicPartition 类,代表一个 topic 的一个 partition。value 是一个包涵 ProducerBatch 的双端队列。等待 Sender 线程推送给 broker。画张图来看下:
下面的标识符不知道大家有没有疑问?分配内存的标识符为啥不在 synchronized 并行块中分配?导致下面的 synchronized 并行块中还要 tryAppend 一下。
因为这时候可能其他线程已经创建好 ProducerBatch 了,造成多余的内存申请。
如果把分配内存放在 synchronized 并行块会有什么问题?
内存申请不到线程会一直等待,如果放在并行块中会造成一直不释放 Deque 队列的锁,那其他线程将无法对 Deque 队列展开线程安全的并行操作。
再跟下 tryAppend() 方式,这就比较简单了。
以上标识符见图解:
5.5 唤醒 sender 线程推送 ProducerBatch
当 record 载入成功后,如果发现 ProducerBatch 已满足推送的条件(通常是 queue 中有多个 batch,那么种叠添加的那些 batch 肯定是能推送了),那么就会唤醒 sender 线程,推送 ProducerBatch。
sender 线程对 ProducerBatch 的处理是在 run() 方式中展开的,该方式具体内容同时实现如下表所示:
其中比较核心理念的方式是 run() 方式中的
org.apache.kafka.clients.producer.internals.Sender#sendProducerData其中 pollTimeout 意思是最长阻塞到至少有一个通道在你注册的事件就绪了。返回 0 则表示走起发车了。
他们继续跟下:
org.apache.kafka.clients.producer.internals.RecordAccumulator#ready最终再来看下里面这个方式
org.apache.kafka.clients.producer.inte六、总结
最终为了让你对 Kafka Producer 有个宏观的架构理解,请看下图:
简要说明:
new KafkaProducer() 后创建一个后台线程 KafkaThread (实际运行线程是 Sender,KafkaThread 是对 Sender 的封装) 扫描 RecordAccumulator 中是否有最新消息。初始化 KafkaProducer.send() 推送最新消息,实际是将最新消息保存到 RecordAccumulator 中,实际上就是保存到一个 Map 中 (ConcurrentMap>),这条最新消息会被记录到同一个记录批次 (相同主题相同分区算同一个批次) 里面,这个批次的所有最新消息会被推送到相同的主题和分区上。后台的独立线程扫描到 RecordAccumulator 中有最新消息后,会将最新消息推送到 Kafka 集群中 (不是一有最新消息就推送,而是要看最新消息是否 ready)如果推送成功 (最新消息成功载入 Kafka), 就返回一个 RecordMetaData 第一类,它包括了主题和分区信息,以及记录在分区里的偏移量。如果载入失败,就会返回一个错误,生产者在收到错误后会尝试重新推送最新消息 (如果允许的话,此时会将最新消息在保存到 RecordAccumulator 中),几次后如果还是失败就返回错误最新消息。好了,责任编辑对 Kafka Producer 源标识符展开了导出,下一篇文章将会详细介绍 metadata 的内容以及在 Producer 端 metadata 的更新机制。敬请期待~