参见Eureka手撸微服务的限流器

限流器 artisan 255℃ 0评论

实现思路

Eureka官方实现参见:com.netflix.discovery.util.RateLimiter,该实现是基于令牌桶算法,主要思路如下:
1、每次进行acquire调用获取令牌时,会根据当前时间戳以及上次生成令牌时间戳的差值按照令牌频率生成对应的令牌。
2、将步骤1中的令牌弥补到已经消费令牌的计数器上(进行令牌数量的回血)
3、进行生成令牌之后调用consumeToken获取令牌,无论是否有可用令牌都会立即返回(比较简单,guava的限流器支持超时获取令牌)

并发控制
使用的是CAS乐观锁进行并发控制,必要时候会进行自旋操作,例如当上次生成时间戳设置成功之后,就可以要将新的令牌弥补到令牌计数器中

限流器代码实现:

package com.javartisan.app;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author javartisan
 * @email javartisan@163.com
 */
public class IRateLimiter {

    /**
     * 当前已经消费的token数量
     */
    private AtomicInteger consumedTokenCnt = new AtomicInteger();

    private AtomicLong lastFillTokenTimeInMillSeconds = new AtomicLong();

    /**
     * 时间单位(构造器只支持秒或分)
     */
    private Long timeUnitInMillSecond;

    public IRateLimiter() {
        this(TimeUnit.SECONDS);
    }

    public IRateLimiter(TimeUnit unit) {
        switch (unit) {

            case SECONDS:
                timeUnitInMillSecond = 1000L;
                break;
            case MINUTES:
                timeUnitInMillSecond = 1000L * 60;
            default:
                throw new RuntimeException("TimeUnit not support!");
        }
    }

    /**
     * @param burstCnt
     * @param countPerTimeUnit 在{@link timeUnitInMillSecond}(秒或分)可以产生的token数量
     */
    public void refillToken(int burstCnt, int countPerTimeUnit) {

        long lastFillTime = lastFillTokenTimeInMillSeconds.get();
        long currentTimeMillis = System.currentTimeMillis();
        long timeDiff = currentTimeMillis - lastFillTime;
        // 产生token的数量
        long generateTokenCnt = timeDiff / timeUnitInMillSecond * countPerTimeUnit;
        if (generateTokenCnt > 0) {

            //产生这些token花费的时间
            long generateTokenCntCostTime = generateTokenCnt / countPerTimeUnit * timeUnitInMillSecond;
            /*
             * 注意:
             * long nextFillTime = currentTimeMillis + generateTokenCntCostTime 与 long nextFillTime = lastFillTime == 0 ? currentTimeMillis : currentTimeMillis + generateTokenCntCostTime;区别:
             * 前者表示生成的token是系统启动也就是0到 currentTimeMillis + generateTokenCntCostTime 时间段产生的token
             * 后者表示生成的token是系统启动也就是0到 currentTimeMillis   时间段产生的token
             * 差别在于:
             *  0到currentTimeMillis + generateTokenCntCostTime时间之间token数量的差别
             */
            long nextFillTime = lastFillTime == 0 ? currentTimeMillis : currentTimeMillis + generateTokenCntCostTime;
            if (lastFillTokenTimeInMillSeconds.compareAndSet(lastFillTime, nextFillTime)) { // 此处不需要使用while循环,如果失败了证明有其他线程在执行该操作因此可以由其他线程执行

                while (true) { // 一旦if (lastFillTokenTimeInMillSeconds.compareAndSet(lastFillTime, nextFillTime)) 执行成功,此时必须执行成功
                  // 自旋操作
                    int tokenConsumedCnt = consumedTokenCnt.get();
                    // 由于burstCnt是每次传参,因此burstCnt可能会变小,因此需要去min操作,得到的结果也就是最多可以弥补的token数量(最多可以回血数)
                    int maximumTokenToAdd = Math.min(tokenConsumedCnt, burstCnt);
                    // 最多消费数量为0,不允许为负数
                    int newConsumeCnt = Math.max(0, tokenConsumedCnt - maximumTokenToAdd);
                    if (consumedTokenCnt.compareAndSet(tokenConsumedCnt, newConsumeCnt)) {
                        return;
                    }
                }
            }

        }
    }

    public boolean acquire(int burstCnt, int countPerTimeUnit) {
        if (burstCnt < 0 || countPerTimeUnit < 0) {
            throw new RuntimeException("burstCnt and countPerTimeUnit must bigger zero.");
        }
        refillToken(burstCnt, countPerTimeUnit);
        return consumeToken(burstCnt);
    }

    private boolean consumeToken(int burstCnt) {
        while (true) { 
            int tokenConsumedCnt = consumedTokenCnt.get();
            if (tokenConsumedCnt >= burstCnt) {
                return false;
            }
            if (consumedTokenCnt.compareAndSet(tokenConsumedCnt, tokenConsumedCnt + 1)) { //此处会进行自旋转操作
                return true;
            }
        }
    }
}

测试代码:

package com.javartisan.app;


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class IRateLimiterTests {

    public static void main(String[] args) {
        IRateLimiter rateLimiter = new IRateLimiter(TimeUnit.SECONDS);
        int SIZE = 200;
        CountDownLatch countDownLatch = new CountDownLatch(SIZE);
        for (int i = 0; i < SIZE; i++) {

            new Thread(() -> {
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                boolean acquire = rateLimiter.acquire(100, 100);
                if (!acquire) {
                    System.out.println(acquire);
                } else {
                    System.out.println("acquire true");
                }

            }).start();
            countDownLatch.countDown();
        }

    }
}

转载请注明:Java工匠师 » 参见Eureka手撸微服务的限流器

喜欢 (4)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址