学堂 学堂 学堂公众号手机端

Kafka_2.10-0.10.0.0集群安装与配置(kafka3还是kafka2)

lewis 1年前 (2024-05-09) 阅读数 20 #技术


kafka_2.10-0.10 集群安装

上文已经讲过如何安装Zookeeper集群,因为Kafka集群需要依赖Zookeeper服务,虽然Kafka有内置Zookeeper,但是还是建议独立安装Zookeeper集群服务,此处不再赘述


kafka集群还是安装在192.168.20.178 、 192.168.20.179 、192.168.20.174三台机器上面

zookeeper集群

192.168.20.178 kafka1
192.168.20.179 kafka2
192.168.20.174 kafka3
192.168.20.37 zookeeper1
192.168.20.38 zookeeper2
192.168.20.39 zookeeper3

  

1、kafka官网下载kafka_2.10-0.10.0.0.tgz压缩包,解压缩

2、修改/appcom/kafka_2.10-0.10.0.0/config/server.properties文件:

192.168.20.178机器:

broker.id=1

listeners=PLAINTEXT://kafka1:9092

advertised.listeners=PLAINTEXT://kafka1:9092

log.dirs=/data/kafka/logs

需要配置较大 分片影响读写速度

zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181

其他配置暂时采取默认

192.168.20.179机器:

broker.id=2

listeners=PLAINTEXT://kafka2:9092

advertised.listeners=PLAINTEXT://kafka2:9092

log.dirs=/data/kafka/logs

需要配置较大 分片影响读写速度

zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181

其他配置暂时采取默认


192.168.20.39机器:

broker.id=3

listeners=PLAINTEXT://kafka3:9092

advertised.listeners=PLAINTEXT://kafka3:9092

log.dirs=/data/kafka/logs

需要配置较大 分片影响读写速度

zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181

其他配置暂时采取默认

3、在每台服务器分别启动kafka服务:

./kafka-server-start.sh /data/kafka/config/server.properties & (以这种方式启动kafka应用后,kafka总会莫名其妙的退出。后来改为这种命令启动:

./kafka-server-start.sh -daemon /data/kafka/config/server.properties 解决了问题)

4、任意一台机器上面,测试: 在kafka中创建名为“cmy_nbd_topic1”的topic,该topic切分为4份,每一份备份数为3

./kafka-topics.sh --create --zookeeper zookeeper1:2181 --replication-factor 3 --partitions 4 --topic cinyi_topic1

5、列出所有topic :

./kafka-topics.sh--list--zookeeperzookeeper1:2181,zookeeper2:2181,zookeeper3:2181

(1)建立一个主题

/data/kafka/bin/kafka-topics.sh --create --zookeeperzookeeper1:2181 --replication-factor 3 --partitions 1 --topic summer

#注意:factor大小不能超过broker数

(2) 增加partition数量

/data/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --alter -topic summer --partitions 4

能增加,不能减少partitions, --partitions 4 是增加后的值

(3)删除一个主题

/data/kafka/bin/kafka-topics.sh --delete --zookeeper zookeeper1:2181 --topic summer

...1. 在kafka配置文件中/data/kafka/config/server.properties 添加一行 delete.topic.enable = true

...2. 连接zookeeper

  删除zookeeper下/brokers/topics/test-topic节点

  删除zookeeper下/config/topics/test-topic节点

  删除zookeeper下/admin/delete_topics/test-topic节点

(4)查看有哪些主题已经创建

[root@kafka1 ~]# /data/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper1:2181   #列出集群中所有的topic summer  #已经创建成功

(5)查看summer这个主题的详情

[root@kafka1 ~]# /data/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper1:2181--topic summer
Topic:summerPartitionCount:1 ReplicationFactor:3 Configs: Topic: summerPartition: 0 Leader: 2 Replicas: 2,4,3 Isr: 2,4,3 #主题名称:summer #Partition:只有一个,从0开始 #leader :id为2的broker #Replicas 副本存在于broker id为2,3,4的上面 #Isr:活跃状态的broker

(6)发送消息,这里使用的是生产者角色

[root@kafka1 ~]# /data/kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic summer
This is a messages
welcometo kafka

(7)接收消息,这里使用的是消费者角色

[root@kafka2 ~]# /data/kafka/bin/kafka-console-consumer.sh --zookeeper  zookeeper1:2181 --topic summer --from-beginning
# This is a messages welcometo kafka

(8)kafka集群的leader平衡机制
创建一个abctest3的topic,
1./data/kafka/bin/kafka-topics.sh --create --zookeeperzookeeper1:2181 --replication-factor 3 --partitions 1 --topic abctest32.显示testabc3 topic信息


3. kill 掉 brock2 节点后,查询topic信息


1    Partition: 分区
2 Leader : 负责读写指定分区的节点
3 Replicas : 复制该分区log的节点列表
4 Isr : "in-sync"
4.启动broker2 节点进程后, 3个节点不均衡。
5. 通过kafka-preferred-replica-election.sh 命令,是leader平衡


6.动态leader平衡,在配置文件中加入 auto.leader.rebalance.enable=true

8. 集群分区日志迁移,(1) broker1 上的topic abc_test ,迁移到 broker3上, (2) 把broker1上的topic abc_test的 分区,迁移到broker3上

1.编写json的文件

cat topics-to-move.json

{"topics":{"topic":abc_test},

"version":1

}

2. 生成迁移计划,并没有迁移  

/data/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --topics-to-move-json-file /root/topics-tomove.json --broker-list "3" --generate

Current partition replica assignment

{"version":1,"partitions":[]} ####备份回滚使用
Proposed partition reassignment configuration

{"version":1,"partitions":[]} #####迁移的json文件

  把Proposed partition reassignment configuration 生成的json保存到文件中

  cat /root/expand-cluster-reassignment.json

  {"version":1,"partitions":[]}

  3. 执行迁移

  /data/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file /root/expand-cluster-reassignment.json --execute

4.验证:

/data/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file /root/expand-cluster-reassignment.json--verify

 5.迁移某个topic的某些特定的partition数据到其他broker,步骤与上面一样,但是json文件如下面所示:
  cat custom-reassignment.json
  {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
  可以指定到topic的分区编号

(9) kafka集群的监控 kafka offset monitor 监控一个集群, kafka manager 可以监控多个集群

    kafka offset Monitor 介绍

    1. kafka集群当前存活broker集合

    2. kafka集群当前活动topic集合

    3. 消费者列表

    4.kafka集群当前consumer按组消费的offset lag数(即当前topic当前分区目前有多少消息积压而没有及时消费)

    kafka offset monitor 部署

    主页https://github.com/quantifind/KafkaOffsetMonitor  

    下载: wgethttps://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jar

    

  

​      java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 192.168.20.153:2181,192.168.20.154:2181,192.168.20.155:2181 --port 8086 --refresh 10.seconds --retain 7.days 1>stdout.log 2>stderr.log &​

topic:创建时topic名称

partition:分区编号

offset:表示该parition已经消费了多少条message

logSize:表示该partition已经写了多少条message

Lag:表示有多少条message没有被消费。

Owner:表示消费者

Created:该partition创建时间

Last Seen:消费状态刷新最新时间。

######################
broker.id = 0 #当前集群中的唯一标识,和zookeeper myid功能相同
port = 9092
host.name= 192.168.20.178 本台机器的IP地址
num.network.threads = 2 broker网络处理的线程数
num.io.threads = 8 broker io处理的线程数,与 log.dirs 有关系
socket.send.buffer.bytes=1048576 socker发送buffer大小,先发送到缓存区,提高性能
socker.receive.buffer.bytes=1048576 接受缓存区,当大于数字时,序列化到磁盘
socker.request.max.bytes=10483421 向kafka 发送消息的请求数,不能超过java堆栈大小

log.dirs = /data/kafka/logs 状态消息持久化的地方
num.partitions = 2 默认的分区数,一个topic默认2个分区数
log.retention.hours = 168 发送到kafka的消息过期时间,7天
在 log.retention.hours=168下添加

message.max.byte = 5048576 kafka可以接收消息的最大大小是5M
default.replication.factor = 2 每个topic的每个patitions的消息默认只有一个副本,修改成保存2个副本数
replica.fetch.max.bytes = 5048576 取消息的最大字节数5M
log.segment.bytes = 452345245 日志文件大于数字,新启一个新的文件。
log.retention.check.interval.ms=646452 每隔646452毫秒,查看一下log.retention.hours是否失效
log.cleaner.enable = false 是否启用log压缩
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
auto.leader.rebalance.enable=true leader自动平衡
消费者 consumer : 从消息队列中请求消息的客户端应用程序
生产者 producer: 向broker发布消息的客户端应用程序
AMPQ服务器端broker: 用来接收生产者发送的消息并将这些消息路由到服务器的队列。




主题(topic): 类似新闻的 体育,娱乐,教育标题
分区(partition)一个topic中的消息数据按照多个分区组织,分区是消息队列组织的最小单位,一个分区可以看作是一个FIFO的队列。



版权声明

本文仅代表作者观点,不代表博信信息网立场。

热门