01.Kafka安装
Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等
同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。
Kafka 支持Java 及多种其它语言客户端,可与Hadoop、Storm、Spark等其它大数据工具结合使用。
本教程主要介绍Kafka 在Centos 7上的安装和使用,包括功能验证和集群的简单配置。
# 01.Linux安装
# 1、安装JDK
- 下载并解压到 /opt路径下
[root@k8s-node1 opt]# cd /opt
[root@k8s-node1 opt]# tar -xvf jdk-8u301-linux-x64.tar.gz
[root@k8s-node1 opt]# mv jdk1.8.0_301 java
1
2
3
2
3
- 设置Java 环境变量
[root@k8s-node1 opt]# vim /etc/profile # 对所有用户生效
JAVA_HOME=/opt/java
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME PATH
[root@k8s-node1 opt]# source /etc/profile
1
2
3
4
5
2
3
4
5
# 2、安装Kafka
[root@k8s-node1 opt]# wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
[root@k8s-node1 opt]# tar -xvf kafka_2.13-2.8.0.tgz
[root@k8s-node1 opt]# mv kafka_2.13-2.8.0 kafka
[root@k8s-node1 opt]# cd kafka
1
2
3
4
2
3
4
# 3、基本配置
[root@k8s-node1 kafka]# cd /opt/kafka
[root@k8s-node1 kafka]# vim config/server.properties
# 消费者、生产者和broker之间通信的地址,如果不设置,用的是listeners配置的地址
advertised.listeners=PLAINTEXT://192.168.56.65:9092
# 设置zookeeper的地址,如果用的是集群则配置的时候用逗号分割
zookeeper.connect=192.168.56.65:2181
broker.id=0
num.network.threads=3
# 设置broker处理磁盘io的线程数
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
# 设置每个topic的分区个数
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 4、功能验证
# 1、启动Zookeeper
- 使用安装包中的脚本启动单节点Zookeeper实例
[root@k8s-node1 opt]# cd /opt/kafka
[root@k8s-node1 kafka]# bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
[root@k8s-node1 kafka]# ps -ef|grep zookeeper # 确定当前zookeeper已经启动
1
2
3
2
3
# 2、启动Kafka 服务
[root@k8s-node1 kafka]# bin/kafka-server-start.sh config/server.properties
[2015-10-26 04:28:56,115] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2015-10-26 04:28:56,141] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
1
2
3
2
3
# 3、创建topic
- 使用kafka-topics.sh 创建单分区单副本的topic test:
[root@k8s-node1 ~]# cd /opt/kafka
[root@k8s-node1 kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
1
2
2
- 查看topic
[root@k8s-node1 kafka]# bin/kafka-topics.sh --list --zookeeper localhost:2181
test
1
2
2
# 4、产生消息
- 使用kafka-console-producer.sh 发送消息
[root@k8s-node1 kafka]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello tom
>hello jack
1
2
3
2
3
# 5、消费消息
- 使用kafka-console-consumer.sh 接收消息并在终端打印
[root@k8s-node1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello tom
hello jack
1
2
3
2
3
上次更新: 2024/4/1 16:53:26