注册

喝了100杯酱香拿铁,我顿悟了锁的精髓

大家好,我是哪吒。


上一篇提到了锁粒度的问题,使用“越细粒度的锁越好”,真的是这样吗?会不会产生一些其它问题?


先说结论,可能会产生死锁问题。


下面还是以购买酱香拿铁为例:



1、定义咖啡实体类Coffee


@Data
public class Coffee {
// 酱香拿铁
private String name;

// 库存
public Integer inventory;

public ReentrantLock lock = new ReentrantLock();
}

2、初始化数据


private static List<Coffee> coffeeList = generateCoffee();

public static List<Coffee> generateCoffee(){
List<Coffee> coffeeList = new ArrayList<>();
coffeeList.add(new Coffee("酱香拿铁1", 100));
coffeeList.add(new Coffee("酱香拿铁2", 100));
coffeeList.add(new Coffee("酱香拿铁3", 100));
coffeeList.add(new Coffee("酱香拿铁4", 100));
coffeeList.add(new Coffee("酱香拿铁5", 100));
return coffeeList;
}

3、随机获取n杯咖啡


// 随机获取n杯咖啡
private static List<Coffee> getCoffees(int n) {
if(n >= coffeeList.size()){
return coffeeList;
}

List<Coffee> randomList = Stream.iterate(RandomUtils.nextInt(n), i -> RandomUtils.nextInt(coffeeList.size()))
.distinct()// 去重
.map(coffeeList::get)// 跟据上面取得的下标获取咖啡
.limit(n)// 截取前面 需要随机获取的咖啡
.collect(Collectors.toList());
return randomList;
}

4、购买咖啡


private static boolean buyCoffees(List<Coffee> coffees) {
//存放所有获得的锁
List<ReentrantLock> locks = new ArrayList<>();
for (Coffee coffee : coffees) {
try {
// 获得锁3秒超时
if (coffee.lock.tryLock(3, TimeUnit.SECONDS)) {
// 拿到锁之后,扣减咖啡库存
locks.add(coffee.lock);
coffeeList = coffeeList.stream().map(x -> {
// 购买了哪个,就减哪个
if (coffee.getName().equals(x.getName())) {
x.inventory--;
}
return x;
}).collect(Collectors.toList());
} else {
locks.forEach(ReentrantLock::unlock);
return false;
}
} catch (InterruptedException e) {
}
}
locks.forEach(ReentrantLock::unlock);
return true;
}

3、通过parallel并行流,购买100次酱香拿铁,一次买2杯,统计成功次数


public static void main(String[] args){
StopWatch stopWatch = new StopWatch();
stopWatch.start();

// 通过parallel并行流,购买100次酱香拿铁,一次买2杯,统计成功次数
long success = IntStream.rangeClosed(1, 100).parallel()
.mapToObj(i -> {
List<Coffee> getCoffees = getCoffees(2);
//Collections.sort(getCoffees, Comparator.comparing(Coffee::getName));
return buyCoffees(getCoffees);
})
.filter(result -> result)
.count();

stopWatch.stop();
System.out.println("成功次数:"+success);
System.out.println("方法耗时:"+stopWatch.getTotalTimeSeconds()+"秒");
for (Coffee coffee : coffeeList) {
System.out.println(coffee.getName()+"-剩余:"+coffee.getInventory()+"杯");
}
}


耗时有点久啊,20多秒。


数据对不对?



  • 酱香拿铁1卖了53杯;
  • 酱香拿铁2卖了57杯;
  • 酱香拿铁3卖了20杯;
  • 酱香拿铁4卖了22杯;
  • 酱香拿铁5卖了19杯;
  • 一共卖了171杯。

数量也对不上,应该卖掉200杯才对,哪里出问题了?


4、使用visualvm测一下:


果不其然,出问题了,产生了死锁。


线程 m 在等待的一个锁被线程 n 持有,线程 n 在等待的另一把锁被线程 m 持有。



  1. 比如美杜莎买了酱香拿铁1和酱香拿铁2,小医仙买了酱香拿铁2和酱香拿铁1;
  2. 美杜莎先获得了酱香拿铁1的锁,小医仙获得了酱香拿铁2的锁;
  3. 然后美杜莎和小医仙接下来要分别获取 酱香拿铁2 和 酱香拿铁1 的锁;
  4. 这个时候锁已经被对方获取了,只能相互等待一直到 3 秒超时。


5、如何解决呢?


让大家都先拿一样的酱香拿铁不就好了。让所有线程都先获取酱香拿铁1的锁,然后再获取酱香拿铁2的锁,这样就不会出问题了。


也就是在随机获取n杯咖啡后,对其进行排序即可。


// 通过parallel并行流,购买100次酱香拿铁,一次买2杯,统计成功次数
long success = IntStream.rangeClosed(1, 100).parallel()
.mapToObj(i -> {
List<Coffee> getCoffees = getCoffees(2);
// 根据咖啡名称进行排序
Collections.sort(getCoffees, Comparator.comparing(Coffee::getName));
return buyCoffees(getCoffees);
})
.filter(result -> result)
.count();

6、再测试一下



  • 成功次数100;
  • 咖啡卖掉了200杯,数量也对得上。
  • 代码执行速度也得到了质的飞跃,因为不用没有循环等待锁的时间了。


看来真的不是越细粒度的锁越好,真的会产生死锁问题。通过对酱香拿铁进行排序,解决了死锁问题,避免循环等待,效率也得到了提升。


作者:哪吒编程
来源:juejin.cn/post/7287429638020005944

0 个评论

要回复文章请先登录注册