博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flume源码阅读记录(3)flume启动组件Application代码分析
阅读量:4171 次
发布时间:2019-05-26

本文共 6534 字,大约阅读时间需要 21 分钟。

public class Application {  private static final Logger logger = LoggerFactory      .getLogger(Application.class);  public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";  public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";  private final List
components; private final LifecycleSupervisor supervisor; private MaterializedConfiguration materializedConfiguration; private MonitorService monitorServer; //类持有的锁 private final ReentrantLock lifecycleLock = new ReentrantLock(); public Application() { this(new ArrayList
(0)); } public Application(List
components) { this.components = components; supervisor = new LifecycleSupervisor(); } //启动方法 public void start() { lifecycleLock.lock(); try { for (LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } } finally { lifecycleLock.unlock(); } } //更新配置文件时,执行以下方法。 @Subscribe public void handleConfigurationEvent(MaterializedConfiguration conf) { try { lifecycleLock.lockInterruptibly(); //先停止,再启动所有组件。 stopAllComponents(); startAllComponents(conf); } catch (InterruptedException e) { logger.info("Interrupted while trying to handle configuration event"); return; } finally { // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock if (lifecycleLock.isHeldByCurrentThread()) { lifecycleLock.unlock(); } } } //停止方法 public void stop() { //先加锁 lifecycleLock.lock(); stopAllComponents(); try { supervisor.stop(); if (monitorServer != null) { monitorServer.stop(); } } finally { lifecycleLock.unlock(); } } //停止所有组件 停止组件的顺序是 Source---Sink---Channel private void stopAllComponents() { if (this.materializedConfiguration != null) { logger.info("Shutting down configuration: {}", this.materializedConfiguration); //停止所有Source for (Entry
entry : this.materializedConfiguration.getSourceRunners().entrySet()) { try { logger.info("Stopping Source " + entry.getKey()); supervisor.unsupervise(entry.getValue()); } catch (Exception e) { logger.error("Error while stopping {}", entry.getValue(), e); } } //停止Sink for (Entry
entry : this.materializedConfiguration.getSinkRunners().entrySet()) { try { logger.info("Stopping Sink " + entry.getKey()); supervisor.unsupervise(entry.getValue()); } catch (Exception e) { logger.error("Error while stopping {}", entry.getValue(), e); } } //停止Channel for (Entry
entry : this.materializedConfiguration.getChannels().entrySet()) { try { logger.info("Stopping Channel " + entry.getKey()); supervisor.unsupervise(entry.getValue()); } catch (Exception e) { logger.error("Error while stopping {}", entry.getValue(), e); } } } if (monitorServer != null) { monitorServer.stop(); } } //启动方法 启动的顺序是 Channel--> Sink -->Source private void startAllComponents(MaterializedConfiguration materializedConfiguration) { logger.info("Starting new configuration:{}", materializedConfiguration); this.materializedConfiguration = materializedConfiguration; for (Entry
entry : materializedConfiguration.getChannels().entrySet()) { try { logger.info("Starting Channel " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } /* * Wait for all channels to start. */ for (Channel ch : materializedConfiguration.getChannels().values()) { while (ch.getLifecycleState() != LifecycleState.START && !supervisor.isComponentInErrorState(ch)) { try { logger.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms"); Thread.sleep(500); } catch (InterruptedException e) { logger.error("Interrupted while waiting for channel to start.", e); Throwables.propagate(e); } } } for (Entry
entry : materializedConfiguration.getSinkRunners().entrySet()) { try { logger.info("Starting Sink " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } for (Entry
entry : materializedConfiguration.getSourceRunners().entrySet()) { try { logger.info("Starting Source " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } this.loadMonitoring(); } //监控加载,监控可以检测启动的配置文件,当修改配置文件并保存时,会被监控检测到。 @SuppressWarnings("unchecked") private void loadMonitoring() { Properties systemProps = System.getProperties(); Set
keys = systemProps.stringPropertyNames(); try { if (keys.contains(CONF_MONITOR_CLASS)) { String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS); Class
klass; try { //Is it a known type? klass = MonitoringType.valueOf( monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass(); } catch (Exception e) { //Not a known type, use FQCN klass = (Class
) Class.forName(monitorType); } this.monitorServer = klass.newInstance(); Context context = new Context(); for (String key : keys) { if (key.startsWith(CONF_MONITOR_PREFIX)) { context.put(key.substring(CONF_MONITOR_PREFIX.length()), systemProps.getProperty(key)); } } monitorServer.configure(context); monitorServer.start(); } } catch (Exception e) { logger.warn("Error starting monitoring. " + "Monitoring might not be available.", e); } }}

对于Application的源代码,主要由几点需要注意:

1.start(),stop()方法中,组件的启动与关闭顺序是不同的。其中start方法顺序是,Channel-->Sink-->Source。而stop方法中关闭的顺序是Source-->Sink-->Channel。代码中所写的SinkRunner,SourceRunner控制Sink,Source 的启动关闭。

2.各组件都实现LifecycleAware接口,并由LifecycleSupervisor控制组件的状态变化。

3.loadMonitoring()方法用于加载监控。读取启动时的flume.monitoring.type加载对于的监控器类型。

4.handleConfigurationEvent()方法更新配置,若修改启动命令行所指定的配置文件,会被检测到并重启所有组件。

 

PollingZooKeeperConfigurationProvider类

provider负责各个组件的启动停止。

那么该类跟Application中的startAllCompoments有何关联。

这里LifecycleSupervisor的对象调用sipervise方法,并参入LifecycleState.START作为启动参数。

上面说到,各组件都实现LifecycleAware接口,LifecycleAware接口如下:

而PollingZooKeeperConfigurationProvider也实现了LifecycleAware接口,那么这里实际上是用的provider的start方法 ,修改组件的启动状态,并加入监听器监听组件的状态。

 

 

转载地址:http://lfkai.baihongyu.com/

你可能感兴趣的文章
pthread线程使用小结
查看>>
A Game of Thrones(59)
查看>>
2018.3.19
查看>>
A Game of Thrones(97)
查看>>
A Game of Thrones(98)
查看>>
2018.3.20
查看>>
2018.3.21
查看>>
2018.3.22
查看>>
2018.3.23
查看>>
A Game of Thrones(102)
查看>>
2018.4.29
查看>>
2018.4.30
查看>>
2018.4.31
查看>>
2018.4.32
查看>>
2018.4.33
查看>>
《python基础教程》答案(第一章)
查看>>
2018.4.34
查看>>
2018.4.35
查看>>
2018.4.36
查看>>
我为什么要写博客
查看>>