Kafka是一种高吞吐量的 分布式 发布订阅消息系统,它可以处理消费者规模的网站中所有动作流数据。Kafka的目的是通过Hadoop 并行加载机制统一线上和离线消息处理,并通过 集群 提供实时消息。本文内容较基础,主要围绕kafka的体系架构和功能展开。
正文开始之前,我们先了解一下Kafka中涉及的相关术语:
1、Broker——Kafka集群包含一个或多个服务器,这种服务器被称为broker ;
2、Topic——每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
3、Partition——Partition是物理上的概念,每个Topic包含一个或多个Partition.
4、Producer——负责发布消息到Kafka broker
5、Consumer——消息消费者,向Kafka broker读取消息的客户端。
6、Consumer Group——每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
Kafka的topic可以看做是一个记录流 ("/orders", "/user-signups"),每个topic都有一个日志,它存储在磁盘上。每个topic又被分成多个partition(区),每个partition在存储层面是append log文件,任何发布到partition的消息都会被直接追加到日志文件的尾部,Kafka Producer API用于生成数据记录流,Kafka Consumer API用于使用Kafka的记录流。
Kafka架构:Topic、Partition、Producer、Consumer
通常,一个普通的工作流程是Kafka的producer向topic写入消息,consumer从topic中读取消息。topic与日志相关联,日志是存储在磁盘上的数据结构,Kafka将producer的记录附加到topic日志的末尾。topic日志由分布在多个文件上的许多分区组成,这些文件可以分布在多个Kafka集群节点上。Kafka在集群的不同节点上分发topic日志分区,以实现具有水平可伸缩性的高性能。Spreading 分区有助于快速写入数据,Kafka将分区复制到许多节点以提供故障转移。
如果多个producer和consumer同时读取和写入相同的Kafka主题日志,Kafka如何扩展?首先,Kafka本身的写入速度很快,顺序写入文件系统本身就不需要太多时间;其次,在现代的快速驱动器上,Kafka可以轻松地每秒写入700 MB或更多字节的数据。
集群部署和测试
Kafka使用ZooKeeper管理集群,ZooKeeper用于协调服务器或集群拓扑,ZooKeeper是配置信息的一致性文件系统。你可以选择Kafka自带的Zookeeper,也可以选择单独部署,一台Linux主机开放三个端口即可构建一个简单的伪ZooKeeper集群。
ZooKeeper可以将拓扑更改发送到Kafka,如果集群中的某台服务器宕机或者某个topic被添加、删除,集群中的每个节点都可以知道新服务器何时加入,ZooKeeper提供Kafka Cluster配置的同步视图。Kafka和ZooKeeper的搭建都需要java环境,对于jdk的下载安装本文不过多赘述,可以自行网上查询,二者的安装包也可以在Apache官网自行下载。自建Zookeeper集群的配置过程如下:
创建目录 ZooKeeper:mkdir zookeeper
拷贝最少三个实例,进入ZooKeeper目录,其他实例进行同样的操作:
创建目录zkdata、zkdatalog
进入conf目录
拷贝zoo_sample.cfg 为zoo.cfg,详细配置如下:
Java代码
使用Kafka自带的ZooKeeper集群:
查看配置文件
进入Kafka的config的目录:
先建立zk集群,直接使用Kafka自带的ZooKeeper建立zk集群,修改zookeeper.properties文件:
Kafka服务器
Kafka集群由多个Kafka Brokers组成。每个Kafka Broker都有一个唯一的ID(编号)。Kafka Brokers包含主题日志分区,如果希望获得故障处理能力,需要保证至少有三到五个服务器,Kafka集群最大可同时存在10,100或1,000个服务器。Kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定.当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,因此需要选择一个"up-to-date"的follower。
例如,如果在AWS中运行kafka集群,其中一个Kafka Broker发生故障,作为ISR(同步副本)的Kafka Broker可以迅速提供数据。
请注意,对于如何设置Kafka集群本身并没有硬性规定。例如,可以在单个AZ中设置整个集群,以便使用AWS增强型网络和放置组获得更高的吞吐量,然后使用Mirror Maker将集群镜像到同一区域中的热灾备AZ。