3、Kafka Python API

3.1 环境安装

  • 下载安装kafka-python

    pip install kafka-python

  • 验证是否安装成功

    import kafka

3.2 发送消息

1.创建producer

  • 1,命令行方式:普通的发送方式

    导入KafkaProducer,创建连接到node-teach:9092这个Broker的Producer,循环向test_topic这个Topic发送100个消息,消息内容都是'some_message_bytes'。

    进入到python的交互式命令行,进行代码编写:

    >>> from kafka import KafkaProducer#导入KafkaProducer类,用于生产数据
    >>> producer = KafkaProducer(bootstrap_servers='node-teach:9092')#创建生产者与broker的连接
    #发送100次相同的数据
    >>> for _ in range(100):
    ...     producer.send('test_topic',b'some_message_bytes')#指定的topic如果不存在,则会创建新的topic,如果存在,使用原来的
    

    注:kafka发送的数据是字节,需要将字符串转换为字节

  • 2,命令行方式:发送json字符串

    #建立KafkaProducer的实例
    #value_serializer:将数据进行序列化的操作
    #lambda v: json.dumps(v).encode('utf-8'):将传入的数据转换为json,并进行utf-8编码
    >>> producer = KafkaProducer(bootstrap_servers='node-teach:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    #发送一个字典数据
    >>> producer.send('test_topic', {'key1': 'value1'})
    <kafka.producer.future.FutureRecordMetadata object at 0x2a9ebd0>
    >>>
    
  • 3,命令行方式:发送普通字符串,将数据进行压缩

    compression_type:压缩类型,'gzip'

    #建立生产者和broker进程的连接,指定要锁方式为gzip
    >>> producer = KafkaProducer(bootstrap_servers='node-teach:9092',compression_type='gzip')
    #发送数据
    >>> producer.send('test_topic', b'msg')
    

2.利用producer将某个目录下的所有文件名发送到指定topic,并由consumer来接收

创建producer

from kafka import KafkaProducer
import json
import os
import time
from sys import argv
#建立Producer与Kafka进程的连接
producer = KafkaProducer(bootstrap_servers='node-teach:9092')

#定义一个日志输出方法,进行日志格式化输出
def log(str):
    t = time.strftime(r"%Y-%m-%d_%H-%M-%S",time.localtime())
    print("[%s]%s"%(t,str))
#查看目录下的所有文件并利用生产者进行数据生产。
def list_file(path):
    dir_list = os.listdir(path);
    for f in dir_list:
         producer.send('test_topic',f.encode())
         producer.flush()
         log('send: %s' % (f))    

list_file(argv[1])
#关闭生产者的连接
producer.close()
log('done')

创建consumer

from kafka import KafkaConsumer
import time

#格式化输出日志
def log(str):
        t = time.strftime(r"%Y-%m-%d_%H-%M-%S",time.localtime())
        print("[%s]%s"%(t,str))

log('start consumer')
#消费node-teach:9092上的test_topic 这个Topic
consumer=KafkaConsumer('test_topic',bootstrap_servers=['node-teach:9092'])
for msg in consumer:
        recv = "%s:%d:%d: key=%s value=%s" %(msg.topic,msg.partition,msg.offset,msg.key,msg.value)
        log(recv)

当前的consumer只能接收自身启动后的数据,如果需要启动后将之前的数据全部接收,需要设置consumer的group_id,kafka内部会将数据广播到分组上,组会将数据转接到对应的consumer上。

consumer=KafkaConsumer('test_topic',group_id='test_group_id',bootstrap_servers=['node-teach:9092'])

results matching ""

    No results matching ""