Java并发

最初的计算机同一时间只能运行一个程序,即一个程序占用了整个计算机的资源,这样必然导致三个问题:资源利用率,公平性(用户和程序共享计算机资源),便利性(原本要在一个程序里解决所有问题,现在可以分解到多个程序里去解决),这些问题促使了多进程操作系统的出现。多线程也是基于类似的原因而产生的。

多线程的优劣

多线程的主要优势:

  1. 提高资源利用率
  2. 提高吞吐量,如单线程因为IO阻塞而导致吞吐率低下,而在多线程下能提高吞吐率
  3. 通过多任务形式可以简化复杂任务的处理

多线程引入的问题:

  1. 线程安全问题,出现的原因是需要对共享的(Shared)和可变的(Mutable)状态访问
  2. 活跃性问题,如死锁、活锁、饥饿
  3. 性能问题,频繁的上下文切换,同步机制抑制编译器优化。

要玩好线程,就必须懂得很好地解决以上三个问题

线程安全问题关注的是“永远不发生糟糕的事情”,活跃性问题关注的是“某个正确的事情最终会发生”,性能问题关注的是“正确的事能尽快发生”

线程安全问题

如果一个类是线程安全的,那么当多个线程访问这个类时,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为(满足规范中定义的各种不变性条件以及后验条件)。这就是线程安全的定义了。

导致线程安全问题的最根本原因是需要对共享的(Shared)和可变的(Mutable)状态访问。“共享”意味着变量可以由多个线程同时访问,而“可变”则意味着变量的值在其生命周期内可以发生变化。要令对象是线程安全,有三种方式:

  1. 不在线程之间共享该状态变量。这种方式在技术上称为线程封闭,实现方式有串行线程封闭方式、ThreadLocal和局部变量,使用享元模式复制一份对象镜像。
  2. 将状态变量修改为不可变的变量。不可变对象本身就是线程安全的
  3. 使用同步机制来协同对对象共享可变状态的访问,同步机制包括Synchronized内置锁、volatile类型的变量、显式锁(Explict Lock)以及原子变量。

何时应该使用同步机制来确保线程安全呢?

答:同步机制主要应对线程安全所引起的两类问题:竞态条件(Race Condition)和不变性条件的一致性。竞态条件描述的是通过一个可能失效的观测结果来决定下一步的动作,即“先检查后执行(Check-Then-Act)”,竞态条件会破坏线程安全,而不变性条件的一致性是指在任何时刻,线程观察到的不变性条件都不能被破坏。引起竞态条件(Race Condition)和不变性条件的一致性问题的都是一些复合操作,因此,当复合操作会引起竞态条件(Race Condition)和不变性条件的一致性问题时,就应该使用同步机制。

为什么同步机制能保证线程安全呢?

答:同步机制保证了复合操作的原子性和内存可见性。原子性是指一个线程在进行复合操作的过程中,不会受到其他线程的影响,即同一时刻,线程独占复合操作。可见性是指一个线程对共享状态变量的修改能被其他线程获取。

相同竞态条件应使用同一个锁来保护

相同的不变性条件应该使用同一个锁来保护

每个Java对象都可以用做一个实现同步的锁,这些锁称为内置锁或监视器锁,是一种互斥锁。

对象封装得越好,越容易实现对象的线程安全性。但面向对象的抽象和封装会降低程序的性能,有时可以打破封装以提高性能,但前提是先保证代码正确运行。

内存可见性

一般我们只会注意到同步机制提供了原子性特性,往往忽略了其很重要的另一个特性——内存可见性。

当一个线程需要写入状态变量,而另一个线程需要读取该状态变量,在缺少同步机制的情况下,读取到的状态变量可能存在多种情况,可能是读取到之前的值,也可能是其他奇怪的值。如下面的代码:

public class NoVisibility {
	private static boolean ready;
	
	private static int number;
	
	private static class ReaderThread extends Thread {
		public void run(){
			while (!ready){
				Thread.yield();
				System.out.println(number);
			}
		}
	}
	
	public static void main(String[] args){
		new ReaderThread().start();
		number=42;
		ready=true;
	}
}

上面的代码可能出现多种不可预测的情况:

  • 读线程会无限循环下去,因为读线程可能永远都看不到ready的值。
  • 可能会输出0,因为读线程可能看到了写入ready的值,却没有看到之后写入number的值,这种现象称为“重排序”

因此,只要有数据在多个线程之间共享,就要使用正确的同步机制

加锁机制既可以确保可见性又可以确保原子性,但volatile变量只能确保可见性

何时才应该使用volatile变量呢?

  • 对变量的写入操作不依赖变量的当前值,或者你能确保只有单个线程更新变量的值。——即不会发生竞态条件
  • 该变量不会与其他状态变量一起纳入不变性条件中。——即不属于不变性条件范围
  • 在访问变量时不需要加锁。

volatile的使用是有一定的局限性的,虽然正确使用volatile可以使线程安全,但线程安全并不意味着合理,看下面的例子:

volatile boolean shutdownRequested;

public void log(String msg) throws InterruptedException {
	if(!shutdownRequested){
		queue.put(msg);
	}else{
		throw new IllegalStateException("logger is shutdown");
	}
}

上面代码是日志服务生产日志的代码实现,虽然整个代码是线程安全的,但意图通过shutdownRequested来停止生产,却会出问题。shutdownRequested和queue.put()出现了竞态条件,可能因为queue已满而导致put操作阻塞,这样生产者线程将一直阻塞。虽然这里可以通过中断生产者线程来退出阻塞,但一般这样的生产者线程是很难中断的,因为中断线程的操作应该由线程的所有者发起的(请看任务取消与线程中断一节),出现这个问题的原因是volatile只保证了可见性,而这里还需要依赖原子性操作。

不可变对象

满足下列条件的对象才是不可变对象:

  • 对象创建以后其状态就不能修改
  • 对象的所有域都是final类型的
  • 对象是正确创建的(在对象的创建期间,this引用没有逸出)

final域有两层含义:1. final域是不能修改的;2. final域能确保初始化过程的安全性,从而可以不受限制地访问不可变对象,并在共享这些对象时无须同步。

即使某个对象的引用对其他线程是可见的,也并不意味着对象内部状态对于使用该对象的线程来说一定是可见的,为了保证一致性视图就需要使用到同步机制。但是,不可变对象的final域初始化安全性能保证对象内部状态的可见性,因此,任何线程都可以在不需要额外同步的情况下安全地访问不可变对象。

不可变对象有点类似于DDD中值对象的概念,或者也可以理解为一种新的基本类型,就像int,boolean类型一样,因此,不可变对象+volatile也可以实现线程安全。

下面给出一个示例,说明如何通过不可变对象和volatile来消除锁:

@Immutable
class OneValueCache{
	private final BigInteger lastNumber;
	private final BigInteger[] lastFactors;
	
	public OneValueCache(BigInteger i,BigInteger[] factors){
		lastNumber=i;
		lastFactors=Arrays.copyOf(factors,factors.length);
	}
	
	public BigInteger[] getFactors(BigInteger i){
		if(lastNumber == null || !lastNumber.equals(i)){
			return null;
		}else{
			return Arrays.copyOf(lastFactors,lastFactors.length);
		}
		
	}
}

public class VolatileCachedFactorizer implements Servlet {
	private volatile OneValueCache cache=new OneValueCache(null,null);
	
	public void service(ServletRequest req,ServletResponse resp){
		BigInteger i = extractFromRequest(req);
		// -- 从这里开始没有实现原子性 -- //
		BigInteger[] factors=cache.getFactors(i);
		if(factors==null){
			factors=factor(i);
			cache=new OneValueCache(i,factors);
		}
		// -- 到这里结束没有实现原子性 -- //
		encodeIntoResponse(resp,factors);
	}
}

正如volatile的使用限制所说的一样,要使用不可变对象+volatile的方式消除锁,只适用于不可变对象的替换不依赖于原有不可变对象当前值,并且不可变对象不再与其他变量组成不变性条件。在该示例中,数字与其因子列表出成了不变性条件封装在一个不可变对象OneValueCache中,而OneValueCache的替换并不依赖于原有的OneValueCache的值,所以这里的实现消除了锁还保证了线程安全。

串行线程封闭

串行线程封闭是指一个对象总是只能被单个线程拥有,在原线程不再需要对其访问时能通过安全的方式,将该对象的所有权转移到另一个线程。这里“安全的方式”需要一个大前提和一个中间件,大前提是所有权转移之后,原线程不再对该对象访问,而中间件则包括有阻塞队列,对象池,ConcurrentMap.remove(),AtomicReference.compareAndSet(),通过这些中间件来安全地传递。

串行线程封闭是线程封闭技术中的一种,它其实就是通过不在线程间共享对象的方式来实现线程安全的。

安全发布

在这里,安全发布的含义是在对象首次发布时,对象的引用以及对象的状态必须同时对其他线程可见。因为存在可见性问题,因此才需要关注对象安全发布。

一个对象可通过以下方式安全地发布:

  • 在静态初始化函数中初始化一个对象引用。
  • 将对象的引用保存到volatile类型的域或者AtomicReferance对象中。
  • 将对象的引用保存到某个正确构造对象的final类型域中
  • 将对象的引用保存到一个由锁保护的域中
  • 通过线程安全容器发布

不可变对象可以通过任意机制来发布

事实不可变对象必须通过安全方式来发布。

可变对象必须通过安全方式来发布,并且必须是线程安全的或者由某个锁保护起来。

安全暴露1内部状态

一个线程安全类使用以下方式来暴露其内部状态时,将可能使其变得不再安全:

  • 通过非private的方式在域中发布可变的共享对象
  • 通过非private的方法,返回(发布)可变的共享对象

以上问题可以使用以下方式来解决:

  • 使用享元模式,复制一份可变的共享对象的拷贝再进行发布,这里实际使用了线程封闭技术中其中一种方式,即令对象不共享
  • 通过封装,令可变的共享对象成为不可变对象,如Map unmodifyMap=Collections.unmodifiableMap(map);,在这种方式下,不能基于unmodifyMap实例去修改它的结构,但可通过map去修改unmodifyMap的结构,而且在这种方式下unmodifyMap中存储的对象也应该是不可变对象。即令对象不可变
  • 如果可变的共享对象自身是线程安全的,并且没有任何不变性条件来约束它的值,在操作上也不存在任何不允许的状态转换时,就可以安全暴露

当一个类只由多个线程安全类组合而成的时候,并不意味着这个类就是线程安全的,除非这些状态变量是相互独立的(即它们之间不存在关联的不变性条件)

安全扩展——扩展现有线程安全类

当我们需要在现有的线程安全类基础上添加新的功能,而新的功能是基于该线程安全类的复合操作之上时,就会涉及到原子操作的问题,如在一个线程安全的链表上添加“若没有则添加”的功能。

在进行安全扩展之前,必须要清晰地了解原有线程安全类的同步策略(使用了哪些锁,有哪些不变性条件和后验条件),并且在扩展时要保证与原线程安全类使用相同的同步策略(相同的锁)。安全扩展一般有以下方式:

  • 直接在源码上修改,因为使用的是同一份代码,所以很容易保证使用的是相同的同步策略。
  • 通过类继承的方式扩展。这种方式的缺点是必须知道原有线程安全类的同步策略,并保证使用相同的同步策略,即使这样,也可能会因为原线程安全类的同步策略变更,而导致使用了不同的同步策略。
  • 客户端加锁,即在客户端使用时才对其进行加锁。这种方式与类继承的方式有点类似,缺点也是一样的。
  • 通过组合/装饰模式,使用一个新的锁来重新封装原线程安全类。优点是不需要了解原线程安全类的同步策略,缺点是会引入新的锁,会有微小的性能损耗。

线程安全的工具类

同步容器

同步容器类包括Vector和Hashtable,以及通过Collections.synchronizedXxx等的同步封装器类。这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。

虽然同步容器类都是线程安全的,但要在其上执行复合操作时,也需要同步机制来保证原子性。迭代操作(遍历元素)、跳转(通过当前元素找下一个元素)以及条件运算(若没有则添加),这些操作都属于复合操作,当执行这些复合操作过程中,有其他线程并发地修改容器,将出现意料之外的行为,如迭代过程中并发修改容器会导致ConcurrentModificationException异常。抛出异常并不意味着同步容器类不是线程安全的,只是对于调用者来说,这些简单的操作抛出异常不是人们所期望的。要解决这些复合操作的原子性问题,就需要通过同步机制来保证原子性。

迭代器使用了“及时失败”的处理机制:当它们发现在迭代过程中被修改了,就抛出ConcurrentModificationException异常。如果不希望通过加锁来保证迭代过程中的原子性,可以通过“克隆”容器的方式来处理迭代。

并发容器

Java 5.0中提供了多种并发容器,它们是为了改善同步容器的性能问题而设计的。如:

  • ConcurrentHashMap是用来替代同步且基于散列的Map(Hashtable)
  • CopyOnWriteArrayList用于替代同步的List(Vector),遍历操作为主要操作的场景
  • LinkedBlockingQueue和ArrayBlockingQueue与LinkedList和ArrayList类似,但比同步List有更好的并发性能。
  • PriorityBlockingQueue则是按优先级排序的队列
  • ConrrentSkipListMap替代同步的SortedMap
  • ConrrentSkipListSet替代同步的SortedSet

并发容器类同时还增强了同步容器类:

  • 并发容器类不会抛出ConcurrentModificationException,因此不需要在迭代过程中加锁。ConcurrentHashMap返回的迭代器具有弱一致性,而非“及时失败”,弱一致性的迭代器可以容忍并发修改,当创建迭代器时会遍历已有的元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。
  • 提供了一些便利的复合操作,如putIfAbsent

ConcurrentHashMap使用了分段锁(Lock Striping)技术,使得锁的粒度更细(每个锁只控制一个散列桶),提高了并发性和伸缩性。

CopyOnWriteArrayList在每次修改时,都会创建并重新发布一个新的容器副本,迭代器保留一个指向底层基础数组的引用,由于这个数组不会被修改,因此对其进行同步时只需确保数组内容的可见性,这样迭代过程不会受其他线程修改容器的干扰,因此不会抛出ConcurrentModificationException。

仅当迭代操作远远多于修改操作时,才应该使用“写时复制”容器。否则每次的修改操作都要复制底层数组数据,造成很大的性能开销。

LinkedBlockingQueue、ArrayBlockingQueue和PriorityBlockingQueue作为阻塞队列的实现,主要应用于消费者模式,put/take方法是永久阻塞方法,offer/poll方法是定时阻塞方法,使用offer/poll方法能使用更灵活的策略来处理负荷过载的情况,如将过多的工作项序列化并写入磁盘,减少生产者线程的数量,或者通过某种方法来抑制生产者线程。

SynchronousQueue也是BlockingQueue的实现,但它并不是一个真正的队列,可以说它根本就不存在中间队列,它有点类似手把手传递物件的情形,路人甲想把物件传递到路人乙,但此时路人乙腾不出手来接收物件,路人甲只能等待路人乙腾出手时才能传递下一个物件。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。

ArrayDeque和LinkedBlockingDeque是实现了Deque或BlockingDeque的双端队列。双端队列适用于工作密取(Work Stealing)模式。在消费者模式中,所有消费者共享同一个工作队列,而在工作密取模式则每个消费者都有一个双端队列,这样可以减少消费者都在同一个工作队列内的竞争,具有更高的可伸缩性。工作密取模式非常适用于既是消费者也是生产者问题。

阻塞和中断

线程阻塞或暂停执行的原因有多种:

  • 等待IO操作结束
  • 等待获得一个锁
  • 等待从Thread.sleep方法中醒来
  • 等待另一个线程的计算结果

阻塞操作与执行时间很长的普通操作的差别在于,被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行,例如等待IO操作完成,等待某个锁变成可用或者等待外部计算结束。

Thread提供了interrupte方法用于中断线程,但这种中断仅仅是一种协作机制,它不能强制停止正在运行的线程,只是仅仅要求线程在执行到某个可以暂停的地方停止正在执行的操作——前提是该线程愿意停止下来。当某个方法抛出InterruptedException时,表示该方法是一个阻塞方法(可中断的方法),如果这个方法被中断,那么它将努力提前结束阻塞状态。

处理InterruptedException异常的方式一般有两种:

  • 传递InterruptedException,让高层代码去处理中断异常
  • 恢复中断。在Runnable中是不能抛出InterruptedException的,因此在捕获InterruptedException异常时,调用Thread.currentThread().interrupt();恢复异常状态,让高层代码去处理中断异常

同步工具类

同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。同步工具类在类库中包括阻塞队列,信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。

闭锁

闭锁描述的是在某个结束状态到来之前,所有线程都不能通过这个锁,一旦达到结束状态,所有线程都允许通过该锁,而且该锁的状态不再发生改变。闭锁的例子有CountDownLatch以及FutureTask。

从FutureTask实现的接口Runnable和Future可以看出它有两层语义:

  • 可计算/可执行,基于Runnable接口
  • 会有结果的,基于Future接口

因此,FutureTask表达了一种抽象的可生成结果的计算。

FutureTask有三种状态:等待运行,正在运行和运行完成。运行完成表示计算的所有可能结束方式:正常结束、由于取消而结束和由于异常而结束。当FutureTask进入完成状态后,它会永远停止在这个状态上。

基于FutureTask的这些特性,FutureTask可用于提前计算,延迟计算和缓存计算结果的场景。

信号量

信号量用来控制同时访问某个特定资源的操作数量,或者执行某个指定操作的数量,可以用来实现某种资源池,或者对容器施加边界。

Semaphore中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可,并在使用以后释放许可。如果没有许可,那么acquire将阻塞直到有许可。

栅栏

栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用。

Exchanger是一种两方(Two-Party)栅栏,各方在栅栏位置上交换数据。Exchanger的实现实际上使用了串行线程封装技术中的CAS方式,在线程间安全地交换对象的。

任务执行

多线程优劣一节中提到,使用多线程有几大好处:提高资源利用率、提高吞吐量、简化复杂任务,但这些优势都建立在“多线程合理运用”这一大前提下的,为什么这样说呢?如果我们通过无限创建线程的方式来执行任务的话,将会引起资源消耗过大,导致系统不稳定等问题。

“多线程合理运用”包含两个方面,一方面是需要找出清晰的任务边界,另一方面使用Executor框架作为任务执行策略。

在理想情况下,各个任务之间是相互独立的:任务并不依赖于其他任务的状态、结果或边界效应。

在java中,线程池基于Executor框架。虽然Executor是个简单的接口,但它为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。

延伸阅读——什么样的对象需要池化呢?

答:对象创建/销毁的开销大,对象资源消耗大,对象存在无限增长的可能性并最终会影响到系统稳定。

Executor框架基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程则相当于消费者。

Executor框架

虽然我们在日常工作中都会使用到Executor框架,但真正能把它用好的人估计并不多,下面通过解答Executor框架的常见问题的方式来深入认识它吧。

线程、工作队列、Reject策略之间的关系是怎么样的呢?

线程池有几大关键参数,这些参数可以从ThreadPoolExecutor的构造函数中看出:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize:线程池的核心线程数
  • maximumPoolSize:线程池线程上限,maximumPoolSize >= corePoolSize
  • keepAliveTime:空闲线程的存活时间
  • unit:时间单位
  • workQueue:工作队列
  • threadFactory:线程池工厂
  • handler:Reject策略处理器

一个新创建的Executor,它内部并不存在任何线程。当向Executor提交一个任务,如果此时活跃的线程数小于corePoolSize2,将创建新的工作线程并立即执行任务,当活跃线程数大于或等于corePoolSize的时候,Executor将把任务通过Queue.offer()的方式把任务提交至工作队列,等待空闲的core线程从工作队列中获取任务并执行。如果Queue.offer()失败,并且当前活动线程数小于maximumPoolSize时,将创建新的工作线程执行任务。当Queue.offer()失败而且活动线程数达到了最大值后,将执行Reject策略(另一种导致执行Reject策略的情况是Executor处于关闭状态)。

基于以上对Executor任务提交过程的描述,Executors.newCachedThreadPool()实现使用的工作队列是SynchronousQueue:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

SynchronousQueue的特性是当没有消费者在等待poll()操作的时候,offer()操作是会失败的。因此,当Executor没有空闲的线程时(所有线程都在执行任务),将自动地创建新的工作线程来执行任务,这样CachedThreadPool就实现了线程的弹性扩展了。

Executor的三个生命周期所描述的状态是什么?

Executor的生命周期包括:运行、关闭(shutdown)、终止(terminate)。通过调用ExecutorService的shutdown()或shutdownNow()方法进入关闭状态,在进入关闭状态之后,Executor内所有任务都执行完成或被中断之后,进入终止状态。

如何优雅地关闭Executor?

如果不能很好地控制Executor的关闭过程,将有可能使系统处于糟糕的状态。Executor存在着以下几种关闭过程:

  • 平缓地关闭(shutdown())。只关闭任务提交的入口,并平缓地等待已提交任务的执行完成,但可能出现剩余任务执行时间长或死锁导致应用关闭的响应时间太长。这种方式只能通过保证任务粒度尽可能小并确保不会出现死锁,尽快结束线程占用,以快速响应结束。
  • 中断式关闭(shutdownNow())。取消掉还未开始执行的任务,并尝试中断正在执行的任务。这种方式要求任务实现良好的中断策略。
  • 强制终止(kill -9或断电方式)。这种方式必须保证未执行完成的任务可以重新执行,并保证重复执行的幂等。

线程池执行异构任务会出现什么问题?

假如在一个线程池中同时执行IO密集型任务和计算密集型任务,两类任务的执行响应时间相距较大(假如IO密集型任务响应时间远大于计算密集型任务),在高负载的情况下将使得线程池中所有线程都阻塞在IO操作中,而无法处理计算密集型任务,导致计算密集型任务响应慢。另外,当异构任务存在依赖关系时,将会导致饥饿死锁的发生。因此,为了使并发性能最好和避免饥饿死锁,应当尽量避免在同一个线程池中执行异构任务或有依赖关系的任务。

同一个线程池中执行有依赖关系的任务时,会出现什么问题?如何解决?

在一个有界的线程池中执行有依赖关系的任务时,可能会出现池中所有线程都在执行有依赖的任务,而被依赖的任务没有空闲线程执行,而导致池中所有线程都在等待被依赖任务的执行结果,这种现象称为饥饿死锁。解决这个问题的方法要么线程池是无界的,要么将有依赖关系的任务隔离到不同的线程池中执行。

在线程池中调度使用了ThreadLocal的任务,如何避免在多个任务之间传递值?

ThreadLocal使每个线程都可以拥有某个变量的一个私有“版本”,由于线程池中的线程是可以复用的,即意味着一个线程可能会执行多个任务,这就会导致ThreadLocal在多个任务之间传递值了。要避免这个问题,则要求ThreadLocal的生命周期受限于任务的生命周期了。

如何为某个任务确定线程池的大小?

线程池配置过大,将消耗更多内存和引入不必要的竞争,配置过小,又无法充分利用资源和降低了吞吐率。下面给出按CPU分配线程池大小的计算公式:

  • $N_{cpu}$:cpu的核心数量,可通过Runtime.getRuntime().availableProcessors()获取
  • $U_{cpu}$:期望的cpu利用率,介于0~1之间
  • $W$:无并发时,任务执行期间花在阻塞上的时间
  • $C$:无并发时,任务执行期间花在计算上的时间

除了CPU资源之外,其他资源也会影响线程池的大小,如内存、文件句柄、套接字句柄、资源池(如数据库连接池),这些资源的计算就相对简单一些了,只需要用资源总量除以每个任务的需求量即可。

Java库中提供的饱和策略有哪些?

  • AbortPolicy:默认策略。当饱和时直接抛出RejectedExecutionException
  • DiscardOldestPolicy:丢弃下一个将要执行的任务,并把当前任务推送到任务队列。
  • DiscardPolicy:什么也不做,只是静悄悄地丢弃任务而已
  • CallerRunsPolicy:不在线程池中执行该任务,而是在调用者线程中执行。

任务队列有哪些?何时使用哪种队列呢?

对于任务队列来说,大致分为三种:无界队列、有界队列和同步移交(Synchronous Handoff)。

无界队列意味着将可能消耗无限大的内存资源,而实际上根本不可能存在无限大的内存资源,因此无界队列在正式环境很少使用,或者说如果要用到无界队列,那么将是糟糕的设计。

有界队列一般用在可预测的流量场景,并且不希望使用过多的线程以免造成剧烈竞争或资源消耗时(不需要太大的吞吐量时),可以使用有界队列。即使流量超过有界队列时,也可以通过饱和策略来处理。

同步队列则用在期望任务能尽快执行,而且线程池的大小能达到一定的量级(需要较大的吞吐量时),就应该使用同步队列。

因此,在正式环境中,一般只使用有界队列和同步队列。当不需要太大的吞吐量时,适宜使用有界队列,而吞吐量是主要考虑要素时,则应使用同步队列。

ThreadPoolExecutor和AbstractExecutorService有哪些可扩展的方法?这些扩展方法如何使用呢?

AbstractExecutorService提供了newTaskFor()方法的扩展,可以让用户自定义RunnableFuture的实现。

ThreadPoolExecutor提供了三个扩展方法:beforeExecute、afterExecute和terminated。分别用于在任务执行前、执行后、以及Executor处于终止状态时回调的。

Executor框架存在的必要性

Executor框架最大的作用是提供了任务执行策略,如任务在哪个线程中执行、任务执行的顺序、允许多少个任务并发执行、允许多少任务等待执行、在执行任务前后应该进行哪些操作等等,使得开发者能更容易做到合理地使用多线程并发技术。

Timer和ScheduledThreadPoolExecutor的区别与实现

一个Timer实例下执行的所有定时任务都只会在一个线程下执行,如果某个任务的执行时间过长,会影响到下一个任务的正确执行时间,另外,如果某个定时任务抛出了未检查异常,Timer线程并不捕获异常,最终导致定时线程终止,而其他等待执行的定时任务将无法执行了。而ScheduledThreadPoolExecutor正是用于取替Timer,解决Timer中的问题的。ScheduledThreadPoolExecutor支持多个定时任务并发执行,而且提供了对未检查异常的处理,在线程终止之后重新创建线程进行任务调度。

如何理解Future?

Future表示一个任务的生命周期:待执行、执行中、执行完成。有三个原因可能导致任务执行完成:正常完成、因取消导致中断而完成、因异常而完成。另外,Future也隐含了任务的生命周期只能前进,不能后退,当某个任务完成后,它就永远停留在“完成”状态上。如果Future.get()抛出了ExecutionException,可通过getCause()获取初始异常。

ExecutorCompletionService的实现

ExecutorCompletionService可以将已经执行完成的任务以Future的形式保存到阻塞队列中,方便获取任务执行结果。这里消费线程既是消费者,也是生产者了,因为消费完任务后又产生了新的结果到队列中了。ExecutorCompletionService的关键实现代码:

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

	public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成“拥塞”。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成死锁。

取消任务与关闭线程

线程启动以及任务的提交执行都很容易,而且在大多数时候都让它们运行直至结束。但如果需要提前结束任务或线程,使其能安全、快速、可靠地停止下来呢?这就并不是那么容易了,而且这个非常重要的程序设计要素往往被忽略了。

一个行为良好的软件与勉强运行的软件之间的最主要区别是,行为良好的软件能很完善地处理失败、关闭和取消等过程。

在Java中提供了中断协作机制,使一个线程能够终止另一个线程当前的工作。这种协作机制是合理的,协作机制不会立刻强制停止一个线程或任务的执行,否则很可能导致数据结构处于不一致的状态,它只是简单地修改线程为中断状态,在该线程上执行的任务可以在某个时刻去检测中断状态,并决定如何取消任务的执行,另外,线程的所有者也可以根据中断状态或中断异常来执行线程中断策略。

任务取消策略

一个可取消的任务必须拥有取消策略(Cancellation Policy),在这个策略中将详细地定义取消操作的“How”、“When”以及“What”,即其他代码如何(How)请求取消该任务,任务在何时(When)检查是否已经请求了取消,以及在响应取消请求时应该执行哪些(What)操作。

线程中断机制是实现任务取消的最合理方式,这完全是因为Java可中断的阻塞操作都是基于线程中断机制(约定俗成)。自定义的取消机制无法触发Java可中断阻塞操作的响应,除非自定义的取消机制包含有线程中断机制。

当一个任务检测到中断请求或中断异常时,在完成必要的中断数据恢复操作之后,要么传递中断异常,要么恢复中断状态,这是因为线程的所有者也需要线程的中断。只有实现了线程中断策略的代码(线程所有者)才可以屏蔽中断请求或中断异常,在常规任务中绝对不能屏蔽中断请求或中断异常。

线程中断策略

线程中断策略规定了线程所有者如何解释某个中断请求——当发现中断请求时,应该做哪些工作,哪些工作单元对于中断来说是原子操作,以及以多快的速度来响应中断。

中断请求也应该只能由线程所有者发起(除非你知道中断对该线程的含义),因为除了线程所有者,其他代码很难或根本不可能知道中断对线程来说意味着什么,例如,非线程所有者希望中断在某个线程上正在执行的任务而发起了中断请求,但这时候任务可能已经结束,而该线程已经在执行其他任务了。所有者可以将线程的中断策略信息封装到某个合适的取消机制中,例如关闭(shutdown)方法。

可能会有同学会问,既然中断请求只能由线程所有者发起,但Future.cancel(true)方法也能发起中断请求啊,难道Future是线程的所有者吗?好像不是啊,线程的所有者应该是线程池对象才对啊,那么为什么我们可以通过Future来发起中断请求呢?这个问题的答案是Future不是线程的所有者,而且只有在使用Executor框架下返回的Future或者使用者很清楚FutureTask所在线程的中断策略的情况才可以调用Future.cancel(true)方法发起中断请求,否则,其他情况下都不应该调用Future.cancel(true)来发起中断请求。至于为什么有这样的答案,我将会在另一篇文章《Future深度剖解》中解答。

在这里“线程所有者”的含义并不是简单地指持有线程的对象,而应该是创建并持有线程的对象,如线程池,它既负责线程的创建,同时也持有线程。

不可中断的阻塞

在Java库中,许多可阻塞的方法都是通过提前返回(TimeoutException)或者抛出InterruptedException来响应中断请求的,但并不是所有的可阻塞方法或阻塞机制都能响应中断,下面列出这些不可响应中断的阻塞以及如何让这些阻塞操作快速中断的办法:

  • 同步的Socket I/O操作,可以通过关闭底层套接字(close())使执行read或write等方法而被阻塞的线程抛出SocketException
  • InterruptibleChannel,中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptException并关闭链路,而且其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException。当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程都抛出AsynchronousCloseException。大多数标准的Channel都实现了InterruptibleChannel。
  • Selector的异步I/O,如果线程在调用Selector.select方法阻塞了,那么调用close或wakeup方法会使线程抛出ClosedSelectorException并提前返回。
  • 内置锁,如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,但Lock类中提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。

如果要使以上不可中断的阻塞也能响应中断,有两种封装办法:

  1. 扩展Thread的interrupt方法,在interrupt方法中处理不可中断的阻塞(如调用Socket.close)
  2. 扩展ThreadPoolExecutor.newTaskFor()和Future.cancel()方法,在Future.cancel()方法中能处理不可叫断的阻塞(如调用Socket.close)

线程所有者

封装线程的正确原则是——只有线程所有者才能操控该线程,如中断线程或者修改线程的优先级!!!与其他封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务也可以拥有工作线程,但应用程序并不能拥有工作线程,因此应用程序不能直接停止工作者线程。相反,服务应该提供生命周期方法(Lifecycle Method)来关闭它自己以及它所拥有的线程。这样,当应用程序关闭该服务时,服务就可以关闭所有的线程了。在ExecutorService中提供了shutdown和shutdownNow方法。同样,在其他拥有线程的服务中也应该提供类似的关闭机制。

上面这段描述可以理解为完全基于面向对象特性——封装而提出的。封装要求隐藏对象的内部属性(隐藏线程的操控)并只暴露必要的功能(暴露关闭功能)。因此,从这里也可以领悟到不但是线程只能由线程所有者操控,所有的对象都应该只能由对象所有者操控!!!

“毒丸”对象

“毒丸”对象可应用于终止生产者和消费者,生产者通过投放“毒丸”对象以终止生产,而消费者在收到“毒丸”对象后也终止消费。在生产者和消费斱数量都已知的情况下,就可以使用“毒丸”对象来终止生产/消费。

线程异常终止

在并发程序中,当任务抛出了未捕获的异常时(通常是RuntimeException),将导致线程提前死亡。这种未捕获的异常往往意味着某种编程错误或其他不可修复的错误,处理这种异常其中的一种办法就是在try-catch代码块中调用任务,这样就能捕获未检查的异常,并可以在try-finally代码块中通知框架线程因未检查异常而终止,让框架作出响应,如:

	public void run(){
		Throwable thrown=null;
		try{
			while(!isInterrupted()){
				runTask(getTaskFromWorkQueue());
			} 
		}catch(Throwable e){
			thrown=e;
		}finally{
			threadExited(this,thrown);//通知框架,线程因异常终止,并作相应的处理
		}
	}

另一个处理办法是,在Thread API中提供了UncaughtExceptionHandler,线程由于未捕获的异常而终结时会回调该接口,让程序代码介入异常终止的处理。另外,还需要强调的是UncaughtExceptionHandler依然只有线程所有者才能修改的。UncaughtExceptionHandler与Executor框架结合使用的时候,通过execute提交的任务所抛出的未捕获异常能被UncaughtExceptionHandler处理,但通过submit提交的任务所抛出的未捕获异常,都被封装为ExecutionException,并作为Future异常完成状态。

JVM关闭

JVM关闭方式可以分为两种,一种是正常关闭,另一种则是强行关闭。正常关闭的方式有最后一个非守护线程结束,程序调用了System.exit时,或特定于平台的方法关闭(如发送了SIGINT信号或键入CTRL-C)。而强行关闭的方式则有在代码中执行Runtime.halt或通过操作系统“杀死”进程(kill -9)

关闭钩子

在正常关闭时,JVM首先调用所有已注册的关闭钩子(Shutdown Hook)。关闭钩子是指通过Runtime.addShutdownHook注册的但尚未开始的线程。每个关闭钩子将在独立线程中运行,并与其他未执行结束的线程并发执行,当所有普通线程都终止之后,JVM才会结束。

守护线程

线程分为两种:普通线程和守护线程。普通线程与守护线程之间的差异仅在于当线程退出时发生的操作。当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM会正常退出。当JVM停止时,所有仍然存在的守护线程都将被抛弃——既不会执行finally代码块,也不会执行回卷栈,而JVM只是直接退出。

活跃性问题

按活跃性问题细分,又可以分为四类:死锁、饥饿、活锁、丢失信号。

死锁

有两种常见类型的死锁:顺序死锁(Lock-Ordering Deadlock)资源死锁(Resource Deadlock)

顺序死锁描述的这样的场景:程序会以不同的顺序获取两个或以上的锁,在并发环境下将会造成死锁。

资源死锁的场景有两类:

1. 以不同的顺序从两个或以上资源池获取资源,在并发环境下将会造成死锁。
2. 在同一个线程池中执行具有依赖关系的任务时,将会发生死锁。极端的情况就是在一个任务在与其相同的单线程Executor中提交另一个任务,并需要等待被提交任务在单线程的Executor中执行完成才能继续执行。这种死锁方式也称为**线程饥饿死锁(Thread-Starvation Deadlock)**。

顺序死锁和资源死锁的描述非常相似,事实上我们可以把锁看成是只有一个资源的资源池特例,另外,我们也可以把资源池看作是一个锁,因为仅剩一个资源的资源池也会表现出与锁一样的特性。因此,在分析死锁问题的时候,把资源池作为锁来看待就对了。

如果细心地分析死锁发生的三个场景,可以得出死锁发生的根本原因是两个线程可能在某一个时刻相互依赖于对方持有的资源才能继续执行,这时就会发生死锁。例如,顺序死锁就是相互依赖于对方释放锁资源;线程饥饿死锁,A依赖于B的执行结果(资源),而B又依赖于A释放线程资源。概括一下就是如果存在着循环依赖,必将发生死锁。基于此,可以用有向图来分析死锁,如把每个线程假想为有向图中的一个节点,节点之间的边表示线程A等待线程B所占有的资源,如果这个有向图形成了一条环路,那么就存在着一个死锁。

现在已经对死锁有理论和概念上的认识了,那么在实际编码时,又如何防范死锁的发生呢?

在实际编码中,很容易就会在一个调用链上获取两个以上的锁,特别是在持有某个锁的方法中再去调用外部方法时,这个外部方法很可能也会持有其他的锁,在这种情况下,将很容易导致死锁的发生,而且对顺序死锁的分析将变得异常复杂。在这种情况下,就需要我们遵从开放调用原则——在调用外部方法时,尽量不要持有锁。

如何实施开放调用原则呢?首先,对象封装特性要求对象内部属性组成不变性条件,如果外部方法调用不属于不变性条件范围内,则只需要对内部属性的复合操作进行加锁同步,外部方法调用完全可以不包含在同步块中。其次,如果外部方法调用也在不变性条件范围内时,则应构造一些协议(而不是通过加锁)来防止其他线程进入代码同步临界区,如:

	public void shutdown(){
		if(state.compareAndSet(false,true)){//这里可以防止其他线程进入临界区
			// 调用其他对象的关闭方法,这些外部方法可能存在持有锁的操作
		}
	}

如果不使用上面的实现,shutdown方法也使用同步,将潜在死锁的风险(因为这个过程持有两个锁了),下面给出反面代码:

	public synchronized void shutdown(){
		state=true;
		// 调用其他对象的关闭方法,这些外部方法可能存在持有锁的操作
	}

除了始终贯彻开放调用原则之外,还可以使用支持定时的锁(如Lock.tryLock),使发生死锁时也能恢复过来。

在发生死锁时,可以通过kill -3导出线程堆栈,来分析死锁原因。

饥饿

当线程由于无法访问它所需要的资源而不能继续执行时,就发生了“饥饿(Starvation)”。引发饥饿最常见资源就是CPU时钟周期。如果在Java应用程序中对线程的优先级使用不当。或者在持有锁时执行一些无法结束的结构(例如无限循环,或者无限制地等待某个资源)。

要避免使用线程优先级,因为这会增加平台依赖性,并可能导致活跃性问题。

活锁

活锁的发生不会阻塞线程,但也不能继续执行,因为线程将不断重复执行相同的操作,而且总会失败。

活锁发生的场景如由于代码缺陷导致处理事务消息异常时,又重新放回队列,重新消费也必然再次异常,从而使消息消费无法继续下去。又如两台机器使用相同载波来发送数据包,这些数据包将发生冲突,如果使用相同的时间间隔进行重试,又将重复发生冲突。

性能问题

对性能的理解

提升性能意味着用更少的资源做更多的事情。这里的“资源”含义包括了CPU时钟周期、处理时间、内存、网络带宽、I/O带宽等。当性能由于某种特定的资源而受到限制时,我们通常将该操作称为相应的资源密集型操作,如CPU密集型。

虽然引入线程的主要目的是提高性能,但引入多个线程总会引入一些额外的开销,如线程之间的协调(同步加锁)上下文切换线程的创建和销毁等。

要想通过并发来获得更好的性能,需要努力做好两件事:

* 更有效地利用现有处理资源
* 出现新的处理资源时使程序尽可能地利用这些新资源。——保证有高的可伸缩性

程序的性能可以用两类指标来衡量:多快多少。“多快”描述的是完成一次操作需要多少时间,而“多少”描述的是单位时间内能完成多少次操作。“多快”可以使用服务时间、延迟时间这些指标来衡量,而“多少”则可以使用吞吐量、可伸缩性来衡量。

可伸缩性指的是:当增加计算资源时(如CPU、内存、存储容量或I/O带宽),程序的吞吐量或处理能力相应地增加。

多快多少使用的调优方法是截然不同的。要使程序更“快”,一般是用更小的代价完成相同的工作,如缓存之前计算的结果、用时间复杂度更低的算法。而要让程序能处理更“多”的问题,则要设法将问题的计算并行化,从而能利用更多的计算资源来完成更多的工作。

因此,在讨论并发性能的时候,更侧重于“多少”方面的性能调优,即如何提高程序的可伸缩性。

对于性能优化,必须认清一个事实:优化必然是有代价的。优化通常是通过增加某种形式的成本来降低另一种形式的开销,如增加内存使用量以降低服务时间,增加开销来换取安全性,降低可读性和可维护性换取更快算法,又如增加服务时间来换取更高的可伸缩性(MVC分层架构)。

在进行性能优化之前,必须明确优化的需求。可通过下面这些问题来明确优化需求:

* 优化的衡量指标是什么?是“多快”方面的指标还是“多少”方面的指标
* 优化到什么程度才能满足要求呢?
* 优化的场景是什么呢?是低负载还是高负载?大数据集还是小数据集?
* 要优化的场景发生的频率如何呢?
* 在优化之外的场景是否能使用这些代码?
* 要实现性能代码需要付出哪些隐含的代价呢?

Amdahl定律

Amdahl定律描述的是,程序中串行组件所占的比重,将决定通过增加计算资源所能提高的最高加速比。公式如下:

上面公式中,$Speedup$是提高的加速比,$F$是串行组件的占比,$N$是指计算资源数量(即CPU核心数量)。从公式中可以看出以下几点:

  • 当$N$无穷大时,增加计算资源所能提高的最大加速比为$\cfrac {1}{F}$,即当串行部分占比为10%,则最大加速比为10,即最高只能提高10倍速度。
  • 当$F$无穷小时,可伸缩性将达到无限扩展
  • CPU的平均利用率可通过$\cfrac {Speedup}{N}$计算得出
  • 可伸缩性将受限于程序的串行部分代码,串行操作将降低可伸缩性。

上下文切换和同步阻塞是如何影响性能的

上下文切换过程中,需要保存当前线程执行上下文,以及设置新调度线程的执行上下文,这里就存在一定的开销。为了保证切换上下文之后,应用程序还有可用的cpu执行时间以获取更大的吞吐量,调度器会为每个可运行线程分配一个最小执行时间,如果线程因为剧烈的竞争而发生频繁的线程挂起、唤醒,使线程无法使用完整的最小执行调试时间片,降低了吞吐量之余,还加剧了线程上下文切换带来的开销。

vmstat命令工具能报告上下文切换次数以及在内核中执行时间所占比例,处于可运行状态但并没有运行的线程数量等信息

减少锁的竞争

同步锁必然会导致串行代码的增多,从而降低程序的可伸缩性(Amdahl定律),另外,因为对同步锁的竞争,而导致产生上下文的切换开销影响程序性能。因此,减少锁的竞争能提高程序的可伸缩性和降低上下文切换的开销

** 在并发程序中,对可伸缩性的最主要威胁就是独占方式的资源锁。**

有两个因素将影响在锁上发生竞争的可能性:锁的请求频率,以及每次持有该锁的时间。两者乘积越大,发生竞争的可能性越大。

下面三种方式可以降低锁的竞争程序:

1. 减少锁的持有时间。 即缩小锁的范围,使锁仅保护最小必要代码块

2. 降低锁的请求频率。 通过锁分解或锁分段技术实现。锁分解是指当一个锁在保护多个相互独立的状态变量时(即一个锁保护多个不变性条件),那么可以将这个锁按不变性条件分解为多个锁,从而降低了单个锁的请求频率,减少了竞争的可能性,从而提高伸缩性。

3. 使用带有协调机制的独占锁,这些机制允许更高的并发性。例如使用并发容器、读-写锁、不可变对象以及原子变量。

避免热点域(Hot Field)

热点域是指那些会被并发访问的区域。引入热点域必然会或多或少地引入串行处理代码,因此热点域也必然限制了可伸缩性。

如果再仔细思考一下,热点域才是限制可伸缩性的罪魁祸首,为了解决并发共享状态变量访问安全问题而使用了同步锁的,等待其他线程的执行结果才能继续执行,这里的执行结果也成了线程协调里的热点域

热点域的概念可以很广泛,可以是一个内存区块,可以是一个服务接口,也可以是一个应用,一台计算机,甚至一个集群。当一个热点域无法通过减少其粒度,降低热点域的访问频率时,这就是应用可以达到的最大可伸缩性的状态了。例如,数据库的垂直切分和水平切分,数据库读写分离,redis集群,nginx的负载均衡都可以看作降低热点域的访问频率,以提高可伸缩性的实践。对于一个单一内存区块热点域(如一个全局计数器状态)而言,只能通过垂直扩展方式(加cpu/内存)来提高吞吐量,这是无法通过横向扩展节点的方式提高吞吐量的,因为这个热点域已经是最小的粒度了,无法通过一些有效手段来降低它的访问频率了。另外,对于一些多节点应用,虽然它们号称提供无状态服务(即无热点域),具有很高的可伸缩性,但通常它们都得依赖于一些存储中间件,这些存储中间件往往会成为了热点域(如redis缓存了全局状态变量),从而导致这些无状态的服务可伸缩性会受限于存储中间件的可伸缩性。

基于此,一个程序能达到的最大可伸缩性的办法是让一个节点只维护一个不变性条件的状态变量!!

谨慎使用对象池

对象池本身存在着热点域(队列就是它的热点域),因此对它的访问必然会产生竞争带来的开销,而这些开销往往高于对象创建/回收的开销,因此除非对象的创建开销比较大,否则不建议使用对象池。

  1. 安全暴露的概念是自己发明的,它用于区别安全发布概念。安全暴露描述的是一个线程安全的类如何可以安全地暴露其内部状态,而不影响其自身的线程安全。而安全发布关注的是对象首次发布时,对象对其他线程的可见性问题。 

  2. 当corePoolSize配置为0时,难道要等到工作队列堆满之后,才会启动线程执行任务吗?其实不是的。因为在Executor框架中做了特殊处理,当corePoolSize为0(maximumPoolSize>0)时,会创建出1个工作线程来消费工作队列中的任务,至到工作队列堆满之后,才会创建第二、第三…个线程,直到活跃线程达到最大值。