限流基本原理
📦 本文已归档到:「blog」
限流可以认为是服务降级的一种。限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。
限流方法
计数器
控制单位时间的请求数量。使用 AtomicInteger
进行统计。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class Counter {
private final int limit = 10;
private final long timeout = 1000;
private long time;
private AtomicInteger reqCount = new AtomicInteger(0);
public boolean limit() { long now = System.currentTimeMillis(); if (now < time + timeout) { reqCount.addAndGet(1); return reqCount.get() <= limit; } else { time = now; reqCount = new AtomicInteger(0); return true; } } }
|
滑动窗口
滑动窗口是对计数器方式的改进,增加一个时间粒度的度量单位 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
| import java.util.Iterator; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.IntStream;
public class TimeWindow { private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<Long>();
private int seconds;
private int max;
public TimeWindow(int max, int seconds) { this.seconds = seconds; this.max = max;
new Thread(() -> { while (true) { try { Thread.sleep((seconds - 1) * 1000L); } catch (InterruptedException e) { e.printStackTrace(); } clean(); } }).start();
}
public static void main(String[] args) throws Exception {
final TimeWindow timeWindow = new TimeWindow(10, 1);
IntStream.range(0, 3).forEach((i) -> { new Thread(() -> {
while (true) {
try { Thread.sleep(new Random().nextInt(20) * 100); } catch (InterruptedException e) { e.printStackTrace(); } timeWindow.take(); }
}).start();
});
}
public void take() {
long start = System.currentTimeMillis(); try {
int size = sizeOfValid(); if (size > max) { System.err.println("超限");
} synchronized (queue) { if (sizeOfValid() > max) { System.err.println("超限"); System.err.println("queue中有 " + queue.size() + " 最大数量 " + max); } this.queue.offer(System.currentTimeMillis()); } System.out.println("queue中有 " + queue.size() + " 最大数量 " + max);
}
}
public int sizeOfValid() { Iterator<Long> it = queue.iterator(); Long ms = System.currentTimeMillis() - seconds * 1000; int count = 0; while (it.hasNext()) { long t = it.next(); if (t > ms) { count++; } }
return count; }
public void clean() { Long c = System.currentTimeMillis() - seconds * 1000;
Long tl = null; while ((tl = queue.peek()) != null && tl < c) { System.out.println("清理数据"); queue.poll(); } }
}
|
令牌桶(Token Bucket)
规定固定容量的桶,token 以固定速度往桶内填充,当桶满时 token 不会被继续放入,每过来一个请求把 token 从桶中移除,如果桶中没有 token 不能请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class TokenBucket {
private long time;
private Double total;
private Double rate;
private Double nowSize;
public boolean limit() { long now = System.currentTimeMillis(); nowSize = Math.min(total, nowSize + (now - time) * rate); time = now; if (nowSize < 1) { return false; } else { nowSize -= 1; return true; } } }
|
漏桶
规定固定容量的桶,有水进入,有水流出。对于流进的水我们无法估计进来的数量、速度,对于流出的水我们可以控制速度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class LeakBucket {
private long time;
private Double total;
private Double rate;
private Double nowSize;
public boolean limit() { long now = System.currentTimeMillis(); nowSize = Math.max(0, (nowSize - (now - time) * rate)); time = now; if ((nowSize + 1) < total) { nowSize++; return true; } else { return false; }
} }
|
参考资料