设为首页 收藏本站
查看: 1521|回复: 0

[经验分享] Flume架构与源码分析-核心组件分析-2

[复制链接]

尚未签到

发表于 2017-5-22 07:04:39 | 显示全部楼层 |阅读模式

4、整体流程

  从以上部分我们可以看出,不管是Source还是Sink都依赖Channel,那么启动时应该先启动Channel然后再启动Source或Sink即可。
  Flume有两种启动方式:使用EmbeddedAgent内嵌在Java应用中或使用Application单独启动一个进程,此处我们已Application分析为主。
  首先进入org.apache.flume.node.Application的main方法启动:

//1、设置默认值启动参数、参数是否必须的
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);
option = new Option("f", "conf-file", true,
"specify a config file (required if -z missing)");
option.setRequired(false);
options.addOption(option);
//2、接着解析命令行参数
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");
if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
isZkConfigured = true;
}
if (isZkConfigured) {
//3、如果是通过ZooKeeper配置,则使用ZooKeeper参数启动,此处忽略,我们以配置文件讲解
} else {
//4、打开配置文件,如果不存在则快速失败
File configurationFile = new File(commandLine.getOptionValue('f'));
if (!configurationFile.exists()) {
throw new ParseException(
"The specified configuration file does not exist: " + path);
}
List<LifecycleAware> components = Lists.newArrayList();
if (reload) { //5、如果需要定期reload配置文件,则走如下方式
//5.1、此处使用Guava提供的事件总线
EventBus eventBus = new EventBus(agentName + "-event-bus");
//5.2、读取配置文件,使用定期轮训拉起策略,默认30s拉取一次
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components); //5.3、向Application注册组件
//5.4、向事件总线注册本应用,EventBus会自动注册Application中使用@Subscribe声明的方法
eventBus.register(application);
} else { //5、配置文件不支持定期reload
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(
agentName, configurationFile);
application = new Application();
//6.2、直接使用配置文件初始化Flume组件
application.handleConfigurationEvent(configurationProvider
.getConfiguration());
}
}
//7、启动Flume应用
application.start();
//8、注册虚拟机关闭钩子,当虚拟机关闭时调用Application的stop方法进行终止
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
}
});

  以上流程只提取了核心代码中的一部分,比如ZK的实现直接忽略了,而Flume启动大体流程如下:
  1、读取命令行参数;
  2、读取配置文件;
  3、根据是否需要reload使用不同的策略初始化Flume;如果需要reload,则使用Guava的事件总线实现,Application的handleConfigurationEvent是事件订阅者,PollingPropertiesFileConfigurationProvider是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而handleConfigurationEvent会收到该配置变更重新进行初始化;
  4、启动Application,并注册虚拟机关闭钩子。
   
handleConfigurationEvent方法比较简单,首先调用了stopAllComponents停止所有组件,接着调用startAllComponents使用配置文件初始化所有组件: 

@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
stopAllComponents();
startAllComponents(conf);

  MaterializedConfiguration存储Flume运行时需要的组件:Source、Channel、Sink、SourceRunner、SinkRunner等,其是通过ConfigurationProvider进行初始化获取,比如PollingPropertiesFileConfigurationProvider会读取配置文件然后进行组件的初始化。
  对于startAllComponents实现大体如下: 

//1、首先启动Channel
supervisor.supervise(Channels,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//2、确保所有Channel是否都已启动
for(Channel ch: materializedConfiguration.getChannels().values()){
while(ch.getLifecycleState() != LifecycleState.START
&& !supervisor.isComponentInErrorState(ch)){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Throwables.propagate(e);
}
}
}
//3、启动SinkRunner
supervisor.supervise(SinkRunners,  
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//4、启动SourceRunner
supervisor.supervise(SourceRunner,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//5、初始化监控服务
this.loadMonitoring(); 
  从如下代码中可以看到,首先要准备好Channel,因为Source和Sink会操作它,对于Channel如果初始化失败则整个流程是失败的;然后启动SinkRunner,先准备好消费者;接着启动SourceRunner开始进行采集日志。此处我们发现有两个单独的组件LifecycleSupervisor和MonitorService,一个是组件守护哨兵,一个是监控服务。守护哨兵对这些组件进行守护,假设出问题了默认策略是自动重启这些组件。
  对于stopAllComponents实现大体如下:

//1、首先停止SourceRunner
supervisor.unsupervise(SourceRunners);
//2、接着停止SinkRunner
supervisor.unsupervise(SinkRunners);
//3、然后停止Channel
supervisor.unsupervise(Channels);
//4、最后停止MonitorService
monitorServer.stop(); 
  此处可以看出,停止的顺序是Source、Sink、Channel,即先停止生产,再停止消费,最后停止管道。
  Application中的start方法代码实现如下:

public synchronized void start() {
for(LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}

  其循环Application注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持reload配置文件,则之前启动Application时注册过PollingPropertiesFileConfigurationProvider组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启。
  而Application关闭执行了如下动作:    

public synchronized void stop() {
supervisor.stop();
if(monitorServer != null) {
monitorServer.stop();
}

  即关闭守护哨兵和监控服务。
  到此基本的Application分析结束了,我们还有很多疑问,守护哨兵怎么实现的。 
  整体流程可以总结为:
  1、首先初始化命令行配置;
  2、接着读取配置文件;
  3、根据是否需要reload初始化配置文件中的组件;如果需要reload会使用Guava事件总线进行发布订阅变化;
  4、接着创建Application,创建守护哨兵,并先停止所有组件,接着启动所有组件;启动顺序:Channel、SinkRunner、SourceRunner,并把这些组件注册给守护哨兵、初始化监控服务;停止顺序:SourceRunner、SinkRunner、Channel;
  5、如果配置文件需要定期reload,则需要注册Polling***ConfigurationProvider到守护哨兵;
  6、最后注册虚拟机关闭钩子,停止守护哨兵和监控服务。
   
轮训实现的SourceRunner 和SinkRunner会创建一个线程进行工作,之前已经介绍了其工作方式。接下来我们看下守护哨兵的实现。
 
首先创建LifecycleSupervisor:

  //1、用于存放被守护的组件
supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
//2、用于存放正在被监控的组件
monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
//3、创建监控服务线程池
monitorService = new ScheduledThreadPoolExecutor(10,
new ThreadFactoryBuilder().setNameFormat(
"lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
.build());
monitorService.setMaximumPoolSize(20);
monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
//4、定期清理被取消的组件
purger = new Purger();
//4.1、默认不进行清理
needToPurge = false; 
LifecycleSupervisor启动时会进行如下操作:

public synchronized void start() {
monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS);
lifecycleState = LifecycleState.START;

首先每隔两个小时执行清理组件,然后改变状态为启动。而LifecycleSupervisor停止时直接停止了监控服务,然后更新守护组件状态为STOP:

  //1、首先停止守护监控服务
if (monitorService != null) {
monitorService.shutdown();
try {
monitorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("Interrupted while waiting for monitor service to stop");
}
}
//2、更新所有守护组件状态为STOP,并调用组件的stop方法进行停止
for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses.entrySet()) {
if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
entry.getValue().status.desiredState = LifecycleState.STOP;
entry.getKey().stop();
}
}
//3、更新本组件状态
if (lifecycleState.equals(LifecycleState.START)) {
lifecycleState = LifecycleState.STOP;
}
//4、最后的清理
supervisedProcesses.clear();
monitorFutures.clear(); 
 
接下来就是调用supervise进行组件守护了:

  if(this.monitorService.isShutdown() || this.monitorService.isTerminated()
|| this.monitorService.isTerminating()){
//1、如果哨兵已停止则抛出异常,不再接收任何组件进行守护
}
//2、初始化守护组件
Supervisoree process = new Supervisoree();
process.status = new Status();
//2.1、默认策略是失败重启
process.policy = policy;
//2.2、初始化组件默认状态,大多数组件默认为START
process.status.desiredState = desiredState;
process.status.error = false;
//3、组件监控器,用于定时获取组件的最新状态,或者重新启动组件
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
//4、定期的去执行组件监控器,获取组件最新状态,或者重新启动组件
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
}

 
如果不需要守护了,则需要调用unsupervise:

public synchronized void unsupervise(LifecycleAware lifecycleAware) {
synchronized (lifecycleAware) {
Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
//1.1、设置守护组件的状态为被丢弃
supervisoree.status.discard = true;
//1.2、设置组件盼望的最新生命周期状态为STOP
this.setDesiredState(lifecycleAware, LifecycleState.STOP);
//1.3、停止组件
lifecycleAware.stop();
}
//2、从守护组件中移除
supervisedProcesses.remove(lifecycleAware);
//3、取消定时监控组件服务
monitorFutures.get(lifecycleAware).cancel(false);
//3.1、通知Purger需要进行清理,Purger会定期的移除cancel的组件
needToPurge = true;
monitorFutures.remove(lifecycleAware);
}

 
接下来我们再看下MonitorRunnable的实现,其负责进行组件状态迁移或组件故障恢复:

public void run() {
long now = System.currentTimeMillis();
try {
if (supervisoree.status.firstSeen == null) {
supervisoree.status.firstSeen = now; //1、记录第一次状态查看时间
}
supervisoree.status.lastSeen = now; //2、记录最后一次状态查看时间
synchronized (lifecycleAware) {
//3、如果守护组件被丢弃或出错了,则直接返回
if (supervisoree.status.discard || supervisoree.status.error) {
return;
}
//4、更新最后一次查看到的状态
supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
//5、如果组件的状态和守护组件看到的状态不一致,则以守护组件的状态为准,然后进行初始化
if (!lifecycleAware.getLifecycleState().equals(
supervisoree.status.desiredState)) {
switch (supervisoree.status.desiredState) {
case START: //6、如果是启动状态,则启动组件
try {
lifecycleAware.start();
} catch (Throwable e) {
if (e instanceof Error) {
supervisoree.status.desiredState = LifecycleState.STOP;
try {
lifecycleAware.stop();
} catch (Throwable e1) {
supervisoree.status.error = true;
if (e1 instanceof Error) {
throw (Error) e1;
}
}
}
supervisoree.status.failures++;
}
break;
case STOP: //7、如果是停止状态,则停止组件
try {
lifecycleAware.stop();
} catch (Throwable e) {
if (e instanceof Error) {
throw (Error) e;
}
supervisoree.status.failures++;
}
break;
default:
}
} catch(Throwable t) {
}
}
}

 
如上代码进行了一些简化,整体逻辑即定时去采集组件的状态,如果发现守护组件和组件的状态不一致,则可能需要进行启动或停止。即守护监视器可以用来保证组件如能失败后自动启动。默认策略是总是失败后重启,还有一种策略是只启动一次。 
  
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379686-1-1.html 上篇帖子: Flume架构与源码分析-核心组件分析-1 下篇帖子: 【Spark五十二】Spark Streaming整合Flume-NG一
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表