synchronized的使用很简单,从优化的角度,注意synchronized加锁的粒度,根据业务需要,粒度越小越好。 对象级别的粒度:
public synchronized void test(){ // TODO } public void test(){ synchronized (this) { // TODO } }类级别的粒度:
public static synchronized void test(){ // TODO } public static void test(){ synchronized (TestSynchronized.class) { // TODO } }下面我们看一个优化的案例。 初始代码如下:
public class BadSync implements Runnable { long start = System.currentTimeMillis(); volatile AtomicLong totalTime = new AtomicLong(0); volatile int i; public void inc() { i++; } @Override public synchronized void run() { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } inc(); totalTime.getAndAdd(System.currentTimeMillis() - start); } public static void main(String[] args) throws InterruptedException { BadSync sync = new BadSync(); for (int i = 0; i < 5; i++) { new Thread(sync).start(); } Thread.sleep(3000); System.out.println("计数器:" + sync.i); System.out.println("总耗时:" + sync.totalTime); } }结果如下:
上面的代码synchronized加在了run()方法上,但是我们分析业务,多线程并发写入操作在inc(),因此,synchronized加在run()上,这个锁的粒度大了。所以可以做如下的优化:
public class BadSync implements Runnable { long start = System.currentTimeMillis(); volatile AtomicLong totalTime = new AtomicLong(0); volatile int i; public synchronized void inc() { i++; } @Override public void run() { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } inc(); totalTime.getAndAdd(System.currentTimeMillis() - start); } public static void main(String[] args) throws InterruptedException { BadSync sync = new BadSync(); for (int i = 0; i < 5; i++) { new Thread(sync).start(); } Thread.sleep(3000); System.out.println("计数器:" + sync.i); System.out.println("总耗时:" + sync.totalTime); } }或者:
public class BadSync implements Runnable { long start = System.currentTimeMillis(); volatile AtomicLong totalTime = new AtomicLong(0); volatile int i; public void inc() { i++; } @Override public void run() { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (this) { inc(); } totalTime.getAndAdd(System.currentTimeMillis() - start); } public static void main(String[] args) throws InterruptedException { BadSync sync = new BadSync(); for (int i = 0; i < 5; i++) { new Thread(sync).start(); } Thread.sleep(3000); System.out.println("计数器:" + sync.i); System.out.println("总耗时:" + sync.totalTime); } }结果:
Lock锁优化可以从这基本维度来考虑优化方案:
是否选择了合适的Lock锁是否可以使用并发容器而不用加锁Lock锁的粒度我们接下来看一个案例,电商系统中记录首页被用户浏览的次数,以及租后一次操作的时间(包含读或写)。
public class TotalLock { // 开始时间 final long start = System.currentTimeMillis(); // 总耗时 AtomicLong totalTime = new AtomicLong(0); // 缓存变量,记录浏览次数和最后一次操作时间 private Map<String, Long> map = new HashMap(){{put("count", 0L);}}; // 定义锁 ReentrantLock LOCK = new ReentrantLock(); public Map<String, Long> read() { LOCK.lock(); // 加锁 long end = 0L; try { Thread.currentThread().sleep(100); end = System.currentTimeMillis(); // 记录最后一次操作时间 map.put("time", end); } catch (InterruptedException e) { e.printStackTrace(); } finally { LOCK.unlock(); // 解锁 } System.out.println(Thread.currentThread().getName() + " ,read=" + (end - start)); totalTime.addAndGet(end - start); return map; } public Map<String, Long> write() { LOCK.lock(); long end = 0L; try { Thread.currentThread().sleep(100); map.put("count", map.get("count") + 1); end = System.currentTimeMillis(); map.put("time", end); } catch (InterruptedException e) { e.printStackTrace(); } finally { LOCK.unlock(); } System.out.println(Thread.currentThread().getName() + " ,write=" + (end - start)); totalTime.addAndGet(end - start); return map; } public static void main(String[] args) throws InterruptedException { TotalLock count = new TotalLock(); for (int i = 0; i < 10; i++) { new Thread(() -> count.read()).start(); } for (int i = 0; i < 1; i++) { new Thread(() -> count.write()).start(); } Thread.sleep(5000); System.out.println("总耗时:" + count.totalTime.get()); } }结果: 这里代码是否有可以优化的空间,来提升性能呢? 分析业务,查看次数这里其实是可以并行读取的,我们关注的业务是写入次数,也就是count,至于读取发生的时间time的写入操作,只是一个put,不需要原子性保障,对这个加互斥锁没有必要。所以我们可以替换成读写锁。
public class TotalLock { // 开始时间 final long start = System.currentTimeMillis(); // 总耗时 AtomicLong totalTime = new AtomicLong(0); // 因为,虽然write不行并行的,但是read()是并行的,所以这里使用ConcurrentHashMap private Map<String, Long> map = new ConcurrentHashMap(){{put("count", 0L);}}; // 定义锁 ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock(); public Map<String, Long> read() { LOCK.readLock().lock(); // 加读锁 try { Thread.currentThread().sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); // 记录最后一次操作时间 map.put("time", end); LOCK.readLock().unlock(); // 解锁 System.out.println(Thread.currentThread().getName() + ", read=" + (end - start)); totalTime.addAndGet(end - start); return map; } public Map<String, Long> write() { LOCK.writeLock().lock(); try { Thread.currentThread().sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } map.put("count", map.get("count") + 1); long end = System.currentTimeMillis(); map.put("time", end); LOCK.writeLock().unlock(); System.out.println(Thread.currentThread().getName() + ", write=" + (end - start)); totalTime.addAndGet(end - start); return map; } public static void main(String[] args) throws InterruptedException { TotalLock count = new TotalLock(); for (int i = 0; i < 10; i++) { new Thread(() -> count.read()).start(); } for (int i = 0; i < 1; i++) { new Thread(() -> count.write()).start(); } Thread.sleep(5000); System.out.println(count.map); System.out.println("总耗时:" + count.totalTime.get()); } }多线程并发的计数器
public class NormalSync implements Runnable { Long start = System.currentTimeMillis(); int i = 0; public synchronized void run() { int j = i; //实际业务中可能会有一堆的耗时操作,这里等待100ms模拟 try { //做一系列操作 Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //业务结束后,增加计数 i = j + 1; System.out.println(Thread.currentThread().getId() + " ok,time=" + (System.currentTimeMillis() - start)); } public static void main(String[] args) throws InterruptedException { NormalSync test = new NormalSync(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); Thread.sleep(1000); System.out.println("last value=" + test.i); } }结果:
这种使用synchronized的方案,虽然开启了多个线程,但是其实和串行没有区别。
因此, 我们可以针对于这种在方法上暴力加synchronized的方案,使用CAS的思想进行优化。
public class NormalSync implements Runnable { Long start = System.currentTimeMillis(); int i = 0; public void run() { int j = i; //实际业务中可能会有一堆的耗时操作,这里等待100ms模拟 try { //做一系列操作 Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } try { Field f = Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); Unsafe unsafe = (Unsafe) f.get(null); long offset = unsafe.objectFieldOffset(NormalSync.class.getDeclaredField("i")); while (!unsafe.compareAndSwapInt(this, offset, j, j + 1)) { j = i; } } catch (Exception e) { e.printStackTrace(); } //业务结束后,增加计数 System.out.println(Thread.currentThread().getId() + " ok,time=" + (System.currentTimeMillis() - start)); } public static void main(String[] args) throws InterruptedException { NormalSync test = new NormalSync(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); Thread.sleep(1000); System.out.println("last value=" + test.i); } }但是Unsafe这个类,我们在生产环境中不推荐使用,因为不安全,但是我们可以使用CAS的这种思想将上面的代码进一步优化。使用CAS + synchronized。
public class NormalSync implements Runnable { Long start = System.currentTimeMillis(); int i = 0; public void run() { int j = i; //实际业务中可能会有一堆的耗时操作,这里等待100ms模拟 try { //做一系列操作 Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } // 这里加锁是为了保证原子性 synchronized (this) { //注意这里! while (j != i) { j = i; } i = j + 1; } //业务结束后,增加计数 System.out.println(Thread.currentThread().getId() + " ok,time=" + (System.currentTimeMillis() - start)); } public static void main(String[] args) throws InterruptedException { NormalSync test = new NormalSync(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); Thread.sleep(1000); System.out.println("last value=" + test.i); } }结果: 当然在实际场景中,计数器使用原子类是最简单有效,性能也很好的。这里只是演示CAS的思想优化性能。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { /* * core = 0 * max = Integer.MAX_VALUE * timeout = 60s * queue = 1 * 只要线程不够用,就会一直创建线程, 不用就全部释放。 * 线程数:0 - Integer.MAX_VALUE之间弹性伸缩 * 注意:任务并发太高且耗时较长时,造成CPU高消耗,需要警惕OOM */ return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { /* * core = max = 指定数量 * timeout = 0(不过期) * queue = 无界队列 * 线程数一直保持指定数量,不增不减,永不超时 * 如果线程不够用,任务就一直追加到队列中,排队等候 * 注意:并发太高时,容易造成长时间等待无响应,如果任务临时变量数据过多,容易OOM */ return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { /* * core = max = 1 * timeout = 0 * queue = 无界队列 * 只有一个线程在慢悠悠的干活,可以任务是fix的特例 * 适用于任务零散提交,不紧急的情况 */ return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) { /** * core = 指定数量 * max = Integer.MAX_VALUE * timeout = 0 * queue = DelayWorkQueue(重点!) * 用于任务调度,DelayWorkQueue限制了任务可被获取的时机,也就实现时间上的控制 */ super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }corePoolSize
核心线程数,一旦有任务进来,在core范围内立刻就会创建线程执行任务。因此,这个值应该业务并发量在绝大多数时间内的并发情况。 1) 高并发,耗时短,减小core size,如配CPU数 + 1; 2) 并发不高,耗时长,又分成看两种情况:多IO,调大core size,如 2*CPU + 1,让CPU充分工作;多运算,调小core size,减少上下文切换时间。
workQueue
workQueue的确定,需要根据业务可接受的等待时间来定。 1)CPU资源紧张,内存够大,但是任务不紧急,可以接受稍长的延迟,减少core size,增大workQueue; 2)CPU资源够用,但是内存紧张,且任务紧急,要求快速响应,降低workQueue。
maximumPoolSize
maximumPoolSize常常需要与workQueue搭配使用,如果使用无界队列,maximumPoolSize就无意义。如果workQueue满了,同时达到max,后续任务可能会丢失。
1)任务波动较大,波峰来临时,减小workQueue,增大maximumPoolSize,让尽可能多的线程执行任务; 2)如果任务不紧急,可以减小maximumPoolSize,增大woreQueue。
keepAliveTime
1)如果不缺CPU,且任务波峰波谷的间隔较短,且无法捉摸,应当适当增大keepAliveTime,避免频繁创建和销毁线程。 2)如果波峰波谷间隔时间较长,可以适当调小keepAliveTime,让闲置的线程尽快销毁,释放CPU资源。
handler
当线程池满负荷工作时,在来任务,会执行此策略。如果不处理默认抛出异常,记录日志。需要根据任务处理的数据的重要程度来定,是否可接受数据丢失。如果可接受,可不做任何处理;否则,可以记录日志,或者放入消息队列等等。
为什么要做上下文切换的优化? 当线程切换的时候,当前线程如果没有执行完任务时,如果发生线程切换,需要记录当前任务的状态,待此线程抢到执行权,再次任务的时候,需要将此任务状态恢复。这个过程就是上下文切换。上下文切换时需要消耗资源和时间,做好上下文切换的优化,对于性能提升有一定好处。
锁的持有时间越长,就意味着有越多的线程在等待资源释放,发生上下文切换的次数就越多,代价就越大。因此,在加锁的时候,还是要注意锁的粒度,将锁加到需要加锁的地方,越近越好。
public void f(){ f1(); synchronized (this){ f2(); } f3(); }结论:类锁 < 静态锁 < 方法锁 < 代码块锁 , 能共享锁的地方尽量不要用独享锁。
我们在使用notify/wait的时候,一定要注意不要出现两种情况。1. 过时通知。2. 额外唤醒
结果: 结果是没有正确打印出total。 count先执行时,提前释放了notify通知,这时候,print还没进入wait,收不到这个信号。等print去wait的时候,再等通知等不到了,典型的通知过时现象。仅仅因为一行代码的顺序问题,如果不注意,造成整个程序卡死。解决方案:交换print线程和count线程的先后顺序。
结果: 分析: 出异常了!因为等候的两个线程第一个删除后,第二个唤醒时,等待前的状态已失效。 方案: 线程唤醒后,要警惕睡眠前后状态不一致,要二次判断
合理搭配JVM内存调优,减少 JVM 垃圾回收的频率可以有效地减少上下文切换