博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊storm的IWaitStrategy
阅读量:6171 次
发布时间:2019-06-21

本文共 20382 字,大约阅读时间需要 67 分钟。

本文主要研究一下storm的IWaitStrategy

IWaitStrategy

storm-2.0.0/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java

public interface IWaitStrategy {    static IWaitStrategy createBackPressureWaitStrategy(Map
topologyConf) { IWaitStrategy producerWaitStrategy = ReflectionUtils.newInstance((String) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY)); producerWaitStrategy.prepare(topologyConf, WAIT_SITUATION.BACK_PRESSURE_WAIT); return producerWaitStrategy; } void prepare(Map
conf, WAIT_SITUATION waitSituation); /** * Implementations of this method should be thread-safe (preferably no side-effects and lock-free) *

* Supports static or dynamic backoff. Dynamic backoff relies on idleCounter to estimate how long caller has been idling. *

*

     *      *  int idleCounter = 0;     *  int consumeCount = consumeFromQ();     *  while (consumeCount==0) {     *     idleCounter = strategy.idle(idleCounter);     *     consumeCount = consumeFromQ();     *  }     *      * 
* * @param idleCounter managed by the idle method until reset * @return new counter value to be used on subsequent idle cycle */ int idle(int idleCounter) throws InterruptedException; enum WAIT_SITUATION {SPOUT_WAIT, BOLT_WAIT, BACK_PRESSURE_WAIT}}
  • 这个接口提供了一个工厂方法,默认是读取topology.backpressure.wait.strategy参数值,创建producerWaitStrategy,并使用WAIT_SITUATION.BACK_PRESSURE_WAIT初始化
  • WAIT_SITUATION一共有三类,分别是SPOUT_WAIT, BOLT_WAIT, BACK_PRESSURE_WAIT
  • 该接口定义了int idle(int idleCounter)方法,用于static或dynamic backoff

SpoutExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

public class SpoutExecutor extends Executor {    private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);    private final IWaitStrategy spoutWaitStrategy;    private final IWaitStrategy backPressureWaitStrategy;    private final AtomicBoolean lastActive;    private final MutableLong emittedCount;    private final MutableLong emptyEmitStreak;    private final SpoutThrottlingMetrics spoutThrottlingMetrics;    private final boolean hasAckers;    private final SpoutExecutorStats stats;    private final BuiltinMetrics builtInMetrics;    SpoutOutputCollectorImpl spoutOutputCollector;    private Integer maxSpoutPending;    private List
spouts; private List
outputCollectors; private RotatingMap
pending; private long threadId = 0; public SpoutExecutor(final WorkerState workerData, final List
executorId, Map
credentials) { super(workerData, executorId, credentials, ClientStatsUtil.SPOUT); this.spoutWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); this.spoutWaitStrategy.prepare(topoConf, WAIT_SITUATION.SPOUT_WAIT); this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY)); this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT); //...... } //......}
  • 这里创建了两个watiStrategy,一个是spoutWaitStrategy,一个是backPressureWaitStrategy
  • spoutWaitStrategy读取的是topology.spout.wait.strategy参数,在defaults.yaml里头值为org.apache.storm.policy.WaitStrategyProgressive
  • backPressureWaitStrategy读取的是topology.backpressure.wait.strategy参数,在defaults.yaml里头值为org.apache.storm.policy.WaitStrategyProgressive

BoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java

public class BoltExecutor extends Executor {    private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class);    private final BooleanSupplier executeSampler;    private final boolean isSystemBoltExecutor;    private final IWaitStrategy consumeWaitStrategy;       // employed when no incoming data    private final IWaitStrategy backPressureWaitStrategy;  // employed when outbound path is congested    private final BoltExecutorStats stats;    private final BuiltinMetrics builtInMetrics;    private BoltOutputCollectorImpl outputCollector;    public BoltExecutor(WorkerState workerData, List
executorId, Map
credentials) { super(workerData, executorId, credentials, ClientStatsUtil.BOLT); this.executeSampler = ConfigUtils.mkStatsSampler(topoConf); this.isSystemBoltExecutor = (executorId == Constants.SYSTEM_EXECUTOR_ID); if (isSystemBoltExecutor) { this.consumeWaitStrategy = makeSystemBoltWaitStrategy(); } else { this.consumeWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BOLT_WAIT_STRATEGY)); this.consumeWaitStrategy.prepare(topoConf, WAIT_SITUATION.BOLT_WAIT); } this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY)); this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT); this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(this.getTopoConf()), ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS))); this.builtInMetrics = new BuiltinBoltMetrics(stats); } private static IWaitStrategy makeSystemBoltWaitStrategy() { WaitStrategyPark ws = new WaitStrategyPark(); Map
conf = new HashMap<>(); conf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 5000); ws.prepare(conf, WAIT_SITUATION.BOLT_WAIT); return ws; } //......}
  • 这里创建了两个IWaitStrategy,一个是consumeWaitStrategy,一个是backPressureWaitStrategy
  • consumeWaitStrategy在非SystemBoltExecutor的情况下读取的是topology.bolt.wait.strategy参数,在defaults.yaml里头值为org.apache.storm.policy.WaitStrategyProgressive;如果是SystemBoltExecutor则使用的是WaitStrategyPark策略
  • backPressureWaitStrategy读取的是读取的是topology.backpressure.wait.strategy参数,在defaults.yaml里头值为org.apache.storm.policy.WaitStrategyProgressive

WaitStrategyPark

storm-2.0.0/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java

public class WaitStrategyPark implements IWaitStrategy {    private long parkTimeNanoSec;    public WaitStrategyPark() { // required for instantiation via reflection. must call prepare() thereafter    }    // Convenience alternative to prepare() for use in Tests    public WaitStrategyPark(long microsec) {        parkTimeNanoSec = microsec * 1_000;    }    @Override    public void prepare(Map
conf, WAIT_SITUATION waitSituation) { if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) { parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PARK_MICROSEC)); } else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) { parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC)); } else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) { parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PARK_MICROSEC)); } else { throw new IllegalArgumentException("Unknown wait situation : " + waitSituation); } } @Override public int idle(int idleCounter) throws InterruptedException { if (parkTimeNanoSec == 0) { return 1; } LockSupport.parkNanos(parkTimeNanoSec); return idleCounter + 1; }}
  • 该策略使用的是LockSupport.parkNanos(parkTimeNanoSec)方法

WaitStrategyProgressive

storm-2.0.0/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java

/** * A Progressive Wait Strategy * 

Has three levels of idling. Stays in each level for a configured number of iterations before entering the next level. * Level 1 - No idling. Returns immediately. Stays in this level for `level1Count` iterations. Level 2 - Calls LockSupport.parkNanos(1). * Stays in this level for `level2Count` iterations Level 3 - Calls Thread.sleep(). Stays in this level until wait situation changes. * *

* The initial spin can be useful to prevent downstream bolt from repeatedly sleeping/parking when the upstream component is a bit * relatively slower. Allows downstream bolt can enter deeper wait states only if the traffic to it appears to have reduced. *

*/public class WaitStrategyProgressive implements IWaitStrategy { private int level1Count; private int level2Count; private long level3SleepMs; @Override public void prepare(Map

conf, WAIT_SITUATION waitSituation) { if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) { level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL1_COUNT)); level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL2_COUNT)); level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS)); } else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) { level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT)); level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT)); level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS)); } else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) { level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT)); level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT)); level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS)); } else { throw new IllegalArgumentException("Unknown wait situation : " + waitSituation); } } @Override public int idle(int idleCounter) throws InterruptedException { if (idleCounter < level1Count) { // level 1 - no waiting ++idleCounter; } else if (idleCounter < level1Count + level2Count) { // level 2 - parkNanos(1L) ++idleCounter; LockSupport.parkNanos(1L); } else { // level 3 - longer idling with Thread.sleep() Thread.sleep(level3SleepMs); } return idleCounter; }}

  • WaitStrategyProgressive是一个渐进式的wait strategy,它分为3个level的idling
  • level 1是no idling,立刻返回;在level 1经历了level1Count的次数之后进入level 2
  • level 2使用的是LockSupport.parkNanos(1),在level 2经历了level2Count次数之后进入level 3
  • level 3使用的是Thread.sleep(level3SleepMs),在wait situation改变的时候跳出
  • 不同的WAIT_SITUATION读取不同的LEVEL1_COUNT、LEVEL2_COUNT、LEVEL3_SLEEP_MILLIS参数,对于spout,它们的默认值分别为0、0、1;对于bolt它们的默认值分别为1、1000、1;对于back pressure,它们的默认值分别为1、1000、1

SpoutExecutor.call

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

@Override    public Callable
call() throws Exception { init(idToTask, idToTaskBase); return new Callable
() { final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount(); int recvqCheckSkips = 0; int swIdleCount = 0; // counter for spout wait strategy int bpIdleCount = 0; // counter for back pressure wait strategy int rmspCount = 0; @Override public Long call() throws Exception { int receiveCount = 0; if (recvqCheckSkips++ == recvqCheckSkipCountMax) { receiveCount = receiveQueue.consume(SpoutExecutor.this); recvqCheckSkips = 0; } long currCount = emittedCount.get(); boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending); boolean isActive = stormActive.get(); if (!isActive) { inactiveExecute(); return 0L; } if (!lastActive.get()) { lastActive.set(true); activateSpouts(); } boolean pendingEmitsIsEmpty = tryFlushPendingEmits(); boolean noEmits = true; long emptyStretch = 0; if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) { for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators. spouts.get(j).nextTuple(); } noEmits = (currCount == emittedCount.get()); if (noEmits) { emptyEmitStreak.increment(); } else { emptyStretch = emptyEmitStreak.get(); emptyEmitStreak.set(0); } } if (reachedMaxSpoutPending) { if (rmspCount == 0) { LOG.debug("Reached max spout pending"); } rmspCount++; } else { if (rmspCount > 0) { LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount); } rmspCount = 0; } if (receiveCount > 1) { // continue without idling return 0L; } if (!pendingEmits.isEmpty()) { // then facing backpressure backPressureWaitStrategy(); return 0L; } bpIdleCount = 0; if (noEmits) { spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch); return 0L; } swIdleCount = 0; return 0L; } private void backPressureWaitStrategy() throws InterruptedException { long start = Time.currentTimeMillis(); if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait."); } bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount); spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start); } private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException { emptyEmitStreak.increment(); long start = Time.currentTimeMillis(); swIdleCount = spoutWaitStrategy.idle(swIdleCount); if (reachedMaxSpoutPending) { spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start); } else { if (emptyStretch > 0) { LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch); } } } // returns true if pendingEmits is empty private boolean tryFlushPendingEmits() { for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) { if (executorTransfer.tryTransfer(t, null)) { pendingEmits.poll(); } else { // to avoid reordering of emits, stop at first failure return false; } } return true; } }; }
  • spout维护了pendingEmits队列,即emit没有成功或者等待emit的队列,同时也维护了pending的RotatingMap,即等待ack的tuple的id及数据
  • spout从topology.max.spout.pending读取TOPOLOGY_MAX_SPOUT_PENDING配置,计算maxSpoutPending=ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(),默认为null,即maxSpoutPending为0
  • spout在!reachedMaxSpoutPending && pendingEmitsIsEmpty的条件下才调用nextTuple发送数据;在pendingEmits不为空的时候触发backPressureWaitStrategy;在noEmits((currCount == emittedCount.get()))时触发spoutWaitStrategy
  • 在每次调用call的时候,在调用nextTuple之间记录currCount = emittedCount.get();如果有调用nextTuple的话,则会在SpoutOutputCollectorImpl的emit或emitDirect等方法更新emittedCount;之后用noEmits=(currCount == emittedCount.get())判断是否有发射数据
  • spout维护了bpIdleCount以及swIdleCount,分别用于backPressureWaitStrategy.idle(bpIdleCount)、spoutWaitStrategy.idle(swIdleCount)

BoltExecutor.call

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java

@Override    public Callable
call() throws Exception { init(idToTask, idToTaskBase); return new Callable
() { int bpIdleCount = 0; int consumeIdleCounter = 0; private final ExitCondition tillNoPendingEmits = () -> pendingEmits.isEmpty(); @Override public Long call() throws Exception { boolean pendingEmitsIsEmpty = tryFlushPendingEmits(); if (pendingEmitsIsEmpty) { if (bpIdleCount != 0) { LOG.debug("Ending Back Pressure Wait stretch : {}", bpIdleCount); } bpIdleCount = 0; int consumeCount = receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits); if (consumeCount == 0) { if (consumeIdleCounter == 0) { LOG.debug("Invoking consume wait strategy"); } consumeIdleCounter = consumeWaitStrategy.idle(consumeIdleCounter); if (Thread.interrupted()) { throw new InterruptedException(); } } else { if (consumeIdleCounter != 0) { LOG.debug("Ending consume wait stretch : {}", consumeIdleCounter); } consumeIdleCounter = 0; } } else { if (bpIdleCount == 0) { // check avoids multiple log msgs when spinning in a idle loop LOG.debug("Experiencing Back Pressure. Entering BackPressure Wait. PendingEmits = {}", pendingEmits.size()); } bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount); } return 0L; } // returns true if pendingEmits is empty private boolean tryFlushPendingEmits() { for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) { if (executorTransfer.tryTransfer(t, null)) { pendingEmits.poll(); } else { // to avoid reordering of emits, stop at first failure return false; } } return true; } }; }
  • bolt executor同样也维护了pendingEmits,在pendingEmits不为空的时候,触发backPressureWaitStrategy.idle(bpIdleCount)
  • 在pendingEmits为空时,根据receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits)返回的consumeCount,若为0则触发consumeWaitStrategy.idle(consumeIdleCounter)
  • bolt executor维护了bpIdleCount及consumeIdleCounter,分别用于backPressureWaitStrategy.idle(bpIdleCount)以及consumeWaitStrategy.idle(consumeIdleCounter)

小结

  • spout和bolt的executor里头都用到了backPressureWaitStrategy,读取的是topology.backpressure.wait.strategy参数(for any producer (spout/bolt/transfer thread) when the downstream Q is full),使用的实现类为org.apache.storm.policy.WaitStrategyProgressive,在下游component的recv queue满的时候使用的背压策略;具体是使用pendingEmits队列来判断,spout或bolt的call方法里头每次判断pendingEmitsIsEmpty都是调用tryFlushPendingEmits,先尝试发送数据,如果下游成功接收,则pendingEmits队列为空,通过这种机制来动态判断下游负载,决定是否触发backpressure
  • spout使用的spoutWaitStrategy,读取的是topology.spout.wait.strategy参数(employed when there is no data to produce),使用的实现类为org.apache.storm.policy.WaitStrategyProgressive,在没有数据发射的时候使用;具体是使用emittedCount来判断
  • bolt使用的consumeWaitStrategy,在非SystemBoltExecutor的情况下读取的是topology.bolt.wait.strategy参数(employed when there is no data in its receive buffer to process),使用的实现类为org.apache.storm.policy.WaitStrategyProgressive,在receive buffer没有数据处理的时候使用;具体是使用receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits)返回的consumeCount来判断
  • spout与bolt不同的还有一点就是spout除了pendingEmitsIsEmpty还多了一个reachedMaxSpoutPending参数,来判断是否继续产生数据,bolt则使用pendingEmitsIsEmpty来判断是否可以继续消费数据
  • IWaitStrategy除了WaitStrategyProgressive实现,还有WaitStrategyPark实现,该策略在bolt是SystemBolt的情况下使用

doc

转载地址:http://khtba.baihongyu.com/

你可能感兴趣的文章
CSS ID选择器与CLASS选择器
查看>>
mysql 索引B-Tree类型对索引使用的生效和失效情况详解
查看>>
指针的看法
查看>>
Cocos-2d 坐标系及其坐标转换
查看>>
LAMP网站的CACHE机制概要
查看>>
[MySQL 5.6] 5.6新参数slave_rows_search_algorithms
查看>>
ESXi5.1嵌套KVM虚拟化环境支持配置
查看>>
爬虫的小技巧之–如何寻找爬虫入口
查看>>
JVM学习(二)垃圾收集器
查看>>
为hexo博客添加基于gitment评论功能
查看>>
java 库存 进销存 商户 多用户管理系统 SSM springmvc 项目源码
查看>>
Flutter - Drawer 抽屉视图与自定义header
查看>>
ERP系统的优势_库存管理软件开发
查看>>
如何内行地评价公链(一)从真正的不可能三角谈起
查看>>
BigDecimal 详解
查看>>
Shell实战之函数的高级用法
查看>>
NASA制做模拟系外行星环境 发现了热木星大气不透明的原因
查看>>
Slog67_后端框架Skynet之Makefile解读
查看>>
iOS ShareSDK桥接技术
查看>>
BAT面试须知:Java开发的招聘标准
查看>>