不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • MySQL

  • Redis

  • Elasticsearch

  • Kafka

    • 01.Kafka安装
    • 02.kafka基础 ✅
    • 03.kafka原理 ✅
    • 04.Kafka集群 ✅
    • 11.Python操作kafka
      • 01.Python操作kafka
        • 1.1 安装kafka-python
        • 1.2 生产者
        • 1.3 消费者
      • 02.封装使用
        • 2.1 kafka_message.py
        • 2.2 kfk_producer.py
        • 2.3 kfk_customer.py
        • 2.4 运行效果
    • 12.golang操作kafka
  • Etcd

  • MongoDB

  • TiDB

  • RabbitMQ

  • 数据库
  • Kafka
xiaonaiqiang
2021-08-08
目录

11.Python操作kafka

# 01.Python操作kafka

# 1.1 安装kafka-python

  • 安装kafka客户端
    • kafka-python github (opens new window)
    • kafka-python 官网 (opens new window)
pip install kafka-python
1

# 1.2 生产者

# -*- coding: utf-8 -*-
from kafka import KafkaProducer
from kafka.errors import kafka_errors
import traceback
import json

def producer_demo():
    # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
    producer = KafkaProducer(
        bootstrap_servers=['192.168.56.65:9092'],
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode()
	)
    msg = {
        "username": "zhangsan",
        "password": "123456"
    }
    future = producer.send(
        'kafka_demo',   # 指定向kafka_demo这个Topic发数据
        key='userinfo',  # 发送消息中的key值
        value=msg        # 发送消息中的数据
    )
    try:
        future.get(timeout=10) # 监控是否发送成功
    except kafka_errors:  # 发送失败抛出kafka_errors
        traceback.format_exc()

if __name__ == "__main__":
    producer_demo()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 1.3 消费者

# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
import json

def consumer_demo():
    consumer = KafkaConsumer(
        'kafka_demo',
        'kafka_domo2',      # 指定获取kafka_demo这个Topic中的数据
        # bootstrap_servers:kafka的节点,多个节点使用逗号分隔
        bootstrap_servers='192.168.56.65:9092',
        # group_id:多个拥有相同group_id的消费者被判定为一组,一条数据记录只会被同一个组中的一个消费者消费
        group_id='test'    # 指定此消费者实例属于的组名,可以不指定
    )
    for message in consumer:
        # receive, key: userinfo, value: {'username': 'zhangsan', 'password': '123456'}
        print("receive, key: {}, value: {}".format(
                json.loads(message.key.decode()),
                json.loads(message.value.decode())
            )
        )

if __name__ == "__main__":
    consumer_demo()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 02.封装使用

# 2.1 kafka_message.py

# -*- coding: utf-8 -*-
from kafka import KafkaProducer, KafkaConsumer
import json


class Messenger(object):
	def __init__(self):
		self.bootstrap_servers = "192.168.56.65:9092"
		self.producer = KafkaProducer(
			bootstrap_servers=[self.bootstrap_servers],
			key_serializer=lambda k: json.dumps(k).encode(),
			value_serializer=lambda v: json.dumps(v).encode()
		)

	def get_consumer(self, *topics , group_id="test"):
		consumer = KafkaConsumer(
			# 指定获取的Topic(可以订阅多个topic)
			*topics,
			# bootstrap_servers:kafka的节点,多个节点使用逗号分隔
			bootstrap_servers=self.bootstrap_servers,
			# group_id:多个拥有相同group_id的消费者被判定为一组,一条数据记录只会被同一个组中的一个消费者消费
			group_id=group_id  # 指定此消费者实例属于的组名,可以不指定
		)
		return consumer

	def __del__(self):
		if self.producer and self.producer.bootstrap_connected():
			self.producer.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 2.2 kfk_producer.py

# -*- coding: utf-8 -*-
from kafka_message import Messenger
from kafka.errors import kafka_errors
import traceback
import json


def producer_demo():
	KFK = Messenger()
	# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
	msg = {
		"username": "zhangsan2",
		"password": "123456"
	}
	future = KFK.producer.send(
		'kafka_demo',
		key='userinfo',
		value=msg
	)
	print("send msg=%s"%json.dumps(msg))
	try:
		future.get(timeout=10)  # 监控是否发送成功
	except kafka_errors:  # 发送失败抛出kafka_errors
		traceback.format_exc()


if __name__ == "__main__":
	producer_demo()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 2.3 kfk_customer.py

# -*- coding: utf-8 -*-
from kafka_message import Messenger
import json

def consumer_demo():
    KFK = Messenger()
    consumer = KFK.get_consumer("kafka_demo","kafka_demo2")
    for message in consumer:
        # receive, key: userinfo, value: {'username': 'zhangsan', 'password': '123456'}
        print("receive, key: {}, value: {}".format(
                json.loads(message.key.decode()),
                json.loads(message.value.decode())
            )
        )

if __name__ == "__main__":
    consumer_demo()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 2.4 运行效果

上次更新: 2024/4/1 16:53:26
04.Kafka集群 ✅
12.golang操作kafka

← 04.Kafka集群 ✅ 12.golang操作kafka→

最近更新
01
04.数组双指针排序_子数组
03-25
02
08.动态规划
03-25
03
06.回溯算法
03-25
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式