开闭作为现在微服务项目中常用的灵活性举措,在复试中的确也是时常会被问及的,我在复试的这时候也时常讨厌问呵呵你对开闭演算法知道哪一些?有看完源代码吗?同时实现基本原理是什么?
第三部分先谈谈开闭演算法,最后再谈谈源代码的同时实现原理。
开闭演算法
有关开闭的演算法大体能分成五类:一般而言询问处计时器、翻转询问处计时器、漏桶(也有称棒状,英语Leaky bucket)、副本桶(英语Token bucket)。
一般而言询问处
一般而言询问处,较之其他的开闭演算法,这如果是最单纯的一种。
它单纯地对两个一般而言的天数询问处内的允诺数目展开算数,如果少于允诺数目的共振频率,将被间接弃置。
那个单纯的开闭演算法优劣都很明显。缺点不然是单纯,缺点举个范例而言。
比如说他们右图中的红色地区是一般而言天数询问处,预设天数覆盖范围是60s,开闭数目是100。
如图被除数内右图,前面一段天数都没网络流量,正好前面30秒迈盖了100个允诺,这时因为没少于开闭共振频率,因此允诺全数透过,接着下两个窗口的20秒内反之亦然透过了100个允诺。
因此隐性的相等于在那个括弧的40秒的天数内就透过了200个允诺,少于了他们开闭的共振频率。
翻转询问处
为了强化那个问题,只好有了翻转询问处演算法,简而言之,翻转询问处是天数询问处在随着天数推移时不时地终端。
翻转询问处把两个一般而言天数询问处再继续拆分成N个小询问处,接着对每一小询问处依次展开算数,大部份小询问处允诺之和不能少于他们预设的开闭共振频率。
以右图举范例而言,假定他们的询问处拆分成了3个小窗口,小询问处都是20s,反之亦然如前所述下面的范例,当在第二个20s的这时候来了100个允诺,能透过。
接着天数询问处翻转,下两个20s允诺又来了100个允诺,这时他们翻转询问处的60s覆盖范围内允诺数目的确就少于100了啊,因此允诺被拒绝。
漏桶Leaky bucket
漏桶演算法,人如其名,他是两个漏的桶,不管允诺的数目有多少,最终都会以一般而言的出口网络流量大小匀速流出,如果允诺的网络流量少于漏桶大小,那么超出的网络流量将会被弃置。
也是说网络流量流入的速度是不定的,但是流出的速度是恒定的。
那个和MQ削峰填谷的思想比较类似,在面对突然激增的网络流量的这时候,透过漏桶演算法能做到匀速排队,一般而言速度开闭。
漏桶演算法的优势是匀速,匀速是缺点也是缺点,很多人说漏桶不能处理突增网络流量,那个说法并不准确。
漏桶本来就如果是为了处理间歇性的突增网络流量,网络流量呵呵起来了,接着系统处理不过来,能在空闲的这时候去处理,防止了突增网络流量导致系统崩溃,保护了系统的稳定性。
但是,换两个思路来想,其实这些突增的网络流量对于系统而言完全没压力,你还在慢慢地匀速排队,其实是对系统性能的浪费。
因此,对于这种有场景而言,副本桶演算法比漏桶就更有优势。
副本桶token bucket
现在的副本桶演算法,像Guava和Sentinel的同时实现都有冷启动/预热的方式,为了避免在网络流量激增的同时把系统打挂,副本桶演算法会在最开始一段天数内冷启动,随着网络流量的增加,系统会根据网络流量大小动态地调整生成副本的速度,最终直到允诺达到系统的共振频率。
源代码举例
他们以sentinel举例,sentinel中统计用到了翻转询问处演算法,接着也有用到漏桶、副本桶演算法。
翻转询问处
sentinel中就使用到了翻转询问处演算法来展开统计,不过他的同时实现和我下面画的图有点不一样,实际上sentinel中的翻转询问处用两个圆形来描述更合理一点。
前期是创建节点,接着slot串起来是两个责任链模式,StatisticSlot透过翻转询问处来统算数据,FlowSlot是真正开闭的逻辑,还有一些降级、系统保护的举措,最终形成了整个sentinel的开闭方式。
翻转询问处的同时实现主要能看LeapArray的代码,预设不然定义了天数询问处的相关参数。
对于sentinel而言其实询问处分成秒和分钟两个级别,秒不然询问处数目是2,分钟则是60个询问处,每一询问处的天数长度是1s,总的天数周期是60s,分成60个询问处,这里他们就以分钟级别的统计而言。
public abstract class LeapArray<T> {
//询问处天数长度,毫秒数,预设1000ms
protected int windowLengthInMs;
//询问处数目,预设60
protected int sampleCount;
//毫秒天数周期,预设60*1000
protected int intervalInMs;
//秒级天数周期,预设60
private double intervalInSecond;
//天数询问处数组
protected final AtomicReferenceArray<WindowWrap<T>> array;
接着他们要看的是它是怎么计算出当前询问处的,其实源代码里写的听清楚的,但是如果你按照之前想象把他当做一条直线延伸去想不然估计不太好理解。
首先计算数组索引下标和天数询问处天数那个都比较单纯,难点如果大部分在于第三点询问处大于old那个是什么鬼,详细说下这几种情况。
数组中的天数询问处是是空的,那个说明天数走到了他们初始化的天数之后了,这时new两个新的询问处透过CAS的方式去更新,接着返回那个新的询问处就好了。第二种情况是正好天数询问处的天数相等,那么间接返回,没啥好说的第三种情况是比较难以理解的,能参看两条天数线的图,就比较好理解了,第三次天数询问处走完了达到1200,接着圆形天数询问处开始循环,新的天数起始位置还是1200,接着天数询问处的天数来到1676,B2的位置如果还是老的询问处那么是600,因此他们要重置之前的天数询问处的天数为当前的天数。最后一种一般情况不太可能发生,除非时钟回拨这样子从那个他们能发现是针对每一WindowWrap天数询问处都展开了统计,最后实际上在前面的几个地方都会用到天数询问处统计的QPS结果,这里就不再赘述了,知道即可。
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int) (timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis – timeMillis % windowLengthInMs;
}
public WindowWrap<T> currentWindow(long timeMillis) {
//当前天数如果小于0,返回空
if (timeMillis < 0) {
return null;
}
//计算天数询问处的索引
int idx = calculateTimeIdx(timeMillis);
// 计算当前天数询问处的开始天数
long windowStart = calculateWindowStart(timeMillis);
while (true) {
//在询问处数组中获得询问处
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* 比如说当前天数是888,根据计算得到的数组询问处位置是个空,因此间接创建两个新询问处就好了
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* 那个更好了,正好等于,间接返回就行
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* |_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* … 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* 那个要当成圆形理解就好了,之前如果是1200两个完整的圆形,接着继续从1200开始,如果现在天数是1676,落在在B2的位置,
* try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 那个不太可能出现,嗯。。时钟回拨
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
漏桶
sentinel主要根据FlowSlot中的流控展开网络流量控制,其中RateLimiterController就是漏桶演算法的同时实现,那个同时实现较之其他几个还是单纯多了,稍微看呵呵如果就明白了。
首先计算出当前允诺平摊到1s内的天数花费,接着去计算这一次允诺预计天数如果小于当前天数不然,那么以当前天数为主,返回即可反之如果少于当前天数不然,这这时候就要展开排队等待了,等待的这时候要判断是否少于当前最大的等待天数,少于就间接弃置没少于就更新上一次的透过天数,接着再比较一次是否超时,还超时就重置天数,反之在等待天数覆盖范围之内不然就等待,如果都不是那就能透过了public class RateLimiterController implements TrafficShapingController {
//最大等待超时天数,预设500ms
private final int maxQueueingTimeMs;
//开闭数目
private final double count;
//上一次的透过天数
private final AtomicLong latestPassedTime = new AtomicLong(-1);
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
//天数平摊到1s内的花费
long costTime = Math.round(1.0 * (acquireCount) / count * 1000); // 1 / 100 * 1000 = 10ms
//计算这一次允诺预计的天数
long expectedTime = costTime + latestPassedTime.get();
//花费天数小于当前天数,pass,最后透过天数 = 当前天数
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
//预计透过的天数少于当前天数,要展开排队等rentTimeMillis();
//等待天数少于最大等待天数,弃置
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
//反之,能更新最后一次透过天数了
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime – TimeUtil.currentTimeMillis();
//更新后再判断,还是少于最大超时天数,那么就弃置,天数重置
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
//在天数覆盖范围之内不然,就等待
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
副本桶
最后是副本桶,那个不在于同时实现的复制,而是你看源代码会发现都算的些啥玩意儿。。。sentinel的副本桶同时实现如前所述Guava,代码在WarmUpController中。
那个演算法那些各种计算逻辑其实他们能不管(因为我也没看懂。。),但是流程上他们是清晰的就能了。
几个核心的参数看注释,构造方法里那些计算逻辑暂时不管他是怎么算的(我也没整明白,但是不影响他们理解),关键看canPass是怎么做的。
拿到当前询问处和上两个询问处的QPS填充副本,也是往桶里丢副本,接着他们先看填充副本的逻辑public class WarmUpController implements TrafficShapingController {
//开闭QPS
protected double count;
//冷启动系数,预设=3
private int coldFactor;
//警戒的副本数
protected int warningToken = 0;
//最大副本数
private int maxToken;
//斜率,产生副本的速度
protected double slope;
//存储的副本数目
protected AtomicLong storedTokens = new AtomicLong(0);
//最后一次填充副本天数
protected AtomicLong lastFilledTime = new AtomicLong(0);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException(“Cold factor should be larger than 1”);
}
this.count = count;
this.coldFactor = coldFactor;
//stableInterval 稳定产生副本的天数周期,1/QPS
//warmUpPeriodInSec 预热/冷启动天数 ,预设 10s
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor – 1);
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
//斜率的计算参考Guava,当做两个一般而言改的公式
slope = (coldFactor – 1.0) / count / (maxToken – warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//当前天数询问处透过的QPS
long passQps = (long) node.passQps();
//上两个天数询问处QPS
long previousQps = (long) node.previousPassQps();
//填充副本
syncToken(previousQps);
// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
//当前的令牌少于警戒线,获得少于警戒线的副本数
long aboveToken = restToken – warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
}
填充副本的逻辑如下:
拿到当前的天数,接着去掉毫秒数,得到的是秒级天数判断天数小于这里是为了控制每秒丢一次副本接着是coolDownTokens去计算他们的冷启动/预热是怎么计算填充副本的前面计算当前剩下的副本数那个就不说了,减去上一次消耗的是桶里剩下的副本protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
//去掉当前天数的毫秒
currentTime = currentTime – currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
//控制每秒填充一次副本
if (currentTime <= oldLastFillTime) {
牌数目,包含添加副本的逻辑,这是预热的逻辑
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
//存储的副本数目当然要减去上一次消耗的副本
long currentValue = storedTokens.addAndGet(0 – passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
最开始的事实因为lastFilledTime和oldValue都是0,因此根据当前天数戳会得到两个非常大的数字,最后和maxToken取小不然就得到了最大的副本数,因此第三次初始化的这时候就会生成maxToken的副本之后他们假定系统的QPS一开始很低,接着突然飙高。因此开始的这时候回一直走到高于警戒线的逻辑里去,接着passQps又很低,因此会一直处于把副本桶填满的状态(currentTime – lastFilledTime.get()会一直都是1000,也是1秒),因此每次都会填充最大QPScount数目的副本接着突增网络流量来了,QPS瞬间很高,慢慢地副本数目就会消耗到警戒线之下,走到他们if的逻辑里去,接着去按照count数目增加副本private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
//水位低于警戒线,就生成副本
if (oldValue < warningToken) {
//如果桶中副本低于警戒线,根据上一次的天数差,得到新的副本数,因为去掉了毫秒,1秒生成的副本是共振频率count
//第三次都是0不然,会生成count数目的副本
newValue = (long)(oldValue + (currentTime – lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
//反之,如果是高于警戒线,要判断QPS。因为QPS越高,生成副本就要越慢,QPS低不然生成副本要越快
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime – lastFilledTime.get()) * count / 1000);
}
}
//不要少于最大副本数
return Math.min(newValue, maxToken);
}
下面的逻辑理顺之后,他们就能继续看开闭的部分逻辑:
副本计算的逻辑完成,接着判断是不是少于警戒线,按照下面的说法,低QPS的状态的确是一直少于的,因此会根据斜率来计算出两个warningQps,因为他们处于冷启动的状态,因此那个阶段是要根据斜率来计算出两个QPS数目,让网络流量慢慢地达到系统能承受的峰值。举个范例,如果count是100,那么在QPS很低的情况下,副本桶一直处于满状态,但是系统会控制QPS,实际透过的QPS是warningQps,根据演算法可能只有10或者20(怎么算的不影响理解)。QPS主键提高的这时候,aboveToken再逐渐变小,整个warningQps就在逐渐变大,直到走到警戒线之下,到了else逻辑里。网络流量突增的情况,是else逻辑里低于警戒线的情况,他们副本桶在时不时地根据count去增加副本,这这时候消耗副本的速度少于他们生成副本的速度,可能就会导致一直处于警戒线之下,这这时候判断当然就需要根据最高QPS去判断开闭了。long restToken = storedTokens.get();
if (restToken >= warningToken) {
//当前的副本少于警戒线,获得少于警戒线的副本数
long aboveToken = restToken – warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
因此,按照低QPS到突增高QPS的流程,来想象呵呵那个过程:
刚开始,系统的QPS非常低,初始化他们就间接把副本桶塞满了接着那个低QPS的状态持续了一段天数,因为他们一直会填充最大QPS数目的副本(因为取最小值,因此其实桶里副本基本不会有变化),因此副本桶一直处于满的状态,整个系统的开闭也处于两个比较低的水平这以上的部分一直处于警戒线之上,实际上是叫做冷启动/预热的过程。
接着系统的QPS突然激增,副本消耗速度太快,就算他们每次增加最大QPS数目的副本任然无法维持消耗,因此桶里的副本在不断低减少,那个这时候,冷启动阶段的限制QPS也在不断地提高,最后直到桶里的副本低于警戒线低于警戒线之后,系统就会按照最高QPS去开闭,那个过程是系统在逐渐达到最高开闭的过程那这样一来,实际就达到了他们处理突增网络流量的目的,整个系统在漫漫地适应突然飙高的QPS,接着最终达到系统的QPS共振频率。
开始的过程。
总结
因为演算法如果单独说不然都比较单纯,一说大家都能听明白,不需要几个字就能说明白,因此还是得弄点源代码看看别人是怎么玩的,因此尽管我很讨厌放源代码,但是还是不得不干。
光靠别人说一点其实有点看不明白,按照顺序读一遍不然心里就有数了。
那源代码不然最难以理解的是副本桶的同时实现了,说实话那几个计算的逻辑我看了好几遍不知道他算的什么鬼,但是思想他们理解就行了,其他的逻辑相对而言就比较容易理解。