多线程并发在电商系统下的追本溯源-性能调优

    科技2023-11-12  70

    锁优化

    synchronized优化

    synchronized的使用很简单,从优化的角度,注意synchronized加锁的粒度,根据业务需要,粒度越小越好。 对象级别的粒度:

    public synchronized void test(){ // TODO } public void test(){ synchronized (this) { // TODO } }

    类级别的粒度:

    public static synchronized void test(){ // TODO } public static void test(){ synchronized (TestSynchronized.class) { // TODO } }

    下面我们看一个优化的案例。 初始代码如下:

    public class BadSync implements Runnable { long start = System.currentTimeMillis(); volatile AtomicLong totalTime = new AtomicLong(0); volatile int i; public void inc() { i++; } @Override public synchronized void run() { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } inc(); totalTime.getAndAdd(System.currentTimeMillis() - start); } public static void main(String[] args) throws InterruptedException { BadSync sync = new BadSync(); for (int i = 0; i < 5; i++) { new Thread(sync).start(); } Thread.sleep(3000); System.out.println("计数器:" + sync.i); System.out.println("总耗时:" + sync.totalTime); } }

    结果如下:

    上面的代码synchronized加在了run()方法上,但是我们分析业务,多线程并发写入操作在inc(),因此,synchronized加在run()上,这个锁的粒度大了。所以可以做如下的优化:

    public class BadSync implements Runnable { long start = System.currentTimeMillis(); volatile AtomicLong totalTime = new AtomicLong(0); volatile int i; public synchronized void inc() { i++; } @Override public void run() { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } inc(); totalTime.getAndAdd(System.currentTimeMillis() - start); } public static void main(String[] args) throws InterruptedException { BadSync sync = new BadSync(); for (int i = 0; i < 5; i++) { new Thread(sync).start(); } Thread.sleep(3000); System.out.println("计数器:" + sync.i); System.out.println("总耗时:" + sync.totalTime); } }

    或者:

    public class BadSync implements Runnable { long start = System.currentTimeMillis(); volatile AtomicLong totalTime = new AtomicLong(0); volatile int i; public void inc() { i++; } @Override public void run() { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (this) { inc(); } totalTime.getAndAdd(System.currentTimeMillis() - start); } public static void main(String[] args) throws InterruptedException { BadSync sync = new BadSync(); for (int i = 0; i < 5; i++) { new Thread(sync).start(); } Thread.sleep(3000); System.out.println("计数器:" + sync.i); System.out.println("总耗时:" + sync.totalTime); } }

    结果:

    Lock锁优化

    Lock锁优化可以从这基本维度来考虑优化方案:

    是否选择了合适的Lock锁是否可以使用并发容器而不用加锁Lock锁的粒度

    我们接下来看一个案例,电商系统中记录首页被用户浏览的次数,以及租后一次操作的时间(包含读或写)。

    public class TotalLock { // 开始时间 final long start = System.currentTimeMillis(); // 总耗时 AtomicLong totalTime = new AtomicLong(0); // 缓存变量,记录浏览次数和最后一次操作时间 private Map<String, Long> map = new HashMap(){{put("count", 0L);}}; // 定义锁 ReentrantLock LOCK = new ReentrantLock(); public Map<String, Long> read() { LOCK.lock(); // 加锁 long end = 0L; try { Thread.currentThread().sleep(100); end = System.currentTimeMillis(); // 记录最后一次操作时间 map.put("time", end); } catch (InterruptedException e) { e.printStackTrace(); } finally { LOCK.unlock(); // 解锁 } System.out.println(Thread.currentThread().getName() + " ,read=" + (end - start)); totalTime.addAndGet(end - start); return map; } public Map<String, Long> write() { LOCK.lock(); long end = 0L; try { Thread.currentThread().sleep(100); map.put("count", map.get("count") + 1); end = System.currentTimeMillis(); map.put("time", end); } catch (InterruptedException e) { e.printStackTrace(); } finally { LOCK.unlock(); } System.out.println(Thread.currentThread().getName() + " ,write=" + (end - start)); totalTime.addAndGet(end - start); return map; } public static void main(String[] args) throws InterruptedException { TotalLock count = new TotalLock(); for (int i = 0; i < 10; i++) { new Thread(() -> count.read()).start(); } for (int i = 0; i < 1; i++) { new Thread(() -> count.write()).start(); } Thread.sleep(5000); System.out.println("总耗时:" + count.totalTime.get()); } }

    结果: 这里代码是否有可以优化的空间,来提升性能呢? 分析业务,查看次数这里其实是可以并行读取的,我们关注的业务是写入次数,也就是count,至于读取发生的时间time的写入操作,只是一个put,不需要原子性保障,对这个加互斥锁没有必要。所以我们可以替换成读写锁。

    public class TotalLock { // 开始时间 final long start = System.currentTimeMillis(); // 总耗时 AtomicLong totalTime = new AtomicLong(0); // 因为,虽然write不行并行的,但是read()是并行的,所以这里使用ConcurrentHashMap private Map<String, Long> map = new ConcurrentHashMap(){{put("count", 0L);}}; // 定义锁 ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock(); public Map<String, Long> read() { LOCK.readLock().lock(); // 加读锁 try { Thread.currentThread().sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); // 记录最后一次操作时间 map.put("time", end); LOCK.readLock().unlock(); // 解锁 System.out.println(Thread.currentThread().getName() + ", read=" + (end - start)); totalTime.addAndGet(end - start); return map; } public Map<String, Long> write() { LOCK.writeLock().lock(); try { Thread.currentThread().sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } map.put("count", map.get("count") + 1); long end = System.currentTimeMillis(); map.put("time", end); LOCK.writeLock().unlock(); System.out.println(Thread.currentThread().getName() + ", write=" + (end - start)); totalTime.addAndGet(end - start); return map; } public static void main(String[] args) throws InterruptedException { TotalLock count = new TotalLock(); for (int i = 0; i < 10; i++) { new Thread(() -> count.read()).start(); } for (int i = 0; i < 1; i++) { new Thread(() -> count.write()).start(); } Thread.sleep(5000); System.out.println(count.map); System.out.println("总耗时:" + count.totalTime.get()); } }

    CAS乐观锁优化

    多线程并发的计数器

    public class NormalSync implements Runnable { Long start = System.currentTimeMillis(); int i = 0; public synchronized void run() { int j = i; //实际业务中可能会有一堆的耗时操作,这里等待100ms模拟 try { //做一系列操作 Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //业务结束后,增加计数 i = j + 1; System.out.println(Thread.currentThread().getId() + " ok,time=" + (System.currentTimeMillis() - start)); } public static void main(String[] args) throws InterruptedException { NormalSync test = new NormalSync(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); Thread.sleep(1000); System.out.println("last value=" + test.i); } }

    结果:

    这种使用synchronized的方案,虽然开启了多个线程,但是其实和串行没有区别。

    因此, 我们可以针对于这种在方法上暴力加synchronized的方案,使用CAS的思想进行优化。

    public class NormalSync implements Runnable { Long start = System.currentTimeMillis(); int i = 0; public void run() { int j = i; //实际业务中可能会有一堆的耗时操作,这里等待100ms模拟 try { //做一系列操作 Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } try { Field f = Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); Unsafe unsafe = (Unsafe) f.get(null); long offset = unsafe.objectFieldOffset(NormalSync.class.getDeclaredField("i")); while (!unsafe.compareAndSwapInt(this, offset, j, j + 1)) { j = i; } } catch (Exception e) { e.printStackTrace(); } //业务结束后,增加计数 System.out.println(Thread.currentThread().getId() + " ok,time=" + (System.currentTimeMillis() - start)); } public static void main(String[] args) throws InterruptedException { NormalSync test = new NormalSync(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); Thread.sleep(1000); System.out.println("last value=" + test.i); } }

    但是Unsafe这个类,我们在生产环境中不推荐使用,因为不安全,但是我们可以使用CAS的这种思想将上面的代码进一步优化。使用CAS + synchronized。

    public class NormalSync implements Runnable { Long start = System.currentTimeMillis(); int i = 0; public void run() { int j = i; //实际业务中可能会有一堆的耗时操作,这里等待100ms模拟 try { //做一系列操作 Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } // 这里加锁是为了保证原子性 synchronized (this) { //注意这里! while (j != i) { j = i; } i = j + 1; } //业务结束后,增加计数 System.out.println(Thread.currentThread().getId() + " ok,time=" + (System.currentTimeMillis() - start)); } public static void main(String[] args) throws InterruptedException { NormalSync test = new NormalSync(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); Thread.sleep(1000); System.out.println("last value=" + test.i); } }

    结果: 当然在实际场景中,计数器使用原子类是最简单有效,性能也很好的。这里只是演示CAS的思想优化性能。

    总结

    减少锁的时间,不需要同步执行的代码,能不放在同步块中执行就不要放在同步块中,可以让锁尽快释放。减小锁的粒度,将物理上的一个锁,拆成逻辑上的多个锁,增加并行度,从而降低锁竞争,典型如分段锁。锁的粒度,拆锁的粒度不能无限拆,最多可以将一个锁拆成当前cpu数量相等。减少加减锁的次数,假如有一个循环,循环内的操作需要加锁,我们应该把锁防盗循环外面,否则每次进出循环,都要加锁。使用读写锁,业务细分,读操作加读锁,可以并发读,写操作使用写锁,只能单线程写,参考计数器案例。善用volatile,volatile的控制比synchronized更轻量化,在某些变量上可以加以运用,如单例模式中。

    线程池参数优化

    Executors工具创建的线程池详解

    newCachedThreadPool

    public static ExecutorService newCachedThreadPool() { /* * core = 0 * max = Integer.MAX_VALUE * timeout = 60s * queue = 1 * 只要线程不够用,就会一直创建线程, 不用就全部释放。 * 线程数:0 - Integer.MAX_VALUE之间弹性伸缩 * 注意:任务并发太高且耗时较长时,造成CPU高消耗,需要警惕OOM */ return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

    newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) { /* * core = max = 指定数量 * timeout = 0(不过期) * queue = 无界队列 * 线程数一直保持指定数量,不增不减,永不超时 * 如果线程不够用,任务就一直追加到队列中,排队等候 * 注意:并发太高时,容易造成长时间等待无响应,如果任务临时变量数据过多,容易OOM */ return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

    newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() { /* * core = max = 1 * timeout = 0 * queue = 无界队列 * 只有一个线程在慢悠悠的干活,可以任务是fix的特例 * 适用于任务零散提交,不紧急的情况 */ return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

    newScheduledThreadPool

    public ScheduledThreadPoolExecutor(int corePoolSize) { /** * core = 指定数量 * max = Integer.MAX_VALUE * timeout = 0 * queue = DelayWorkQueue(重点!) * 用于任务调度,DelayWorkQueue限制了任务可被获取的时机,也就实现时间上的控制 */ super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }

    线程池调优的经验

    corePoolSize

    核心线程数,一旦有任务进来,在core范围内立刻就会创建线程执行任务。因此,这个值应该业务并发量在绝大多数时间内的并发情况。 1) 高并发,耗时短,减小core size,如配CPU数 + 1; 2) 并发不高,耗时长,又分成看两种情况:多IO,调大core size,如 2*CPU + 1,让CPU充分工作;多运算,调小core size,减少上下文切换时间。

    workQueue

    workQueue的确定,需要根据业务可接受的等待时间来定。 1)CPU资源紧张,内存够大,但是任务不紧急,可以接受稍长的延迟,减少core size,增大workQueue; 2)CPU资源够用,但是内存紧张,且任务紧急,要求快速响应,降低workQueue。

    maximumPoolSize

    maximumPoolSize常常需要与workQueue搭配使用,如果使用无界队列,maximumPoolSize就无意义。如果workQueue满了,同时达到max,后续任务可能会丢失。

    1)任务波动较大,波峰来临时,减小workQueue,增大maximumPoolSize,让尽可能多的线程执行任务; 2)如果任务不紧急,可以减小maximumPoolSize,增大woreQueue。

    keepAliveTime

    1)如果不缺CPU,且任务波峰波谷的间隔较短,且无法捉摸,应当适当增大keepAliveTime,避免频繁创建和销毁线程。 2)如果波峰波谷间隔时间较长,可以适当调小keepAliveTime,让闲置的线程尽快销毁,释放CPU资源。

    handler

    当线程池满负荷工作时,在来任务,会执行此策略。如果不处理默认抛出异常,记录日志。需要根据任务处理的数据的重要程度来定,是否可接受数据丢失。如果可接受,可不做任何处理;否则,可以记录日志,或者放入消息队列等等。

    并发容器的选择

    在大部分场景通常都是弱一致性的情况下,使用 ConcurrentHashMap 即可;如果数据量级很高,且存在大量增删改操作,则可以考虑使用ConcurrentSkipListMap。读多,写少的高并发场景下,ArrayList使用CopyOnWriteArrayList来替代

    上下文切换优化

    为什么要做上下文切换的优化? 当线程切换的时候,当前线程如果没有执行完任务时,如果发生线程切换,需要记录当前任务的状态,待此线程抢到执行权,再次任务的时候,需要将此任务状态恢复。这个过程就是上下文切换。上下文切换时需要消耗资源和时间,做好上下文切换的优化,对于性能提升有一定好处。

    竞争锁

    锁的持有时间越长,就意味着有越多的线程在等待资源释放,发生上下文切换的次数就越多,代价就越大。因此,在加锁的时候,还是要注意锁的粒度,将锁加到需要加锁的地方,越近越好。

    public void f(){ f1(); synchronized (this){ f2(); } f3(); }

    结论:类锁 < 静态锁 < 方法锁 < 代码块锁 , 能共享锁的地方尽量不要用独享锁。

    notify/wait

    我们在使用notify/wait的时候,一定要注意不要出现两种情况。1. 过时通知。2. 额外唤醒

    过时通知

    public class WaitInvalid { int total = 0; byte[] lock = new byte[0]; //计算1-100的和,算完后通知print public void count() { synchronized (lock) { for (int i = 1; i < 101; i++) { total += i; } lock.notify(); } System.out.println("count finish"); } //打印,等候count的通知 public void print() { synchronized (lock) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(total); } public static void main(String[] args) { WaitInvalid waitInvalid = new WaitInvalid(); new Thread(() -> { waitInvalid.count(); }).start(); new Thread(() -> { waitInvalid.print(); }).start(); } }

    结果: 结果是没有正确打印出total。 count先执行时,提前释放了notify通知,这时候,print还没进入wait,收不到这个信号。等print去wait的时候,再等通知等不到了,典型的通知过时现象。仅仅因为一行代码的顺序问题,如果不注意,造成整个程序卡死。解决方案:交换print线程和count线程的先后顺序。

    额外唤醒

    public class NotifyInvalid { List list = new ArrayList(); byte[] lock = new byte[0]; public void del() { synchronized (lock) { //没值就等,有值就删 if (list.isEmpty()) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.remove(0); } } public void add() { synchronized (lock) { //加个值后唤醒 list.add(0, 0); lock.notifyAll(); } } public static void main(String[] args) throws InterruptedException { NotifyInvalid notifyInvalid = new NotifyInvalid(); //启动两个线程等候删除 for (int i = 0; i < 2; i++) { new Thread(() -> { notifyInvalid.del(); }).start(); } //新线程添加一个 new Thread(() -> { notifyInvalid.add(); }).start(); Thread.sleep(1000); System.out.println(notifyInvalid.list.size()); } }

    结果: 分析: 出异常了!因为等候的两个线程第一个删除后,第二个唤醒时,等待前的状态已失效。 方案: 线程唤醒后,要警惕睡眠前后状态不一致,要二次判断

    线程池

    线程池的线程数量设置不宜过大,因为一旦线程池的工作线程总数超过系统所拥有的处理器数量,就会导致过多的上下文切换。慎用Executors,尤其如newCachedThreadPool。这个方法前面分析过。如果任务过多会无休止创建过多线程,增加了上下文的切换。最好根据业务情况,自己创建线程池参数。

    虚拟机

    合理搭配JVM内存调优,减少 JVM 垃圾回收的频率可以有效地减少上下文切换

    Processed: 0.015, SQL: 8