甚么是开闭
开闭能认为服务项目降班的一种,开闭就是管制控制系统的输入和输入网络流量已达至为保护控制系统 的目地。一般而言控制系统的客运量是能被估算的,为了确保控制系统的平衡运转,除非达至的须要管制的共振频率,就须要管制网络流量并采行一些举措以完成管制网络流量的目地。比如说:延后处置,婉拒处置,或是部分婉拒处置之类。
比如说情景:
某天小华忽然发觉的USB允诺忽然之间降到了原来的10倍,USB几乎不能采用,产生了一连串蝴蝶效应,导致了整个控制系统崩盘。这就比方说,老水表中都加装了稳压,除非采用高功率电子设备,稳压就会TNUMBERAP,确保各家用电器不被强电阻坏掉,控制系统也反之亦然加装稳压,避免非预期允诺极重,引起控制系统失去知觉。
开闭方法
常见的开闭演算法有:加减,翻转窗口加减,漏桶演算法和副本桶演算法。
漏桶演算法路子
水(允诺)进入到漏桶里,漏桶以一定的速度流入,当溪水的速度极重会直接外溢, 漏桶是私自管制了统计数据的传输率。
副本桶演算法
除了要能够管制统计数据的平均值传输率外,还须要容许某种意义的突发性允诺,副本桶更加最合适。
副本桶的路子是以两个静止的速度往桶里放副本,假如允诺须要被处置,则须要从桶里抽出两个副本,假如没有副本单单,那么就婉拒服务项目。
Google开放源码软件包Guava提供了开闭辅助工具类RateLimiter是如前所述副本桶演算法来实现的。
public double acquire() {
return acquire(1);
}
public double acquire(int permits) {
checkPermits(permits); //检查和模块与否不合法(与否小于0)
long microsToWait;
synchronized (mutex) { //应付mammalian情况须要并行
microsToWait = reserveNextTicket(permits, readSafeMicros()); //获得须要等待的时间
}
ticker.sleepMicrosUninterruptibly(microsToWait); //等待,当未达至管制时,microsToWait为0
return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);
}
private long reserveNextTicket(double requiredPermits, long nowMicros) {
resync(nowMicros); //补充副本
long microsToNextFreeTicket = nextFreeTicketMicros – nowMicros数目
double freshPermits = requiredPermits – storedPermitsToSpend;
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
this.storedPermits -= storedPermitsToSpend; // 减去消耗的副本
return microsToNextFreeTicket;
}
private void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
storedPermits = Math.min(maxPermits,
storedPermits + (nowMicros – nextFreeTicketMicros) / stableIntervalMicros);
nextFreeTicketMicros = nowMicros;
}
}
计数器
控制单位时间内的允诺数量
import java.util.concurrent.atomic.AtomicInteger;
public class Counter {
/**
* 最大访问数量
*/
private final int limit = 10;
/**
* 访问时间差
*/
private final long timeout = 1000;
/**
* 允诺时间
*/
private long time;
/**
* 当前计数器
*/
private AtomicInteger reqCount = new AtomicInteger(0);
public boolean limit() {
long now = System.currentTimeMillis();
if (now < time + timeout) {
// 单位时间内
reqCount.getAndAdd(1);
return reqCount.get() <= limit;
} else {
// 超出单位时间
time = now;
reqCount = new AtomicInteger(0);
return true;
}
}
public static void main(String[] args) {
}
}
计数方式有没有问题?
假设每分钟允诺数量为 60 个,每秒钟控制系统能处置1个允诺,用户在00:59 发送了60 个允诺,然后在 1:00 发生了60个允诺,此时 2 秒内有120个允诺(每秒60)个允诺,这样的方式并没有实现管制网络流量,因为每分钟能处置60个,但是实际上这60个是一秒钟发过来的。
翻转询问处计数
翻转询问处是对计数方式对改进,增加两个时间粒度的度量单位。
把一分钟分成了若干等份,比如说分成6份, 每份10s, 在一份独立计数器上,在 00:00-00:09 之间计数器累加1, 当等份数量越大,开闭统计越详细。
package ratelimit;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.IntStream;
public class TimeWindow {
private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<Long>();
/**a
* 间隔秒数
*/
private int seconds;
/**
* 最大开闭
*/
private int max;
public TimeWindow(int max, int seconds) {
this.seconds = seconds;
this.max = max;
new Thread(() -> {
while (true) {
try {
Thread.sleep((seconds – 1) * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
clean();
}
}).start();
}
public static void main(String[] args) {
final TimeWindow timeWindow = new TimeWindow(10, 1);
IntStream.range(0, 3).forEach((i) -> {
new Thread(() -> {
try {
while (true) {
Thread.sleep(new Random().nextInt(20) * 100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
timeWindow.take();
}).start();
});
}
public void take() {
long start = System.currentTimeMillis();
int size = sizeOfValid();
if (size > max) {
System.out.println(“超限”);
}
synchronized (queue) {
if (sizeOfValid() > max) {
System.err.println(“超限”);
System.err.println(“queue 中有:” + queue.size() + “最大数量:” + max);
}
this.queue.offer(System.currentTimeMillis());
}
System.err.println(“queue 中有:” + queue.size() + “最大数量:” + max);
}
private int sizeOfValid() {
Iterator<Long> it = queue.iterator();
Long ms = System.currentTimeMillis() – seconds * 1000;
int count = 0;
while (it.hasNext()) {
long t = it.next();
if (t > ms) {
//在时间询问处范围内
count++;
}
}
return count;
}
public void clean() {
Long c = System.currentTimeMillis() – seconds * 1000;
Long t1 = null;
while ((t1 = queue.peek()) != null && t1 < c) {
System.out.println(“统计数据清理”);
queue.poll();
}
}
}
副本桶问题
副本桶演算法
当网络电子设备衡量网络流量与否超过额定带宽时,须要查看副本桶,而副本桶中会放置一定数量的副本,两个副本容许USB发送或接收1bit统计数据(有时是1 Byte统计数据),当USB通过1bit统计数据后,同时也要从桶中移除两个副本。当桶里没有副本的时候,任何网络流量都被视为超过额定带宽,只有当桶中有副本时,统计数据才能通过USB。副本桶中的副本不仅仅能被移除,反之亦然也能往里添加,所以为了确保USB随时有统计数据通过,就必须不停地往桶里加副本,由此可见,往桶里加副本的速度,就决定了统计数据通过USB的速度。 因此,我们通过控制往副本桶里加副本的速度从而控制用户网络流量的带宽。而设置的这个用户传输统计数据的速度被称为承诺信息速度(CIR),通常以秒为单位。比如说我们设置用户的带宽为1000 bit每秒,只要确保每秒钟往桶里添加1000个副本即可。
副本桶能用来为保护自己,主要用来对调用者频率进行开闭,为的是不让自己的控制系统垮掉。
副本桶演算法代码
package com.netease.datastream.util.flowcontrol;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* <pre>
* Created by inter12 on 15-3-18.
* </pre>
*/
public class TokenBucket {
// 默认桶大小个数 即最大瞬间网络流量是64M
private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
// 两个桶的单位是1字节
private int everyTokenSize = 1;
// 瞬间最大网络流量
private int maxFlowRate;
// 平均值网络流量
private int avgFlowRate;
// 队列来缓存桶数量:最大的网络流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 *
// 1024 * 64
private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(
DEFAULT_BUCKET_SIZE);
private ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor();
private volatile boolean isStart = false;
private ReentrantLock lock = new ReentrantLock(true);
private static final byte A_CHAR = a;
public TokenBucket() {
}
public TokenBucket(int maxFlowRate, int avgFlowRate) {
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
this.everyTokenSize = everyTokenSize;
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public void addTokens(Integer tokenNum) {
// 若是桶已经满了,就不再家如新的副本
for (int i = 0; i < tokenNum; i++) {
tokenQueue.offer(Byte.valueOf(A_CHAR));
}
}
public TokenBucket build() {
start();
return tc boolean getTokens(byte[] dataSize) {
// Preconditions.checkNotNull(dataSize);
// Preconditions.checkArgument(isStart,
// “please invoke start method first !”);
int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean result = needTokenNum <= tokenQueue.size(); // 与否存在足够的桶数量
if (!result) {
return false;
}
int tokenCount = 0;
for (int i = 0; i < needTokenNum; i++) {
Byte poll = tokenQueue.poll();
if (poll != null) {
tokenCount++;
}
}
return tokenCount == needTokenNum;
} finally {
lock.unlock();
}
}
public void start() {
// 初始化桶队列大小
if (maxFlowRate != 0) {
tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
}
// 初始化副本生产者
TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1,
TimeUnit.SECONDS);
isStart = true;
}
public void stop() {
isStart = false;
scheduledExecutorService.shutdown();
}
public boolean isStarted() {
return isStart;
}
class TokenProducer implements Runnable {
private int avgFlowRate;
private TokenBucket tokenBucket;
public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
this.avgFlowRate = avgFlowRate;
this.tokenBucket = tokenBucket;
}
@Override
public void run() {
tokenBucket.addTokens(avgFlowRate);
}
}
public static TokenBucket newBuilder() {
return new TokenBucket();
}
public TokenBucket everyTokenSize(int everyTokenSize) {
this.everyTokenSize = everyTokenSize;
return this;
}
public TokenBucket maxFlowRate(int maxFlowRate) {
this.maxFlowRate = maxFlowRate;
return this;
}
public TokenBucket avgFlowRate(int avgFlowRate) {
this.avgFlowRate = avgFlowRate;
return this;
}
private String stringCopy(String data, int copyNum) {
StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
for (int i = 0; i < copyNum; i++) {
sbuilder.append(data);
}
return sbuilder.toString();
}
public static void main(String[] args) throws IOException,
InterruptedException {
tokenTest();
}
private static void arrayTest() {
ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(
10);
tokenQueue.offer(1);
tokenQueue.offer(1);
tokenQueue.offer(1);
System.out.println(tokenQueue.size());
System.out.println(tokenQueue.remainingCapacity());
}
private static void tokenTest() throws InterruptedException, IOException {
TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512)
.maxFlowRate(1024).build();
BufferedWriter bufferedWriter = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(“D:/ds_test”)));
String data = “xxxx”;// 四个字节
for (int i = 1; i <= 1000; i++) {
Random random = new Random();
int i1 = random.nextInt(100);
boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data,
i1).getBytes());
TimeUnit.MILLISECONDS.sleep(100);
if (tokens) {
bufferedWriter.write(“token pass — index:” + i1);
System.out.println(“token pass — index:” + i1);
} else {
bufferedWriter.write(“token rejuect — index” + i1);
System.out.println(“token rejuect — index” + i1);
}
bufferedWriter.newLine();
bufferedWriter.flush();
}
bufferedWriter.close();
}
}
副本桶和漏桶的选择问题
假如要让自己的控制系统不被打垮,用副本桶,假如确保别人的控制系统不被打垮,用漏桶演算法。
内推链接:https://job.toutiao.com/referral/mobile/spring-referral?token=MzsxNjIwMzgzNzA2MzYyOzY5Mzk2OTkyMjAwODk2NjkxNTE7MA
欢迎关注公众号:程序员财富自由之路
参考资料
https://www.cnblogs.com/xuwc/p/9123078.html