3、Kafka Python API
3.1 环境安装
下载安装kafka-python
pip install kafka-python
验证是否安装成功
import kafka
3.2 发送消息
1.创建producer
1,命令行方式:普通的发送方式
导入KafkaProducer,创建连接到node-teach:9092这个Broker的Producer,循环向test_topic这个Topic发送100个消息,消息内容都是'some_message_bytes'。
进入到python的交互式命令行,进行代码编写:
>>> from kafka import KafkaProducer#导入KafkaProducer类,用于生产数据 >>> producer = KafkaProducer(bootstrap_servers='node-teach:9092')#创建生产者与broker的连接 #发送100次相同的数据 >>> for _ in range(100): ... producer.send('test_topic',b'some_message_bytes')#指定的topic如果不存在,则会创建新的topic,如果存在,使用原来的
注:kafka发送的数据是字节,需要将字符串转换为字节
2,命令行方式:发送json字符串
#建立KafkaProducer的实例 #value_serializer:将数据进行序列化的操作 #lambda v: json.dumps(v).encode('utf-8'):将传入的数据转换为json,并进行utf-8编码 >>> producer = KafkaProducer(bootstrap_servers='node-teach:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8')) #发送一个字典数据 >>> producer.send('test_topic', {'key1': 'value1'}) <kafka.producer.future.FutureRecordMetadata object at 0x2a9ebd0> >>>
3,命令行方式:发送普通字符串,将数据进行压缩
compression_type:压缩类型,'gzip'
#建立生产者和broker进程的连接,指定要锁方式为gzip >>> producer = KafkaProducer(bootstrap_servers='node-teach:9092',compression_type='gzip') #发送数据 >>> producer.send('test_topic', b'msg')
2.利用producer将某个目录下的所有文件名发送到指定topic,并由consumer来接收
创建producer
from kafka import KafkaProducer
import json
import os
import time
from sys import argv
#建立Producer与Kafka进程的连接
producer = KafkaProducer(bootstrap_servers='node-teach:9092')
#定义一个日志输出方法,进行日志格式化输出
def log(str):
t = time.strftime(r"%Y-%m-%d_%H-%M-%S",time.localtime())
print("[%s]%s"%(t,str))
#查看目录下的所有文件并利用生产者进行数据生产。
def list_file(path):
dir_list = os.listdir(path);
for f in dir_list:
producer.send('test_topic',f.encode())
producer.flush()
log('send: %s' % (f))
list_file(argv[1])
#关闭生产者的连接
producer.close()
log('done')
创建consumer
from kafka import KafkaConsumer
import time
#格式化输出日志
def log(str):
t = time.strftime(r"%Y-%m-%d_%H-%M-%S",time.localtime())
print("[%s]%s"%(t,str))
log('start consumer')
#消费node-teach:9092上的test_topic 这个Topic
consumer=KafkaConsumer('test_topic',bootstrap_servers=['node-teach:9092'])
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" %(msg.topic,msg.partition,msg.offset,msg.key,msg.value)
log(recv)
当前的consumer只能接收自身启动后的数据,如果需要启动后将之前的数据全部接收,需要设置consumer的group_id,kafka内部会将数据广播到分组上,组会将数据转接到对应的consumer上。
consumer=KafkaConsumer('test_topic',group_id='test_group_id',bootstrap_servers=['node-teach:9092'])