Kafka生产者

Kafka生产者,第1张

当我们使用Kafka作为异步消息队列之后,首当其冲的两个问题:消息的来源和消息的去除,映入我们的脑海。进而延伸出很多我们需要考虑的问题,这里引用《Kafka权威指南》的原文:

这里就需要我们着手对Kafka生产者组件进行研究,学着怎么自己用生产者API构造一个Java Client类。

一次数据的生产大概要经历这么一个流程:

1.创建ProducerRecord对象,对象内指定目标主题和发送内容,同时还可以指定键和要发送到的分区

2.创建定制的序列化器或使用现有的序列化器,Kafka支持的序列化协议有JSON、Protobuf、arvo等,其中arvo是Kafka本身支持的定制化协议格式。

3.对象通过序列化器序列化后,会被发往指定分区。如果ProducerRecord对象未指定分区,则交由分区器根据对象的键来选择一个分区。

4.接着数据会被传到一个记录批次里,这个批次的数据会被发往相同的topic和Partition

5.Kafka服务器处理上面的发送请求,同时抛出成功与否的响应消息,如果失败则重试。

1.1 bootstrap.servers:broker 的地址清单,地址的格式为host:port,至少提供两个broker信息

1.2 key.serializer 和 value.serializer:key和value的序列化方式,必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类.Kafka 客户端默认提供了ByteArraySerializer(这个只做很少的事情)、StringSerializer和IntegerSerializer,因此,如果你只使用常见的几种Java 对象类型,那么就没必要实现自己的序列化器。

1.3 acks:acks=0,表示生产者在成功写入悄息之前不会等待任何来自服务器的响应。acks=1,表示只要集群的首领节点收到消息,生产者就会收到 一个来自服务器的成功响应。acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。

延迟比较:acks=0<acks=1<acks=ALL

安全比较:acks=ALL>acks=1>acks=0

1.4 buffer.memory:生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。

1.5 compresision.type:使用哪种压缩算法

1.6 retries:重试次数

1.7 batch.size:当有多个消息需要被发送到同一个分区时,生产者会把它们放在罔一个批次里。该参数指定了一个批次可以使用的内存大小。

1.8 max.in.flight.requests.per.connection:该参数指定了生产者在收到服务器晌应之前可以发送多少个批次的消息。一般设置为1,可以保证批次写入消息是有序的

2.1 同步发送:发送并等待服务端响应。

2.2 异步发送:这里还实现了一个回调函数,等待服务端响应时处理。

2.3 发送并忘记:最简单的同步发送方式。

第二段我们提到了如果使用Kafka默认的序列化器,比如JSON 、Avro 、Thrift或Protobuf,则没必要实现序列化器。但是默认的序列化器并不能满足大部分场景的需求,我们可以实现自定义的序列化器类。

此前我们提到,一个ProducerRecord除了包含目标主题和发送内容,还可以设置键值。这个键值既可以作为附加信息,又可以决定消息被发送到哪个分区。对于分区策略,Kafka默认的分区策略是轮询算法,当然也可以实现自定义分区策略。

参考资料:《Kafka权威指南》

一.概念&原理

[if !supportLists]1. [endif]主题(topic):主题是对消息的分类。

[if !supportLists]2. [endif]消息(message):消息是kafka通信的基本单位。

[if !supportLists]3. [endif]分区(partition): 一组 消息对应 一个 主题, 一个 主题对应 一个或多个 分区。每个分区为一系列有序消息组成的 有序队列 ;每个分区在物理上对应一个文件夹。

[if !supportLists]4. [endif]副本(replica):每个分区有 一个或多个 副本,分区的副本分布在集群的 不同 代理(机器)上,以提高可用性;分区的副本与日志对象是一一对应的。

[if !supportLists]5. [endif]Kafka只保证一个 分区内 的消息 有序性 ,不保证跨分区消息的有序性。消息被追加到相应分区中, 顺序写入磁盘 ,效率非常高。

[if !supportLists]6. [endif]Kafka选取某个某个分区的 一个 副本作为leader副本,该分区的 其他 副本为follower副本。 只有leader副本负责处理客户端读/写请求 ,follower副本从leader副本同步数据。

[if !supportLists]7. [endif]任何发布到分区的消息都会追加到日志文件的尾部, 每条消息 在日志文件中的 位置 都对应一个 按序递增的偏移量 ;偏移量在一个分区下严格有序。

[if !supportLists]8. [endif]Kafka不允许对消息进行随机读写。

[if !supportLists]9. [endif]新版消费者将 消费偏移量 保存到kafka内部的一个主题中。

[if !supportLists]10. [endif]Kafka集群由 一个或多个代理 (Broker,也称为kafka实例)构成。可以在 一台 服务器上配置 一个或多个代理 ,每个代理具有唯一标识broker.id。

[if !supportLists]11. [endif]生产者将消息 发送给代理 (Broker)。

[if !supportLists]12. [endif]消费者以 拉取 (pull)方式拉取数据,每个消费者都属于一个消费组。

[if !supportLists]13. [endif]同一个主题的一条消息只能被 同一个消费组 下的某一个消费者消费,但 不同消费组 的消费者可以 同时 消费该消息。

[if !supportLists]14. [endif]消息 广播 :指定各消费者属于不同消费组;消息 单播 :指定各消费者属于同一个消费组。

[if !supportLists]15. [endif]Kafka启动时在Zookeeper上创建相应节点来保存 元数据 ,元数据包括:代理节点信息、集群信息、主题信息、分区状态信息、分区副本分配方案、动态配置等;

[if !supportLists]16. [endif]Kafka通过 监听 机制在节点注册监听器来监听节点元数据变化;

[if !supportLists]17. [endif]Kafka将数据写入 磁盘 ,以文件系统来存数据;

[if !supportLists]18. [endif]生产环境一般将zookeeper集群和kafka集群 分机架 部署;

[if !supportLists]二.[endif] Kafka Producer

配置:

/**

 * xTestProxy——KafkaConfigConstant

 *

 * @author  ZhangChi

 * @date  2018年6月20日---下午5:50:44

 * @version  1.0

 */

public   class  KafkaConfigConstant {

public   static   final  String KAFKA_CLUSTER  = "fa-common1.hangzhou-1.kafka.internal.lede.com:9200,fa-common2.hangzhou-1.kafka.internal.lede.com:9200,fa-common3.hangzhou-1.kafka.internal.lede.com:9200"

}

生产者配置:

/**

 * xTestProxy——HttpKafkaProducerFactory

 *

 * @author  ZhangChi

 * @date  2018年6月11日---下午2:37:51

 * @version  1.0

 */

public   class  HttpKafkaProducerFactory {

// 真正的KafkaProducer仅有一份

private   static  KafkaProducer kafkaProducer  = null

private   static  Properties property

public   static  KafkaProducer getKafkaProducer() {

if  ( kafkaProducer  == null ) {

synchronized  (HttpKafkaProducerFactory. class ) {

if  ( kafkaProducer  == null ) {

property  = buildKafkaProperty ()

kafkaProducer  = new  KafkaProducer( property )

}

}

}

return   kafkaProducer

}

public   static  Properties buildKafkaProperty() {

Properties props = new  Properties()

props.put(ProducerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER )

props.put(ProducerConfig. ACKS_CONFIG , "all")

props.put(ProducerConfig. RETRIES_CONFIG , 0)

props.put(ProducerConfig. BATCH_SIZE_CONFIG , 16384)

props.put(ProducerConfig. BUFFER_MEMORY_CONFIG , 33554432)

props.put(ProducerConfig. LINGER_MS_CONFIG , 1)

props.put(ProducerConfig. KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")

props.put(ProducerConfig. VALUE_SERIALIZER_CLASS_CONFIG ,

"org.apache.kafka.common.serialization.StringSerializer")

return  props

}

}

生产者线程组:

/**

 * xTestProxy——HttpKafkaProducerThread

 * 多线程每次new一个实例

 *

 * @author  ZhangChi

 * @date  2018年6月25日---下午2:09:39

 * @version  1.0

 */

public   class  HttpKafkaProducerThread implements  Runnable {

private   static  Logger logger  = LoggerFactory. getLogger ("HttpKafkaProducerThread")

private   final  String KAFKA_TOPIC = KafkaConstant. HTTP_REQ_RESP_TOPIC

private  String kafkaMessageJson

private  KafkaProducer producer

public  String messageType

public  String originalMessage

private   static  KafkaMessage kafkaMessage  = new  KafkaMessage()

public  HttpKafkaProducerThread(KafkaProducer producer, String messageType, String originalMessage) {

this .producer = producer

this .messageType = messageType

this .originalMessage = originalMessage

}

@Override

public   void  run() {

// TODO  Auto-generated method stub

/* 1.构建kafka消息*/

kafkaMessageJson = generateKafkaMessage( this .messageType, this .originalMessage)

/* 2.发送kafka消息*/

if  (kafkaMessageJson != null  &&!StringUtils. isEmpty (kafkaMessageJson)) {

logger .info("create message start:" + kafkaMessageJson)

producer.send( new  ProducerRecord( this .KAFKA_TOPIC, kafkaMessageJson))

} else  {

logger .info("kafkaMessageJson is null!")

}

}

private  String generateKafkaMessage(String messageType, String originalMessage) {

if  (StringUtils. isBlank (messageType) || StringUtils. isBlank (originalMessage)) {

return   null

}

kafkaMessage .setMessageId(KafkaMessageUtils. generateId ())

kafkaMessage .setMessageTime(KafkaMessageUtils. generateTime ())

kafkaMessage .setMessageType(messageType)

kafkaMessage .setMessage(originalMessage)

String kafkaMessageToJson = null

try  {

kafkaMessageToJson = KafkaMessageUtils. objectToJson ( kafkaMessage )

} catch  (JsonProcessingException e) {

// TODO  Auto-generated catch block

e.printStackTrace()

}

kafkaMessageJson = kafkaMessageToJson

return  kafkaMessageToJson

}

}

[if !supportLists]三.[endif] Kafka Consumer

消费者配置:

private   static  Properties buildKafkaProperty() {

Properties properties = new  Properties()

// 测试环境kafka的端口号是9200

properties.put(ConsumerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER )

// 消费组名称

properties.put(ConsumerConfig. GROUP_ID_CONFIG , KafkaConfigConstant. GROUP_ID )

properties.put(ConsumerConfig. CLIENT_ID_CONFIG , "test")

// 从头消费

properties.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG , "earliest")

// 自动提交偏移量

properties.put(ConsumerConfig. ENABLE_AUTO_COMMIT_CONFIG , "true")

// 时间间隔1s

properties.put(ConsumerConfig. AUTO_COMMIT_INTERVAL_MS_CONFIG , "1000")

properties.put(ConsumerConfig. KEY_DESERIALIZER_CLASS_CONFIG ,

"org.apache.kafka.common.serialization.StringDeserializer")

properties.put(ConsumerConfig. VALUE_DESERIALIZER_CLASS_CONFIG ,

"org.apache.kafka.common.serialization.StringDeserializer")

return  properties

}

消费者线程组:

/**

 * AnalysisEngine——HttpKafkaConsumerGroup

 *

 * @author  ZhangChi

 * @date  2018年6月11日---下午6:20:47

 * @version  1.0

 */

@Service("httpKafkaConsumerGroup")

public   class  HttpKafkaConsumerGroup {

@Autowired

private  RequestAnalyzer requestAnalyzer

@Autowired

private  EsDocumentServiceImpl esDocumentServiceImpl

@Autowired

private  AnalysisEngineClient analysisEngineClient

@Autowired

private  MongoTemplate mongoTemplate

private  List httpKafkaConsumerList = new  ArrayList()

public   void  initHttpKafkaConsumerGroup( int  consumerNumber, RunModeEnum mode) {

for  ( int  i = 0i <consumerNumberi++) {

/**

 * 将注入的服务当做构造参数,这样保证每个子线程都能拿到服务实例而不是空指针!

 */

HttpKafkaConsumer consumerThread = new  HttpKafkaConsumer(requestAnalyzer, esDocumentServiceImpl, mode, analysisEngineClient, mongoTemplate)

httpKafkaConsumerList.add(consumerThread)

}

}

public   void  consumeGroupStart() {

for  (HttpKafkaConsumer item : httpKafkaConsumerList) {

LogConstant. runLog .info("httpKafkaConsumerList size : " + httpKafkaConsumerList.size())

Thread consumerThread = new  Thread(item)

consumerThread.start()

}

}

}

先逐个初始化消费者实例,然后将这些消费者加入到消费组列表中。消费组启动后,会循环产生消费者线程。

 

Kafka最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大特性就是可以实时处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低时延的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务水平和最关键指标之一。

基本工作流程如上图所示,其中:

我们看上面的架构图中,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据的时候 永远的找leader ,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下图:

发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证 同一分区 内的数据是有序的!写入示意图如下:

上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:

熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:

保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为 0 1 all

最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。

Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。

前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:

无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?

需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!

消息存储在log文件后,消费者就可以进行消费了。在讲消息队列通信的两种模式的时候讲到过点对点模式和发布订阅模式。Kafka采用的是点对点的模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是 找leader 去拉取。

多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!如下图:

图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议 消费者组的consumer的数量与partition的数量一致

kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化是非常艰难的.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升.

除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技巧对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker对于consumer端也是一样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定.对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy).

其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑.可以将任何在网络上传输的消息都经过压缩.kafka支持gzip/snappy等多种压缩方式

kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的节点信息). 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".

异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失。

其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.由此可见,consumer客户端也很轻量级。

kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不仅提高了consumer端的灵活性,也适度的减轻了broker端设计的复杂度这是和众多JMS prodiver的区别.此外,kafka中消息ACK的设计也和JMS有很大不同,kafka中的消息是批量(通常以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK.或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险.

Kafka提供3种消息传输一致性语义:最多1次,最少1次,恰好1次。

最少1次:可能会重传数据,有可能出现数据被重复处理的情况

最多1次:可能会出现数据丢失情况

恰好1次:并不是指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的情况。

at most once: 消费者fetch消息,然后保存offset,然后处理消息当client保存offset之后,但是在消息处理过程中consumer进程失效(crash),导致部分消息未能继续处理.那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理),这就是"at most once".

at least once: 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常或者consumer失效,导致保存offset操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once".

"Kafka Cluster"到消费者的场景中可以采取以下方案来得到“恰好1次”的一致性语义:

最少1次+消费者的输出中额外增加已处理消息最大编号:由于已处理消息最大编号的存在,不会出现重复处理消息的情况。

kafka中,replication策略是基于partition,而不是topickafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有)备份的个数可以通过broker配置文件来设定。leader处理所有的read-write请求,follower需要和leader保持同步.Follower就像一个"consumer",消费消息并保存在本地日志中leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.

选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader.

每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容"每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为"最小offset".kafka.例如"00000000000.kafka"其中"最小offset"表示此segment中起始消息的offset.

获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可.

kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)

Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.

Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.

Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息此作用主要是为了"负载均衡".一个group中的多个consumer可以交错的消费一个topic的所有partitions简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.

Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.

Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset.此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.

Partition Owner registry: 用来标记partition正在被哪个consumer消费.临时znode。此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)

当consumer启动时,所触发的操作:

A) 首先进行"Consumer id Registry"

B) 然后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其他consumer的"leave"和"join"只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).

C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活情况如果broker列表变更,将会触发所有的groups下的consumer重新balance.

总结:

Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。

如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader.但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader.必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leader.Kafka并不是使用这种方法。

Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。

一个邪恶的想法:如果所有节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦所有节点都down了,这个就不能保证了。

实际应用中,当所有的副本都down掉时,必须及时作出反应。可以有以下两种选择:

这是一个在可用性和连续性之间的权衡。如果等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的节点恢复,这个节点的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。

这种窘境不只Kafka会遇到,几乎所有的分布式数据系统都会遇到。

以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的leader.

优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.

对于某个分区来说,保存正分区的"broker"为该分区的"leader",保存备份分区的"broker"为该分区的"follower"。备份分区会完全复制正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采取的方案是在保存备份分区的"broker"上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。一般情况下,一个分区有一个“正分区”和零到多个“备份分区”。可以配置“正分区+备份分区”的总数量,关于这个配置,不同主题可以有不同的配置值。注意,生产者,消费者只与保存正分区的"leader"进行通信。

Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topic配置副本的数量。Kafka会自动在每个副本上备份数据,所以当一个节点down掉时数据依然是可用的。

Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。

创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers.所有的读写操作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。

许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:

符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是“太久”,是由参数replica.lag.max.messages决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。

只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。Producer也可以选择是否等待消息被提交的通知,这个是由参数acks决定的。

Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。


欢迎分享,转载请注明来源:夏雨云

原文地址:https://www.xiayuyun.com/zonghe/709031.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-08-06
下一篇2023-08-06

发表评论

登录后才能评论

评论列表(0条)

    保存