一般在具体的业务中,平台方会发布秒杀席位个数,秒杀的时间段,让各个商家报名,将自己的产品参与秒杀活动。这里将同事画的一张图放上来,大致是这么一个流程。关于秒杀原理可以参考单��秒杀…

                                                                                                                                                                                    一般在具体的业务中,平台方会发布秒杀席位个数,秒杀的时间段,让各个商家报名,将自己的产品参与秒杀活动。这里将同事画的一张图放上来,大致是这么一个流程。关于秒杀原理可以参考单机秒杀系统样例  

Test
这里面关于Nginx的限流可以参考高并发之Nginx的限流
我们这里在秒杀的前一天将所有的批准参加秒杀的商家商品数据导入到redis的商品队列中,这里我们使用当当的elastic-job来实现,关于elastic-job的使用,可以参考分布式调度Elastic-Job攻略
这里是以商家的服务来当成一件商品来处理的,所以service可以理解成商品。这里有一个区位的概念,即秒杀当天商家可以报名的所有时间段,我们称之为区位,比如0点~2点,2点~4点等等。区位的实体类字段如下。
/**

  • 秒杀区位
    /
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class Location {
    private Long id; //区位id
    private LocalDate date; //日期
    private TimeSegment timeSegment; //时间段
    private int locationNumber; //区位数量,即可以容纳多少商家报名
    private BigDecimal baseCost; //商家参与活动的最低缴纳金
    private double sellPercent; //销售百分比缴纳率
    private boolean releaseLocation; //是否发布
    }
    /
    *

  • 秒杀服务
    /
    @RequiredArgsConstructor
    @NoArgsConstructor
    public class Service {
    @Getter
    @Setter
    @NonNull
    private Long id; //服务id
    @Getter
    @Setter
    @NonNull
    private Long signUpId; //报名id
    @Getter
    @Setter
    @NonNull
    private String serviceCode; //服务编码
    @Getter
    @Setter
    @NonNull
    private String serviceName; //服务名称
    @Getter
    @Setter
    @NonNull
    private String serviceLevel; //服务分类
    @Getter
    @Setter
    @NonNull
    private Price price; //价格,包含普通价格和秒杀价格
    @Getter
    @Setter
    @NonNull
    private int totalCount; //参与秒杀的总数
    @Getter
    @Setter
    @NonNull
    private int limitCount; //单个用户可以购买该服务的数量
    @Getter
    @Setter
    @NonNull
    private TimeSegment timeSegment; //时间段
    @Getter
    @Setter
    @NonNull
    private CheckStatus checkStatus; //平台对该秒杀的审核状态
    @Getter
    private Lock lock = new ReentrantLock(); //重入锁
    @Getter
    private Condition condition = lock.newCondition(); //重入锁条件
    }
    商家商品(服务)数据导入到redis的商品队列中
    /
    *

  • 在每天的0点开始处理,查看后一天是否有秒杀活动
    */
    @Slf4j
    @Component
    @ElasticSimpleJob(cron=”0 0 0 * * ?”,jobName=”loadRedis”,shardingTotalCount=2,jobParameter=”日期”,shardingItemParameters=”0=Load0,1=Load1”)
    public class LoadDataToRedisJob implements SimpleJob {
    @Autowired
    private RedisService redisService;
    @Autowired
    private DataDao dataDao;

    @Override
    public void execute(ShardingContext shardingContext) {
    //获取后一天所有的秒杀区位
    ListlocationNextDayList = dataDao.findJobToday(LocalDate.now().plusDays(1));
    if (!CollectionUtils.isEmpty(locationNextDayList)) {
    //获取一天所有的秒杀区位时间段
    ListsegmentList = locationNextDayList.stream().map(location -> location.getTimeSegment())
    .collect(Collectors.toList());
    switch (shardingContext.getShardingItem()) {
    case 0:
    //获取每个时间段内的所有参与秒杀的服务
    segmentList.stream().map(timeSegment -> {
    ListserviceInSegment = dataDao.findServiceInSegment(timeSegment);
    serviceInSegment.stream().forEach(service -> service.setTimeSegment(timeSegment));
    return serviceInSegment;
    //扁平化所有服务,统一为一组List
    }).flatMap(services -> services.stream()).forEach(service -> {
    //以服务id以及秒杀时间段组合为主键
    String key = service.getId() + service.getTimeSegment().toString();
    //如果redis中存在该主键的队列,则清空队列
    if (redisService.exists(key)) {
    for (int i = 0; i < redisService.llen(key); i++) {
    redisService.rpop(key);
    }
    }
    //清空后,根据每个服务的参与总数,将服务按总数量推送到该主键队列中
    for (int i = 0; i < service.getTotalCount(); i++) {
    redisService.lpush(key, JSONObject.toJSONString(service));
    }
    log.info(service.getId() + service.getTimeSegment().toString());
    //以服务id+”:count”组合成该服务的总数键,如果redis中存在该键,则删除
    String countKey = service.getId() + “:count”;
    if (redisService.exists(countKey)) {
    redisService.del(countKey);
    }
    //重新将总数放入该键的redis中存储
    redisService.set(countKey, String.valueOf(service.getTotalCount()));
    });
    break;
    case 1:
    break;
    default:
    break;
    }
    }
    }

}
将服务导入到redis队列后,我们需要设立一个秒杀活动开始的标识,让秒杀下单只能在秒杀活动进行中开启,不在秒杀时间内不允许下单。
/**

  • 给秒杀时间点设立开启标识,每天0点开始,每2小时执行一次
    */
    @Slf4j
    @Component
    @ElasticSimpleJob(cron=”0 0 0/2 * * ?”,jobName=”openTimeSeg”,shardingTotalCount=1,jobParameter=”日期”,shardingItemParameters=”0=Open0”)
    public class OpenTimeSegmentJob implements SimpleJob {
    @Autowired
    private RedisService redisService;
    @Autowired
    private DataDao dataDao;

    @Override
    public void execute(ShardingContext shardingContext) {
    //获取当天的所有秒杀区位
    ListlocationToDayList = dataDao.findJobToday(LocalDate.now());
    //如果当天有秒杀活动
    if (!CollectionUtils.isEmpty(locationToDayList)) {
    //获取当前时间点,当前时间点不一定是准点
    LocalDateTime now = LocalDateTime.now();
    int year = now.getYear();
    int month = now.getMonthValue();
    int day = now.getDayOfMonth();
    int hour = now.getHour();
    //将当前时间拼装成整点
    LocalDateTime beginDate = LocalDateTime.of(year, month, day, hour, 0, 0);
    //以整点时间为基准,在redis中放入开启秒杀时间段,119分钟后消失(每个时间段段为1小时59分钟,2小时的最后一分钟结束该时间段秒杀)
    redisService.set(“TimeStart:” + new TimeSegment(beginDate, beginDate.plusMinutes(119)).toString(),
    “opened”,7140);
    log.info(beginDate.toString() + “至” + beginDate.plusMinutes(119).toString() + “秒杀开始”);
    }
    }

}
到了秒杀时间,我们就可以开始下单了,先定义一个秒杀单的接口
public interface SecOrder {
/**
* 秒杀下单
* @param secOrder
* @return
*/
public String makeOrder(SecOrder secOrder);

/**
 * 是否存在该订单编号的秒杀单
 * @param orderNo
 * @return
 */
public boolean exitsOrder(String orderNo);

/**
 * 修改支付状态
 * @param orderNo
 */
public void changePayStatus(String orderNo);

}
SecOrder的实现类的各属性如下。
/**

  • 服务秒杀单
    */
    @Slf4j
    @AllArgsConstructor
    @NoArgsConstructor
    @ServiceSecOrderVersion(value = 1)
    public class ServiceSecOrder implements SecOrder {
    @Getter
    @Setter
    private Long id; //订单id
    @Getter
    @Setter
    private String orderNo; //订单编号
    @Getter
    @Setter
    private Service service; //订单服务内容
    @Getter
    @Setter
    private TimeSegment timeSegment; //秒杀时间段
    @Getter
    @Setter
    private int number; //订单数量
    @Getter
    @Setter
    private BigDecimal amount; //订单金额
    @Getter
    @Setter
    private AppUser user; //下单人
    @Getter
    @Setter
    private int orderStatus; //订单状态
    @Getter
    @Setter
    private LocalDateTime createDate; //创建日期

然后开始下秒杀订单
@Override
public String makeOrder(SecOrder secOrder) {
RedisService redisService = SpringBootUtil.getBean(RedisService.class);
IdService idService = SpringBootUtil.getBean(IdService.class);
MessageSender messageSender = SpringBootUtil.getBean(MessageSender.class);
//如果当前时间在秒杀时间段之外,不允许秒杀下单
if (LocalDateTime.now().isBefore(((ServiceSecOrder)secOrder).getTimeSegment().getBeginTime())
|| LocalDateTime.now().isAfter(((ServiceSecOrder)secOrder).getTimeSegment().getEndTime())) {
throw new RuntimeException(“不在秒杀时间段内”);
}
//由于测试时间的问题,此处需要屏蔽,等到实际部署时需要恢复
LocalDateTime now = LocalDateTime.now();
int year = now.getYear();
int month = now.getMonthValue();
int day = now.getDayOfMonth();
int hour = now.getHour();
LocalDateTime beginDate = LocalDateTime.of(year,month,day,hour,0,0);
//从redis中检查是否有开启秒杀时间段
if (!redisService.exists(“TimeStart:” + new TimeSegment(beginDate,beginDate.plusMinutes(119)).toString())) {
throw new RuntimeException(“当前时间段无秒杀”);
}
((ServiceSecOrder)secOrder).setId(idService.genId());
if (((ServiceSecOrder)secOrder).getNumber() > ((ServiceSecOrder)secOrder).getService().getLimitCount()) {
throw new RuntimeException(“秒杀数量超出限购”);
}
AppUser loginAppUser = AppUserUtil.getLoginAppUser();
AppUser user = new AppUser();
user.setId(loginAppUser.getId());
((ServiceSecOrder)secOrder).setOrderNo(getCode(idService));
user.setUsername(loginAppUser.getUsername());
((ServiceSecOrder)secOrder).setUser(user);
//设置订单状态0表示未支付状态
((ServiceSecOrder)secOrder).setOrderStatus(0);
((ServiceSecOrder)secOrder).setCreateDate(LocalDateTime.now());
//设置用户秒杀队列键名(每一种服务都有独立的用户秒杀队列)
//队列名由User:+服务id+时间段组成
String key = “User:” + ((ServiceSecOrder)secOrder).getService().getId()
+ ((ServiceSecOrder)secOrder).getTimeSegment().toString();
String serviceKey = ((ServiceSecOrder)secOrder).getService().getId()
+ ((ServiceSecOrder)secOrder).getService().getTimeSegment().toString();
//如果服务队列还有数据,则推送用户进队列,否则直接返回秒杀失败
if (redisService.llen(serviceKey) > 0) {
//建立每个服务对每个用户对限制数量对键,格式为——“服务id:用户id”
String limitUserKey = ((ServiceSecOrder) secOrder).getService().getId() + “:”
+ ((ServiceSecOrder) secOrder).getUser().getId();
//如果该键存在,获取该键的值(这里需要考虑分布式的并发问题的可能)
//但是有幂等,所以此处不会出现一个用户同时秒杀一个商品(服务)的多个并发线程存在
if (redisService.exists(limitUserKey)) {
String limitCount = redisService.get(limitUserKey);
//如果该键的值达到服务商品的限制数,返回秒杀失败
if (Integer.valueOf(limitCount) == ((ServiceSecOrder) secOrder).getService().getLimitCount()) {
return ((ServiceSecOrder) secOrder).getUser().getUsername() + “秒杀服务”
+ ((ServiceSecOrder) secOrder).getService().getServiceName() + “失败”;
}
}else { //如果不存在,设置该键的值为0
redisService.set(limitUserKey,”0”);
}
//将秒杀用户id推送到该队列中
redisService.lpush(key, ((ServiceSecOrder) secOrder).getUser().getId() + “”);
我们将用户id推送到redis队列后就要开始匹配秒杀结果了,因为商品队列早已经在前一天就推送进去了。
/**

  • 秒杀结果匹配任务,每天0点开始,每2小时执行一次
    */
    @Slf4j
    @Component
    @RabbitListener(queues = {SecendKillMq.SECENDKILL_QUEUE + “_1”,
    SecendKillMq.SECENDKILL_QUEUE + “_2”,
    SecendKillMq.SECENDKILL_QUEUE + “_3”,
    SecendKillMq.SECENDKILL_QUEUE + “_4”,
    SecendKillMq.SECENDKILL_QUEUE + “_5”,
    SecendKillMq.SECENDKILL_QUEUE + “_6”,
    SecendKillMq.SECENDKILL_QUEUE + “_7”,
    SecendKillMq.SECENDKILL_QUEUE + “_8”,
    SecendKillMq.SECENDKILL_QUEUE + “_9”,
    SecendKillMq.SECENDKILL_QUEUE + “_10”})

@ElasticSimpleJob(cron=”0 0 0/2 * * ?”,jobName=”secResult”,shardingTotalCount=2,jobParameter=”日期”,shardingItemParameters=”0=SecKill0,1=SecKill1”)
public class SecendKillResultJob implements SimpleJob {
@Autowired
private RedisService redisService;
@Autowired
private DataDao dataDao;
//为了跟MQ搭配,唤醒中断用
private ListserviceList;

@Override
public void execute(ShardingContext shardingContext) {
    //获取当天所有的秒杀区位
    List<Location> locationTodayList = dataDao.findJobToday(LocalDate.now());
    //如果当天有秒杀活动
    if (!CollectionUtils.isEmpty(locationTodayList)) {
        //获取一天所有的秒杀区位时间段
        List<TimeSegment> segmentList = locationTodayList.stream().map(location -> location.getTimeSegment())
                .collect(Collectors.toList());
        switch (shardingContext.getShardingItem()) {
            case 0:
                //从所有秒杀区位时间段过滤当前秒杀时间段
                this.serviceList = segmentList.stream().filter(timeSegment -> LocalDateTime.now().isAfter(timeSegment.getBeginTime()) &&
                        LocalDateTime.now().isBefore(timeSegment.getEndTime()))
                        //将时间段转化成时间段内的秒杀服务
                        .map(timeSegment -> {
                            List<Service> serviceInSegment = dataDao.findServiceInSegment(timeSegment);
                            serviceInSegment.stream().forEach(service -> service.setTimeSegment(timeSegment));
                            return serviceInSegment;
                            //扁平化所有的秒杀服务,将所有当前时间段内的服务放入serviceList属性中
                            //就是拿出当前时间段内所有参与秒杀的服务
                        }).flatMap(services -> services.stream()).collect(Collectors.toList());
                //并行化处理所有的秒杀服务
                int lism = 0;
                if (serviceList.size() > Runtime.getRuntime().availableProcessors() * 2) {
                    lism = serviceList.size();
                }else {
                    lism = Runtime.getRuntime().availableProcessors() * 2;
                }
                ForkJoinPool forkJoinPool = new ForkJoinPool(lism);
                try {
                    forkJoinPool.submit(() ->
                    this.serviceList.parallelStream().forEach(service -> {
                        while (true) {
                            try {
                                service.getLock().lock();
                                String userKey = "User:" + service.getId() + service.getTimeSegment().toString();
                                String serviceKey = service.getId() + service.getTimeSegment().toString();
                                String countKey = service.getId() + ":count";
                                //如果下秒杀时间内没有用户下单该服务,则中断该服务的并行线程
                                //如果有用户下单则唤醒该并行线程
                                while (redisService.llen(userKey).equals(0L)) {
                                    try {
                                        log.info("用户队列无数据,开始中断");
                                        service.getCondition().await();
                                        LocalDateTime now = LocalDateTime.now();
                                        if (now.isAfter(service.getTimeSegment().getEndTime())) {
                                            break;
                                        }
                                    } catch (InterruptedException e) {
                                        e.printStackTrace();
                                    }
                                } 

到这里,如果没有用户下单,则会进行线程中断,不会去执行while (true)的无限循环。SecendKillResultJob同时又是RabbitMQ的一个消费者,同时监听了10个消息队列,监听后进行如下处理
@RabbitHandler
public void receice(byte[] data, Channel channel, Message message) throws IOException {
try {
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//收到服务id
Long serviceId = unSerialize(data);
log.info(serviceId + “”);
//如果当前秒杀服务列表不为空
if (!CollectionUtils.isEmpty(this.serviceList)) {
//从服务列表中过滤出id为MQ收取的服务ID的服务
this.serviceList.stream().filter(service -> service.getId().equals(serviceId))
.forEach(service -> {
log.info(“存在” + service.getId());
try {
service.getLock().lock();
//对该服务所在线程进行唤醒
service.getCondition().signalAll();
} finally {
service.getLock().unlock();
}
});
}
} catch (IOException e) {
e.printStackTrace();
//丢弃这条消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
log.info(“receiver fail”);
}
}

/**

  • 反序列化

  • @param data

  • @return
    /
    private Long unSerialize(byte[] data) {
    Input input = null;
    try {
    Kryo kryo = new Kryo();
    input = new Input(new ByteArrayInputStream(data));
    return kryo.readObject(input,Long.class);
    }
    finally {
    input.close();
    }
    }
    这样我们再回到ServiceSecOrder的makeOrder下单方法中,将用户下单的服务id异步发送到MQ中,去唤醒秒杀结果匹配任务继续执行。
    /
    *

  • 服务秒杀单
    */
    @Slf4j
    @AllArgsConstructor
    @NoArgsConstructor
    @ServiceSecOrderVersion(value = 1)
    public class ServiceSecOrder implements SecOrder {
    @Getter
    @Setter
    private Long id; //订单id
    @Getter
    @Setter
    private String orderNo; //订单编号
    @Getter
    @Setter
    private Service service; //订单服务内容
    @Getter
    @Setter
    private TimeSegment timeSegment; //秒杀时间段
    @Getter
    @Setter
    private int number; //订单数量
    @Getter
    @Setter
    private BigDecimal amount; //订单金额
    @Getter
    @Setter
    private AppUser user; //下单人
    @Getter
    @Setter
    private int orderStatus; //订单状态
    @Getter
    @Setter
    private LocalDateTime createDate; //创建日期

    @Override
    public String makeOrder(SecOrder secOrder) {
    RedisService redisService = SpringBootUtil.getBean(RedisService.class);
    IdService idService = SpringBootUtil.getBean(IdService.class);
    MessageSender messageSender = SpringBootUtil.getBean(MessageSender.class);
    //如果当前时间在秒杀时间段之外,不允许秒杀下单
    if (LocalDateTime.now().isBefore(((ServiceSecOrder)secOrder).getTimeSegment().getBeginTime())
    || LocalDateTime.now().isAfter(((ServiceSecOrder)secOrder).getTimeSegment().getEndTime())) {
    throw new RuntimeException(“不在秒杀时间段内”);
    }
    //由于测试时间的问题,此处需要屏蔽,等到实际部署时需要恢复
    LocalDateTime now = LocalDateTime.now();
    int year = now.getYear();
    int month = now.getMonthValue();
    int day = now.getDayOfMonth();
    int hour = now.getHour();
    LocalDateTime beginDate = LocalDateTime.of(year,month,day,hour,0,0);
    //从redis中检查是否有开启秒杀时间段
    if (!redisService.exists(“TimeStart:” + new TimeSegment(beginDate,beginDate.plusMinutes(119)).toString())) {
    throw new RuntimeException(“当前时间段无秒杀”);
    }
    ((ServiceSecOrder)secOrder).setId(idService.genId());
    if (((ServiceSecOrder)secOrder).getNumber() > ((ServiceSecOrder)secOrder).getService().getLimitCount()) {
    throw new RuntimeException(“秒杀数量超出限购”);
    }
    AppUser loginAppUser = AppUserUtil.getLoginAppUser();
    AppUser user = new AppUser();
    user.setId(loginAppUser.getId());
    ((ServiceSecOrder)secOrder).setOrderNo(getCode(idService));
    user.setUsername(loginAppUser.getUsername());
    ((ServiceSecOrder)secOrder).setUser(user);
    //设置订单状态0表示未支付状态
    ((ServiceSecOrder)secOrder).setOrderStatus(0);
    ((ServiceSecOrder)secOrder).setCreateDate(LocalDateTime.now());
    //设置用户秒杀队列键名(每一种服务都有独立的用户秒杀队列)
    //队列名由User:+服务id+时间段组成
    String key = “User:” + ((ServiceSecOrder)secOrder).getService().getId()
    + ((ServiceSecOrder)secOrder).getTimeSegment().toString();
    String serviceKey = ((ServiceSecOrder)secOrder).getService().getId()
    + ((ServiceSecOrder)secOrder).getService().getTimeSegment().toString();
    //如果服务队列还有数据,则推送用户进队列,否则直接返回秒杀失败
    if (redisService.llen(serviceKey) > 0) {
    //建立每个服务对每个用户对限制数量对键,格式为——“服务id:用户id”
    String limitUserKey = ((ServiceSecOrder) secOrder).getService().getId() + “:”
    + ((ServiceSecOrder) secOrder).getUser().getId();
    //如果该键存在,获取该键的值(这里需要考虑分布式的并发��题的可能)
    //但是有幂等,所以此处不会出现一个用户同时秒杀一个商品(服务)的多个并发线程存在
    if (redisService.exists(limitUserKey)) {
    String limitCount = redisService.get(limitUserKey);
    //如果该键的值达到服务商品的限制数,返回秒杀失败
    if (Integer.valueOf(limitCount) == ((ServiceSecOrder) secOrder).getService().getLimitCount()) {
    return ((ServiceSecOrder) secOrder).getUser().getUsername() + “秒杀服务”
    + ((ServiceSecOrder) secOrder).getService().getServiceName() + “失败”;
    }
    }else { //如果不存在,设置该键的值为0
    redisService.set(limitUserKey,”0”);
    }
    //将秒杀用户id推送到该队列中
    redisService.lpush(key, ((ServiceSecOrder) secOrder).getUser().getId() + “”);
    //唤醒秒杀结果匹配任务继续执行。
    CompletableFuture.runAsync(() -> {
    messageSender.send(SecendKillMq.MQ_EXCHANGE_SECENDKILL,
    SecendKillMq.ROUTING_KEY_SECENDKILL,
    ((ServiceSecOrder) secOrder).getService().getId());
    });

这里我们需要先看一段redis-lua代码,表示如果用户队列,服务队列均有数据,则将其弹出,放入一个hash中作为匹配的结果,同时扣减服务队列总数。其意义可以参考用户金额的终极解决方案–Redis Lua
@Override
public Object secondKill(String userKey, String serviceKey,String userResult, String countKey,String serviceId) {
String script = “if redis.call(‘llen’,KEYS[1]) > 0 and redis.call(‘llen’,KEYS[2]) > 0 “ +
“and tonumber(redis.call(‘get’,KEYS[4])) > 0 then “ +
“local userid = redis.call(‘rpop’,KEYS[1])” +
“redis.call(‘hset’,KEYS[3],userid,redis.call(‘rpop’,KEYS[2])) “ +
“redis.call(‘decr’,KEYS[4]) “ +
“return redis.call(‘incr’,KEYS[5]..’:’..userid) else return 0 end”;
return execute(jedis -> jedis.eval(script,5,userKey,serviceKey,userResult,countKey,serviceId));
}
秒杀服务线程唤醒后,继续执行
/**

  • 秒杀结果匹配任务,每天0点开始,每2小时执行一次
    */
    @Slf4j
    @Component
    @RabbitListener(queues = {SecendKillMq.SECENDKILL_QUEUE + “_1”,
    SecendKillMq.SECENDKILL_QUEUE + “_2”,
    SecendKillMq.SECENDKILL_QUEUE + “_3”,
    SecendKillMq.SECENDKILL_QUEUE + “_4”,
    SecendKillMq.SECENDKILL_QUEUE + “_5”,
    SecendKillMq.SECENDKILL_QUEUE + “_6”,
    SecendKillMq.SECENDKILL_QUEUE + “_7”,
    SecendKillMq.SECENDKILL_QUEUE + “_8”,
    SecendKillMq.SECENDKILL_QUEUE + “_9”,
    SecendKillMq.SECENDKILL_QUEUE + “_10”})

@ElasticSimpleJob(cron=”0 0 0/2 * * ?”,jobName=”secResult”,shardingTotalCount=2,jobParameter=”日期”,shardingItemParameters=”0=SecKill0,1=SecKill1”)
public class SecendKillResultJob implements SimpleJob {
@Autowired
private RedisService redisService;
@Autowired
private DataDao dataDao;
//为了跟MQ搭配,唤醒中断用
private ListserviceList;

@Override
public void execute(ShardingContext shardingContext) {
    //获取当天所有的秒杀区位
    List<Location> locationTodayList = dataDao.findJobToday(LocalDate.now());
    //如果当天有秒杀活动
    if (!CollectionUtils.isEmpty(locationTodayList)) {
        //获取一天所有的秒杀区位时间段
        List<TimeSegment> segmentList = locationTodayList.stream().map(location -> location.getTimeSegment())
                .collect(Collectors.toList());
        switch (shardingContext.getShardingItem()) {
            case 0:
                //从所有秒杀区位时间段过滤当前秒杀时间段
                this.serviceList = segmentList.stream().filter(timeSegment -> LocalDateTime.now().isAfter(timeSegment.getBeginTime()) &&
                        LocalDateTime.now().isBefore(timeSegment.getEndTime()))
                        //将时间段转化成时间段内的秒杀服务
                        .map(timeSegment -> {
                            List<Service> serviceInSegment = dataDao.findServiceInSegment(timeSegment);
                            serviceInSegment.stream().forEach(service -> service.setTimeSegment(timeSegment));
                            return serviceInSegment;
                            //扁平化所有的秒杀服务,将所有当前时间段内的服务放入serviceList属性中
                            //就是拿出当前时间段内所有参与秒杀的服务
                        }).flatMap(services -> services.stream()).collect(Collectors.toList());
                //并行化处理所有的秒杀服务
                int lism = 0;
                if (serviceList.size() > Runtime.getRuntime().availableProcessors() * 2) {
                    lism = serviceList.size();
                }else {
                    lism = Runtime.getRuntime().availableProcessors() * 2;
                }
                ForkJoinPool forkJoinPool = new ForkJoinPool(lism);
                try {
                    forkJoinPool.submit(() ->
                    this.serviceList.parallelStream().forEach(service -> {
                        while (true) {
                            try {
                                service.getLock().lock();
                                String userKey = "User:" + service.getId() + service.getTimeSegment().toString();
                                String serviceKey = service.getId() + service.getTimeSegment().toString();
                                String countKey = service.getId() + ":count";
                                //如果下秒杀时间内没有用户下单该服务,则中断该服务的并行线程
                                //如果有用户下单则唤醒该并行线程
                                while (redisService.llen(userKey).equals(0L)) {
                                    try {
                                        log.info("用户队列无数据,开始中断");
                                        service.getCondition().await();
                                        LocalDateTime now = LocalDateTime.now();
                                        if (now.isAfter(service.getTimeSegment().getEndTime())) {
                                            break;
                                        }
                                    } catch (InterruptedException e) {
                                        e.printStackTrace();
                                    }
                                }
                                log.info("中断被唤醒,继续运行");
                                //如果用户队列和服务队列均有数据
                                while (redisService.llen(userKey) > 0 && redisService.llen(serviceKey) > 0) {
                                    //匹配出秒杀结果,并扣减服务数量,增加用户秒杀过该服务的数量
                                    redisService.secondKill(userKey, serviceKey, "UserResult" + service.getId(), countKey,String.valueOf(service.getId()));
                                }
                                //如果服务队列为空,表示被秒杀完了,从用户队列弹出用户,告知秒杀失败
                                while (redisService.llen(serviceKey).equals(0L)) {
                                    redisService.hset("UserResult" + service.getId(),redisService.rpop(userKey),"秒杀失败");
                                    if (redisService.llen(userKey).equals(0L)) {
                                        break;
                                    }
                                }
                                //当前时间已经超出了秒杀时间段,结束while(true)无限循环
                                LocalDateTime now = LocalDateTime.now();
                                if (now.isAfter(service.getTimeSegment().getEndTime())) {
                                    break;
                                }
                            } finally {
                                service.getLock().unlock();
                            }
                        }
                    })).get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                break;
            case 1:
                break;
            default:
                break;
        }
    }
} 

当有用户秒杀到服务时,或者服务被秒杀完,用户的下单需要知道自己是否秒杀成功或者秒杀失败。
/**

  • 服务秒杀单
    */
    @Slf4j
    @AllArgsConstructor
    @NoArgsConstructor
    @ServiceSecOrderVersion(value = 1)
    public class ServiceSecOrder implements SecOrder {
    @Getter
    @Setter
    private Long id; //订单id
    @Getter
    @Setter
    private String orderNo; //订单编号
    @Getter
    @Setter
    private Service service; //订单服务内容
    @Getter
    @Setter
    private TimeSegment timeSegment; //秒杀时间段
    @Getter
    @Setter
    private int number; //订单数量
    @Getter
    @Setter
    private BigDecimal amount; //订单金额
    @Getter
    @Setter
    private AppUser user; //下单人
    @Getter
    @Setter
    private int orderStatus; //订单状态
    @Getter
    @Setter
    private LocalDateTime createDate; //创建日期

    @Override
    public String makeOrder(SecOrder secOrder) {
    RedisService redisService = SpringBootUtil.getBean(RedisService.class);
    IdService idService = SpringBootUtil.getBean(IdService.class);
    MessageSender messageSender = SpringBootUtil.getBean(MessageSender.class);
    //如果当前时间在秒杀时间段之外,不允许秒杀下单
    if (LocalDateTime.now().isBefore(((ServiceSecOrder)secOrder).getTimeSegment().getBeginTime())
    || LocalDateTime.now().isAfter(((ServiceSecOrder)secOrder).getTimeSegment().getEndTime())) {
    throw new RuntimeException(“不在秒杀时间段内”);
    }
    //由于测试时间的问题,此处需要屏蔽,等到实际部署时需要恢复
    LocalDateTime now = LocalDateTime.now();
    int year = now.getYear();
    int month = now.getMonthValue();
    int day = now.getDayOfMonth();
    int hour = now.getHour();
    LocalDateTime beginDate = LocalDateTime.of(year,month,day,hour,0,0);
    //从redis中检查是否有开启秒杀时间段
    if (!redisService.exists(“TimeStart:” + new TimeSegment(beginDate,beginDate.plusMinutes(119)).toString())) {
    throw new RuntimeException(“当前时间段无秒杀”);
    }
    ((ServiceSecOrder)secOrder).setId(idService.genId());
    if (((ServiceSecOrder)secOrder).getNumber() > ((ServiceSecOrder)secOrder).getService().getLimitCount()) {
    throw new RuntimeException(“秒杀数量超出限购”);
    }
    AppUser loginAppUser = AppUserUtil.getLoginAppUser();
    AppUser user = new AppUser();
    user.setId(loginAppUser.getId());
    ((ServiceSecOrder)secOrder).setOrderNo(getCode(idService));
    user.setUsername(loginAppUser.getUsername());
    ((ServiceSecOrder)secOrder).setUser(user);
    //设置订单状态0表示未支付状态
    ((ServiceSecOrder)secOrder).setOrderStatus(0);
    ((ServiceSecOrder)secOrder).setCreateDate(LocalDateTime.now());
    //设置用户秒杀队列键名(每一种服务都有独立的用户秒杀队列)
    //队列名由User:+服务id+时间段组成
    String key = “User:” + ((ServiceSecOrder)secOrder).getService().getId()
    + ((ServiceSecOrder)secOrder).getTimeSegment().toString();
    String serviceKey = ((ServiceSecOrder)secOrder).getService().getId()
    + ((ServiceSecOrder)secOrder).getService().getTimeSegment().toString();
    //如果服务队列还有数据,则推送用户进队列,否则直接返回秒杀失败
    if (redisService.llen(serviceKey) > 0) {
    //建立每个服务对每个用户对限制数量对键,格式为——“服务id:用户id”
    String limitUserKey = ((ServiceSecOrder) secOrder).getService().getId() + “:”
    + ((ServiceSecOrder) secOrder).getUser().getId();
    //如果该键存在,获取该键的值(这里需要考虑分布式的并发问题的可能)
    //但是有幂等,所以此处不会出现一个用户同时秒杀一个商品(服务)的多个并发线程存在
    if (redisService.exists(limitUserKey)) {
    String limitCount = redisService.get(limitUserKey);
    //如果该键的值达到服务商品的限制数,返回秒杀失败
    if (Integer.valueOf(limitCount) == ((ServiceSecOrder) secOrder).getService().getLimitCount()) {
    return ((ServiceSecOrder) secOrder).getUser().getUsername() + “秒杀服务”
    + ((ServiceSecOrder) secOrder).getService().getServiceName() + “失败”;
    }
    }else { //如果不存在,设置该键的值为0
    redisService.set(limitUserKey,”0”);
    }
    //将秒杀用户id推送到该队列中
    redisService.lpush(key, ((ServiceSecOrder) secOrder).getUser().getId() + “”);
    //唤醒秒杀结果匹配任务继续执行。
    CompletableFuture.runAsync(() -> {
    messageSender.send(SecendKillMq.MQ_EXCHANGE_SECENDKILL,
    SecendKillMq.ROUTING_KEY_SECENDKILL,
    ((ServiceSecOrder) secOrder).getService().getId());
    });
    LocalDateTime start = LocalDateTime.now();
    //从redis的匹配结果获取当前用户的秒杀结果
    Futurefuture = CompletableFuture.supplyAsync(() -> {
    while (true) {
    if (redisService.hexists(“UserResult” + ((ServiceSecOrder) secOrder).getService().getId(),
    ((ServiceSecOrder) secOrder).getUser().getId() + “”)) {
    return redisService.hget(“UserResult” + ((ServiceSecOrder) secOrder).getService().getId(),
    ((ServiceSecOrder) secOrder).getUser().getId() + “”);
    }
    if (LocalDateTime.now().isAfter(start.plusSeconds(3))) {
    return “秒杀失败”;
    }
    }
    });
    try {
    if (future.get(3000, TimeUnit.MILLISECONDS).equals(“秒杀失败”)) {
    log.info(((ServiceSecOrder) secOrder).getUser().getUsername() + “秒杀服务”
    + ((ServiceSecOrder) secOrder).getService().getServiceName() + “失败”);
    return ((ServiceSecOrder) secOrder).getUser().getUsername() + “秒杀服务”
    + ((ServiceSecOrder) secOrder).getService().getServiceName() + “失败”;
    } else {
    log.info(((ServiceSecOrder) secOrder).getUser().getUsername() + “秒杀服务”
    + ((ServiceSecOrder) secOrder).getService().getServiceName() + “成功”);
    SecOrderDao secOrderDao = SpringBootUtil.getBean(SecOrderDao.class);
    //秒杀成功的将秒杀单存入数据库
    secOrderDao.saveServiceSecOrder((ServiceSecOrder) secOrder);
    //等待支付结果,5分钟后根据支付与否进行处理
    //如果未支付,则将被秒杀到的服务重新入服务队列,并增加服务总数
    ServicePayBackSender servicePayBackSender = SpringBootUtil.getBean(ServicePayBackSender.class);
    CompletableFuture.runAsync(() -> {
    servicePayBackSender.send(SecendKillMq.MQ_EXCHANGE_DEAD,SecendKillMq.ROURING_KEY_DEAD,(ServiceSecOrder) secOrder);
    });
    return ((ServiceSecOrder) secOrder).getUser().getUsername() + “秒杀服务”
    + ((ServiceSecOrder) secOrder).getService().getServiceName() + “成功”;
    }
    } catch (Exception e) {
    e.printStackTrace();
    log.info(((ServiceSecOrder) secOrder).getUser().getUsername() + “秒杀服务”
    + ((ServiceSecOrder) secOrder).getService().getServiceName() + “失败”);
    return ((ServiceSecOrder) secOrder).getUser().getUsername() + “秒杀服务”
    + ((ServiceSecOrder) secOrder).getService().getServiceName() + “失败”;
    }
    } else {
    return ((ServiceSecOrder) secOrder).getUser().getUsername() + “秒杀服务”
    + ((ServiceSecOrder) secOrder).getService().getServiceName() + “失败”;
    }
    }

这里使用了RabbitMQ的延迟队列,配置如下
/**

  • rabbitmq配置

  • */
    @Configuration
    public class RabbitmqConfig {

    @Bean
    public ListsecendKillQueues() {
    Listqueues = new ArrayList<>();
    for (int i = 1;i < 11;i++) {
    Queue queue = new Queue(SecendKillMq.SECENDKILL_QUEUE + “_” + i);
    queues.add(queue);
    }
    return queues;
    }

    @Bean
    public TopicExchange secendKillExchange() {
    return new TopicExchange(SecendKillMq.MQ_EXCHANGE_SECENDKILL);
    }

// @Bean
// public FanoutExchange secondKillExchange() {
// return new FanoutExchange(SecendKillMq.MQ_EXCHANGE_SECENDKILL);
// }

@Bean
public ListbingingSecondKill(){
Listbindings = new ArrayList<>();
for (int i = 1;i < 11;i++) {
Binding binding = BindingBuilder.bind(secendKillQueues().get(i - 1)).to(secendKillExchange())
.with(SecendKillMq.ROUTING_KEY_SECENDKILL + “_” + i);
bindings.add(binding);
}
return bindings;
}

@Bean
public Queue deadQueue() {
Map<String,Object> arguments = new HashMap<>();
arguments.put(“x-dead-letter-exchange”,SecendKillMq.MQ_EXCHANGE_DEAD);
arguments.put(“x-dead-letter-routing-key”,SecendKillMq.ROUTING_KEY_PAYBACK);
return new Queue(SecendKillMq.DEAD_QUEUE,true,false,false,arguments);
}

@Bean
public DirectExchange deadExchange() {
return new DirectExchange(SecendKillMq.MQ_EXCHANGE_DEAD);
}

@Bean
public Binding bindingDeadExchange() {
return BindingBuilder.bind(deadQueue()).to(deadExchange())
.with(SecendKillMq.ROURING_KEY_DEAD);
}

@Bean
public Queue payBackQueue() {
return new Queue(SecendKillMq.PAYBACK_QUEUE,true,false,false);
}

@Bean
public Binding bindingPayBack() {
return BindingBuilder.bind(payBackQueue()).to(deadExchange())
.with(SecendKillMq.ROUTING_KEY_PAYBACK);
}

}
消息生产者ServicePayBackSender如下
@Slf4j
@Component
public class ServicePayBackSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;

public void send(String exchange,String routingKey,Object content) {
    log.info("send content=" + content);
    this.rabbitTemplate.setMandatory(true);
    this.rabbitTemplate.setConfirmCallback(this);
    this.rabbitTemplate.setReturnCallback(this);
    MessagePostProcessor processor = message -> {
        message.getMessageProperties().setExpiration(300000 + "");
        return message;
    };
    this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content),processor);
}

/**
 * 确认后回调:
 * @param correlationData
 * @param ack
 * @param cause
 */
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (!ack) {
        log.info("send ack fail, cause = " + cause);
    } else {
        log.info("send ack success");
    }
}

/**
 * 失败后return回调:
 *
 * @param message
 * @param replyCode
 * @param replyText
 * @param exchange
 * @param routingKey
 */
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
}

/**
 * 对消息对象进行二进制序列化
 * @param o
 * @return
 */
private byte[] serialize(Object o) {
    Kryo kryo = new Kryo();
    ByteArrayOutputStream stream = new ByteArrayOutputStream();
    Output output = new Output(stream);
    kryo.writeObject(output, o);
    output.close();
    return stream.toByteArray();
}

}
消费者如下
@Slf4j
@Component
@RabbitListener(queues = SecendKillMq.PAYBACK_QUEUE)
public class ServicePayBackDeal {
@Autowired
private SecOrderDao secOrderDao;
@Autowired
private RedisService redisService;

@RabbitHandler
public void receice(byte[] data, Channel channel, Message message) throws IOException {
    try {
        //告诉服务器收到这条消��� 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        ServiceSecOrder order = unSerialize(data);
        if (secOrderDao.countServiceSecOrderHasPay(order) == 0) {
            String key = order.getService().getId() + order.getService().getTimeSegment().toString();
            String countKey = order.getService().getId() + ":count";
            String countLimit = order.getService().getId() + ":" + order.getUser().getId();
            redisService.unPayedBack(key, JSONObject.toJSONString(order.getService()),countKey,countLimit);
        }
    } catch (IOException e) {
        e.printStackTrace();
        //丢弃这条消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        log.info("receiver fail");
    }
}

/**
 * 反序列化
 * @param data
 * @return
 */
private ServiceSecOrder unSerialize(byte[] data) {
    Input input = null;
    try {
        Kryo kryo = new Kryo();
        input = new Input(new ByteArrayInputStream(data));
        return kryo.readObject(input,ServiceSecOrder.class);
    }
    finally {
        input.close();
    }
}

}
unPayedBack也是一段redis-lua,代码如下
@Override
public Object unPayedBack(String serviceKey,String serviceValue, String countKey,String limitCountKey) {
String script = “redis.call(‘lpush’,KEYS[1],ARGV[1]) “ +
“redis.call(‘incr’,KEYS[2]) “ +
“return redis.call(‘decr’,KEYS[3])”;
return execute(jedis -> jedis.eval(script,3,serviceKey,countKey,limitCountKey,serviceValue));
}
在秒杀结束后唤醒所有的秒杀中断,退出while(true)的无限循环
/**

  • 在每个秒杀段最后一分钟唤醒所有秒杀中断,每天的1点59分开始,每2小时执行一次
    */
    @Component
    @ElasticSimpleJob(cron=”0 59 1/2 * * ?”,jobName=”signal”,shardingTotalCount=2,jobParameter=”日期”,shardingItemParameters=”0=Signal0,1=Signal1”)
    public class SignalJob implements SimpleJob {
    @Autowired
    private DataDao dataDao;
    @Autowired
    private MessageSender messageSender;

    @Override
    public void execute(ShardingContext shardingContext) {
    //获取当天所有的秒杀区位
    ListlocationTodayList = dataDao.findJobToday(LocalDate.now());
    if (!CollectionUtils.isEmpty(locationTodayList)) {
    //获取一天所有的秒杀区位时间段
    ListsegmentList = locationTodayList.stream().map(location -> location.getTimeSegment())
    .collect(Collectors.toList());
    switch (shardingContext.getShardingItem()) {
    case 0:
    segmentList.stream().filter(timeSegment -> LocalDateTime.now().plusMinutes(-2L).isAfter(timeSegment.getBeginTime()) &&
    LocalDateTime.now().plusMinutes(-2L).isBefore(timeSegment.getEndTime()))
    //将时间段转化成时间段内的秒杀服务
    .map(timeSegment -> {
    ListserviceInSegment = dataDao.findServiceInSegment(timeSegment);
    serviceInSegment.stream().forEach(service -> service.setTimeSegment(timeSegment));
    return serviceInSegment;
    //扁平化所有的秒杀服务
    }).flatMap(services -> services.stream()).forEach(service -> {
    CompletableFuture.runAsync(() -> {
    messageSender.send(SecendKillMq.MQ_EXCHANGE_SECENDKILL,
    SecendKillMq.ROUTING_KEY_SECENDKILL,
    service.getId());
    });
    });
    break;
    case 1:
    break;
    default:
    break;
    }
    }
    }

}
现在我们来建立下单的Controller
先建立一个秒杀单工厂接口
public interface SecOrderFactory {
/**
* 创建秒杀订单
* @return
*/
public SecOrder createSecOrder();

/**
 * 获取秒杀订单
 * @return
 */
public SecOrder getSecOrder();

}
Service版本标签
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ServiceSecOrderVersion {
int value();
}
Service工厂实现类
@Component
public class ServiceSecOrderFactory implements SecOrderFactory {
private Set<Class<?>> classes = ClassUtil.getClassSet(“com.cloud.secondkill.domain”);
private SecOrder createdSecOrder;

@PostConstruct
private void init() {
    this.createdSecOrder = createSecOrder();
}

@Override
public SecOrder createSecOrder() {
    Object instance = null;
    try {
        //过滤有@ServiceSecOrderVersion标签的类
        instance = classes.stream().filter(clazz -> clazz.isAnnotationPresent(ServiceSecOrderVersion.class))
                //过滤实现了SecOrder接口的类
                .filter(clazz -> SecOrder.class.isAssignableFrom(clazz))
                //找出版本号大的类,并实例化为对象
                .max(Comparator.comparingInt(clazz -> clazz.getAnnotation(ServiceSecOrderVersion.class).value()))
                .get().newInstance();
    } catch (InstantiationException e) {
        e.printStackTrace();
    } catch (IllegalAccessException e) {
        e.printStackTrace();
    }
    return (SecOrder) instance;
}

@Override
public SecOrder getSecOrder() {
    return createdSecOrder;
}

}
秒杀单的Bean
@Component
public class SecOrderBean {
@Getter
private Map<String,Class> secOrderFactoryMap = new HashMap<>();
@Getter
private Map<String,Class> secOrderMap = new HashMap<>();

@PostConstruct
private void init() {
    Set<Class<?>> classes = ClassUtil.getClassSet("com.cloud.secondkill.domain");
    classes.stream().filter(clazz -> SecOrderFactory.class.isAssignableFrom(clazz))
            .forEach(clazz -> secOrderFactoryMap.put(clazz.getSimpleName(),clazz));
    classes.stream().filter(clazz -> SecOrder.class.isAssignableFrom(clazz))
            .forEach(clazz -> secOrderMap.put(clazz.getSimpleName(),clazz));
}

}
下单Controller
@Slf4j
@RestController
public class SecOrderController {
private ThreadLocalsecOrderFactory = new ThreadLocal<>();
private ThreadLocalsecOrderService = new ThreadLocal<>();
@Autowired
private SecOrderBean secOrderBean;

@SuppressWarnings("unchecked")
@Transactional
@PostMapping("/makesecorder")
@LxRateLimit(perSecond = 500,timeOut = 500) //此处为接口限流,数字可以根据实际清空改写
public Result<String> makeSecOrder(@RequestBody String secOrderStr, @RequestParam("type") String type) throws Exception {
    log.info(secOrderStr);
    try {
        SecOrder secOrder = setSecOrderFactory(secOrderStr, type);
        String secResult = this.secOrderService.get().makeOrder(secOrder);
        return Result.success(secResult);
    } finally {
        secOrderFactory.remove();
        secOrderService.remove();
    }
}

private SecOrder setSecOrderFactory(String secOrderStr,String type) {
    Class classType = secOrderBean.getSecOrderMap().get(type);
    Object secOrder = JSONObject.parseObject(secOrderStr, classType);
    setSecOrderFactory(type);
    return (SecOrder) secOrder;
}

private void setSecOrderFactory(String type) {
    Class classFactoryType = secOrderBean.getSecOrderFactoryMap().get(type + "Factory");
    this.secOrderFactory.set((SecOrderFactory) SpringBootUtil.getBean(classFactoryType));
    this.secOrderService.set(this.secOrderFactory.get().getSecOrder());
}

}
以上不明白的可以参考用工厂方法模式来下不同订单 以及使用简单工厂加接口加适配器模式来遵守开闭原则
@LxRateLimit为接口限流
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LxRateLimit {
/**
*
* @return
*/
String value() default “”;

/**
 * 每秒向桶中放入令牌的数量   默认最大即不做限流
 * @return
 */
double perSecond() default Double.MAX_VALUE;

/**
 * 获取令牌的等待时间  默认0
 * @return
 */
int timeOut() default 0;

/**
 * 超时时间单位
 * @return
 */
TimeUnit timeOutUnit() default TimeUnit.MILLISECONDS;

}
AOP拦截
@Slf4j
@Aspect
@Component
public class LxRateLimitAspect {
private RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE);

/**
 * 带有指定注解切入
 */
@ResponseBody
@Around(value = "@annotation(com.cloud.secondkill.annotion.LxRateLimit)")
public Object aroundNotice(ProceedingJoinPoint pjp) throws Throwable {
    log.info("拦截到了{}方法...", pjp.getSignature().getName());
    Signature signature = pjp.getSignature();
    MethodSignature methodSignature = (MethodSignature)signature;
    //获取目标方法
    Method targetMethod = methodSignature.getMethod();
    if (targetMethod.isAnnotationPresent(LxRateLimit.class)) {
        //获取目标方法的@LxRateLimit注解
        LxRateLimit lxRateLimit = targetMethod.getAnnotation(LxRateLimit.class);
        rateLimiter.setRate(lxRateLimit.perSecond());
        if (!rateLimiter.tryAcquire(lxRateLimit.timeOut(), lxRateLimit.timeOutUnit()))
            return "服务器繁忙,请稍后再试!";
    }
    return pjp.proceed();
}

}
具体原理可以参考Guava RateLimiter限流源码解析和实例应用
现在要加入秒杀的幂等,来防止工具秒杀
token Controller
@RestController
public class TokenController {
@Autowired
private RedisService redisService;

@GetMapping("/gettoken")
public Map getToken(@RequestParam("url") String url) {
    Map<String,String> tokenMap = new HashMap<>();
    String tokenValue = UUID.randomUUID().toString();
    AppUser user = AppUserUtil.getLoginAppUser();
    String key = url + user.getId();
    tokenMap.put(key,tokenValue);
    redisService.set(key,tokenValue);
    return tokenMap;
}

}
配置幂等Spring MVC拦截器
@Component
@Slf4j
public class TokenInterceptor implements HandlerInterceptor {
@Autowired
private RedisService redisService;

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
    AppUser user = AppUserUtil.getLoginAppUser();
    String tokenName = request.getRequestURI() + user.getId();
    String tokenValue = request.getParameter("token_value");
    if (tokenValue != null && !tokenValue.equals("")) {
        log.info("tokenName:{},tokenValue:{}",tokenName,tokenValue);
        return handleToken(request,response,handler,user);
    }
    return false;
}

@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable ModelAndView modelAndView) throws Exception {
    if (redisService.exists(request.getParameter("token_value"))) {
        RedisTool.releaseDistributedLock(redisService, request.getParameter("token_value"), request.getParameter("token_value"));
    }
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable Exception ex) throws Exception {

}

/**
 * 分布式锁处理
 * @param request
 * @param response
 * @param handler
 * @return
 * @throws Exception
 */
private boolean handleToken(HttpServletRequest request, HttpServletResponse response, Object handler,AppUser user) throws Exception {
    //当大量高并发下所有带token参数的请求进来时,进行分布式锁定,允许某一台服务器的一个线程进入,锁定时间3分钟
    if (RedisTool.tryGetDistributedLock(redisService,request.getParameter("token_value"),request.getParameter("token_value"),180)) {
        if (redisService.exists(request.getRequestURI() + user.getId())) {
            //当请求的url与token与redis中的存储相同时
            if (redisService.get(request.getRequestURI() + user.getId()).equals(request.getParameter("token_value"))) {
                //放行的该线程删除redis中存储的token
                redisService.del(request.getRequestURI() + user.getId());
                log.info("放行");
                RedisTool.releaseDistributedLock(redisService,request.getParameter("token_value"),request.getParameter("token_value"));
                //放行
                return true;
            }
        }
        log.info("拦截");
        //当请求的url与token与redis中的存储不相同时,解除锁定
        RedisTool.releaseDistributedLock(redisService,request.getParameter("token_value"),request.getParameter("token_value"));
        //进行拦截
        return false;
    }
    return false;
}

}
@SpringBootConfiguration
public class TokenInterceptorConfig extends WebMvcConfigurerAdapter {
@Autowired
private TokenInterceptor tokenInterceptor;

@Override
public void addInterceptors(InterceptorRegistry registry) {
    registry.addInterceptor(tokenInterceptor).addPathPatterns("/makesecorder");
}

}    

本文标题: 推荐系列-分布式秒杀

本文作者: OSChina

发布时间: 2021年04月15日 09:19

最后更新: 2023年06月29日 07:10

原始链接: https://haoxiang.eu.org/42f9643e/

版权声明: 本文著作权归作者所有,均采用CC BY-NC-SA 4.0许可协议,转载请注明出处!

× 喜欢就赞赏一下呗!
打赏二维码