1.7 Flume的负载均衡(load balance)、容错(failover)
1.7.1 load balance
负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。Load balancing Sink Processor 能够实现 load balance 功能,如下图 Agent1 是一个路由节点,负责将 Channel 暂存的 Event 均衡到对应的多个 Sink 组件上,而每个 Sink 组件分别连接到一个独立的 Agent 上,示例配置,如下所示:
关键步骤:定义sink的组,用于将一个agent对接到多个agent
#定义sink的组
a1.sinkgroups = g1
#组中的sink组员为k1,k2,k3
a1.sinkgroups.g1.sinks = k1 k2 k3
#定义sink组的类型为load_balance(负载均衡)
a1.sinkgroups.g1.processor.type = load_balance
#如果开启,则将失败的 sink 放入黑名单,,等待一段时间后依然没有接到信息,则会进行惩罚,等待时间会呈指数增长
a1.sinkgroups.g1.processor.backoff = true
#定义选择sink的方式为round_robin(轮询:指的是轮流使用对应的服务器)
#此外还可以选择为random,随机选择
a1.sinkgroups.g1.processor.selector = round_robin
#在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
全部配置:
A服务器:exec-memory-avro.conf
#定义agent
exec-memory-avro.sources = exec-source
exec-memory-avro.channels = memory-channel
exec-memory-avro.sinks = avro-sink1 avro-sink2
#定义source
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /root/bigdata/data/abc.log
#定义sink的组
exec-memory-avro.sinkgroups = g1
#定义组员sink1,为avro-sink,对接到node-teach1的52020端口上
exec-memory-avro.sinks.avro-sink1.channel = memory-channel
exec-memory-avro.sinks.avro-sink1.type = avro
exec-memory-avro.sinks.avro-sink1.hostname = node-teach1
exec-memory-avro.sinks.avro-sink1.port = 52020
#定义组员sink2
exec-memory-avro.sinks.avro-sink2.channel = memory-channel
exec-memory-avro.sinks.avro-sink2.type = avro
exec-memory-avro.sinks.avro-sink2.hostname = node-teach2
exec-memory-avro.sinks.avro-sink2.port = 52020
#将组员sink加入到sink组
exec-memory-avro.sinkgroups.g1.sinks = avro-sink1 avro-sink2
#定义sink组中的选择服务器的方式为round_robin(轮询)
exec-memory-avro.sinkgroups.g1.processor.type = load_balance
exec-memory-avro.sinkgroups.g1.processor.backoff = true
exec-memory-avro.sinkgroups.g1.processor.selector = round_robin
exec-memory-avro.sinkgroups.g1.processor.selector.maxTimeOut=10000
#定义channel
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.channels.memory-channel.capacity = 1000
exec-memory-avro.channels.memory-channel.transactionCapacity = 100
启动:
bin/flume-ng agent -c conf -f exec-memory-avro.conf -n exec-memory-avro -Dflume.root.logger=INFO,console
B服务器:avro-memory-logger.conf
#定义agent
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel
#定义source为avro-source,将数据绑定到对应的node-teach1的52020端口上
avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sources.avro-source.bind = node-teach1
avro-memory-logger.sources.avro-source.port = 52020
#定义sink
avro-memory-logger.sinks.logger-sink.type = logger
avro-memory-logger.sinks.logger-sink.channel = memory-channel
#定义channel,最大容量为1000,事务容量为100
avro-memory-logger.channels.memory-channel.type = memory
avro-memory-logger.channels.memory-channel.capacity = 1000
avro-memory-logger.channels.memory-channel.transactionCapacity = 100
C服务器:avro-memory-logger.conf
C服务器和B服务器的配置方式都是类似的,只不过及其的主机名发生了变化
#定义agent
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel
#定义source为avro-source,将数据绑定到对应的node-teach2的52020端口上
avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sources.avro-source.bind = node-teach2
avro-memory-logger.sources.avro-source.port = 52020
#定义sink
avro-memory-logger.sinks.logger-sink.type = logger
avro-memory-logger.sinks.logger-sink.channel = memory-channel
#定义channel,最大容量为1000,事务容量为100
avro-memory-logger.channels.memory-channel.type = memory
avro-memory-logger.channels.memory-channel.capacity = 1000
avro-memory-logger.channels.memory-channel.transactionCapacity = 100
启动B/C服务器:
flume-ng agent -f avro-memory-logger.conf -n avro-memory-logger -Dflume.root.logger=INFO,console
模拟数据添加:
下面脚本解释:
while true:死循环
do echo "test">>/xxx/xxx.log:输出数据"test"到某个日志文件中
sleep 0.5:睡眠0.5秒
done:结束符
while true;do echo "add ..." >> /root/bigdata/data/abc.log;sleep 0.5;done
启动服务器的顺序:先启动B/C,后启动A
1.7.2 Failover
Failover Sink Processor 能够实现 failover 功能,具体流程类似 load balance,但是内部处理机制与 load balance 完全不同。
Failover Sink Processor 维护一个优先级 Sink 组件列表,只要有一个 Sink组件可用,Event 就被传递到这个激活的组件上。故障转移机制的作用是将失败的 Sink降级到一个池,在这些池中它们被分配一个冷却时间,随着故障的连续,在重试之前冷却时间增加。一旦 Sink 成功发送一个事件,它将恢复到活动池。 Sink 具有与之相关的优先级,数量越大,优先级越高。 举例:
具有优先级为 100 的 sink 在优先级为 80 的 Sink 之前被激活。如果在发送事件时汇聚失败,则接下来将尝试下一个具有最高优先级的 Sink 发送事件。如果没有指定优先级,则根据在配置中指定 Sink 的顺序来确定优先级。 示例配置如下所示:
#定义一个sink组
a1.sinkgroups = g1
#定义sjnk组中的组员为k1,k2,k3
a1.sinkgroups.g1.sinks = k1 k2 k3
#定义组的类型为failover(容错)
a1.sinkgroups.g1.processor.type = failover
#定义组中的组员的优先级值, 绝对值越大表示优先级越高
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
#失败的 Sink 的最大回退期,超过这个回退期没有恢复机器,则会惩罚,时间加剧。
a1.sinkgroups.g1.processor.maxpenalty = 20000
全部配置:
A服务器:exec-memory-avro.conf
#定义agent
exec-memory-avro.channels = memory-channel
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink1 avro-sink2
#定义组
exec-memory-avro.sinkgroups = g1
#定义source为exec source
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /root/bigdata/data/bcd.log
#定义组员sink1
exec-memory-avro.sinks.avro-sink1.channel = memory-channel
exec-memory-avro.sinks.avro-sink1.type = avro
exec-memory-avro.sinks.avro-sink1.hostname = node-teach1
exec-memory-avro.sinks.avro-sink1.port = 52020
#定义sink为avro-sink2,连接到node-teach2的52020端口上
exec-memory-avro.sinks.avro-sink2.channel = memory-channel
exec-memory-avro.sinks.avro-sink2.type = avro
exec-memory-avro.sinks.avro-sink2.hostname = node-teach2
exec-memory-avro.sinks.avro-sink2.port = 52020
#定义sink组
exec-memory-avro.sinkgroups.g1.sinks = avro-sink1 avro-sink2
#定义组的类型为failover(容错),优先级及其最大回退时间
exec-memory-avro.sinkgroups.g1.processor.type = failover
exec-memory-avro.sinkgroups.g1.processor.priority.avro-sink1 = 10
exec-memory-avro.sinkgroups.g1.processor.priority.avro-sink2 = 1
exec-memory-avro.sinkgroups.g1.processor.maxpenalty = 10000
#定义channel
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.channels.memory-channel.capacity = 1000
exec-memory-avro.channels.memory-channel.transactionCapacity = 100
启动A服务器
flume-ng agent -f exec-memory-avro.conf -n exec-memory-avro -Dflume.root.logger=INFO,console
B服务器:avro-memory-logger.conf
接收A服务器的内容并输出到控制台上
#定义agent
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel
#定义source为avro,绑定到node-teach1的52020端口上
avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sources.avro-source.bind = node-teach1
avro-memory-logger.sources.avro-source.port = 52020
#定义sink为logger将内容输出到控制台上
avro-memory-logger.sinks.logger-sink.type = logger
avro-memory-logger.sinks.logger-sink.channel = memory-channel
#定义memory
avro-memory-logger.channels.memory-channel.type = memory
avro-memory-logger.channels.memory-channel.capacity = 1000
avro-memory-logger.channels.memory-channel.transactionCapacity = 100
C服务器:
#定义agent
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel
#定义source为avro,绑定到node-teach2的52020端口上
avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sources.avro-source.bind = node-teach2
avro-memory-logger.sources.avro-source.port = 52020
#定义sink为logger将内容输出到控制台上
avro-memory-logger.sinks.logger-sink.type = logger
avro-memory-logger.sinks.logger-sink.channel = memory-channel
#定义channel
avro-memory-logger.channels.memory-channel.type = memory
avro-memory-logger.channels.memory-channel.capacity = 1000
avro-memory-logger.channels.memory-channel.transactionCapacity = 100
启动B/C服务器:
flume-ng agent -f avro-memory-logger.conf -n avro-memory-logger -Dflume.root.logger=INFO,console
注:B机器和C机器的指令完全一致