Java微服务架构师—并发编程基础(下)

    科技2022-07-10  124

    Java并发编程基础

    并发(线程)安全

    线程安全概念

    多个线程访问同一个类(对象或方法)时,这个类始终都能表现出正确的行为,那么这个类(对象或方法)就是线程安全的

    多个线程访问myThread的run方法时,以排队方式进行处理(cpu分配的先后顺序而定),一个线程想要执行synchronized修饰的代码必须获取到锁,拿到锁则执行,否则这个线程会不断尝试获取锁,直到拿到锁,而且是多个线程同时去竞争这把锁(锁竞争问题)。

    synchronized: 可以在任意对象或方法上加锁,而加锁的这段代码称为‘互斥区或临界区’

    同步

    同步的概念就是共享,我们要牢记“共享”这两个字,如果不是共享的资源,就没有必要进行同步

    目的就是为了线程安全,其实对于线程安全来说需要同时满足两个特性

    原子性(同步)一致性

    异步

    异步的概念就是独立,相互之间不受到任何制约,就像Ajax

    线程间通信

    操作系统中将其经过特殊处理使其成一个整体的一种方式,当线程间存在通信,则系统间的交互性更强。对线程任务处理过程进行有效管控监督

    实现方式

    wait 释放锁notify 不释放锁配合synchronizedCountDownLatchvolatile

    案例

    集合遍历问题(list)共享成员变量

    线程安全的集合

    Collections.synchronized***

    编码规约

    考虑代码健壮性常见场景敏感性

    常用处理方法

    ThreadLocal

    线程局部变量,是一种多线程间并发访问变量的解决方案。与synchronized等加锁方式不同,ThreadLocal完全不提供锁,而是以空间换时间的手段,为每个线程提供变量的独立副本,以保障线程安全。从性能上说,ThreadLocal不具绝对优势,在并发不是很高的时候,加锁的性能会更好,但作为与锁完全无关的线程安全解决方案在高并发量或者锁竞争激烈的场景,使用ThreadLocal可以在一定程度上减少锁竞争问题。

    Volatile

    概念定义

    volatile关键字主要作用是使变量的更改在多个线程间即时可见。

    作用

    在多个线程间可进行变量的变更,使得线程间进行数据的共享可见。阻止指令重排序,happens-before语义

    内存模型JMM

    一个线程可以执行的操作有使用(use)、赋值(assign)、装载(load)、存储(store)、锁定()(lock)、解锁(unlock)而主内存可以执行的操作有读(read)、写(write)、锁定(lock)、解锁(unlock),每个操作都是原子性的操作volatile的作用就是强制线程到主内存(共享内存)里去读取变量,而不是线程工作内存里去读取数据,从而实现了多线程间的变量的可见,也就是满足了线程安全的可见性解决的主要问题是一个线程对共享变量的写入何时对另一个线程可见所有的变量都存储在主内存中,每一个线程都有一个私有的本地内存,本地内存中存储了该线程使用到的变量是主内存中的拷贝,线程对变量的所有操作(读取、赋值等)都必须在工作内存中进行,而不能直接读写主内存中的变量(volatile变量也不例外)

    内存屏障

    指令重排序

    程序指令的执行顺序可能与代码书写顺序不一致,这个过程就是指令重排序。

    意义

    JVM能根据处理器的特性,充分利用多级缓存、多核心等进行适当的指令重排序,是程序在保证业务正确运行的同时,充分利用CPU的执行特点,最大限度发挥机器性能

    有些场景不合适指令重排序(使用volatile解决)

    HappensBefore (JMM内存机制)

    代码中一个操作执行结果需要对另一个操作可见,那么这两个操作之间必须要存在happens-before关系,使用这个语义来阐述操作之间的内存可见性。

    相应规则

    一个线程的每一个操作,happens-before于该线程中后续任意操作(写操作happens-before于其后的读操作)对一个锁的解锁,happens-before与随后的对这个锁的加锁对一个volatile域的写,happens-before于任意后续对这个volatile域的读传递性规则:A h-b B 且 B h-b C 则 A h-b C

    应用案例

    停止线程

    Atomic

    概念定义

    封装了一系列的基础类型和对象操作,主要目的是为了实现原子性操作(多线程操作的线程安全操作)保证一个JVM中是原子性、(同时不能为复合操作,否则中间的操作读取复合操作部分可能不对 虽然最终结果是对的 可以通过加锁解决

    主要核心类

    AtomicIntegerAtomicLongAtomicBooleanAtomicIntegerArrayAtomicLongArrayAtomicReference

    同步类容器(串行)

    同步锁

    VectorHashTable

    底层机制

    都是由JDK的Collections.synchronized***等工厂方法去创建实现的使用synchronized关键字对每个公用的方法都进行同步使用Object mutex对象锁使得每次只能有一个线程访问容器的状态

    问题

    还可能出现(并发修改异常)并发处理性能差并发场景下,严重降低吞吐量锁竞争

    并发类容器(异步)jdk1.5开始

    ConcurrentHashMap

    基本使用

    size()

    消除伪共享(缓存行)分之思想volatile long value

    基本原理

    内部结构使用段(Segment)来表示不同的部分,每一段其实就是一个小的HashTable升级版,它们有自己的锁,降低了锁的粒度只要多个修改操作发生在不同的段上,就可以并发进行,默认16段,所以最高支持16个线程的并发修改操作减小锁的粒度达到降低锁竞争的方式,并且代码中大多共享变量采用volatile关键字声明,目的是第一时间获取修改的内容,性能非常好。

    CopyOnWrite

    基本使用

    jdk中常用的

    CopyOnWriteArrayListConyOnWriteArraySet

    最佳使用场景

    读多写少,元素不能太多

    基本原理

    往一个容器添加元素时,不直接往当前的容器添加,而是将当前容器进行Copy,复制出一个新容器,然后新容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。采用读写分离的思想,读和写不同的容器,实现对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前这个容器不会添加任何的元素。

    ConcurrentSkipListMap

    支持并发排序

    队列

    (顶层接口Queue)

    并发队列(ConcurrentLinkedQueue)

    适应高并发场景的队列,通过无锁的方式,实现高并发状态下的高性能,通常性能好于BlockingQueue,是一个基于链表的无界线程安全队列

    该队列遵循先进先出原则,头是最先加入,尾是最后加入,该队列不允许null元素。

    基本使用

    add()和offer()都是添加元素方法,在这里这两个方法没有任何区别poll()和peek()都是取头元素节点,区别在于前者是会删除元素,后者不会

    阻塞队列(BlockingQueue)

    基本使用

    offer(anObject) 将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳则返回true,否则返回false (不会阻塞线程)offer(E e, long timeout ,TimeUint uint ) 设置等待时间,如果指定时间内,还不能往队列加入则返回失败。put() 把anObject加入BlockingQueue里,如果BlockingQueue没有空间,调用此方法的线程会被阻塞直到队列里有空间为止。poll(long timeout,TimeUint uint) 从队列里取出一个队首元素,如果在指定时间内,队列一旦有元素可取,则立即返回队列中的元素,否则直到时间超时还没有元素可取,返回失败。take()取出BlockiQueue里排在队首的元素,若队列为空,阻塞进入等待状态直到队列有新的元素被加入。drainTo() 一次性从队列获取所有可用的元素(还可以指定获取的个数),通过该方法,可以提高获取元素的效率,不需要多次分批加锁或释放锁。

    常用实现

    基于数组的阻塞队列实现ArrayBlockingQueue

    内部维护一个定长数组,缓存队列中的数据元素,其内部没有实现读写分离,也就意味着生产和消费不能完全并行。长度需要定义,可以指定先进先出或者先进后出,由于是定长所以也叫有界队列。子主题 2

    LinkedBlockingDeque

    PriorityBlockingQueue

    基于优先级的阻塞队列,出队列是依据优先级优先级的判断是通过构造函数传入的Compator对象来决定,也就是说传入的队列的元素必须实现Comparable接口,在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,也一个无界队列。元素不能为基本类型

    SynchronousQueue

    一种没有缓冲的阻塞队列生产者生产的数据直接会被消费者获取并消费

    DelayQueue

    带有延迟时间的队列其中的元素只有指定延迟时间到了,才能够从队列中获取到该元素,DelayQueue的元素必须实现Delayed接口,该队列是一个无界队列,应用场景:对缓存超时的数据进行移除,任务超时处理,空闲链接超时关闭等。

    阻塞队列手写模拟

    拥有固定长度的装载元素的容器计数器统计容器的容量大小当队列里面没有元素的时候需要执行线程等待当队列元素已满的时候执行的线程也需要等待合理的线程间通信

    常见锁

    同步锁LockLockSupport

    常见实现

    对象锁类锁

    JUC

    Unsafe

    概念定义

    由于Java不能直接访问操作系统底层,而是通过本地方法来访问。Unsafe类提供了硬件级别的原子操作

    主要提供的功能

    内存操作

    allocateMemory

    分配内存

    reallocateMemory

    扩充内存

    freeMemory

    释放内存

    字段的定位与修改

    可以定位对象某字段的内存位置也可以修改对象的字段值,即使它是私有的

    挂起与恢复

    将一个线程进行挂起是通过park方法实现的,调用park后,线程将一直阻塞直到超时或中断等条件出现unpark可以终止一个挂起的线程,使其恢复正常。整个并发框架中对线程的挂起操作被封装在LockSupport类中,这个类中有各种版本的pack方法,但最终都调用了Unsafe.park()方法

    CAS操作(乐观锁)

    Compare And Swap 简单来说就是比较并交换。CAS包含三个操作数,内存位置(V)、预期原值(A) 和 新值(B)如果内存位置的值与预期原值匹配,那么处理器会自动将该位置值更新为新值,否则,处理器不做任何处理,无论哪种情况,它都会在CAS指令之前返回该位置的值。CAS有效的说明:当某个位置v应该包含值A,如果包含该值,则将B放到这个位置,否则,不用更改该位置,值需告诉这个位置现在的值即可Java并发包(java.util.concurrent)中大量使用CAS操作,涉及到并发的方法都调用了sun.misc.Unsafe类方法进行CAS操作,在Unsafe中是通过compareAndSwapXXX方法实现的

    AQS

    AQS核心

    概念定义

    AbstractQueuedSynchronizer 抽象队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现底层依赖于它,比如:ReetrantLock/Semaphore/CountDownLatch

    内部设计

    它内部维护一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会放入此队列),这里volatile是核心关键字,具体volatile的语义,state的访问方式有三种

    getState()setState()compareAndSetState()

    AQS定义两者资源共享方式

    独占式Exclusive

    只有一个线程能执行(如:ReetrantLoack)

    共享式Share

    多个线程可同时执行,(如:Semaphore、CountDownLatch)不同的自定义同步器争用共享资源的方式也不同

    CLH队列(FIFO)锁)

    head、tail

    核心方法

    自定义同步器在实现时只需要实现共享资源state的获取和释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队),AQS已经在底层实现好了。自定义同步器实现主要实现如下几种方法

    isHeldExclusively()

    该线程是否正在独占资源,只有用到Condition才需要实现它

    tryAcquire()

    独占方式,尝试获取资源,成功则返回tru,失败则返回false

    tryRelease()

    独占方式,成功则返回true,失败则返回false

    tryAcquireShared(int)

    共享方式,尝试获取资源,负数表示失败,0表示成功,但没有剩余可以资源;正数表示成功,且有剩余资源

    tryRelaeseShared(int)

    共享方式,尝试获取资源,成功则返回true,失败则返回false

    锁方式

    公平锁

    队列排队,先进先服务

    非公平锁

    ReetrantLock重入锁

    state初始化为0,表示未锁定状态,A线程lock()时,会调用tryAcquire()独占该锁并将state+1,此后其他线程在tryAcquire()时就会失败,直到A线程unlock()到state=0(释放锁)为止,其他线程才能有机会获取该锁,当然释放锁之前,A线程自己是可以重新获取此锁的(state会累加)这就是可重入的概念,但要注意,获取多少次就要释放多少次,这样才能保证state能回到零态的

    重入锁,在需要进行同步的代码部分加上锁定,不要忘记最后一定要释放锁定,不然会造成锁无法释放,其他线程进不了的问题

    重载的有参构造可以选择公平锁和非公平锁实现方法

    应用场景

    重入锁需求阻塞唤醒多个线程

    Condition

    可配合ReetrantLock实现阻塞唤醒线程通信在使用Lock,可以使用一个等待、通知的类,它就是Condition,这个类一定是针对具体某一把锁的,也就是在只有锁的基础上才会产生Condition我们可以通过一个Lockd对象产生多个Condition进行多线程间的交互,可以使得部分需要唤醒的线程唤醒,其他线程则继续等待通知await、signal signalall

    ReadWriteLock读写锁

    ReetrantReadWriteLock

    其核心是实现读写分离的锁,在高并发访问下,读多写少的情况下,性能高于重入锁分成两把锁,读锁和写锁,在读锁下,多线程可以并发访问,但在写锁的时候只能一个一个顺序访问口诀:读读共享,写写互斥,读写互斥

    lockSupport(线程锁)

    LockSupport提供park()和unpark()方法实现阻塞线程和解除线程阻塞,实现的阻塞和解除阻塞是基于‘许可(permit)’ 作为关联,permit相当于一个信号量。默认为0,线程之间不再需要一个Object获取其他变量存储状态,不需要关心对方状态

    对象Object锁缺点:先阻塞再唤醒 配合synchronized一起使用

    LockSupport静态方法 park和unpark

    没有顺序限制针对具体某个线程不需要在同步代码块里,所以线程间也不需要维护一个共享的同步对象,实现线程间的解耦unpark函数可以先于park调用,所以无需担心线程的执行先后顺序

    底层调用Unsafe类的方法

    减少锁竞争

    避免死锁减少锁的持有时间减少锁的粒度锁的功能分离尽量使用无锁的操作,如原子操作(Atomic系列类),volatile关键字,CAS

    AQS源码分析

    acquire(int)

    此方法是独占模式下线程获取共享资源的顶层入口,如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,整个过程忽略中断的影响,这也正是lock()的语义,当然不仅仅只限制于lock(),获取资源后,线程就可以去执行其临界代码了AQS核心acquire() ,tryAcquire()尝试获取资源,如果成功则直接返回,addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;acquireQueue()使线程在等待队列中获取资源,一直获取到资源后才返回,如果整个等待过程被中断过,则返回tru,否则返回false。 如果线程在等待过程中被中断过,它是不响应的,只是获取资源后才再进行自我中断selfInterrupt将中断补上

    UML类绘制

    CountDownLatch

    任务分成n个子线程去执行,state也初始化n(注意n要与线程数一致)。这n个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1,等到所以子线程都执行完后(state=0),会unpark()调用线程,然后主调用线程就会从await()函数返回继续后续操作

    CAS

    Unsafe类原子性操作

    具体实现

    CountDownLatch

    概念定义

    用于监听某些初始化操作,并且线程进行阻塞,等初始化执行完毕后,通知主线程继续工作执行

    子主题 2

    CyclicBarrier

    概念定义

    栅栏的概念,多线程的进行阻塞。等待某一个临界条件满足后,再同时执行

    使用场景

    存在并行与同步的任务,可以采用以提高系统性能

    Future与Caller回调

    Future模式,也是非常经典的设计模式,这种模式主要就是利用空间换时间的概念,也就是说异步执行(需要开启一个新的线程)Future模式非常适合在处理耗时很长的业务逻辑中进行使用,可以有效的减少系统的响应时间,提供系统的吞吐量类比商品定单,提交订单后,当订单处理完成后,在家里等待商品送货上门即可,Ajax请求,页面是异步的进行后台处理的手写模拟Future模式

    Exchanger

    线程间数据交换,它提供了一个同步点,在这个同步点,两个线程可以交换彼此的数据。(案例:数据比对)两个线程通过exchange方法交换数据,如果一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange方法当两个线程都达到同步点时,这个两个线程可以进行交换数据,将本线程生产出的数据传递给对方。

    ForkJoin并行计算(jdk1.7开始)

    是Java7提供的用于并行执行任务的框架,是一个把大任务分成多个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

    提供两个重要方法

    ForkJoinTask

    使用该框架,需要创建一个ForkJoin任务,它提供在任务中执行fork和join操作的机制,一般情况下,我们并不需要直接继承ForkJoinTask类,只需要继承它的子类,它的子类有2个:

    RecursiveAction

    用于没有返回结果的任务

    RecursiveTask

    用于有返回结果的任务

    ForkJoinPool

    任务ForkJoinTask需要通过ForkJoin[Pool来执行

    实现方式

    设置任务阈值

    拆分规则

    二分法

    继承合理的ForkJoinTask

    Master-Worker模式

    常用的并行计算模式,它的核心是系统由两类进行协作工作的:Master进程和Worker进程Master负责接收和分配任务,Worker负责处理子任务,当各个Worker子进程处理完毕后,会将结果返回给Master,由Master做归纳和总结其好处是将一个大任务分解成多个小任务,并行执行,从而提高系统的吞吐量。手写并发组件模拟

    Semaphore

    相关概念

    pv

    网站的总访问量,用户没刷新一次就会被记录一次

    uv

    访问网站的一台电脑客户端为一个访客,一般指00-24之间相同ip的客户端记录

    qps

    每秒查询数,qps很多程度上代表系统业务上的繁忙程度,每次请求的背后,可能对应着多次磁盘IO。多次网络请求,多个cpu时间片。我们通过qps直观了解系统业务情况,一旦超过设置的警戒阈值,可以考虑增加机器对集群扩容,以免压力过大导致宕机,可以根据前期的压力测试预估值,结合后期的综合运维情况来估算出阈值。

    qts

    修改操作

    rt

    请求响应时间,这个指标直接说明前端用户的体验,任何系统设计师都想降低rt时间还涉及到cpu、内存、网络、磁盘等情况,更多细节的问题很多,如:select、update、delete/PS等数据库层面的统计容量评估:一般通过开发、运维、测试、以及业务人员综出系统的一系列阈值,然后我们根据关键阈值如qps rt 等,对系统进行有效的变更。

    限流策略

    多轮压测后峰值阈值 进行80/20原则,即80%的访问请求将在20%的时间内完成,这样我们可以根据系统对应的pv计算出峰值qps。峰值qps=(总pv80%)/ (60602424*20%)r然后再将总的峰值除以单台机器所能承受的最高qps值,就是所需机器数量。机器数量=总的峰值qps/压测得出的单机极限qps总峰值适当上调预留20%

    guava对应实现

    线程池

    作用

    管理控制

    管理线程的生命周期,对每个环节有一个把控

    系统资源

    可以合理控制线程数量,根据任务多少对线程个数增减,回收空闲线程,减少线程频繁创建于销毁,避免不必要的系统开销,节省系统资源,保证稳定性

    应用性能

    配合高并发容器的设置,对任务和工作项进行缓存,异步的多线程去处理任务,从而提高服务的吞吐量,消费性能,也提高了单个线程的利用率

    兜底策略

    从健壮性考虑,线程池提供多种拒绝策略,可以在任务过多处理不过来时,进行有效的拒绝策略、降级方案、以补偿形式进行任务处理,避免因为线程池的问题对系统产生较为严重的问题。

    jdk提供的实现

    Executors线程工厂

    newFixedThreadPoolnewSingleThreadExecutornewCacheThreadPoolnewScheduledThreadPool

    自定义线程池

    无法满足我们的需求,可以自己实现自定义线程池,其实Executor工厂类里面创建的线程池方法内部都是用了ThreadPoolExecutor这个类,这个类可以用来自定义线程池

    核心线程数最大线程数空闲时间时间单位任务阻塞队列线程工厂(自定义线程)拒绝策略

    队列类型参数和任务执行方式相关

    在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,若大于corePoolSize,则会将任务添加入队列,若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,若线程数大于maximumPoolSize,则执行拒绝策略

    执行顺序:核心->max->队列

    在使用无界队列时,LinkedBlockingQueue。与有界队列相比,除非系统资源耗尽,否则无界队列不存在入队列失败的情况,但有新任务到来时,系统的线程数小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加。若后续仍有新任务加入,而没有空闲的线程资源,则任务直接入队列等待,若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。

    与corePoolSize相关,max不起作用执行顺序:核心->队列

    拒绝策略

    JDK默认提供

    AbortPolicy

    直接抛出异常阻止系统正常工作

    CallerRunsPolicy

    只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务

    DiscardOldestPolicy

    丢弃最老的一个请求,尝试再次提交当前任务。

    DiscardPolicy

    丢弃无法处理的任务,不做任何处理

    自定义策略

    如果需要自定义策略可以实现RejectedExecutionHandler接口

    合理使用线程池

    要点

    线程个数大小的设置

    计算密集型(多核)运算

    线程数=cup核数+1 、也可以cpu核数*2(超线程)参考有几处使用 线程池的数

    IO密集型(阻塞)

    网络传输、数据库、缓存都涉及IO,合理设线程数避免线程切换。线程数=cpu核数 /(1-阻塞系数)这个阻塞系数一般为0.8~0.9 (final int poolSize = (int)(cpuCore/(1-0.9))

    线程池相关参数设置

    不用选择没有上限的配置项

    不要使用没有上限的线程池和设置无界队列

    newCachedThreadPool无界队列和无限制线程数

    合理限制线程池数量和线程空闲回收时间,根据具体的任务执行周期和时间去设置,避免频繁回收和创建,在考虑系统性能和吞吐量,也要考虑系统稳定性

    根据实际场景选择合理的拒绝策略,使用自定义策略进行兜底。

    利用Hook嵌入你的行为

    记录线程的执行轨迹ThreadPoolExecutor提供protected类型可以被覆盖的钩子方法,允许用户在任务执行前和执行后进行一些处理,我们可以通过它实现初始化ThreadLocal、收集统计信息,如:日志记录等,这类Hook如beforeExecute和afterExecute,另外还有一个Hook可用来在任务执行完的时候让用户插入逻辑,如rermineted如果Hook方法执行失败则内部的工作线程的执行将会失败或中断

    线程池的关闭

    当线程池不在被引用并且工作线程数为0时候,线程池将被终止,我们可以调用shutdown来手动终止线程池。如果忘记调用,为了让线程资源被释放,我们还可以使用keepAliveTim和allowCoreThreadTimeOut来达到目的稳妥的方式是使用虚拟机Runtime.getRuntime().addShutdownHook方法手动调用线程池的关闭方法

    XMind - Trial Version

    Processed: 0.010, SQL: 8