11.Python操作kafka
# 01.Python操作kafka
# 1.1 安装kafka-python
pip install kafka-python
1
# 1.2 生产者
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
from kafka.errors import kafka_errors
import traceback
import json
def producer_demo():
# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
producer = KafkaProducer(
bootstrap_servers=['192.168.56.65:9092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode()
)
msg = {
"username": "zhangsan",
"password": "123456"
}
future = producer.send(
'kafka_demo', # 指定向kafka_demo这个Topic发数据
key='userinfo', # 发送消息中的key值
value=msg # 发送消息中的数据
)
try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
if __name__ == "__main__":
producer_demo()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 1.3 消费者
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
import json
def consumer_demo():
consumer = KafkaConsumer(
'kafka_demo',
'kafka_domo2', # 指定获取kafka_demo这个Topic中的数据
# bootstrap_servers:kafka的节点,多个节点使用逗号分隔
bootstrap_servers='192.168.56.65:9092',
# group_id:多个拥有相同group_id的消费者被判定为一组,一条数据记录只会被同一个组中的一个消费者消费
group_id='test' # 指定此消费者实例属于的组名,可以不指定
)
for message in consumer:
# receive, key: userinfo, value: {'username': 'zhangsan', 'password': '123456'}
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)
if __name__ == "__main__":
consumer_demo()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 02.封装使用
# 2.1 kafka_message.py
# -*- coding: utf-8 -*-
from kafka import KafkaProducer, KafkaConsumer
import json
class Messenger(object):
def __init__(self):
self.bootstrap_servers = "192.168.56.65:9092"
self.producer = KafkaProducer(
bootstrap_servers=[self.bootstrap_servers],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode()
)
def get_consumer(self, *topics , group_id="test"):
consumer = KafkaConsumer(
# 指定获取的Topic(可以订阅多个topic)
*topics,
# bootstrap_servers:kafka的节点,多个节点使用逗号分隔
bootstrap_servers=self.bootstrap_servers,
# group_id:多个拥有相同group_id的消费者被判定为一组,一条数据记录只会被同一个组中的一个消费者消费
group_id=group_id # 指定此消费者实例属于的组名,可以不指定
)
return consumer
def __del__(self):
if self.producer and self.producer.bootstrap_connected():
self.producer.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 2.2 kfk_producer.py
# -*- coding: utf-8 -*-
from kafka_message import Messenger
from kafka.errors import kafka_errors
import traceback
import json
def producer_demo():
KFK = Messenger()
# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
msg = {
"username": "zhangsan2",
"password": "123456"
}
future = KFK.producer.send(
'kafka_demo',
key='userinfo',
value=msg
)
print("send msg=%s"%json.dumps(msg))
try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
if __name__ == "__main__":
producer_demo()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 2.3 kfk_customer.py
# -*- coding: utf-8 -*-
from kafka_message import Messenger
import json
def consumer_demo():
KFK = Messenger()
consumer = KFK.get_consumer("kafka_demo","kafka_demo2")
for message in consumer:
# receive, key: userinfo, value: {'username': 'zhangsan', 'password': '123456'}
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)
if __name__ == "__main__":
consumer_demo()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 2.4 运行效果
上次更新: 2024/4/1 16:53:26