设为首页 收藏本站
查看: 1854|回复: 1

[经验分享] Flume NG 学习笔记(七)Sink Processors(故障转移与负载均衡)测试

[复制链接]

尚未签到

发表于 2019-1-30 10:07:20 | 显示全部楼层 |阅读模式
版权声明:本文为博主原创文章,未经博主允许不得转载。
目录(?)[+]

  Sink groups允许组织多个sink到一个实体上。 Sink processors能够提供在组内所有Sink之间实现负载均衡的能力,而且在失败的情况下能够进行故障转移从一个Sink到另一个Sink。
  简单的说就是一个source 对应一个Sinkgroups,即多个sink,这里实际上与第六节的复用/复制情况差不多,只是这里考虑的是可靠性与性能,即故障转移与负载均衡的设置。
  下面是官方配置:
Property Name

Default

Description

sinks


Space-separated list of sinks that are participating in the group

processor.type

default

The component type name, needs to be default, failover or load_balance

  从参数类型上可以看出有3种Processors类型:default, failover(故障转移)和 load_balance(负载均衡),当然,官网上说目前自定义processors 还不支持。
  下面是官网例子
a1.sinkgroups=g1

a1.sinkgroups.g1.sinks=k1 k2

a1.sinkgroups.g1.processor.type=load_balance

一、Default Sink Processor
  DefaultSink Processor 接收单一的Sink,不强制用户为Sink创建Processor,前面举了很多例子。所以这个就不多说了。
二、Failover Sink Processor(故障转移)
  FailoverSink Processor会通过配置维护了一个优先级列表。保证每一个有效的事件都会被处理。
  故障转移的工作原理是将连续失败sink分配到一个池中,在那里被分配一个冷冻期,在这个冷冻期里,这个sink不会做任何事。一旦sink成功发送一个event,sink将被还原到live 池中。
  在这配置中,要设置sinkgroups processor为failover,需要为所有的sink分配优先级,所有的优先级数字必须是唯一的,这个得格外注意。此外,failover time的上限可以通过maxpenalty 属性来进行设置。
  下面是官网配置:
Property Name

Default

Description

sinks


Space-separated list of sinks that are participating in the group

processor.type

default

The component type name, needs to be failover

processor.priority.


must be one of the sink instances associated with the current sink group

processor.maxpenalty

30000

(in millis)

  下面是官网例子
a1.sinkgroups=g1

a1.sinkgroups.g1.sinks=k1 k2

a1.sinkgroups.g1.processor.type=failover

a1.sinkgroups.g1.processor.priority.k1=5

a1.sinkgroups.g1.processor.priority.k2=10

a1.sinkgroups.g1.processor.maxpenalty=10000

  这里首先要申明一个sinkgroups,然后再设置2个sink ,k1与k2,其中2个优先级是5和10,而processor的maxpenalty被设置为10秒,默认是30秒。‘
  下面是测试例子
  

  [html] view plain copy

  •   #配置文件:failover_sink_case13.conf  
  •   #Name the components on this agent  
  •   a1.sources= r1  
  •   a1.sinks= k1 k2  
  •   a1.channels= c1 c2  
  •      
  •   a1.sinkgroups= g1  
  •   a1.sinkgroups.g1.sinks= k1 k2  
  •   a1.sinkgroups.g1.processor.type= failover  
  •   a1.sinkgroups.g1.processor.priority.k1= 5  
  •   a1.sinkgroups.g1.processor.priority.k2= 10  
  •   a1.sinkgroups.g1.processor.maxpenalty= 10000  
  •      
  •   #Describe/configure the source  
  •   a1.sources.r1.type= syslogtcp  
  •   a1.sources.r1.port= 50000  
  •   a1.sources.r1.host= 192.168.233.128  
  •   a1.sources.r1.channels= c1 c2  
  •      
  •   #Describe the sink  
  •   a1.sinks.k1.type= avro  
  •   a1.sinks.k1.channel= c1  
  •   a1.sinks.k1.hostname= 192.168.233.129  
  •   a1.sinks.k1.port= 50000  
  •      
  •   a1.sinks.k2.type= avro  
  •   a1.sinks.k2.channel= c2  
  •   a1.sinks.k2.hostname= 192.168.233.130  
  •   a1.sinks.k2.port= 50000  
  •   # Usea channel which buffers events in memory  
  •   a1.channels.c1.type= memory  
  •   a1.channels.c1.capacity= 1000  
  •   a1.channels.c1.transactionCapacity= 100  
  

  

  这里设置了2个channels与2个sinks ,关于故障转移的设置直接复制官网的例子。我们还要配置2个sinks对于的代理。这里的2个接受代理我们沿用之前第六章复制的2个sink代理配置。
  下面是第一个接受复制事件代理配置
  

  [html] view plain copy

  •   #配置文件:replicate_sink1_case11.conf  
  •   # Name the components on this agent  
  •   a2.sources = r1  
  •   a2.sinks = k1  
  •   a2.channels = c1  
  •      
  •   # Describe/configure the source  
  •   a2.sources.r1.type = avro  
  •   a2.sources.r1.channels = c1  
  •   a2.sources.r1.bind = 192.168.233.129  
  •   a2.sources.r1.port = 50000  
  •      
  •   # Describe the sink  
  •   a2.sinks.k1.type = logger  
  •   a2.sinks.k1.channel = c1  
  •      
  •   # Use a channel which buffers events inmemory  
  •   a2.channels.c1.type = memory  
  •   a2.channels.c1.capacity = 1000  
  •   a2.channels.c1.transactionCapacity = 100  
  

  

  下面是第二个接受复制事件代理配置:
  

  [html] view plain copy

  •   #配置文件:replicate_sink2_case11.conf  
  •   # Name the components on this agent  
  •   a3.sources = r1  
  •   a3.sinks = k1  
  •   a3.channels = c1  
  •      
  •   # Describe/configure the source  
  •   a3.sources.r1.type = avro  
  •   a3.sources.r1.channels = c1  
  •   a3.sources.r1.bind = 192.168.233.130  
  •   a3.sources.r1.port = 50000  
  •      
  •   # Describe the sink  
  •   a3.sinks.k1.type = logger  
  •   a3.sinks.k1.channel = c1  
  •      
  •   # Use a channel which buffers events inmemory  
  •   a3.channels.c1.type = memory  
  •   a3.channels.c1.capacity = 1000  
  •   a3.channels.c1.transactionCapacity = 100  
  

  

  #敲命令
  首先先启动2个接受复制事件代理,如果先启动源发送的代理,会报他找不到sinks的绑定,因为2个接事件的代理还未起来。
  flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1 -Dflume.root.logger=INFO,console
  flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1 -Dflume.root.logger=INFO,console
  在启动源发送的代理
  flume-ng agent -cconf -f conf/failover_sink_case13.conf -n a1 -Dflume.root.logger=INFO,console
  启动成功后
  打开另一个终端输入,往侦听端口送数据
  echo "hello failoversink" | nc 192.168.233.128 50000
  #在启动源发送的代理终端查看console输出
  因为k1的优先级是5,K2是10因此当K2正常运行的时候,是发送到K2的。下面数据正常输出。


  

  然后我们中断K2的代理进程。


  

  再尝试往侦听端口送数据
  echo "hello close k2"| nc 192.168.233.128 50000
  我们发现源代理发生事件到K2失败,然后他将K2放入到failover list(故障列表)


  

  因为K1还是正常运行的,因此这个时候他会接收到数据。


  

  然后我们再打开K2的大理进程,我们继续往侦听端口送数据
  echo " hello open k2 again" | nc192.168.233.128 50000


  

  数据正常发生,Failover SinkProcessor测试完毕。
三、Load balancing SinkProcessor
  负载均衡片处理器提供在多个Sink之间负载平衡的能力。实现支持通过round_robin(轮询)或者random(随机)参数来实现负载分发,默认情况下使用round_robin,但可以通过配置覆盖这个默认值。还可以通过集成AbstractSinkSelector类来实现用户自己的选择机制。
  当被调用的时候,这选择器通过配置的选择规则选择下一个sink来调用。
  下面是官网配置
Property Name

Default

Description

processor.sinks


Space-separated list of sinks that are participating in the group

processor.type

default

The component type name, needs to be load_balance

processor.backoff

false

Should failed sinks be backed off exponentially.

processor.selector

round_robin

Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector

processor.selector.maxTimeOut

30000

Used by backoff selectors to limit exponential backoff (in milliseconds)

  下面是官网的例子
a1.sinkgroups=g1

a1.sinkgroups.g1.sinks=k1 k2

a1.sinkgroups.g1.processor.type=load_balance

a1.sinkgroups.g1.processor.backoff=true

a1.sinkgroups.g1.processor.selector=random

  这个与故障转移的设置差不多。
  下面是测试例子
  

  [html] view plain copy

  •   #配置文件:load_sink_case14.conf  
  •   # Name the components on this agent  
  •   a1.sources = r1  
  •   a1.sinks = k1 k2  
  •   a1.channels = c1  
  •      
  •   a1.sinkgroups = g1  
  •   a1.sinkgroups.g1.sinks = k1 k2  
  •   a1.sinkgroups.g1.processor.type =load_balance  
  •   a1.sinkgroups.g1.processor.backoff = true  
  •   a1.sinkgroups.g1.processor.selector =round_robin  
  •      
  •   # Describe/configure the source  
  •   a1.sources.r1.type = syslogtcp  
  •   a1.sources.r1.port = 50000  
  •   a1.sources.r1.host = 192.168.233.128  
  •   a1.sources.r1.channels = c1  
  •      
  •   # Describe the sink  
  •   a1.sinks.k1.type = avro  
  •   a1.sinks.k1.channel = c1  
  •   a1.sinks.k1.hostname = 192.168.233.129  
  •   a1.sinks.k1.port = 50000  
  •      
  •   a1.sinks.k2.type = avro  
  •   a1.sinks.k2.channel = c1  
  •   a1.sinks.k2.hostname = 192.168.233.130  
  •   a1.sinks.k2.port = 50000  
  •   # Use a channel which buffers events inmemory  
  •   a1.channels.c1.type = memory  
  •   a1.channels.c1.capacity = 1000  
  •   a1.channels.c1.transactionCapacity = 100  
  

  这里要说明的是,因此测试的是负载均衡的例子,因此这边使用一个channel来作为数据传输通道。这里sinks的对应的接收数据的代理配置,我们沿用故障转移的接收代理配置。
  #敲命令
  首先先启动2个接受复制事件代理,如果先启动源发送的代理,会报他找不到sinks的绑定,因为2个接事件的代理还未起来。
  flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1
  -Dflume.root.logger=INFO,console
  flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1
  -Dflume.root.logger=INFO,console
  在启动源发送的代理
  flume-ng agent -cconf -f conf/load_sink_case14.conf -n a1
  -Dflume.root.logger=INFO,console
  
  启动成功后
  打开另一个终端输入,往侦听端口送数据
  echo "loadbanlancetest1" | nc 192.168.233.128 50000
  echo "loadbantest2" | nc 192.168.233.128 50000
  echo "loadban test3"| nc 192.168.233.128 50000
  echo "loadbantest4" | nc 192.168.233.128 50000
  echo "loadbantest5" | nc 192.168.233.128 50000
  #在启动源发送的代理终端查看console输出
  其中K1收到3条数据


  

  其中K1收到2条数据


  

因为我们负载均衡选择的类型是轮询,因此可以看出flume 让代理每次向一个sink发送2次事件数据后就换另一个sinks 发送。

Sink Processors测试完毕

  





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669475-1-1.html 上篇帖子: flume应用开发、搜索引擎算法、Pipes、集群、PageRank算法 下篇帖子: Flume环境部署和配置详解及案例大全
累计签到:1949 天
连续签到:45 天
发表于 2019-2-28 20:49:37 | 显示全部楼层
感谢楼主的分享!

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表