Kafka是一款开源的、轻量级的、分布式、可分区和具有复制备份的(Replicated)、基于ZooKeeper协调管理的分布式流平台的功能强大的消息系统。作为一个流式处理平台,必须具备以下3个关键特性:
1) 能够允许发布和订阅流数据。
2) 存储流数据时提供相应的容错机制。
3) 当流数据到达时能够被及时处理。
消息流系统kafka的基本结构包括生产者和消费者,以及kafka集群。
生产者负责生产消息,将消息写入Kafka集群;消费者从Kafka集群中拉取消息。
消息是Kafka通信的基本单位 ,由一个 固定长度的消息头 和一个 可变长度的消息体 构成。
Kafka将 一组消息 抽象归纳为一个主题(Topic),也就是说,一个主题是对消息的一个分类。 生产者将消息指定主题发送到kafka集群,消费者订阅主题或主题的某些分区进行消费。
Kafka将一组消息归纳为一个主题,而 每个主题又被分成一个或多个分区(Partition) 。每个分区由一系列有序、不可变的消息组成,是一个有序队列。 每个分区在物理上对应为一个文件夹 ,分区的命名规则为主题名称后接“—”连接符,之后再接分区编号,分区编号从0开始,编号最大值为分区的总数减1。
分区使得Kafka在并发处理上变得更加容易,理论上来说,分区数越多吞吐量越高,但这要根据集群实际环境及业务场景而定。同时,分区也是Kafka保证消息被顺序消费以及对消息进行负载均衡的基础。
疑问和答案 :分区如何保证消息被顺序消费?每个分区内的消息是有序的,但不同分区间如何保证?猜测是分区从存储空间上比较大,分区个数少。顺序消费的主要因素在分区内的消息,分区间的可以忽略。高吞吐率顺序写磁盘估计也是这个原因。
Kafka只能保证一个分区之内消息的有序性,并不能保证跨分区消息的有序性。 每条消息被追加到相应的分区中,是顺序写磁盘,因此效率非常高,这是Kafka高吞吐率的一个重要保证 。同时与传统消息系统不同的是,Kafka并不会立即删除已被消费的消息,由于磁盘的限制消息也不会一直被存储,因此 Kafka提供两种删除老数据的策略 ,一是基于消息已存储的时间长度,二是基于分区的大小。这两种策略都能通过配置文件进行配置。
每个分区又有一至多个副本(Replica),分区的副本分布在集群的不同代理上,以提高可用性。
从存储角度上分析,分区的每个副本在逻辑上抽象为一个日志(Log)对象,即分区的副本与日志对象是一一对应的。每个主题对应的 分区数 可以在Kafka启动时所加载的配置文件中配置,也可以在创建主题时指定。当然,客户端还可以在主题创建后修改主题的分区数。
为什么副本要分Leader和Follower? 如果没有Leader副本,就需要所有的副本都同时负责读/写请求处理,同时还得保证这些副本之间数据的一致性,假设有n个副本则需要有n×n条通路来同步数据,这样数据的一致性和有序性就很难保证。
为解决这个问题,Kafka选择分区的一个副本为Leader,该分区其他副本为Follower,只有 Leader副本 才负责处理客户端 读/写请求 ,Follower副本从Leader副本同步数据。
引入Leader副本后客户端只需与Leader副本进行交互,这样数据一致性及顺序性就有了保证。Follower副本从Leader副本同步消息,对于n个副本只需n-1条通路即可,这样就使得系统更加简单而高效。
副本Follower与Leader的角色并不是固定不变的,如果Leader失效,通过相应的选举算法将从其他Follower副本中选出新的Leader副本。
疑问 :leader副本和follower副本是如何选出来的?通过zookeeper选举的嘛?
Kafka在ZooKeeper中动态维护了一个 ISR(In-sync Replica) ,即保存同步的副本列表,该列表中保存的是与Leader副本保持消息同步的所有副本对应的代理节点id。 如果一个Follower副本宕机或是落后太多 ,则该Follower副本节点将 从ISR列表中移除 。 本书用宕机 来特指某个代理失效的情景,包括但不限于代理被关闭,如代理被人为关闭或是发生物理故障、心跳检测过期、网络延迟、进程崩溃等。
任何发布到分区的消息会被直接追加到日志文件的尾部(分区目录下以“.log”为文件名后缀的数据文件),而每条 消息 在日志文件中的位置都会对应一个按序递增的 偏移量 。偏移量是一个分区下严格有序的 逻辑值 ,它并不表示消息在磁盘上的物理位置。由于Kafka几乎不允许对消息进行随机读写,因此Kafka并没有提供额外索引机制到存储偏移量。
消费者可以通过控制消息偏移量来对消息进行消费 ,如消费者可以指定消费的起始偏移量。 为了保证消息被顺序消费,消费者已消费的消息对应的偏移量也需要保存 。需要说明的是,消费者对消息偏移量的操作并不会影响消息本身的偏移量。旧版消费者将消费偏移量保存到ZooKeeper当中, 而新版消费者是将消费偏移量保存到Kafka内部一个主题当中。 当然,消费者也可以自己在外部系统保存消费偏移量,而无需保存到Kafka中。
推测 :一个主题有多个分区,一个分区有多个副本。一个主题(一类消息)有多个分区(消息被分段),一个分区(每段消息)有多个副本(每段消息的副本数)。消息一旦发给kafka,就会分配一个偏移量,在多个副本中的偏移量是一样的。这样的话,消费者通过偏移量消费时对于多个副本就没有差异性。
Kafka集群由一个或多个Kafka实例构成,每一个Kafka实例称为代理(Broker),通常也称代理为Kafka服务器(KafkaServer)。在生产环境中Kafka集群一般包括一台或多台服务器,我们可以在一台服务器上配置一个或多个代理。 每一个代理都有唯一的标识id,这个id是一个非负整数 。在一个Kafka集群中,每增加一个代理就需要为这个代理配置一个与该集群中其他代理不同的id, id值可以选择任意非负整数即可,只要保证它在整个Kafka集群中唯一,这个id就是代理的名字,也就是在启动代理时配置的broker.id对应的值。
生产者(Producer)负责将消息发送给代理,也就是向Kafka代理发送消息的客户端。
消费者(Comsumer)以拉取(pull)方式拉取数据,它是消费的客户端。在Kafka中 每一个消费者都属于一个特定消费组 (ConsumerGroup),可以为每个消费者指定一个消费组,以groupId代表消费组名称,通过group.id配置设置。 如果不指定消费组 ,则该消费者属于默认消费组test-consumer-group。
每个消费者有一个全局唯一的id ,通过配置项client.id指定, 如果客户端没有指定消费者的id, Kafka会自动为该消费者生成一个全局唯一的id,格式为${groupId}-${hostName}-${timestamp}-${UUID前8位字符}。 同一个主题的一条消息只能被同一个消费组下某一个消费者消费 ,但不同消费组的消费者可同时消费该消息。 消费组是Kafka用来实现对一个主题消息进行广播和单播的手段 ,实现消息广播只需指定各消费者均属于不同的消费组,消息单播则只需让各消费者属于同一个消费组。
推论: kafka消息是按照消息类型(主题),在一个消费者组中只能消费一次。也就是一个消费者组只消费一类型的消息。如果某个服务要消费一类消息,必须将自己置为不同的消费者组。
Kafka利用ZooKeeper保存相应元数据信息, Kafka元数据信息包括如代理节点信息、Kafka集群信息、旧版消费者信息及其消费偏移量信息、主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。 Kafka在启动或运行过程当中会在ZooKeeper上创建相应节点 来保存元数据信息, Kafka通过监听机制在这些节点注册相应监听器来监听节点元数据的变化 ,从而由ZooKeeper负责管理维护Kafka集群,同时通过ZooKeeper我们能够很方便地对Kafka集群进行水平扩展及数据迁移。
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。kafka 是一个高性能的消息队列,也是一个分布式流处理平台。
kafka中文网
kafka官网
Producer :Producer即生产者,消息的产生者,是消息的入口。
kafka cluster :
Broker :Broker是kafka实例,每个服务器上有一个或多个kafka的实例,姑且认为每个broker对应一台服务器。一个集群由多个broker组成,集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
Topic :消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
Partition :Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。 同一个topic在不同的分区的数据是不重复的 ,partition的表现形式就是一个一个的文件夹!
Replication : 每一个分区都有多个副本 ,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Message :每一条发送的消息主体。
Consumer :消费者,即消息的消费方,是消息的出口。
Consumer Group :将多个消费组成一个消费者组。在kafka的设计中 同一个分区的数据只能被同一消费者组中的某一个消费者消费 。Partition 的分配问题,即确定哪个 Partition 由哪个 Consumer 来消费。Kafka 有两种分配策略,一个是 RoundRobin,一个是 Range,默认为Range。
一个消费者组内也可以订阅多个topic
多个消费组可以订阅同一个topic 。
Zookeeper :kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
使用brew进行安装,非常方便。
ZooKeeper是一个分布式的,开放源码的 分布式应用程序协调服务 ,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
kafka是基于zookeeper的,启动kafka之前,需要先启动zookeeper
查看启动是否成功
启动kafka
查看启动是否成功
查看topic列表
新起一个终端,作为生产者,用于发送消息,每一行算一条消息,将消息发送到kafka服务器
新起一个终端作为消费者,接收消息
服务关闭的顺序是先kafka,然后zookeeper
再过半小时,你就能明白kafka的工作原理了
Kafka架构原理,也就这么回事!
Kafka到底是个啥?用来干嘛的?
官方定义如下:
翻译过来,大致的意思就是,这是一个实时数据处理系统,可以横向扩展,并高可靠!
实时数据处理 ,从名字上看,很好理解,就是将数据进行实时处理,在现在流行的微服务开发中,最常用实时数据处理平台有 RabbitMQ、RocketMQ 等消息中间件。
这些中间件,最大的特点主要有两个:
在早期的 web 应用程序开发中,当请求量突然上来了时候,我们会将要处理的数据推送到一个队列通道中,然后另起一个线程来不断轮训拉取队列中的数据,从而加快程序的运行效率。
但是随着请求量不断的增大,并且队列通道的数据一致处于高负载,在这种情况下,应用程序的内存占用率会非常高,稍有不慎,会出现内存不足,造成程序内存溢出,从而导致服务不可用。
随着业务量的不断扩张,在一个应用程序内,使用这种模式已然无法满足需求,因此之后,就诞生了各种消息中间件,例如 ActiveMQ、RabbitMQ、RocketMQ等中间件。
采用这种模型,本质就是将要推送的数据,不在存放在当前应用程序的内存中,而是将数据存放到另一个专门负责数据处理的应用程序中,从而实现服务解耦。
消息中间件 :主要的职责就是保证能接受到消息,并将消息存储到磁盘,即使其他服务都挂了,数据也不会丢失,同时还可以对数据消费情况做好监控工作。
应用程序 :只需要将消息推送到消息中间件,然后启用一个线程来不断从消息中间件中拉取数据,进行消费确认即可!
引入消息中间件之后,整个服务开发会变得更加简单,各负其责。
Kafka 本质其实也是消息中间件的一种,Kafka 出自于 LinkedIn 公司,与 2010 年开源到 github。
LinkedIn 的开发团队,为了解决数据管道问题,起初采用了 ActiveMQ 来进行数据交换,大约是在 2010 年前后,那时的 ActiveMQ 还远远无法满足 LinkedIn 对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,LinkedIn 决定研发自己的消息传递系统, Kafka 由此诞生 。
在 LinkedIn 公司,Kafka 可以有效地处理每天数十亿条消息的指标和用户活动跟踪,其强大的处理能力,已经被业界所认可,并成为大数据流水线的首选技术。
先来看一张图, 下面这张图就是 kafka 生产与消费的核心架构模型 !
如果你看不懂这些概念没关系,我会带着大家一起梳理一遍!
简而言之,kafka 本质就是一个消息系统,与大多数的消息系统一样,主要的特点如下:
与 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在于,它有一个**分区 Partition **的概念。
这个分区的意思就是说,如果你创建的 topic 有5个分区,当你一次性向 kafka 中推 1000 条数据时,这 1000 条数据默认会分配到 5 个分区中,其中每个分区存储 200 条数据。
这样做的目的,就是方便消费者从不同的分区拉取数据,假如你启动 5 个线程同时拉取数据,每个线程拉取一个分区,消费速度会非常非常快!
这是 kafka 与其他的消息系统最大的不同!
和其他的中间件一样,kafka 每次发送数据都是向 Leader 分区发送数据,并顺序写入到磁盘,然后 Leader 分区会将数据同步到各个从分区 Follower ,即使主分区挂了,也不会影响服务的正常运行。
那 kafka 是如何将数据写入到对应的分区呢?kafka中有以下几个原则:
与生产者一样,消费者主动的去kafka集群拉取消息时,也是从 Leader 分区去拉取数据。
这里我们需要重点了解一个名词: 消费组 !
考虑到多个消费者的场景,kafka 在设计的时候,可以由多个消费者组成一个消费组,同一个消费组者的消费者可以消费同一个 topic 下不同分区的数据,同一个分区只会被一个消费组内的某个消费者所消费,防止出现重复消费的问题!
但是不同的组,可以消费同一个分区的数据!
你可以这样理解,一个消费组就是一个客户端,一个客户端可以由很多个消费者组成,以便加快消息的消费能力。
但是,如果一个组下的消费者数量大于分区数量,就会出现很多的消费者闲置。
如果分区数量大于一个组下的消费者数量,会出现一个消费者负责多个分区的消费,会出现消费性能不均衡的情况。
因此,在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量保持一致!
光说理论可没用,下面我们就以 centos7 为例,介绍一下 kafka 的安装和使用。
kafka 需要 zookeeper 来保存服务实例的元信息,因此在安装 kafka 之前,我们需要先安装 zookeeper。
zookeeper 安装环境依赖于 jdk,因此我们需要事先安装 jdk
下载zookeeper,并解压文件包
创建数据、日志目录
配置zookeeper
重新配置 dataDir 和 dataLogDir 的存储路径
最后,启动 Zookeeper 服务
到官网 http://kafka.apache.org/downloads.html 下载想要的版本,我这里下载是最新稳定版 2.8.0 。
按需修改配置文件 server.properties (可选)
server.properties 文件内容如下:
其中有四个重要的参数:
可根据自己需求修改对应的配置!
启动 kafka 服务
创建一个名为 testTopic 的主题,它只包含一个分区,只有一个副本:
运行 list topic 命令,可以看到该主题。
输出内容:
Kafka 附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。
运行生产者,然后在控制台中键入一些消息以发送到服务器。
输入两条内容并回车:
Kafka 还有一个命令行使用者,它会将消息转储到标准输出。
输出结果如下:
本文主要围绕 kafka 的架构模型和安装环境做了一些初步的介绍,难免会有理解不对的地方,欢迎网友批评、吐槽。
由于篇幅原因,会在下期文章中详细介绍 java 环境下 kafka 应用场景!
欢迎分享,转载请注明来源:夏雨云
评论列表(0条)