4、Flume与kafka整合
需求:
利用flume监听网络端口的数据,通过kafka将flume传递的数据传入到broker中,利用kafka内置的consumer进行数据消费
4.1 配置flume
在flume的conf目录下编辑kafka.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定义source,为了监听网络上的某个地址和端口
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop000
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
#定义sink为KafkaSink,为了对接到Kafka的进程上
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = topic1
a1.sinks.k1.brokerList = 127.0.0.1:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
#定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
4.2 启动kafka
- 启动zookeeper
bin/zkServer.sh start
- 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties &
- 新增topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1
启动Flume
bin/flume-ng agent --conf-file conf/kafka.conf--name a1 -Dflume.root.logger=DEBUG,console
利用telnet向指定端口发送数据
telnet localhost 44444
启动consumer查看输出
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic topic1