手写高性能缓存

    科技2023-11-18  73

     


    缓存是在实际生产中非常常用的工具,用了缓存以后,我们可以避免重复计算,提高吞吐量。

    虽然缓存乍一看很简单,不就是一个Map吗?最初级的缓存确实可以用一个Map来实现,不过一个功能完备、性能强劲的缓存,需要考虑的点就非常多了


    1 最简单的缓存

    我们从最简单的HashMap入手,一步步提高我们缓存的性能

    /** * 最简单的缓存形式:HashMap */ public class MyCache { private final HashMap<String, Integer> cache = new HashMap<>(); // 该方法需要加锁 public synchronized Integer computer(String userId) throws InterruptedException { Integer result = cache.get(userId); //先检查HashMap里面有没有保存过之前的计算结果 if (result == null) { //如果缓存中找不到,那么需要现在计算一下结果,并且保存到HashMap中 result = doCompute(userId); cache.put(userId, result); } return result; } private Integer doCompute(String userId) throws InterruptedException { TimeUnit.SECONDS.sleep(5); return new Integer(userId); } public static void main(String[] args) throws InterruptedException { MyCache myCache = new MyCache(); System.out.println("开始计算了"); Integer result = myCache.computer("13"); System.out.println("第一次计算结果:" + result); result = myCache.computer("13"); System.out.println("第二次计算结果:" + result); } }

    用synchronized实现

    性能差,当多个线程同时想计算的时候,即便是查询的key不同也需要等待,严重时,性能甚至比不用缓存更差。代码复用能力差

    给HashMap加final关键字:属性被声明为final后,该变量则只能被赋值一次。且一旦被赋值,final的变量就不能再被改变。所以我们把它加上final关键字,增强安全性。

    public static void main(String[] args) { MyCache expensiveComputer = new MyCache(); new Thread(() -> { try { Integer result = expensiveComputer.computer("666"); System.out.println("第一次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Integer result = expensiveComputer.computer("666"); System.out.println("第三次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Integer result = expensiveComputer.computer("667"); System.out.println("第二次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); }

    2 装饰者模式解耦

    我们假设ExpensiveFunction类是耗时计算的实现类,实现了Computable接口,但是其本身不具备缓存功能,也不需要考虑缓存的事情

    /** * 有一个计算函数computer,用来代表耗时计算,每个计算器都要实现这个接口,这样就可以无侵入实现缓存功能 */ public interface Computable<A, V> { V compute(A arg) throws Exception; } /** * 耗时计算的实现类,实现了Computable接口,但是本身不具备缓存能力,不需要考虑缓存的事情 */ public class ExpensiveFunction implements Computable<String, Integer> { @Override public Integer compute(String arg) throws Exception { Thread.sleep(5000); return Integer.valueOf(arg); } } /** * 用装饰者模式,给计算器自动添加缓存功能 */ public class MyCache<A, V> implements Computable<A, V> { private final Map<A, V> cache = new HashMap(); private final Computable<A, V> c; public MyCache(Computable<A, V> c) { this.c = c; } @Override public synchronized V compute(A arg) throws Exception { System.out.println("进入缓存机制"); V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg, result); } return result; } public static void main(String[] args) throws Exception { MyCache<String, Integer> expensiveComputer = new MyCache<>(new ExpensiveFunction()); Integer result = expensiveComputer.compute("666"); System.out.println("第一次计算结果:" + result); result = expensiveComputer.compute("666"); System.out.println("第二次计算结果:" + result); } }

    3 缩小锁的粒度

    虽然提高了并发效率,但是并意味着就是线程安全的,还需要考虑到同时读写等情况

    /** * 缩小了synchronized的粒度,提高性能,但是依然并发不安全 */ public class MyCache<A, V> implements Computable<A, V> { private final Map<A, V> cache = new HashMap(); private final Computable<A, V> c; public MyCache(Computable<A, V> c) { this.c = c; } @Override public V compute(A arg) throws Exception { System.out.println("进入缓存机制"); V result = cache.get(arg); if (result == null) { result = c.compute(arg); synchronized (this) { cache.put(arg, result); } } return result; } public static void main(String[] args) throws Exception { MyCache<String, Integer> expensiveComputer = new MyCache<>(new ExpensiveFunction()); Integer result = expensiveComputer.compute("666"); System.out.println("第一次计算结果:" + result); result = expensiveComputer.compute("666"); System.out.println("第二次计算结果:" + result); } }

    4 使用并发集合ConcurrentHashMap

    其实没必要自己实现线程安全的HashMap,也不应该加synchronized,因为我们自己实现的性能远不如现有的并发集合,我们来使用ConcurrentHashMap优化我们的缓存

    /** * 缩小了synchronized的粒度,提高性能,但是依然并发不安全 */ public class MyCache<A, V> implements Computable<A, V> { private final Map<A, V> cache = new ConcurrentHashMap<>(); private final Computable<A, V> c; public MyCache(Computable<A, V> c) { this.c = c; } @Override public V compute(A arg) throws Exception { System.out.println("进入缓存机制"); V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg, result); } return result; } public static void main(String[] args) throws Exception { MyCache<String, Integer> expensiveComputer = new MyCache<>(new ExpensiveFunction()); Integer result = expensiveComputer.compute("666"); System.out.println("第一次计算结果:" + result); result = expensiveComputer.compute("666"); System.out.println("第二次计算结果:" + result); } }

    5 用Future解决重复计算问题

    Callable<String> task = () -> "success"; FutureTask<String> futureTask = new FutureTask<>(task); new Thread(futureTask).start(); String res = futureTask.get(); System.out.println(res); // 可以多次获取 String res2 = futureTask.get(); System.out.println(res2);

    5.1 重复计算演示

    缺点∶在计算完成前,另一个要求计算相同值的请求到来,会导致计算两遍,这和缓存想避免多次计算的初衷恰恰相反,是不可接受的

     

    // 重复计算问题 public class MyCache<A, V> implements Computable<A, V> { private final Map<A, V> cache = new ConcurrentHashMap<>(); private final Computable<A, V> c; public MyCache(Computable<A, V> c) { this.c = c; } @Override public V compute(A arg) throws Exception { System.out.println("进入缓存机制"); V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg, result); } return result; } public static void main(String[] args) throws Exception { MyCache<String, Integer> expensiveComputer = new MyCache<>( new ExpensiveFunction()); new Thread(() -> { try { Integer result = expensiveComputer.compute("666"); System.out.println("第一次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Integer result = expensiveComputer.compute("666"); System.out.println("第三次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Integer result = expensiveComputer.compute("667"); System.out.println("第二次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); } }

    5.2 Callable看似解决了重复计算

    动机:现在不同的线程进来以后,确实可以同时计算,但是如果两个线程脚前脚后,也就是相差无几的进来请求同一个数据, 会出现重复计算问题

    这个例子只有2个线程,并不可怕,但是如果是100个线程都请求同样的内容,却都需要重新计算,那么会造成巨大的浪费

    /** * 利用Future,避免重复计算(并未完全解决问题) */ public class MyCache<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<>(); private final Computable<A, V> c; public MyCache(Computable<A, V> c) { this.c = c; } @Override public V compute(A arg) throws Exception { Future<V> f = cache.get(arg); if (f == null) { Callable<V> callable = new Callable<V>() { @Override public V call() throws Exception { return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<>(callable); f = ft; cache.put(arg, ft); System.out.println("从FutureTask调用了计算函数"); ft.run(); } return f.get(); } public static void main(String[] args) throws Exception { MyCache<String, Integer> expensiveComputer = new MyCache<>(new ExpensiveFunction()); new Thread(() -> { try { Integer result = expensiveComputer.compute("666"); System.out.println("第一次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Integer result = expensiveComputer.compute("666"); System.out.println("第三次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Integer result = expensiveComputer.compute("667"); System.out.println("第二次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); } }

    如果有两个同时计算666的线程,同时调用cache.get方法,那么返回的结果都为null,后面还是会创建两个任务去计算相 同的值

    5.3 putIfAbsent&Callable解决重复计算

    原子操作putIfAbsent

    put在放入数据时,如果放入数据的key已经存在与Map中,最后放入的数据会覆盖之前存在的数据,putIfAbsent   如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回null

    https://blog.51cto.com/hanchaohan/2130916

    /** * 利用Future,避免重复计算 */ public class MyCache<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<>(); private final Computable<A, V> c; public MyCache(Computable<A, V> c) { this.c = c; } @Override public V compute(A arg) throws Exception { Future<V> f = cache.get(arg); if (f == null) { Callable<V> callable = () -> c.compute(arg); FutureTask<V> ft = new FutureTask<>(callable); f = cache.putIfAbsent(arg, ft); if (f == null) { f = ft; System.out.println("从FutureTask调用了计算函数"); ft.run(); } } return f.get(); } public static void main(String[] args) { MyCache<String, Integer> expensiveComputer = new MyCache<>(new ExpensiveFunction()); new Thread(() -> { try { Integer result = expensiveComputer.compute("666"); System.out.println("第一次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Integer result = expensiveComputer.compute("666"); System.out.println("第三次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Integer result = expensiveComputer.compute("667"); System.out.println("第二次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); } }


    6 计算中抛出异常

    计算过程并不是—帆风顺的,假设有一个计算类,它有一定概率计算失败,应该如何处理?

    /** * 耗时计算的实现类,有概率计算失败 */ public class MayFail implements Computable<String, Integer> { @Override public Integer compute(String arg) throws Exception { double random = Math.random(); if (random > 0.5) { throw new IOException("读取文件出错"); } Thread.sleep(3000); return Integer.valueOf(arg); } } public class MyCache<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<>(); private final Computable<A, V> c; public MyCache(Computable<A, V> c) { this.c = c; } @Override public V compute(A arg) throws InterruptedException { while (true) { Future<V> f = cache.get(arg); if (f == null) { Callable<V> callable = () -> c.compute(arg); FutureTask<V> ft = new FutureTask<>(callable); f = cache.putIfAbsent(arg, ft); if (f == null) { f = ft; System.out.println("从FutureTask调用了计算函数"); ft.run(); } } try { return f.get(); } catch (CancellationException e) { // 执行过程被取消 System.out.println("被取消了"); cache.remove(arg); // 解决缓存 "污染" throw e; } catch (InterruptedException e) { // 被中断 cache.remove(arg); // 解决缓存 "污染",计算失败则移除Future,增加健壮性 throw e; } catch (ExecutionException e) { // 计算过程抛出异常,这里吞掉异常,再次尝试 System.out.println("计算错误,需要重试"); cache.remove(arg); // 解决缓存 "污染" } } } public static void main(String[] args) throws Exception { MyCache<String, Integer> expensiveComputer = new MyCache<>(new MayFail()); new Thread(() -> { try { Integer result = expensiveComputer.compute("666"); System.out.println("第一次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Integer result = expensiveComputer.compute("666"); System.out.println("第三次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Integer result = expensiveComputer.compute("667"); System.out.println("第二次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); //Thread.sleep(1); //Future<Integer> future = expensiveComputer.cache.get("666"); //future.cancel(true); } }

    这3种异常之所以用不同的catch块捕获,是因为它们的处理逻辑是不同的

    CancellationException和InterruptedException是人为取消的,那么我们应该立即终止任务但是如果是计算错误,且我们明确知道多试几次就可以得到答案,那么我们的逻辑应该是重试,尝试多次直到正确的结果出现在这里,我们加上while(true)来保证计算出错不会影响我们的逻辑,然后如果是计算错误,就进入下一个循环,重新计算,直到计算成功,如果是人为取消,那么就抛出异常然后结束运行 Callable<String> task = () -> { throw new RuntimeException("异常"); }; FutureTask<String> futureTask = new FutureTask<>(task); new Thread(futureTask).start(); try { String value = futureTask.get(); System.out.println(value); } catch (Throwable e) { e.printStackTrace(); } // 可以多次获取 Thread.sleep(1000); try { String value = futureTask.get(); System.out.println(value); } catch (Throwable e) { e.printStackTrace(); }

    7 缓存过期功能

    为每个结果指定过期时间,并定期扫描过期的元素

    高并发访问时,如果同时过期,那么同时都拿不到缓存,导致打爆cpu和MySQL,造成缓存雪崩、缓存击穿等高并发下的缓存问题,缓存过期时间设置为随机就能解决这个问题

    /** * 出于安全性考虑,缓存需要设置有效期,到期自动失效,否则如果缓存一直不失效,那么带来缓存不一致等问题 */ public class MyCache<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<>(); private final Computable<A, V> c; public MyCache(Computable<A, V> c) { this.c = c; } // 不过期 @Override public V compute(A arg) throws InterruptedException { while (true) { Future<V> f = cache.get(arg); if (f == null) { Callable<V> callable = () -> c.compute(arg); FutureTask<V> ft = new FutureTask<>(callable); f = cache.putIfAbsent(arg, ft); if (f == null) { f = ft; System.out.println("从FutureTask调用了计算函数"); ft.run(); } } try { return f.get(); } catch (CancellationException e) { System.out.println("被取消了"); cache.remove(arg); throw e; } catch (InterruptedException e) { cache.remove(arg); throw e; } catch (ExecutionException e) { System.out.println("计算错误,需要重试"); cache.remove(arg); } } } public final static ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); // 时间不固定 public V computeRandomExpire(A arg) throws InterruptedException { long randomExpire = (long) (Math.random() * 10000); return compute(arg, randomExpire); } // 时间固定 public V compute(A arg, long expire) throws InterruptedException { if (expire > 0) { executor.schedule(() -> expire(arg), expire, TimeUnit.MILLISECONDS); } return compute(arg); } // 清除缓存 public synchronized void expire(A key) { Future<V> future = cache.get(key); if (future != null) { if (!future.isDone()) { System.out.println("Future任务被取消"); future.cancel(true); } System.out.println("过期时间到,缓存被清除"); cache.remove(key); } } public static void main(String[] args) throws Exception { MyCache<String, Integer> expensiveComputer = new MyCache<>(new MayFail()); new Thread(() -> { try { Integer result = expensiveComputer.compute("666", 5000L); System.out.println("第一次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Integer result = expensiveComputer.compute("666"); System.out.println("第三次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Integer result = expensiveComputer.compute("667"); System.out.println("第二次的计算结果:" + result); } catch (Exception e) { e.printStackTrace(); } }).start(); Thread.sleep(6000L); Integer result = expensiveComputer.compute("666"); System.out.println("主线程的计算结果:" + result); } }

    8 用线程池测试缓存性能

    模拟大量请求,观测缓存效果。用线程池创建大量线程get,用了缓存后,总体耗时大大减少,体现了缓存的作用

    public class MyCacheTest { static MyCache<String, Integer> expensiveComputer = new MyCache<>(new ExpensiveFunction()); public static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(100); long start = System.currentTimeMillis(); for (int i = 0; i < 100; i++) { service.submit(() -> { Integer result = null; try { System.out.println(Thread.currentThread().getName() + "开始等待"); countDownLatch.await(); SimpleDateFormat dateFormat = ThreadSafeFormatter.dateFormatter.get(); String time = dateFormat.format(new Date()); System.out.println(Thread.currentThread().getName() + " " + time + "被放行"); result = expensiveComputer.compute("666"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(result); }); } Thread.sleep(5000); countDownLatch.countDown(); service.shutdown(); //while (!service.isTerminated()) { //} //System.out.println("总耗时" + (System.currentTimeMillis()-start)); } } class ThreadSafeFormatter { public static ThreadLocal<SimpleDateFormat> dateFormatter = new ThreadLocal<SimpleDateFormat>() { //每个线程会调用本方法一次,用于初始化 @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat("mm:ss"); } //首次调用本方法时,会调用initialValue();后面的调用会返回第一次创建的值 @Override public SimpleDateFormat get() { return super.get(); } }; }

     

    Processed: 0.009, SQL: 8