Java中的多线程你只要看这一篇就够了

如果对什么是线程、什么是进程仍存有疑惑,请先Google之,因为这两个概念不在本文的范围之内。

用多线程只有一个目的,那就是更好的利用cpu的资源,因为所有的多线程代码都可以用单线程来实现。说这个话其实只有一半对,因为反应“多角色”的程序代码,最起码每个角色要给他一个线程吧,否则连实际场景都无法模拟,当然也没法说能用单线程来实现:比如最常见的“生产者,消费者模型”。

很多人都对其中的一些概念不够明确,如同步、并发等等,让我们先建立一个数据字典,以免产生误会。

  • 多线程:指的是这个程序(一个进程)运行时产生了不止一个线程
  • 并行与并发:
    • 并行:多个cpu实例或者多台机器同时执行一段处理逻辑,是真正的同时。
    • 并发:通过cpu调度算法,让用户看上去同时执行,实际上从cpu操作层面不是真正的同时。并发往往在场景中有公用的资源,那么针对这个公用的资源往往产生瓶颈,我们会用TPS或者QPS来反应这个系统的处理能力。

并发与并行
  • 线程安全:经常用来描绘一段代码。指在并发的情况之下,该代码经过多线程使用,线程的调度顺序不影响任何结果。这个时候使用多线程,我们只需要关注系统的内存,cpu是不是够用即可。反过来,线程不安全就意味着线程的调度顺序会影响最终结果,如不加事务的转账代码:
    1
    2
    3
    4
    void transferMoney(User from, User to, float amount){
      to.setMoney(to.getBalance() + amount);
      from.setMoney(from.getBalance() - amount);
    }
  • 同步:Java中的同步指的是通过人为的控制和调度,保证共享资源的多线程访问成为线程安全,来保证结果的准确。如上面的代码简单加入@synchronized关键字。在保证结果准确的同时,提高性能,才是优秀的程序。线程安全的优先级高于性能。

好了,让我们开始吧。我准备分成几部分来总结涉及到多线程的内容:

  1. 扎好马步:线程的状态
  2. 内功心法:每个对象都有的方法(机制)
  3. 太祖长拳:基本线程类
  4. 九阴真经:高级多线程控制类

扎好马步:线程的状态

先来两张图:


线程状态

线程状态转换

各种状态一目了然,值得一提的是”blocked”这个状态:
线程在Running的过程中可能会遇到阻塞(Blocked)情况

  1. 调用join()和sleep()方法,sleep()时间结束或被打断,join()中断,IO完成都会回到Runnable状态,等待JVM的调度。
  2. 调用wait(),使该线程处于等待池(wait blocked pool),直到notify()/notifyAll(),线程被唤醒被放到锁定池(lock blocked pool ),释放同步锁使线程回到可运行状态(Runnable)
  3. 对Running状态的线程加同步锁(Synchronized)使其进入(lock blocked pool ),同步锁被释放进入可运行状态(Runnable)。

此外,在runnable状态的线程是处于被调度的线程,此时的调度顺序是不一定的。Thread类中的yield方法可以让一个running状态的线程转入runnable。

内功心法:每个对象都有的方法(机制)

synchronized, wait, notify 是任何对象都具有的同步工具。让我们先来了解他们


monitor

他们是应用于同步问题的人工线程调度工具。讲其本质,首先就要明确monitor的概念,Java中的每个对象都有一个监视器,来监测并发代码的重入。在非多线程编码时该监视器不发挥作用,反之如果在synchronized 范围内,监视器发挥作用。

wait/notify必须存在于synchronized块中。并且,这三个关键字针对的是同一个监视器(某对象的监视器)。这意味着wait之后,其他线程可以进入同步块执行。

当某代码并不持有监视器的使用权时(如图中5的状态,即脱离同步块)去wait或notify,会抛出java.lang.IllegalMonitorStateException。也包括在synchronized块中去调用另一个对象的wait/notify,因为不同对象的监视器不同,同样会抛出此异常。

再讲用法:

  • synchronized单独使用:
    • 代码块:如下,在多线程环境下,synchronized块中的方法获取了lock实例的monitor,如果实例相同,那么只有一个线程能执行该块内容
      1
      2
      3
      4
      5
      6
      7
      8
      public class Thread1 implements Runnable {
         Object lock;
         public void run() { 
             synchronized(lock){
               ..do something
             }
         }
      }
    • 直接用于方法: 相当于上面代码中用lock来锁定的效果,实际获取的是Thread1类的monitor。更进一步,如果修饰的是static方法,则锁定该类所有实例。
      1
      2
      3
      4
      5
      public class Thread1 implements Runnable {
         public synchronized void run() { 
              ..do something
         }
      }
  • synchronized, wait, notify结合:典型场景生产者消费者问题
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    /**
       * 生产者生产出来的产品交给店员
       */
      public synchronized void produce()
      {
          if(this.product >= MAX_PRODUCT)
          {
              try
              {
                  wait(); 
                  System.out.println("产品已满,请稍候再生产");
              }
              catch(InterruptedException e)
              {
                  e.printStackTrace();
              }
              return;
          }
          this.product++;
          System.out.println("生产者生产第" + this.product + "个产品.");
          notifyAll();   //通知等待区的消费者可以取出产品了
      }
      /**
       * 消费者从店员取产品
       */
      public synchronized void consume()
      {
          if(this.product <= MIN_PRODUCT)
          {
              try
              {
                  wait();
                  System.out.println("缺货,稍候再取");
              }
              catch (InterruptedException e)
              {
                  e.printStackTrace();
              }
              return;
          }
          System.out.println("消费者取走了第" + this.product + "个产品.");
          this.product--;
          notifyAll();   //通知等待去的生产者可以生产产品了
      }
    volatile

    多线程的内存模型:main memory(主存)、working memory(线程栈),在处理数据时,线程会把值从主存load到本地栈,完成操作后再save回去(volatile关键词的作用:每次针对该变量的操作都激发一次load and save)。


volatile

针对多线程使用的变量如果不是volatile或者final修饰的,很有可能产生不可预知的结果(另一个线程修改了这个值,但是之后在某线程看到的是修改之前的值)。其实道理上讲同一实例的同一属性本身只有一个副本。但是多线程是会缓存值的,本质上,volatile就是不去缓存,直接取值。在线程安全的情况下加volatile会牺牲性能。

太祖长拳:基本线程类

基本线程类指的是Thread类,Runnable接口,Callable接口
Thread 类实现了Runnable接口,启动一个线程的方法:

1
2
MyThread my = new MyThread();
  my.start();

Thread类相关方法:

1
2
3
4
5
6
7
8
//当前线程可转让cpu控制权,让别的就绪状态线程运行(切换)
public static Thread.yield()
//暂停一段时间
public static Thread.sleep() 
//在一个线程中调用other.join(),将等待other执行完后才继续本线程。    
public join()
//后两个函数皆可以被打断
public interrupte()

关于中断:它并不像stop方法那样会中断一个正在运行的线程。线程会不时地检测中断标识位,以判断线程是否应该被中断(中断标识值是否为true)。终端只会影响到wait状态、sleep状态和join状态。被打断的线程会抛出InterruptedException。
Thread.interrupted()检查当前线程是否发生中断,返回boolean
synchronized在获锁的过程中是不能被中断的。

中断是一个状态!interrupt()方法只是将这个状态置为true而已。所以说正常运行的程序不去检测状态,就不会终止,而wait等阻塞方法会去检查并抛出异常。如果在正常运行的程序中添加while(!Thread.interrupted()) ,则同样可以在中断后离开代码体

Thread类最佳实践
写的时候最好要设置线程名称 Thread.name,并设置线程组 ThreadGroup,目的是方便管理。在出现问题的时候,打印线程栈 (jstack -pid) 一眼就可以看出是哪个线程出的问题,这个线程是干什么的。

如何获取线程中的异常


不能用try,catch来获取线程中的异常

Runnable

与Thread类似

Callable

future模式:并发模式的一种,可以有两种形式,即无阻塞和阻塞,分别是isDone和get。其中Future对象用来存放该线程的返回值以及状态

1
2
3
4
5
ExecutorService e = Executors.newFixedThreadPool(3);
 //submit方法有多重参数版本,及支持callable也能够支持runnable接口类型.
Future future = e.submit(new myCallable());
future.isDone() //return true,false 无阻塞
future.get() // return 返回值,阻塞直到该线程运行结束

九阴真经:高级多线程控制类

以上都属于内功心法,接下来是实际项目中常用到的工具了,Java1.5提供了一个非常高效实用的多线程包:java.util.concurrent, 提供了大量高级工具,可以帮助开发者编写高效、易维护、结构清晰的Java多线程程序。

1.ThreadLocal类

用处:保存线程的独立变量。对一个线程类(继承自Thread)
当使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。常用于用户登录控制,如记录session信息。

实现:每个Thread都持有一个TreadLocalMap类型的变量(该类是一个轻量级的Map,功能与map一样,区别是桶里放的是entry而不是entry的链表。功能还是一个map。)以本身为key,以目标为value。
主要方法是get()和set(T a),set之后在map里维护一个threadLocal -> a,get时将a返回。ThreadLocal是一个特殊的容器。

2.原子类(AtomicInteger、AtomicBoolean……)

如果使用atomic wrapper class如atomicInteger,或者使用自己保证原子的操作,则等同于synchronized

1
2
//返回值为boolean
AtomicInteger.compareAndSet(int expect,int update)

该方法可用于实现乐观锁,考虑文中最初提到的如下场景:a给b付款10元,a扣了10元,b要加10元。此时c给b2元,但是b的加十元代码约为:

1
2
3
4
5
6
if(b.value.compareAndSet(old, value)){
   return ;
}else{
   //try again
   // if that fails, rollback and log
}

AtomicReference
对于AtomicReference 来讲,也许对象会出现,属性丢失的情况,即oldObject == current,但是oldObject.getPropertyA != current.getPropertyA。
这时候,AtomicStampedReference就派上用场了。这也是一个很常用的思路,即加上版本号

3.Lock类

lock: 在java.util.concurrent包内。共有三个实现:

  • ReentrantLock
  • ReentrantReadWriteLock.ReadLock
  • ReentrantReadWriteLock.WriteLock

主要目的是和synchronized一样, 两者都是为了解决同步问题,处理资源争端而产生的技术。功能类似但有一些区别。

区别如下:

  1. lock更灵活,可以自由定义多把锁的枷锁解锁顺序(synchronized要按照先加的后解顺序)
  2. 提供多种加锁方案,lock 阻塞式, trylock 无阻塞式, lockInterruptily 可打断式, 还有trylock的带超时时间版本。
  3. 本质上和监视器锁(即synchronized是一样的)
  4. 能力越大,责任越大,必须控制好加锁和解锁,否则会导致灾难。
  5. 和Condition类的结合。
  6. 性能更高,对比如下图:

synchronized和Lock性能对比

ReentrantLock
可重入的意义在于持有锁的线程可以继续持有,并且要释放对等的次数后才真正释放该锁。
使用方法是:

1.先new一个实例

1
static ReentrantLock r=new ReentrantLock();

2.加锁

1
r.lock()或r.lockInterruptibly();

此处也是个不同,后者可被打断。当a线程lock后,b线程阻塞,此时如果是lockInterruptibly,那么在调用b.interrupt()之后,b线程退出阻塞,并放弃对资源的争抢,进入catch块。(如果使用后者,必须throw interruptable exception 或catch)

3.释放锁

1
r.unlock()

必须做!何为必须做呢,要放在finally里面。以防止异常跳出了正常流程,导致灾难。这里补充一个小知识点,finally是可以信任的:经过测试,哪怕是发生了OutofMemoryError,finally块中的语句执行也能够得到保证。

ReentrantReadWriteLock

可重入读写锁(读写锁的一个实现)

1
2
3
ReentrantReadWriteLock lock = new ReentrantReadWriteLock()
  ReadLock r = lock.readLock();
  WriteLock w = lock.writeLock();

两者都有lock,unlock方法。写写,写读互斥;读读不互斥。可以实现并发读的高效线程安全代码

4.容器类

这里就讨论比较常用的两个:

  • BlockingQueue
  • ConcurrentHashMap

BlockingQueue
阻塞队列。该类是java.util.concurrent包下的重要类,通过对Queue的学习可以得知,这个queue是单向队列,可以在队列头添加元素和在队尾删除或取出元素。类似于一个管  道,特别适用于先进先出策略的一些应用场景。普通的queue接口主要实现有PriorityQueue(优先队列),有兴趣可以研究

BlockingQueue在队列的基础上添加了多线程协作的功能:


BlockingQueue

除了传统的queue功能(表格左边的两列)之外,还提供了阻塞接口put和take,带超时功能的阻塞接口offer和poll。put会在队列满的时候阻塞,直到有空间时被唤醒;take在队 列空的时候阻塞,直到有东西拿的时候才被唤醒。用于生产者-消费者模型尤其好用,堪称神器。

常见的阻塞队列有:

  • ArrayListBlockingQueue
  • LinkedListBlockingQueue
  • DelayQueue
  • SynchronousQueue

ConcurrentHashMap
高效的线程安全哈希map。请对比hashTable , concurrentHashMap, HashMap

5.管理类

管理类的概念比较泛,用于管理线程,本身不是多线程的,但提供了一些机制来利用上述的工具做一些封装。
了解到的值得一提的管理类:ThreadPoolExecutor和 JMX框架下的系统级管理类 ThreadMXBean
ThreadPoolExecutor
如果不了解这个类,应该了解前面提到的ExecutorService,开一个自己的线程池非常方便:

1
2
3
4
5
6
7
8
ExecutorService e = Executors.newCachedThreadPool();
    ExecutorService e = Executors.newSingleThreadExecutor();
    ExecutorService e = Executors.newFixedThreadPool(3);
    // 第一种是可变大小线程池,按照任务数来分配线程,
    // 第二种是单线程池,相当于FixedThreadPool(1)
    // 第三种是固定大小线程池。
    // 然后运行
    e.execute(new MyRunnableImpl());

该类内部是通过ThreadPoolExecutor实现的,掌握该类有助于理解线程池的管理,本质上,他们都是ThreadPoolExecutor类的各种实现版本。请参见javadoc:


ThreadPoolExecutor参数解释

翻译一下:
corePoolSize:池内线程初始值与最小值,就算是空闲状态,也会保持该数量线程。
maximumPoolSize:线程最大值,线程的增长始终不会超过该值。
keepAliveTime:当池内线程数高于corePoolSize时,经过多少时间多余的空闲线程才会被回收。回收前处于wait状态
unit
时间单位,可以使用TimeUnit的实例,如TimeUnit.MILLISECONDS
workQueue:待入任务(Runnable)的等待场所,该参数主要影响调度策略,如公平与否,是否产生饿死(starving)
threadFactory:线程工厂类,有默认实现,如果有自定义的需要则需要自己实现ThreadFactory接口并作为参数传入。

请注意:该类十分常用,作者80%的多线程问题靠他。

(面试必备)Java线程面试题 Top 50

不管你是新程序员还是老手,你一定在面试中遇到过有关线程的问题。Java语言一个重要的特点就是内置了对并发的支持,让Java大受企业和程序员的欢迎。大多数待遇丰厚的Java开发职位都要求开发者精通多线程技术并且有丰富的Java程序开发、调试、优化经验,所以线程相关的问题在面试中经常会被提到。

在典型的Java面试中, 面试官会从线程的基本概念问起, 如:为什么你需要使用线程, 如何创建线程,用什么方式创建线程比较好(比如:继承thread类还是调用Runnable接口),然后逐渐问到并发问题像在Java并发编程的过程中遇到了什么挑战,Java内存模型,JDK1.5引入了哪些更高阶的并发工具,并发编程常用的设计模式,经典多线程问题如生产者消费者,哲学家就餐,读写器或者简单的有界缓冲区问题。仅仅知道线程的基本概念是远远不够的, 你必须知道如何处理死锁竞态条件,内存冲突和线程安全等并发问题。掌握了这些技巧,你就可以轻松应对多线程和并发面试了。

许多Java程序员在面试前才会去看面试题,这很正常。因为收集面试题和练习很花时间,所以我从许多面试者那里收集了Java多线程和并发相关的50个热门问题。我只收集了比较新的面试题且没有提供全部答案。想必聪明的你对这些问题早就心中有数了, 如果遇到不懂的问题,你可以用Google找到答案。若你实在找不到答案,可以在文章的评论中向我求助。你也可以在这找到一些答案Java线程问答Top 12

50道Java线程面试题

下面是Java线程相关的热门面试题,你可以用它来好好准备面试。

1) 什么是线程?

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。程序员可以通过它进行多处理器编程,你可以使用多线程对运算密集型任务提速。比如,如果一个线程完成一个任务要100毫秒,那么用十个线程完成改任务只需10毫秒。Java在语言层面对多线程提供了卓越的支持,它也是一个很好的卖点。欲了解更多详细信息请点击这里

2) 线程和进程有什么区别?

线程是进程的子集,一个进程可以有很多线程,每条线程并行执行不同的任务。不同的进程使用不同的内存空间,而所有的线程共享一片相同的内存空间。别把它和栈内存搞混,每个线程都拥有单独的栈内存用来存储本地数据。更多详细信息请点击这里

3) 如何在Java中实现线程?

在语言层面有两种方式。java.lang.Thread 类的实例就是一个线程但是它需要调用java.lang.Runnable接口来执行,由于线程类本身就是调用的Runnable接口所以你可以继承java.lang.Thread 类或者直接调用Runnable接口来重写run()方法实现线程。更多详细信息请点击这里.

4) 用Runnable还是Thread?

这个问题是上题的后续,大家都知道我们可以通过继承Thread类或者调用Runnable接口来实现线程,问题是,那个方法更好呢?什么情况下使用它?这个问题很容易回答,如果你知道Java不支持类的多重继承,但允许你调用多个接口。所以如果你要继承其他类,当然是调用Runnable接口好了。更多详细信息请点击这里

6) Thread 类中的start() 和 run() 方法有什么区别?

这个问题经常被问到,但还是能从此区分出面试者对Java线程模型的理解程度。start()方法被用来启动新创建的线程,而且start()内部调用了run()方法,这和直接调用run()方法的效果不一样。当你调用run()方法的时候,只会是在原来的线程中调用,没有新的线程启动,start()方法才会启动新线程。更多讨论请点击这里

7) Java中Runnable和Callable有什么不同?

Runnable和Callable都代表那些要在不同的线程中执行的任务。Runnable从JDK1.0开始就有了,Callable是在JDK1.5增加的。它们的主要区别是Callable的 call() 方法可以返回值和抛出异常,而Runnable的run()方法没有这些功能。Callable可以返回装载有计算结果的Future对象。我的博客有更详细的说明。

8) Java中CyclicBarrier 和 CountDownLatch有什么不同?

CyclicBarrier 和 CountDownLatch 都可以用来让一组线程等待其它线程。与 CyclicBarrier 不同的是,CountdownLatch 不能重新使用。点此查看更多信息和示例代码

9) Java内存模型是什么?

Java内存模型规定和指引Java程序在不同的内存架构、CPU和操作系统间有确定性地行为。它在多线程的情况下尤其重要。Java内存模型对一个线程所做的变动能被其它线程可见提供了保证,它们之间是先行发生关系。这个关系定义了一些规则让程序员在并发编程时思路更清晰。比如,先行发生关系确保了:

  • 线程内的代码能够按先后顺序执行,这被称为程序次序规则。
  • 对于同一个锁,一个解锁操作一定要发生在时间上后发生的另一个锁定操作之前,也叫做管程锁定规则。
  • 前一个对volatile的写操作在后一个volatile的读操作之前,也叫volatile变量规则。
  • 一个线程内的任何操作必需在这个线程的start()调用之后,也叫作线程启动规则。
  • 一个线程的所有操作都会在线程终止之前,线程终止规则。
  • 一个对象的终结操作必需在这个对象构造完成之后,也叫对象终结规则。
  • 可传递性

我强烈建议大家阅读《Java并发编程实践》第十六章来加深对Java内存模型的理解。

10) Java中的volatile 变量是什么?

volatile是一个特殊的修饰符,只有成员变量才能使用它。在Java并发程序缺少同步类的情况下,多线程对成员变量的操作对其它线程是透明的。volatile变量可以保证下一个读取操作会在前一个写操作之后发生,就是上一题的volatile变量规则。点击这里查看更多volatile的相关内容。

11) 什么是线程安全?Vector是一个线程安全类吗? (详见这里)

如果你的代码所在的进程中有多个线程在同时运行,而这些线程可能会同时运行这段代码。如果每次运行结果和单线程运行的结果是一样的,而且其他的变量的值也和预期的是一样的,就是线程安全的。一个线程安全的计数器类的同一个实例对象在被多个线程使用的情况下也不会出现计算失误。很显然你可以将集合类分成两组,线程安全和非线程安全的。Vector 是用同步方法来实现线程安全的, 而和它相似的ArrayList不是线程安全的。

12) Java中什么是竞态条件? 举个例子说明。

竞态条件会导致程序在并发情况下出现一些bugs。多线程对一些资源的竞争的时候就会产生竞态条件,如果首先要执行的程序竞争失败排到后面执行了,那么整个程序就会出现一些不确定的bugs。这种bugs很难发现而且会重复出现,因为线程间的随机竞争。一个例子就是无序处理,详见答案

13) Java中如何停止一个线程?

Java提供了很丰富的API但没有为停止线程提供API。JDK 1.0本来有一些像stop(), suspend() 和 resume()的控制方法但是由于潜在的死锁威胁因此在后续的JDK版本中他们被弃用了,之后Java API的设计者就没有提供一个兼容且线程安全的方法来停止一个线程。当run() 或者 call() 方法执行完的时候线程会自动结束,如果要手动结束一个线程,你可以用volatile 布尔变量来退出run()方法的循环或者是取消任务来中断线程。点击这里查看示例代码。

14) 一个线程运行时发生异常会怎样?

这是我在一次面试中遇到的一个很刁钻的Java面试题, 简单的说,如果异常没有被捕获该线程将会停止执行。Thread.UncaughtExceptionHandler是用于处理未捕获异常造成线程突然中断情况的一个内嵌接口。当一个未捕获异常将造成线程中断的时候JVM会使用Thread.getUncaughtExceptionHandler()来查询线程的UncaughtExceptionHandler并将线程和异常作为参数传递给handler的uncaughtException()方法进行处理。

15) 如何在两个线程间共享数据?

你可以通过共享对象来实现这个目的,或者是使用像阻塞队列这样并发的数据结构。这篇教程《Java线程间通信》(涉及到在两个线程间共享对象)用wait和notify方法实现了生产者消费者模型。

16) Java中notify 和 notifyAll有什么区别?

这又是一个刁钻的问题,因为多线程可以等待单监控锁,Java API 的设计人员提供了一些方法当等待条件改变的时候通知它们,但是这些方法没有完全实现。notify()方法不能唤醒某个具体的线程,所以只有一个线程在等待的时候它才有用武之地。而notifyAll()唤醒所有线程并允许他们争夺锁确保了至少有一个线程能继续运行。我的博客有更详细的资料和示例代码。

17) 为什么wait, notify 和 notifyAll这些方法不在thread类里面?

这是个设计相关的问题,它考察的是面试者对现有系统和一些普遍存在但看起来不合理的事物的看法。回答这些问题的时候,你要说明为什么把这些方法放在Object类里是有意义的,还有不把它放在Thread类里的原因。一个很明显的原因是JAVA提供的锁是对象级的而不是线程级的,每个对象都有锁,通过线程获得。如果线程需要等待某些锁那么调用对象中的wait()方法就有意义了。如果wait()方法定义在Thread类中,线程正在等待的是哪个锁就不明显了。简单的说,由于wait,notify和notifyAll都是锁级别的操作,所以把他们定义在Object类中因为锁属于对象。你也可以查看这篇文章了解更多。

18) 什么是ThreadLocal变量?

ThreadLocal是Java里一种特殊的变量。每个线程都有一个ThreadLocal就是每个线程都拥有了自己独立的一个变量,竞争条件被彻底消除了。它是为创建代价高昂的对象获取线程安全的好方法,比如你可以用ThreadLocal让SimpleDateFormat变成线程安全的,因为那个类创建代价高昂且每次调用都需要创建不同的实例所以不值得在局部范围使用它,如果为每个线程提供一个自己独有的变量拷贝,将大大提高效率。首先,通过复用减少了代价高昂的对象的创建个数。其次,你在没有使用高代价的同步或者不变性的情况下获得了线程安全。线程局部变量的另一个不错的例子是ThreadLocalRandom类,它在多线程环境中减少了创建代价高昂的Random对象的个数。查看答案了解更多。

19) 什么是FutureTask?

在Java并发程序中FutureTask表示一个可以取消的异步运算。它有启动和取消运算、查询运算是否完成和取回运算结果等方法。只有当运算完成的时候结果才能取回,如果运算尚未完成get方法将会阻塞。一个FutureTask对象可以对调用了Callable和Runnable的对象进行包装,由于FutureTask也是调用了Runnable接口所以它可以提交给Executor来执行。

20) Java中interrupted 和 isInterrupted方法的区别?

interrupted()isInterrupted()的主要区别是前者会将中断状态清除而后者不会。Java多线程的中断机制是用内部标识来实现的,调用Thread.interrupt()来中断一个线程就会设置中断标识为true。当中断线程调用静态方法Thread.interrupted()来检查中断状态时,中断状态会被清零。而非静态方法isInterrupted()用来查询其它线程的中断状态且不会改变中断状态标识。简单的说就是任何抛出InterruptedException异常的方法都会将中断状态清零。无论如何,一个线程的中断状态有有可能被其它线程调用中断来改变。

21) 为什么wait和notify方法要在同步块中调用?

主要是因为Java API强制要求这样做,如果你不这么做,你的代码会抛出IllegalMonitorStateException异常。还有一个原因是为了避免wait和notify之间产生竞态条件。

22) 为什么你应该在循环中检查等待条件?

处于等待状态的线程可能会收到错误警报和伪唤醒,如果不在循环中检查等待条件,程序就会在没有满足结束条件的情况下退出。因此,当一个等待线程醒来时,不能认为它原来的等待状态仍然是有效的,在notify()方法调用之后和等待线程醒来之前这段时间它可能会改变。这就是在循环中使用wait()方法效果更好的原因,你可以在Eclipse中创建模板调用wait和notify试一试。如果你想了解更多关于这个问题的内容,我推荐你阅读《Effective Java》这本书中的线程和同步章节。

23) Java中的同步集合与并发集合有什么区别?

同步集合与并发集合都为多线程和并发提供了合适的线程安全的集合,不过并发集合的可扩展性更高。在Java1.5之前程序员们只有同步集合来用且在多线程并发的时候会导致争用,阻碍了系统的扩展性。Java5介绍了并发集合像ConcurrentHashMap,不仅提供线程安全还用锁分离和内部分区等现代技术提高了可扩展性。更多内容详见答案

24) Java中堆和栈有什么不同?

为什么把这个问题归类在多线程和并发面试题里?因为栈是一块和线程紧密相关的内存区域。每个线程都有自己的栈内存,用于存储本地变量,方法参数和栈调用,一个线程中存储的变量对其它线程是不可见的。而堆是所有线程共享的一片公用内存区域。对象都在堆里创建,为了提升效率线程会从堆中弄一个缓存到自己的栈,如果多个线程使用该变量就可能引发问题,这时volatile 变量就可以发挥作用了,它要求线程从主存中读取变量的值。
更多内容详见答案

25) 什么是线程池? 为什么要使用它?

创建线程要花费昂贵的资源和时间,如果任务来了才创建线程那么响应时间会变长,而且一个进程能创建的线程数有限。为了避免这些问题,在程序启动的时候就创建若干线程来响应处理,它们被称为线程池,里面的线程叫工作线程。从JDK1.5开始,Java API提供了Executor框架让你可以创建不同的线程池。比如单线程池,每次处理一个任务;数目固定的线程池或者是缓存线程池(一个适合很多生存期短的任务的程序的可扩展线程池)。更多内容详见这篇文章

26) 如何写代码来解决生产者消费者问题?

在现实中你解决的许多线程问题都属于生产者消费者模型,就是一个线程生产任务供其它线程进行消费,你必须知道怎么进行线程间通信来解决这个问题。比较低级的办法是用wait和notify来解决这个问题,比较赞的办法是用Semaphore 或者 BlockingQueue来实现生产者消费者模型,这篇教程有实现它。

27) 如何避免死锁?


Java多线程中的死锁
死锁是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。这是一个严重的问题,因为死锁会让你的程序挂起无法完成任务,死锁的发生必须满足以下四个条件:

  • 互斥条件:一个资源每次只能被一个进程使用。
  • 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
  • 不剥夺条件:进程已获得的资源,在末使用完之前,不能强行剥夺。
  • 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。

避免死锁最简单的方法就是阻止循环等待条件,将系统中所有的资源设置标志位、排序,规定所有的进程申请资源必须以一定的顺序(升序或降序)做操作来避免死锁。这篇教程有代码示例和避免死锁的讨论细节。

28) Java中活锁和死锁有什么区别?

这是上题的扩展,活锁和死锁类似,不同之处在于处于活锁的线程或进程的状态是不断改变的,活锁可以认为是一种特殊的饥饿。一个现实的活锁例子是两个人在狭小的走廊碰到,两个人都试着避让对方好让彼此通过,但是因为避让的方向都一样导致最后谁都不能通过走廊。简单的说就是,活锁和死锁的主要区别是前者进程的状态可以改变但是却不能继续执行。

29) 怎么检测一个线程是否拥有锁?

我一直不知道我们竟然可以检测一个线程是否拥有锁,直到我参加了一次电话面试。在java.lang.Thread中有一个方法叫holdsLock(),它返回true如果当且仅当当前线程拥有某个具体对象的锁。你可以查看这篇文章了解更多。

30) 你如何在Java中获取线程堆栈?

对于不同的操作系统,有多种方法来获得Java进程的线程堆栈。当你获取线程堆栈时,JVM会把所有线程的状态存到日志文件或者输出到控制台。在Windows你可以使用Ctrl + Break组合键来获取线程堆栈,Linux下用kill -3命令。你也可以用jstack这个工具来获取,它对线程id进行操作,你可以用jps这个工具找到id。

31) JVM中哪个参数是用来控制线程的栈堆栈小的

这个问题很简单, -Xss参数用来控制线程的堆栈大小。你可以查看JVM配置列表来了解这个参数的更多信息。

32) Java中synchronized 和 ReentrantLock 有什么不同?

Java在过去很长一段时间只能通过synchronized关键字来实现互斥,它有一些缺点。比如你不能扩展锁之外的方法或者块边界,尝试获取锁时不能中途取消等。Java 5 通过Lock接口提供了更复杂的控制来解决这些问题。 ReentrantLock 类实现了 Lock,它拥有与 synchronized 相同的并发性和内存语义且它还具有可扩展性。你可以查看这篇文章了解更多

33) 有三个线程T1,T2,T3,怎么确保它们按顺序执行?

在多线程中有多种方法让线程按特定顺序执行,你可以用线程类的join()方法在一个线程中启动另一个线程,另外一个线程完成该线程继续执行。为了确保三个线程的顺序你应该先启动最后一个(T3调用T2,T2调用T1),这样T1就会先完成而T3最后完成。你可以查看这篇文章了解更多。

34) Thread类中的yield方法有什么作用?

Yield方法可以暂停当前正在执行的线程对象,让其它有相同优先级的线程执行。它是一个静态方法而且只保证当前线程放弃CPU占用而不能保证使其它线程一定能占用CPU,执行yield()的线程有可能在进入到暂停状态后马上又被执行。点击这里查看更多yield方法的相关内容。

35) Java中ConcurrentHashMap的并发度是什么?

ConcurrentHashMap把实际map划分成若干部分来实现它的可扩展性和线程安全。这种划分是使用并发度获得的,它是ConcurrentHashMap类构造函数的一个可选参数,默认值为16,这样在多线程情况下就能避免争用。欲了解更多并发度和内部大小调整请阅读我的文章How ConcurrentHashMap works in Java

36) Java中Semaphore是什么?

Java中的Semaphore是一种新的同步类,它是一个计数信号。从概念上讲,从概念上讲,信号量维护了一个许可集合。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release()添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore只对可用许可的号码进行计数,并采取相应的行动。信号量常常用于多线程的代码中,比如数据库连接池。更多详细信息请点击这里

37)如果你提交任务时,线程池队列已满。会时发会生什么?

这个问题问得很狡猾,许多程序员会认为该任务会阻塞直到线程池队列有空位。事实上如果一个任务不能被调度执行那么ThreadPoolExecutor’s submit()方法将会抛出一个RejectedExecutionException异常。

38) Java线程池中submit() 和 execute()方法有什么区别?

两个方法都可以向线程池提交任务,execute()方法的返回类型是void,它定义在Executor接口中, 而submit()方法可以返回持有计算结果的Future对象,它定义在ExecutorService接口中,它扩展了Executor接口,其它线程池类像ThreadPoolExecutor和ScheduledThreadPoolExecutor都有这些方法。更多详细信息请点击这里

39) 什么是阻塞式方法?

阻塞式方法是指程序会一直等待该方法完成期间不做其他事情,ServerSocket的accept()方法就是一直等待客户端连接。这里的阻塞是指调用结果返回之前,当前线程会被挂起,直到得到结果之后才会返回。此外,还有异步和非阻塞式方法在任务完成前就返回。更多详细信息请点击这里

40) Swing是线程安全的吗? 为什么?

你可以很肯定的给出回答,Swing不是线程安全的,但是你应该解释这么回答的原因即便面试官没有问你为什么。当我们说swing不是线程安全的常常提到它的组件,这些组件不能在多线程中进行修改,所有对GUI组件的更新都要在AWT线程中完成,而Swing提供了同步和异步两种回调方法来进行更新。点击这里查看更多swing和线程安全的相关内容。

41) Java中invokeAndWait 和 invokeLater有什么区别?

这两个方法是Swing API 提供给Java开发者用来从当前线程而不是事件派发线程更新GUI组件用的。InvokeAndWait()同步更新GUI组件,比如一个进度条,一旦进度更新了,进度条也要做出相应改变。如果进度被多个线程跟踪,那么就调用invokeAndWait()方法请求事件派发线程对组件进行相应更新。而invokeLater()方法是异步调用更新组件的。更多详细信息请点击这里

42) Swing API中那些方法是线程安全的?

这个问题又提到了swing和线程安全,虽然组件不是线程安全的但是有一些方法是可以被多线程安全调用的,比如repaint(), revalidate()。 JTextComponent的setText()方法和JTextArea的insert() 和 append() 方法也是线程安全的。

43) 如何在Java中创建Immutable对象?

这个问题看起来和多线程没什么关系, 但不变性有助于简化已经很复杂的并发程序。Immutable对象可以在没有同步的情况下共享,降低了对该对象进行并发访问时的同步化开销。可是Java没有@Immutable这个注解符,要创建不可变类,要实现下面几个步骤:通过构造方法初始化所有成员、对变量不要提供setter方法、将所有的成员声明为私有的,这样就不允许直接访问这些成员、在getter方法中,不要直接返回对象本身,而是克隆对象,并返回对象的拷贝。我的文章how to make an object Immutable in Java有详细的教程,看完你可以充满自信。

44) Java中的ReadWriteLock是什么?

一般而言,读写锁是用来提升并发程序性能的锁分离技术的成果。Java中的ReadWriteLock是Java 5 中新增的一个接口,一个ReadWriteLock维护一对关联的锁,一个用于只读操作一个用于写。在没有写线程的情况下一个读锁可能会同时被多个读线程持有。写锁是独占的,你可以使用JDK中的ReentrantReadWriteLock来实现这个规则,它最多支持65535个写锁和65535个读锁。

45) 多线程中的忙循环是什么?

忙循环就是程序员用循环让一个线程等待,不像传统方法wait(), sleep() 或 yield() 它们都放弃了CPU控制,而忙循环不会放弃CPU,它就是在运行一个空循环。这么做的目的是为了保留CPU缓存,在多核系统中,一个等待线程醒来的时候可能会在另一个内核运行,这样会重建缓存。为了避免重建缓存和减少等待重建的时间就可以使用它了。你可以查看这篇文章获得更多信息。

46)volatile 变量和 atomic 变量有什么不同?

这是个有趣的问题。首先,volatile 变量和 atomic 变量看起来很像,但功能却不一样。Volatile变量可以确保先行关系,即写操作会发生在后续的读操作之前, 但它并不能保证原子性。例如用volatile修饰count变量那么 count++ 操作就不是原子性的。而AtomicInteger类提供的atomic方法可以让这种操作具有原子性如getAndIncrement()方法会原子性的进行增量操作把当前值加一,其它数据类型和引用变量也可以进行相似操作。

47) 如果同步块内的线程抛出异常会发生什么?

这个问题坑了很多Java程序员,若你能想到锁是否释放这条线索来回答还有点希望答对。无论你的同步块是正常还是异常退出的,里面的线程都会释放锁,所以对比锁接口我更喜欢同步块,因为它不用我花费精力去释放锁,该功能可以在finally block里释放锁实现。

48) 单例模式的双检锁是什么?

这个问题在Java面试中经常被问到,但是面试官对回答此问题的满意度仅为50%。一半的人写不出双检锁还有一半的人说不出它的隐患和Java1.5是如何对它修正的。它其实是一个用来创建线程安全的单例的老方法,当单例实例第一次被创建时它试图用单个锁进行性能优化,但是由于太过于复杂在JDK1.4中它是失败的,我个人也不喜欢它。无论如何,即便你也不喜欢它但是还是要了解一下,因为它经常被问到。你可以查看how double checked locking on Singleton works这篇文章获得更多信息。

49) 如何在Java中创建线程安全的Singleton?

这是上面那个问题的后续,如果你不喜欢双检锁而面试官问了创建Singleton类的替代方法,你可以利用JVM的类加载和静态变量初始化特征来创建Singleton实例,或者是利用枚举类型来创建Singleton,我很喜欢用这种方法。你可以查看这篇文章获得更多信息。

50) 写出3条你遵循的多线程最佳实践

这种问题我最喜欢了,我相信你在写并发代码来提升性能的时候也会遵循某些最佳实践。以下三条最佳实践我觉得大多数Java程序员都应该遵循:

  • 给你的线程起个有意义的名字。
    这样可以方便找bug或追踪。OrderProcessor, QuoteProcessor or TradeProcessor 这种名字比 Thread-1. Thread-2 and Thread-3 好多了,给线程起一个和它要完成的任务相关的名字,所有的主要框架甚至JDK都遵循这个最佳实践。
  • 避免锁定和缩小同步的范围
    锁花费的代价高昂且上下文切换更耗费时间空间,试试最低限度的使用同步和锁,缩小临界区。因此相对于同步方法我更喜欢同步块,它给我拥有对锁的绝对控制权。
  • 多用同步类少用wait 和 notify
    首先,CountDownLatch, Semaphore, CyclicBarrier 和 Exchanger 这些同步类简化了编码操作,而用wait和notify很难实现对复杂控制流的控制。其次,这些类是由最好的企业编写和维护在后续的JDK中它们还会不断优化和完善,使用这些更高等级的同步工具你的程序可以不费吹灰之力获得优化。
  • 多用并发集合少用同步集合
    这是另外一个容易遵循且受益巨大的最佳实践,并发集合比同步集合的可扩展性更好,所以在并发编程时使用并发集合效果更好。如果下一次你需要用到map,你应该首先想到用ConcurrentHashMap。我的文章Java并发集合有更详细的说明。

51) 如何强制启动一个线程?

这个问题就像是如何强制进行Java垃圾回收,目前还没有觉得方法,虽然你可以使用System.gc()来进行垃圾回收,但是不保证能成功。在Java里面没有办法强制启动一个线程,它是被线程调度器控制着且Java没有公布相关的API。

52) Java中的fork join框架是什么?

fork join框架是JDK7中出现的一款高效的工具,Java开发人员可以通过它充分利用现代服务器上的多处理器。它是专门为了那些可以递归划分成许多子模块设计的,目的是将所有可用的处理能力用来提升程序的性能。fork join框架一个巨大的优势是它使用了工作窃取算法,可以完成更多任务的工作线程可以从其它线程中窃取任务来执行。你可以查看这篇文章获得更多信息。

53) Java多线程中调用wait() 和 sleep()方法有什么不同?

Java程序中wait 和 sleep都会造成某种形式的暂停,它们可以满足不同的需要。wait()方法用于线程间通信,如果等待条件为真且其它线程被唤醒时它会释放锁,而sleep()方法仅仅释放CPU资源或者让当前线程停止执行一段时间,但不会释放锁。你可以查看这篇文章获得更多信息。

以上就是50道热门Java多线程和并发面试题啦。我没有分享所有题的答案但给未来的阅读者提供了足够的提示和线索来寻找答案。如果你真的找不到某题的答案,联系我吧,我会加上去的。这篇文章不仅可以用来准备面试,还能检查你对多线程、并发、设计模式和竞态条件、死锁和线程安全等线程问题的理解。我打算把这篇文章的问题弄成所有Java多线程问题的大合集,但是没有你的帮助恐怖是不能完成的,你也可以跟我分享其它任何问题,包括那些你被问到却还没有找到答案的问题。这篇文章对初学者或者是经验丰富的Java开发人员都很有用,过两三年甚至五六年你再读它也会受益匪浅。它可以扩展初学者尤其有用因为这个可以扩展他们的知识面,我会不断更新这些题,大家可以在文章后面的评论中提问,分享和回答问题一起把这篇面试题完善。

原文链接: javarevisited 翻译: ImportNew.com 李 广
译文链接: http://www.importnew.com/12773.html
[ 转载请保留原文出处、译者和译文链接。]

(转)分布式锁1 Java常用技术方案

前言:

      由于在平时的工作中,线上服务器是分布式多台部署的,经常会面临解决分布式场景下数据一致性的问题,那么就要利用分布式锁来解决这些问题。所以自己结合实际工作中的一些经验和网上看到的一些资料,做一个讲解和总结。希望这篇文章可以方便自己以后查阅,同时要是能帮助到他人那也是很好的。

===============================================================长长的分割线====================================================================

正文:

第一步,自身的业务场景:

在我日常做的项目中,目前涉及了以下这些业务场景:

场景一: 比如分配任务场景。在这个场景中,由于是公司的业务后台系统,主要是用于审核人员的审核工作,并发量并不是很高,而且任务的分配规则设计成了通过审核人员每次主动的请求拉取,然后服务端从任务池中随机的选取任务进行分配。这个场景看到这里你会觉得比较单一,但是实际的分配过程中,由于涉及到了按用户聚类的问题,所以要比我描述的复杂,但是这里为了说明问题,大家可以把问题简单化理解。那么在使用过程中,主要是为了避免同一个任务同时被两个审核人员获取到的问题。我最终使用了基于数据库资源表的分布式锁来解决的问题。

场景二: 比如支付场景。在这个场景中,我提供给用户三个用于保护用户隐私的手机号码(这些号码是从运营商处获取的,和真实手机号码看起来是一样的),让用户选择其中一个进行购买,用户购买付款后,我需要将用户选择的号码分配给用户使用,同时也要将没有选择的释放掉。在这个过程中,给用户筛选的号码要在一定时间内(用户筛选正常时间范围内)让当前用户对这个产品具有独占性,以便保证付款后是100%可以拿到;同时由于产品资源池的资源有限,还要保持资源的流动性,即不能让资源长时间被某个用户占用着。对于服务的设计目标,一期项目上线的时候至少能够支持峰值qps为300的请求,同时在设计的过程中要考虑到用户体验的问题。我最终使用了memecahed的add()方法和基于数据库资源表的分布式锁来解决的问题。

场景三: 我有一个数据服务,每天调用量在3亿,每天按86400秒计算的qps在4000左右,由于服务的白天调用量要明显高于晚上,所以白天下午的峰值qps达到6000的,一共有4台服务器,单台qps要能达到3000以上。我最终使用了redis的setnx()和expire()的分布式锁解决的问题。

       场景四:场景一和场景二的升级版。在这个场景中,不涉及支付。但是由于资源分配一次过程中,需要保持涉及一致性的地方增加,而且一期的设计目标要达到峰值qps500,所以需要我们对场景进一步的优化。我最终使用了redis的setnx()、expire()和基于数据库表的分布式锁来解决的问题。

看到这里,不管你觉得我提出的业务场景qps是否足够大,都希望你能继续看下去,因为无论你身处一个什么样的公司,最开始的工作可能都需要从最简单的做起。不要提阿里和腾讯的业务场景qps如何大,因为在这样的大场景中你未必能亲自参与项目,亲自参与项目未必能是核心的设计者,是核心的设计者未必能独自设计。如果能真能满足以上三条,关闭页面可以不看啦,如果不是的话,建议还是看完,我有说的不足的地方欢迎提出建议,我说的好的地方,也希望给我点个赞或者评论一下,算是对我最大的鼓励哈。

  第二步,分布式锁的解决方式:

1. 首先明确一点,有人可能会问是否可以考虑采用ReentrantLock来实现,但是实际上去实现的时候是有问题的,ReentrantLock的lock和unlock要求必须是在同一线程进行,而分布式应用中,lock和unlock是两次不相关的请求,因此肯定不是同一线程,因此导致无法使用ReentrantLock。

2. 基于数据库表做乐观锁,用于分布式锁。

3. 使用memcached的add()方法,用于分布式锁。

4. 使用memcached的cas()方法,用于分布式锁。(不常用)

5. 使用redis的setnx()、expire()方法,用于分布式锁。

6. 使用redis的setnx()、get()、getset()方法,用于分布式锁。

7. 使用redis的watch、multi、exec命令,用于分布式锁。(不常用)

8. 使用zookeeper,用于分布式锁。(不常用)

      第三步,基于数据库资源表做乐观锁,用于分布式锁:

1. 首先说明乐观锁的含义:

大多数是基于数据版本(version)的记录机制实现的。何谓数据版本号?即为数据增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表添加一个 “version”字段来实现读取出数据时,将此版本号一同读出,之后更新时,对此版本号加1。

在更新过程中,会对版本号进行比较,如果是一致的,没有发生改变,则会成功执行本次操作;如果版本号不一致,则会更新失败。

2. 对乐观锁的含义有了一定的了解后,结合具体的例子,我们来推演下我们应该怎么处理:

(1). 假设我们有一张资源表,如下图所示: t_resource , 其中有6个字段id, resoource,  state, add_time, update_time, version,分别表示表主键、资源、分配状态(1未分配  2已分配)、资源创建时间、资源更新时间、资源数据版本号。

(4). 假设我们现在我们对id=5780这条数据进行分配,那么非分布式场景的情况下,我们一般先查询出来state=1(未分配)的数据,然后从其中选取一条数据可以通过以下语句进行,如果可以更新成功,那么就说明已经占用了这个资源

update t_resource set state=2 where state=1 and id=5780。

(5). 如果在分布式场景中,由于数据库的update操作是原子是原子的,其实上边这条语句理论上也没有问题,但是这条语句如果在典型的“ABA”情况下,我们是无法感知的。有人可能会问什么是“ABA”问题呢?大家可以网上搜索一下,这里我说简单一点就是,如果在你第一次select和第二次update过程中,由于两次操作是非原子的,所以这过程中,如果有一个线程,先是占用了资源(state=2),然后又释放了资源(state=1),实际上最后你执行update操作的时候,是无法知道这个资源发生过变化的。也许你会说这个在你说的场景中应该也还好吧,但是在实际的使用过程中,比如银行账户存款或者扣款的过程中,这种情况是比较恐怖的。

(6). 那么如果使用乐观锁我们如何解决上边的问题呢?

a. 先执行select操作查询当前数据的数据版本号,比如当前数据版本号是26:

select id, resource, state,version from t_resource  where state=1 and id=5780;

b. 执行更新操作:

update t_resoure set state=2, version=27, update_time=now() where resource=xxxxxx and state=1 and version=26

c. 如果上述update语句真正更新影响到了一行数据,那就说明占位成功。如果没有更新影响到一行数据,则说明这个资源已经被别人占位了。

3. 通过2中的讲解,相信大家已经对如何基于数据库表做乐观锁有有了一定的了解了,但是这里还是需要说明一下基于数据库表做乐观锁的一些缺点:

(1). 这种操作方式,使原本一次的update操作,必须变为2次操作: select版本号一次;update一次。增加了数据库操作的次数。

(2). 如果业务场景中的一次业务流程中,多个资源都需要用保证数据一致性,那么如果全部使用基于数据库资源表的乐观锁,就要让每个资源都有一张资源表,这个在实际使用场景中肯定是无法满足的。而且这些都基于数据库操作,在高并发的要求下,对数据库连接的开销一定是无法忍受的。

(3). 乐观锁机制往往基于系统中的数据存储逻辑,因此可能会造成脏数据被更新到数据库中。在系统设计阶段,我们应该充分考虑到这些情况出现的可能性,并进行相应调整,如将乐观锁策略在数据库存储过程中实现,对外只开放基于此存储过程的数据更新途径,而不是将数据库表直接对外公开。

4. 讲了乐观锁的实现方式和缺点,是不是会觉得不敢使用乐观锁了呢???当然不是,在文章开头我自己的业务场景中,场景1和场景2的一部分都使用了基于数据库资源表的乐观锁,已经很好的解决了线上问题。所以大家要根据的具体业务场景选择技术方案,并不是随便找一个足够复杂、足够新潮的技术方案来解决业务问题就是好方案?!比如,如果在我的场景一中,我使用zookeeper做锁,可以这么做,但是真的有必要吗???答案觉得是没有必要的!!!

      第四步,使用memcached的add()方法,用于分布式锁:

      对于使用memcached的add()方法做分布式锁,这个在互联网公司是一种比较常见的方式,而且基本上可以解决自己手头上的大部分应用场景。在使用这个方法之前,只要能搞明白memcached的add()和set()的区别,并且知道为什么能用add()方法做分布式锁就好。如果还不知道add()和set()方法,请直接百度吧,这个需要自己了解一下。

我在这里想说明的是另外一个问题,人们在关注分布式锁设计的好坏时,还会重点关注这样一个问题,那就是是否可以避免死锁问题???!!!

      如果使用memcached的add()命令对资源占位成功了,那么是不是就完事儿了呢?当然不是!我们需要在add()的使用指定当前添加的这个key的有效时间,如果不指定有效时间,正常情况下,你可以在执行完自己的业务后,使用delete方法将这个key删除掉,也就是释放了占用的资源。但是,如果在占位成功后,memecached或者自己的业务服务器发生宕机了,那么这个资源将无法得到释放。所以通过对key设置超时时间,即便发生了宕机的情况,也不会将资源一直占用,可以避免死锁的问题。

第五步,使用memcached的cas()方法,用于分布式锁:

下篇文章我们再细说!

第六步,使用redis的setnx()、expire()方法,用于分布式锁:

      对于使用redis的setnx()、expire()来实现分布式锁,这个方案相对于memcached()的add()方案,redis占优势的是,其支持的数据类型更多,而memcached只支持String一种数据类型。除此之外,无论是从性能上来说,还是操作方便性来说,其实都没有太多的差异,完全看你的选择,比如公司中用哪个比较多,你就可以用哪个。

首先说明一下setnx()命令,setnx的含义就是SET if Not Exists,其主要有两个参数 setnx(key, value)。该方法是原子的,如果key不存在,则设置当前key成功,返回1;如果当前key已经存在,则设置当前key失败,返回0。但是要注意的是setnx命令不能设置key的超时时间,只能通过expire()来对key设置。

具体的使用步骤如下:

1. setnx(lockkey, 1)  如果返回0,则说明占位失败;如果返回1,则说明占位成功

2. expire()命令对lockkey设置超时时间,为的是避免死锁问题。

3. 执行完业务代码后,可以通过delete命令删除key。

这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,在expire()命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题,所以如果要对其进行完善的话,可以使用redis的setnx()、get()和getset()方法来实现分布式锁。   

第七步,使用redis的setnx()、get()、getset()方法,用于分布式锁:

      这个方案的背景主要是在setnx()和expire()的方案上针对可能存在的死锁问题,做了一版优化。

那么先说明一下这三个命令,对于setnx()和get()这两个命令,相信不用再多说什么。那么getset()命令?这个命令主要有两个参数 getset(key,newValue)。该方法是原子的,对key设置newValue这个值,并且返回key原来的旧值。假设key原来是不存在的,那么多次执行这个命令,会出现下边的效果:

1. getset(key, “value1″)  返回nil   此时key的值会被设置为value1

2. getset(key, “value2″)  返回value1   此时key的值会被设置为value2

3. 依次类推!

介绍完要使用的命令后,具体的使用步骤如下:

1. setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。

2. get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。

3. 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。

4. 判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。

5. 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。

第八步,使用redis的watch、multi、exec命令,用于分布式锁:

下篇文章我们再细说!

第九步,使用zookeeper,用于分布式锁:

下篇文章我们再细说!

第十步,总结:

综上,关于分布式锁的第一篇文章我就写到这儿了,在文章中主要说明了日常项目中会比较常用到四种方案,大家掌握了这四种方案,其实在日常的工作中就可以解决很多业务场景下的分布式锁的问题。从文章开头我自己的实际使用中,也可以看到,这么说完全是有一定的依据。对于另外那三种方案,我会在下一篇关于分布式锁的文章中,和大家再探讨一下。

 常用的四种方案:

1. 基于数据库表做乐观锁,用于分布式锁。

2. 使用memcached的add()方法,用于分布式锁。

3. 使用redis的setnx()、expire()方法,用于分布式锁。

4. 使用redis的setnx()、get()、getset()方法,用于分布式锁。

不常用但是可以用于技术方案探讨的:

1. 使用memcached的cas()方法,用于分布式锁。

2. 使用redis的watch、multi、exec命令,用于分布式锁。

3. 使用zookeeper,用于分布式锁。

SimpleDateFormat的线程安全问题与解决方案

1. 原因

SimpleDateFormat(下面简称sdf)类内部有一个Calendar对象引用,它用来储存和这个sdf相关的日期信息,例如sdf.parse(dateStr), sdf.format(date) 诸如此类的方法参数传入的日期相关String, Date等等, 都是交友Calendar引用来储存的.这样就会导致一个问题,如果你的sdf是个static的, 那么多个thread 之间就会共享这个sdf, 同时也是共享这个Calendar引用, 并且, 观察 sdf.parse() 方法,你会发现有如下的调用:

复制代码
Date parse() {

  calendar.clear(); // 清理calendar

  ... // 执行一些操作, 设置 calendar 的日期什么的

  calendar.getTime(); // 获取calendar的时间

}
复制代码

这里会导致的问题就是, 如果 线程A 调用了 sdf.parse(), 并且进行了 calendar.clear()后还未执行calendar.getTime()的时候,线程B又调用了sdf.parse(), 这时候线程B也执行了sdf.clear()方法, 这样就导致线程A的的calendar数据被清空了(实际上A,B的同时被清空了). 又或者当 A 执行了calendar.clear() 后被挂起, 这时候B 开始调用sdf.parse()并顺利i结束, 这样 A 的 calendar内存储的的date 变成了后来B设置的calendar的date

 

2. 问题重现

复制代码
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author zhenwei.liu created on 2013 13-8-29 下午5:35
 * @version $Id$
 */
public class DateFormatTest extends Thread {
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");

    private String name;
    private String dateStr;
    private boolean sleep;

    public DateFormatTest(String name, String dateStr, boolean sleep) {
        this.name = name;
        this.dateStr = dateStr;
        this.sleep = sleep;
    }

    @Override
    public void run() {

        Date date = null;

        if (sleep) {
            try {
                TimeUnit.MILLISECONDS.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            date = sdf.parse(dateStr);
        } catch (ParseException e) {
            e.printStackTrace();
        }

        System.out.println(name + " : date: " + date);
    }

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executor = Executors.newCachedThreadPool();

        // A 会sleep 2s 后开始执行sdf.parse()
        executor.execute(new DateFormatTest("A", "1991-09-13", true));
        // B 打了断点,会卡在方法中间
        executor.execute(new DateFormatTest("B", "2013-09-13", false));

        executor.shutdown();
    }
}
复制代码

使用Debug模式执行这段代码,并在sdf.parse()方法里打上断点

复制代码
parse() {

    calendar.clear()

    // 这里打一个断点

    calendar.getTime()

}
复制代码

过程:

1) 首先A线程跑起来以后会进入sleep

2) B线程跑起来, 卡在断点处

3) A线程醒过来, 执行 calendar.clear(), 并将设置sdf.calendar的date为1991-09-13, 此时 A B 的 calendar 都为 1991-09-13

4) 让断点继续执行, 输出如下

A : date: Fri Sep 13 00:00:00 CDT 1991
B : date: Fri Sep 13 00:00:00 CDT 1991

这并不是我们期待的结果

 

 

3. 解决方案

最简单的解决方案我们可以把static去掉,这样每个新的线程都会有一个自己的sdf实例,从而避免线程安全的问题

然而,使用这种方法,在高并发的情况下会大量的new sdf以及销毁sdf,这样是非常耗费资源的

在并发情况下,网站的请求任务与线程执行情况大概可以理解为如下

例如Tomcat的线程池的最大Thread数为4, 现在需要执行的任务有1000个(理解为有1000个用户点了你的网站的某个功能),

而这1000个任务都会用到我们写的日期函数处理类

A) 假如说日期函数处理类使用的是new SimpleDateFormat的方法,那么这里就会有1000次sdf的创建和销毁

B) Java中提供了一种ThreadLocal的解决方案,它的工作方式是,每个线程只会有一个实例,也就是说我们执行完这1000个任务,总共只会实例化4个sdf.

而且,它并不会有多线程的并发问题,因为,单个线程执行任务肯定是顺序的,例如Thread #1负责执行Task #1-#250, 那么他是顺序而执行Task #1-#250

而Thread #2拥有自己的sdf实例,他也是顺序执行任务 Task #251-#500, 以此类推

下面是一个使用ThreadLocal解决sdf多线程问题的例子

复制代码
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * @author zhenwei.liu created on 2013 13-8-29 下午5:35
 * @version $Id$
 */
public class DateUtil {

    /** 锁对象 */
    private static final Object lockObj = new Object();

    /** 存放不同的日期模板格式的sdf的Map */
    private static Map<String, ThreadLocal<SimpleDateFormat>> sdfMap = new HashMap<String, ThreadLocal<SimpleDateFormat>>();

    /**
     * 返回一个ThreadLocal的sdf,每个线程只会new一次sdf
     * 
     * @param pattern
     * @return
     */
    private static SimpleDateFormat getSdf(final String pattern) {
        ThreadLocal<SimpleDateFormat> tl = sdfMap.get(pattern);

        // 此处的双重判断和同步是为了防止sdfMap这个单例被多次put重复的sdf
        if (tl == null) {
            synchronized (lockObj) {
                tl = sdfMap.get(pattern);
                if (tl == null) {
                    // 只有Map中还没有这个pattern的sdf才会生成新的sdf并放入map
                    System.out.println("put new sdf of pattern " + pattern + " to map");

                    // 这里是关键,使用ThreadLocal<SimpleDateFormat>替代原来直接new SimpleDateFormat
                    tl = new ThreadLocal<SimpleDateFormat>() {

                        @Override
                        protected SimpleDateFormat initialValue() {
                            System.out.println("thread: " + Thread.currentThread() + " init pattern: " + pattern);
                            return new SimpleDateFormat(pattern);
                        }
                    };
                    sdfMap.put(pattern, tl);
                }
            }
        }

        return tl.get();
    }

    /**
     * 是用ThreadLocal<SimpleDateFormat>来获取SimpleDateFormat,这样每个线程只会有一个SimpleDateFormat
     * 
     * @param date
     * @param pattern
     * @return
     */
    public static String format(Date date, String pattern) {
        return getSdf(pattern).format(date);
    }

    public static Date parse(String dateStr, String pattern) throws ParseException {
        return getSdf(pattern).parse(dateStr);
    }

}
复制代码

测试类

复制代码
import java.text.ParseException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author zhenyu.nie created on 2013 13-8-26 下午2:13
 * @version 1.0.0
 */
public class Test {

    public static void main(String[] args) {
        final String patten1 = "yyyy-MM-dd";
        final String patten2 = "yyyy-MM";

        Thread t1 = new Thread() {

            @Override
            public void run() {
                try {
                    DateUtil.parse("1992-09-13", patten1);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }
        };

        Thread t2 = new Thread() {

            @Override
            public void run() {
                try {
                    DateUtil.parse("2000-09", patten2);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }
        };

        Thread t3 = new Thread() {

            @Override
            public void run() {
                try {
                    DateUtil.parse("1992-09-13", patten1);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }
        };

        Thread t4 = new Thread() {

            @Override
            public void run() {
                try {
                    DateUtil.parse("2000-09", patten2);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }
        };

        Thread t5 = new Thread() {

            @Override
            public void run() {
                try {
                    DateUtil.parse("2000-09-13", patten1);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }
        };

        Thread t6 = new Thread() {

            @Override
            public void run() {
                try {
                    DateUtil.parse("2000-09", patten2);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }
        };

        System.out.println("单线程执行: ");
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(t1);
        exec.execute(t2);
        exec.execute(t3);
        exec.execute(t4);
        exec.execute(t5);
        exec.execute(t6);
        exec.shutdown();

        sleep(1000);

        System.out.println("双线程执行: ");
        ExecutorService exec2 = Executors.newFixedThreadPool(2);
        exec2.execute(t1);
        exec2.execute(t2);
        exec2.execute(t3);
        exec2.execute(t4);
        exec2.execute(t5);
        exec2.execute(t6);
        exec2.shutdown();
    }

    private static void sleep(long millSec) {
        try {
            TimeUnit.MILLISECONDS.sleep(millSec);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
复制代码

输出

单线程执行:
put new sdf of pattern yyyy-MM-dd to map
thread: Thread[pool-1-thread-1,5,main] init pattern: yyyy-MM-dd
put new sdf of pattern yyyy-MM to map
thread: Thread[pool-1-thread-1,5,main] init pattern: yyyy-MM
双线程执行:
thread: Thread[pool-2-thread-1,5,main] init pattern: yyyy-MM-dd
thread: Thread[pool-2-thread-2,5,main] init pattern: yyyy-MM
thread: Thread[pool-2-thread-1,5,main] init pattern: yyyy-MM
thread: Thread[pool-2-thread-2,5,main] init pattern: yyyy-MM-dd

从输出我们可以看出:

1) 1个线程执行这6个任务的时候,这个线程首次使用过的时候会new一个新的sdf,并且以后都一直用这个sdf,而不是每次处理任务都新建一个新的sdf

2) 2个线程执行6个任务的时候也是同理,但是2个线程的sdf是分开的,每个线程都有自己的”yyyy-MM-dd”, “yyyy-MM”的sdf,所以他们不会有线程安全安全问题

试想,如果使用的是new的实现方法,那么不管是用1个线程去执行,还是用2个线程去执行这6个任务,都需要new 6个sdf

Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

摘要  在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下这三个辅助类的用法。   以下是本文目录大纲:   一.CountDownLatch用法   二.CyclicBarri…

  在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下这三个辅助类的用法。

以下是本文目录大纲:

一.CountDownLatch用法

二.CyclicBarrier用法

三.Semaphore用法

若有不正之处请多多谅解,并欢迎批评指正。

请尊重作者劳动成果,转载请标明原文链接:

http://www.cnblogs.com/dolphin0520/p/3920397.html

 

一.CountDownLatch用法

CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。

CountDownLatch类只提供了一个构造器:

1
public CountDownLatch(int count) {  };  //参数count为计数值

然后下面这3个方法是CountDownLatch类中最重要的方法:

1
2
3
public void await() throws InterruptedException { };   //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown() { };  //将count值减1

下面看一个例子大家就清楚CountDownLatch的用法了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Test {
     public static void main(String[] args) {   
         final CountDownLatch latch = new CountDownLatch(2);
         
         new Thread(){
             public void run() {
                 try {
                     System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
                    Thread.sleep(3000);
                    System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
                    latch.countDown();
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
             };
         }.start();
         
         new Thread(){
             public void run() {
                 try {
                     System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
                     Thread.sleep(3000);
                     System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
                     latch.countDown();
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
             };
         }.start();
         
         try {
             System.out.println("等待2个子线程执行完毕...");
            latch.await();
            System.out.println("2个子线程已经执行完毕");
            System.out.println("继续执行主线程");
        catch (InterruptedException e) {
            e.printStackTrace();
        }
     }
}

执行结果:

复制代码
线程Thread-0正在执行
线程Thread-1正在执行
等待2个子线程执行完毕...
线程Thread-0执行完毕
线程Thread-1执行完毕
2个子线程已经执行完毕
继续执行主线程
复制代码

二.CyclicBarrier用法

字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:

1
2
3
4
5
public CyclicBarrier(int parties, Runnable barrierAction) {
}
public CyclicBarrier(int parties) {
}

参数parties指让多少个线程或者任务等待至barrier状态;参数barrierAction为当这些线程都达到barrier状态时会执行的内容。

然后CyclicBarrier中最重要的方法就是await方法,它有2个重载版本:

1
2
public int await() throws InterruptedException, BrokenBarrierException { };
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };

第一个版本比较常用,用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;

第二个版本是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务。

下面举几个例子就明白了:

假若有若干个线程都要进行写数据操作,并且只有所有线程都完成写数据操作之后,这些线程才能继续做后面的事情,此时就可以利用CyclicBarrier了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
        }
    }
}

执行结果:

复制代码
线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
复制代码

从上面输出结果可以看出,每个写入线程执行完写数据操作之后,就在等待其他线程写入操作完毕。

当所有线程线程写入操作完毕之后,所有线程就继续进行后续的操作了。

如果说想在所有线程写入操作完之后,进行额外的其他操作可以为CyclicBarrier提供Runnable参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N,new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程"+Thread.currentThread().getName());   
            }
        });
        
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
        }
    }
}

运行结果:

复制代码
线程Thread-0正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2正在写入数据...
线程Thread-3正在写入数据...
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
当前线程Thread-3
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
复制代码

从结果可以看出,当四个线程都到达barrier状态后,会从四个线程中选择一个线程去执行Runnable。

下面看一下为await指定时间的效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        
        for(int i=0;i<N;i++) {
            if(i<N-1)
                new Writer(barrier).start();
            else {
                try {
                    Thread.sleep(5000);
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                new Writer(barrier).start();
            }
        }
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                try {
                    cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
                catch (TimeoutException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"所有线程写入完毕,继续处理其他任务...");
        }
    }
}

执行结果:

复制代码
线程Thread-0正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-3正在写入数据...
java.util.concurrent.TimeoutException
Thread-1所有线程写入完毕,继续处理其他任务...
Thread-0所有线程写入完毕,继续处理其他任务...
    at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
    at java.util.concurrent.CyclicBarrier.await(Unknown Source)
    at com.cxh.test1.Test$Writer.run(Test.java:58)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
    at java.util.concurrent.CyclicBarrier.await(Unknown Source)
    at com.cxh.test1.Test$Writer.run(Test.java:58)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
    at java.util.concurrent.CyclicBarrier.await(Unknown Source)
    at com.cxh.test1.Test$Writer.run(Test.java:58)
Thread-2所有线程写入完毕,继续处理其他任务...
java.util.concurrent.BrokenBarrierException
线程Thread-3写入数据完毕,等待其他线程写入完毕
    at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
    at java.util.concurrent.CyclicBarrier.await(Unknown Source)
    at com.cxh.test1.Test$Writer.run(Test.java:58)
Thread-3所有线程写入完毕,继续处理其他任务...
复制代码

上面的代码在main方法的for循环中,故意让最后一个线程启动延迟,因为在前面三个线程都达到barrier之后,等待了指定的时间发现第四个线程还没有达到barrier,就抛出异常并继续执行后面的任务。

另外CyclicBarrier是可以重用的,看下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        
        for(int i=0;i<N;i++) {
            new Writer(barrier).start();
        }
        
        try {
            Thread.sleep(25000);
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("CyclicBarrier重用");
        
        for(int i=0;i<N;i++) {
            new Writer(barrier).start();
        }
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
            
                cyclicBarrier.await();
            catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"所有线程写入完毕,继续处理其他任务...");
        }
    }
}

执行结果:

复制代码
线程Thread-0正在写入数据...
线程Thread-1正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
Thread-0所有线程写入完毕,继续处理其他任务...
Thread-3所有线程写入完毕,继续处理其他任务...
Thread-1所有线程写入完毕,继续处理其他任务...
Thread-2所有线程写入完毕,继续处理其他任务...
CyclicBarrier重用
线程Thread-4正在写入数据...
线程Thread-5正在写入数据...
线程Thread-6正在写入数据...
线程Thread-7正在写入数据...
线程Thread-7写入数据完毕,等待其他线程写入完毕
线程Thread-5写入数据完毕,等待其他线程写入完毕
线程Thread-6写入数据完毕,等待其他线程写入完毕
线程Thread-4写入数据完毕,等待其他线程写入完毕
Thread-4所有线程写入完毕,继续处理其他任务...
Thread-5所有线程写入完毕,继续处理其他任务...
Thread-6所有线程写入完毕,继续处理其他任务...
Thread-7所有线程写入完毕,继续处理其他任务...
复制代码

从执行结果可以看出,在初次的4个线程越过barrier状态后,又可以用来进行新一轮的使用。而CountDownLatch无法进行重复使用。

三.Semaphore用法

Semaphore翻译成字面意思为 信号量,Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

Semaphore类位于java.util.concurrent包下,它提供了2个构造器:

1
2
3
4
5
6
public Semaphore(int permits) {          //参数permits表示许可数目,即同时可以允许多少线程进行访问
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {    //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
    sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}

下面说一下Semaphore类中比较重要的几个方法,首先是acquire()、release()方法:

1
2
3
4
public void acquire() throws InterruptedException {  }     //获取一个许可
public void acquire(int permits) throws InterruptedException { }    //获取permits个许可
public void release() { }          //释放一个许可
public void release(int permits) { }    //释放permits个许可

acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。

release()用来释放许可。注意,在释放许可之前,必须先获获得许可。

这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:

1
2
3
4
public boolean tryAcquire() { };    //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };  //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false

另外还可以通过availablePermits()方法得到可用的许可数目。

下面通过一个例子来看一下Semaphore的具体使用:

假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Test {
    public static void main(String[] args) {
        int N = 8;            //工人数
        Semaphore semaphore = new Semaphore(5); //机器数目
        for(int i=0;i<N;i++)
            new Worker(i,semaphore).start();
    }
    
    static class Worker extends Thread{
        private int num;
        private Semaphore semaphore;
        public Worker(int num,Semaphore semaphore){
            this.num = num;
            this.semaphore = semaphore;
        }
        
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("工人"+this.num+"占用一个机器在生产...");
                Thread.sleep(2000);
                System.out.println("工人"+this.num+"释放出机器");
                semaphore.release();           
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果:

复制代码
工人0占用一个机器在生产...
工人1占用一个机器在生产...
工人2占用一个机器在生产...
工人4占用一个机器在生产...
工人5占用一个机器在生产...
工人0释放出机器
工人2释放出机器
工人3占用一个机器在生产...
工人7占用一个机器在生产...
工人4释放出机器
工人5释放出机器
工人1释放出机器
工人6占用一个机器在生产...
工人3释放出机器
工人7释放出机器
工人6释放出机器
复制代码

下面对上面说的三个辅助类进行一个总结:

1)CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;

而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;

另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。

2)Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。
云栖社区站内文章,如需转载,请保留作者和出处(云栖社区),并邮件通知云栖社区(yqeditor@list.alibaba-inc.com)。

线程池调整真的很重要

来源:blog.bramp.net

翻译:ImportNew

链接:http://www.importnew.com/17633.html

知道吗,你的Java web应用其实是使用线程池来处理请求的。这一实现细节被许多人忽略,但是你迟早都需要理解线程池如何使用,以及如何正确地根据应用调整线程池配置。这篇文章的目的是为了解释线程模型——什么是线程池、以及怎样正确地配置线程池。

单线程模型

让我们从一些基础的线程模型开始,然后再随着线程模型的演变进行更深一步的学习。你使用的任何应用服务器或框架,如Tomcat、Dropwizard、Jetty等,它们的基本原理其实是相同的。Web服务器的最底层实际上是一个socket。这个socket监听并接受到达的TCP连接。一旦一个连接被建立,就可以通过这个新建立的连接读取、解析信息,然后将这些信息包装成一个HTTP请求。这个HTTP请求还将被移交至web应用程序,来完成请求的动作。

我们将通过一个简单的服务器程序来展示线程在其中所起到的作用。这个服务器程序展示了大部分应用服务器的底层实现细节。让我们以一个简单的单线程web服务器程序开始,它的代码像下面这样:

ServerSocket listener = new ServerSocket(8080);

try {

while (true) {

Socket socket = listener.accept();

try {

handleRequest(socket);

} catch (IOException e) {

e.printStackTrace();

}

}

} finally {

listener.close();

}

这段代码在8080端口上创建了一个ServerSocket,紧接着通过循环来监听和接受新到达的连接。一旦连接建立,会将socket传递给handleRequest方法。这个方法可能会读取该HTTP请求,处理这个请求,然后写回一个响应。在这个简单的例子中,handleRequest方法从socket中读取简单的一行数据,然后返回一个简短的HTTP响应。但是,handleRequest有可能需要处理一些更复杂的任务,例如读数据库或者执行其它一些IO操作。

final static String response =

“HTTP/1.0 200 OKrn” +

“Content-type: text/plainrn” +

“rn” +

“Hello Worldrn”;

 

public static void handleRequest(Socket socket) throws IOException {

// Read the input stream, and return “200 OK”

try {

BufferedReader in = new BufferedReader(

new InputStreamReader(socket.getInputStream()));

 

log.info(in.readLine());

 

OutputStream out = socket.getOutputStream();

out.write(response.getBytes(StandardCharsets.UTF_8));

} finally {

socket.close();

}

}

因为只有一个线程处理所有的socket,因此只有在完全处理好一个请求后,才能再接受下一个请求。在实际的应用中,handleRequest方法可能需要经过100毫秒才能返回,那么这个服务器程序在一秒中,只能按顺序处理10个请求。

多线程模型

尽管handleRequest可能会被IO操作阻塞,CPU却可能是空闲的,它可以处理其它更多请求,但这对单线程模型来说是不能实现的。因此,通过创建多个线程,可以使服务器程序实现并发操作:

public static class HandleRequestRunnable implements Runnable {

final Socket socket;

 

public HandleRequestRunnable(Socket socket) {

this.socket = socket;

}

 

public void run() {

try {

handleRequest(socket);

} catch (IOException e) {

e.printStackTrace();

}

}

}

 

// Main loop here

ServerSocket listener = new ServerSocket(8080);

try {

while (true) {

Socket socket = listener.accept();

new Thread( new HandleRequestRunnable(socket) ).start();

}

} finally {

listener.close();

}

上面这段代码中,accept()方法仍然是在一个单线程循环中被调用。但是当TCP连接建立,socket创建时,服务器就创建一个新的线程。这个新生的线程将执行和单线程模型中一样的handleRequest方法。

新线程的建立使调用accept方法的线程能够处理更多的TCP连接,这样服务器就能并发地处理请求了。这一技术被称为“thread per request”(一个线程处理一个请求),也是现在最流行的服务器技术。值得注意的是,还有一些其它的服务器技术,如NGINX和Node.js采用的事件驱动异步模型,它们都没有使用线程池。因此,它们都不在本文的讨论范围内。

“thread per request”方式里创建新线程(稍后销毁这个线程)的操作是昂贵的,因为Java虚拟机和操作系统都需要为这一操作分配资源。另外,在上面那段代码的中,可以创建的线程数量是不受限制的。这么做的隐患很大,因为它可能导致服务器资源迅速枯竭。

资源枯竭

每个线程都需要一定的内存空间来作为自己的栈空间。在最近的64位虚拟机版本中,默认的栈空间是1024KB。如果server收到很多请求,或者handleRequest方法的执行时间变得比较长,就会造成服务器产生很多并发线程。如果要维护1000个线程,仅就栈空间而言,虚拟机就必须耗费1GB的RAM空间。另外,为处理请求,每个线程都会在堆上产生许多对象,这就有可能导致虚拟机的堆空间被迅速占满,给虚拟机的垃圾收集器带来很大压力,造成频繁的垃圾回收,最终导致OutOfMemoryErrors。

线程消耗的不仅是RAM资源,这些线程还可能消耗其它有限的资源,例如文件句柄、数据库连接等。过多地消耗这类资源可能导致一些其它的错误或造成系统崩溃。因此,要防止系统资源被线程耗尽,就必须对服务器产生的线程数量做出限制。

通过使用-Xss参数来调整每个线程的栈空间,可以在一定程度上解决资源枯竭的问题,但它绝不是灵丹妙药。一个小的栈空间可以使得每个线程占用的内存减小,但这样可能会造成StackOverflowErrors栈溢出错误。栈空间的调整方式不尽相同,但是对许多应用来说,1024KB过于浪费了,而256KB或512KB会更加合适。Java所允许的最小栈空间的大小是160KB。

线程池

可以通过一个简单的线程池来避免持续地创建新线程,限制最大线程数量。线程池跟踪着所有线程,在线程数量达到上限前,它会创建新的线程,当有空闲线程时,它会使用空闲线程。

ServerSocket listener = new ServerSocket(8080);

ExecutorService executor = Executors.newFixedThreadPool(4);

try {

while (true) {

Socket socket = listener.accept();

executor.submit( new HandleRequestRunnable(socket) );

}

} finally {

listener.close();

}

上面这段代码使用了ExecutorService类来提交任务(Runnable)。提交的任务将会被线程池中的线程执行,而不是通过新创建的线程执行。在这个例子中,所有的请求都通过一个线程数量固定为4的线程池来完成。这个线程池限制了并发执行的请求数量,从而限制了系统资源的使用。

除了newFixedThreadPool方法创建的线程池外,Executors类还提供了newCachedThreadPool 方法来创建线程池。这种线程池同样有无法限制线程数量的问题,但是它会优先使用线程池中已创建的空闲线程来处理请求。这种类型的线程池特别适用于执行短期任务的请求,因为它们不会长时间的阻塞外部资源。

ThreadPoolExecutors 类也可以直接创建,这样就可以对它进行一些个性化的配置。例如可以配置线程池内最小线程数和最大线程数,也可以配置线程创建和销毁的策略。稍后,本文将介绍这样的例子。

工作队列

对于线程数量固定的线程池,善于观察的读者可能会提出这样的一个疑问:当线程池中的线程都在工作时,一个新的请求到达,会发生什么呢?当线程池中的线程都在工作时,ThreadPoolExecutor可能会使用一个队列来组织新到达的请求,直到线程池中有空闲的线程可以使用。Executors.nexFixedThreadPool方法会默认创建一个没有长度限制的LinkedList。这个LinkedList也可能会产生系统资源耗尽的问题,虽然这个过程会比较缓慢,因为队列中的请求所占用的资源比线程占用的资源要少得多。但是在我们的例子中,队列中的每个请求都保持着一个socket,而每一个socket都需要打开一个文件句柄,操作系统对同时打开的文件句柄数量是有限制的,所以队列中保持socket并不是一个好的方式,除非必须这么做。因此,限制工作队列的长度也是有意义的。

public static ExecutorService newBoundedFixedThreadPool(int nThreads, int capacity) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>(capacity),

new ThreadPoolExecutor.DiscardPolicy());

}

 

public static void boundedThreadPoolServerSocket() throws IOException {

ServerSocket listener = new ServerSocket(8080);

ExecutorService executor = newBoundedFixedThreadPool(4, 16);

try {

while (true) {

Socket socket = listener.accept();

executor.submit( new HandleRequestRunnable(socket) );

}

} finally {

listener.close();

}

}

我们再一次创建一个线程池,这一次我们没有使用Executors.newFixedThreadPool方法,而是自定义了一个ThreadPoolExecutor,在构造方法中传递了一个大小限制为16个元素的LinkedBlockingQueue。同样的,类ArrayBlockingQueue也可以被用来限制队列的长度。

如果所有的线程都在执行任务,而且工作队列也被请求填满了,此时对于新到达请求的处理方式,取决于ThreadPoolExecutor构造方法的最后一个参数。在我们这个例子中,我们使用的是DiscardPolicy,这个参数会让线程池丢弃新到达的请求。还有一些其它的处理策略,例如AbortPolicy会让Executor抛出一个异常,CallerRunsPolicy会使任务在它的调用端线程池中执行。CallerRunsPolicy策略提供了一个简单的方式来限制任务提交的速度。但是这样做可能是有害的,因为它会阻塞一个原本不应被阻塞的线程。

一个好的默认策略应该是Discard或Abort,它们都会使线程池丢弃新到达的任务。这样服务器就能容易地向客户端响应一个错误,例如HTTP的503错误“Service unavailable”。有的人可能会认为,队列的长度应该是允许增长的,这样所有的任务最终都能被执行。但是用户是不愿意长时间等待的,而且若任务到达的速度超过任务处理的速度,队列将会无限地增长。队列是被用来缓冲突然爆发的请求,或者处理短期任务的,通常情况下,队列应该是空的。

多少线程合适呢?

现在,我们知道了如何创建一个线程池。但是有一个更困难的问题,线程池里应该创建多少个线程呢?我们已经知道了线程池中的最大线程数量应该被限制,才不会导致系统资源耗尽。这些系统资源包括了内存(堆栈)、打开的文件句柄、打开的TCP连接、打开的数据库连接以及其它有限的系统资源。相反的,如果线程执行的是CPU密集型任务而不是IO密集型任务,服务器的物理内核数就应该被视为是有限的资源,这样创建的线程数就不应该超过系统的内核数。

系统应创建多少线程取决于这个应用执行的任务。开发人员应使用现实的请求来对系统进行负载测试,测试不同的线程池大小配置对系统的影响。每次测试都增加线程池的大小,直到系统达到崩溃的临界点。这个方法使你可以发现线程池线程数量的上限。超过这个上限,系统的资源将耗尽。在某些情况下,可以谨慎地增加系统的资源,例如分配更多的RAM空间给JVM,或者调整操作系统使其支持同时打开更多的文件句柄。然而,在某些情况下创建的线程数量会达到我们测试出的理论上限,这非常值得我们注意。稍后还会看到这方面的内容。

利特尔法则

排队论,特别的,Little’s Law,可以用来帮助我们理解线程池的一些特性。简单地说,利特尔法则解释了这三种变量的关系:L—系统里的请求数量、λ—请求到达的速率和W—每个请求的处理时间。例如,如果每秒10个请求到达,处理一个请求需要1秒,那么系统在每个时刻都有10个请求在处理。如果处理每个请求的时间翻倍,那么系统每时刻需要处理的请求数也翻倍为20,因此需要20个线程。

任务的执行时间对于系统中正在处理的请求数量有着很大的影响,一些后端资源的迟延,例如数据库,通常会使得请求的处理时间被延长,从而导致线程池中的线程被迅速用尽。因此,理论上测出的线程数上限对于这种情况就不是很合适,这个上限值还应该考虑到线程的执行时间,并结合理论上的上限值。

例如,假设JVM最多能同时处理的请求数为1000。如果我们预计每个请求需要耗费的时间不超过30秒,那么,在最坏的情况下我们每秒能同时处理的请求数不会超过33 ⅓个。但是,如果一切都很顺利,每个请求只需使用500ms就可以完成,那么通过1000个线程应用每秒就可以处理2000个请求。当系统突然出现短暂的任务执行迟延的问题时,通过使用一个队列来减缓这一问题是可行的。

为什么线程数配置不当会带来麻烦?

如果线程池的线程数量过少,我们就无法充分利用系统资源,这使得用户需要花费很长时间来等待请求的响应。但是,如果允许创建过多的线程,系统的资源又会被耗尽,这会对系统造成更大的破坏。

不仅仅是本地的资源被耗尽,其它一些应用也会受到影响。例如,许多应用都使用同一个后端数据库进行查询等操作。数据库有并发连接数量的限制。如果一个应用不加限制地占用了所有数据库连接,其它获取数据库连接的应用都将被阻塞。这将导致许多应用运行中断。

更糟的是,资源耗尽还会引发一些连锁故障。设想这样一个场景,一个应用有许多个实例,这些实例都运行在一个负载均衡器之后。如果一个实例因为过多的请求而占用了过多内存,JVM就需要花更多的时间进行垃圾收集工作,那么JVM处理请求的时间就减少了。这样一来,这个应用实例处理请求的能力降低了,系统中的其它实例就必须处理更多的请求。其它的实例也会因为请求数过多以及线程池大小没有限制的原因产生资源枯竭等问题。这些实例用尽了内存资源,导致虚拟机进行频繁地内存收集操作。这样的恶性循环会在这些实例中产生,直到整个系统奔溃。

我见过许多没有进行负载测试的应用,这些应用能够创建任意多的线程。通常情况下,这些应用只要很少数量的线程就能处理好以一定速率到达的请求。但是,如果应用需要使用其它的一些远程服务来处理用户请求,而这个远程服务的处理能力突然降低了,这将增加【大】W的值(应用处理请求的平均时间)。这样,线程池的线程就会被迅速用尽。如果对应用进行线程数量的负载测试,那么资源枯竭问题就会在测试中显现出来。

多少个线程池合适?

对于微服务架构和面向服务的架构(SOA)来说,它们通常需要请求一些后端服务。线程池的配置非常容易导致程序失败,因此必须谨慎地配置线程池。如果远程服务的性能下降,系统中的线程数量就会迅速达到线程池的上限,其它后续到达的服务就会被丢弃。这些后续的请求也许并不是要使用性能出现故障的服务,但是它们都只能被丢弃了。

针对不同的后端服务请求,设置不同的线程池可以解决这一问题。在这个模式中,仍然使用同一个线程池来处理用户的请求,但是当用户的请求需要调用一个远程服务时,这个任务就被传递给一个指定的后端线程池。这样处理用户请求的主线程池就不会因为调用后端服务而产生很大的负担。当后端服务出现故障时,只有调用这个服务的线程池才会受到影响。

使用多个线程池还有一个好处,就是它能帮助避免出现死锁问题。如果每个空闲线程都因为一个尚未处理完毕的请求阻塞,就会发生死锁,没有一个线程可以继续往下执行。如果使用多个线程池,理解好每个线程池应负责的工作,那么死锁的问题就能在一定程度上避免。

截止时间和一些最佳实践

一个最佳实践是给需要远程调用的请求规定一个截止时间。如果远程服务在规定的时间内没有响应,就丢弃这个请求。这样的技术也可以用在线程池中,如果线程处理某个请求的时间超过了规定时间,那么这个线程就应被停止,为新到达的请求腾出资源,这样也就给W(处理请求的平均时间)规定了上限。虽然这样的做法看起来有些浪费,但是如果一个用户(特别是当用户在使用浏览器时),在等待请求的响应,那么30秒以后,浏览器无论如何也会放弃这个请求,或者更有可能的是:用户不会耐心地等待这个请求响应,而是进行其它操作去了。

快速失败也是一个可以用来处理后端请求的线程池方案。如果后端服务失效了,线程池中的线程数会迅速到达上限,这些线程都在等待没有响应的后端服务。如果使用快速失败机制,当后端服务被标记为失效时,所有的后续请求都会迅速失败,而不是进行不必要的等待。当然,它也需要一种机制来判断后端何时恢复为可用的。

最后,如果一个请求需要独立地调用多个后端服务,那么这个请求就应能并行地调用这些后端服务,而不是顺序地进行。这样就能降低请求的等待时间,但这是以增加线程数为代价的。

幸运的是,有一个非常好的库hystrix,这个库封装了许多很好的线程策略,然后以非常简单和友好的方式将这些借口暴露出来。

总结

我希望这篇文章能改进你对线程池的理解。一个合适的线程池配置需要理解应用的需求,还需要考虑这几个因素,系统允许的最大线程数、处理用户请求所需的时间。好的线程池配置不仅可以避免系统出现连锁故障,还能帮助计划和提供服务。

即使你的应用没有直接地使用一个线程池,它们也间接地通过应用服务器或其它更高级的抽象形式使用了线程池。Tomcat、JBoss、Undertow、Dropwizard 都提供了多种可配置的线程池(这些线程池正是你编写的Servlet运行的地方)。

深入理解Java之线程池(下)

接上文

下面是这三个静态方法的具体实现;

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>());

}

public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>()));

}

public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue<Runnable>());

}

从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。

四.如何合理配置线程池的大小

本节来讨论一个比较重要的话题:如何合理配置线程池大小,仅供参考。

一般需要根据任务的类型来配置线程池大小:

如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

如果是IO密集型任务,参考值可以设置为2*NCPU

当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

参考资料:

http://ifeve.com/java-threadpool/

http://blog.163.com/among_1985/blog/static/275005232012618849266/

http://developer.51cto.com/art/201203/321885.htm

http://blog.csdn.net/java2000_wl/article/details/22097059

http://blog.csdn.net/cutesource/article/details/6061229

http://blog.csdn.net/xieyuooo/article/details/8718741

《JDK API 1.6》

深入理解Java之线程池(中)

接上文

下面我们看一下Worker类的实现:

private final class Worker implements Runnable {

private final ReentrantLock runLock = new ReentrantLock();

private Runnable firstTask;

volatile long completedTasks;

Thread thread;

Worker(Runnable firstTask) {

this.firstTask = firstTask;

}

boolean isActive() {

return runLock.isLocked();

}

void interruptIfIdle() {

final ReentrantLock runLock = this.runLock;

if (runLock.tryLock()) {

try {

if (thread != Thread.currentThread())

thread.interrupt();

} finally {

runLock.unlock();

}

}

}

void interruptNow() {

thread.interrupt();

}

 

private void runTask(Runnable task) {

final ReentrantLock runLock = this.runLock;

runLock.lock();

try {

if (runState < STOP &&

Thread.interrupted() &&

runState >= STOP)

boolean ran = false;

beforeExecute(thread, task);   //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据

//自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等

try {

task.run();

ran = true;

afterExecute(task, null);

++completedTasks;

} catch (RuntimeException ex) {

if (!ran)

afterExecute(task, ex);

throw ex;

}

} finally {

runLock.unlock();

}

}

 

public void run() {

try {

Runnable task = firstTask;

firstTask = null;

while (task != null || (task = getTask()) != null) {

runTask(task);

task = null;

}

} finally {

workerDone(this);   //当任务队列中没有任务时,进行清理工作

}

}

}

它实际上实现了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面这句的效果基本一样:

Thread t = new Thread(w);

相当于传进去了一个Runnable任务,在线程t中执行这个Runnable。

既然Worker实现了Runnable接口,那么自然最核心的方法便是run()方法了:

public void run() {

try {

Runnable task = firstTask;

firstTask = null;

while (task != null || (task = getTask()) != null) {

runTask(task);

task = null;

}

} finally {

workerDone(this);

}

}

从run方法的实现可以看出,它首先执行的是通过构造器传进来的任务firstTask,在调用runTask()执行完firstTask之后,在while循环里面不断通过getTask()去取新的任务来执行,那么去哪里取呢?自然是从任务缓存队列里面去取,getTask是ThreadPoolExecutor类中的方法,并不是Worker类中的方法,下面是getTask方法的实现:

Runnable getTask() {

for (;;) {

try {

int state = runState;

if (state > SHUTDOWN)

return null;

Runnable r;

if (state == SHUTDOWN)  // Help drain queue

r = workQueue.poll();

else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,

//则通过poll取任务,若等待一定的时间取不到任务,则返回null

r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);

else

r = workQueue.take();

if (r != null)

return r;

if (workerCanExit()) {    //如果没取到任务,即r为null,则判断当前的worker是否可以退出

if (runState >= SHUTDOWN) // Wake up others

interruptIdleWorkers();   //中断处于空闲状态的worker

return null;

}

// Else retry

} catch (InterruptedException ie) {

// On interruption, re-check runState

}

}

}

在getTask中,先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。

如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。

如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。

然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出,我们看一下workerCanExit()的实现:

private boolean workerCanExit() {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

boolean canExit;

//如果runState大于等于STOP,或者任务缓存队列为空了

//或者  允许为核心池线程设置空闲存活时间并且线程池中的线程数目大于1

try {

canExit = runState >= STOP ||

workQueue.isEmpty() ||

(allowCoreThreadTimeOut &&

poolSize > Math.max(1, corePoolSize));

} finally {

mainLock.unlock();

}

return canExit;

}

也就是说如果线程池处于STOP状态、或者任务队列已为空或者允许为核心池线程设置空闲存活时间并且线程数大于1时,允许worker退出。如果允许worker退出,则调用interruptIdleWorkers()中断处于空闲状态的worker,我们看一下interruptIdleWorkers()的实现:

void interruptIdleWorkers() {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

for (Worker w : workers)  //实际上调用的是worker的interruptIfIdle()方法

w.interruptIfIdle();

} finally {

mainLock.unlock();

}

}

从实现可以看出,它实际上调用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

void interruptIfIdle() {

final ReentrantLock runLock = this.runLock;

if (runLock.tryLock()) {    //注意这里,是调用tryLock()来获取锁的,因为如果当前worker正在执行任务,锁已经被获取了,是无法获取到锁的

//如果成功获取了锁,说明当前worker处于空闲状态

try {

if (thread != Thread.currentThread())

thread.interrupt();

} finally {

runLock.unlock();

}

}

}

这里有一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面取任务来执行。

我们再看addIfUnderMaximumPoolSize方法的实现,这个方法的实现思想和addIfUnderCorePoolSize方法的实现思想非常相似,唯一的区别在于addIfUnderMaximumPoolSize方法是在线程池中的线程数达到了核心池大小并且往任务队列中添加任务失败的情况下执行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {

Thread t = null;

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

if (poolSize < maximumPoolSize && runState == RUNNING)

t = addThread(firstTask);

} finally {

mainLock.unlock();

}

if (t == null)

return false;

t.start();

return true;

}

看到没有,其实它和addIfUnderCorePoolSize方法的实现基本一模一样,只是if语句判断条件中的poolSize < maximumPoolSize不同而已。

到这里,大部分朋友应该对任务提交给线程池之后到被执行的整个过程有了一个基本的了解,下面总结一下:

1)首先,要清楚corePoolSize和maximumPoolSize的含义;

2)其次,要知道Worker是用来起到什么作用的;

3)要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:

  • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
  • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
  • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

3.线程池中的线程初始化

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

  • prestartCoreThread():初始化一个核心线程;
  • prestartAllCoreThreads():初始化所有核心线程

下面是这2个方法的实现:

public boolean prestartCoreThread() {

return addIfUnderCorePoolSize(null); //注意传进去的参数是null

}

 

public int prestartAllCoreThreads() {

int n = 0;

while (addIfUnderCorePoolSize(null))//注意传进去的参数是null

++n;

return n;

}

注意上面传进去的参数是null,根据第2小节的分析可知如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的

r = workQueue.take();

即等待任务队列中有任务。

4.任务缓存队列及排队策略

在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。

workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:

1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

5.任务拒绝策略

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

6.线程池的关闭

ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

  • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
  • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

7.线程池容量的动态调整

ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:设置核心池大小
  • setMaximumPoolSize:设置线程池最大能创建的线程数目大小

当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

三.使用示例

前面我们讨论了关于线程池的实现原理,这一节我们来看一下它的具体使用:

public class Test {

public static void main(String[] args) {

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,

new ArrayBlockingQueue<Runnable>(5));

 

for(int i=0;i<15;i++){

MyTask myTask = new MyTask(i);

executor.execute(myTask);

System.out.println(“线程池中线程数目:”+executor.getPoolSize()+”,队列中等待执行的任务数目:”+

executor.getQueue().size()+”,已执行玩别的任务数目:”+executor.getCompletedTaskCount());

}

executor.shutdown();

}

}

 

class MyTask implements Runnable {

private int taskNum;

 

public MyTask(int num) {

this.taskNum = num;

}

 

@Override

public void run() {

System.out.println(“正在执行task “+taskNum);

try {

Thread.currentThread().sleep(4000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(“task “+taskNum+”执行完毕”);

}

}

执行结果:

正在执行task 0

线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0

线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0

正在执行task 1

线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0

正在执行task 2

线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0

正在执行task 3

线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0

正在执行task 4

线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0

线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0

线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0

线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0

线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0

线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0

正在执行task 10

线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0

正在执行task 11

线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0

正在执行task 12

线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0

正在执行task 13

线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0

正在执行task 14

task 3执行完毕

task 0执行完毕

task 2执行完毕

task 1执行完毕

正在执行task 8

正在执行task 7

正在执行task 6

正在执行task 5

task 4执行完毕

task 10执行完毕

task 11执行完毕

task 13执行完毕

task 12执行完毕

正在执行task 9

task 14执行完毕

task 8执行完毕

task 5执行完毕

task 7执行完毕

task 6执行完毕

task 9执行完毕

从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。

不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE

Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池

Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池

接下文

深入理解Java之线程池(上)

来源:海 子

链接:http://www.cnblogs.com/dolphin0520/p/3932921.html

在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:

如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?

在Java中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。

以下是本文的目录大纲:

一.Java中的ThreadPoolExecutor类

二.深入剖析线程池实现原理

三.使用示例

四.如何合理配置线程池的大小

若有不正之处请多多谅解,并欢迎批评指正。

一.Java中的ThreadPoolExecutor类

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。

在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {

…..

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

BlockingQueue<Runnable> workQueue);

 

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

 

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

 

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

}

从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

TimeUnit.DAYS;               //天

TimeUnit.HOURS;             //小时

TimeUnit.MINUTES;           //分钟

TimeUnit.SECONDS;           //秒

TimeUnit.MILLISECONDS;      //毫秒

TimeUnit.MICROSECONDS;      //微妙

TimeUnit.NANOSECONDS;       //纳秒

  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

ArrayBlockingQueue;

LinkedBlockingQueue;

SynchronousQueue;

ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

  • threadFactory:线程工厂,主要用来创建线程;
  • handler:表示当拒绝处理任务时的策略,有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

具体参数的配置与线程池的关系将在下一节讲述。

从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:

public abstract class AbstractExecutorService implements ExecutorService {

 

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };

public Future<?> submit(Runnable task) {};

public <T> Future<T> submit(Runnable task, T result) { };

public <T> Future<T> submit(Callable<T> task) { };

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,

boolean timed, long nanos)

throws InterruptedException, ExecutionException, TimeoutException {

};

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)

throws InterruptedException, ExecutionException {

};

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,

long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

};

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

throws InterruptedException {

};

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,

long timeout, TimeUnit unit)

throws InterruptedException {

};

}

AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

我们接着看ExecutorService接口的实现:

public interface ExecutorService extends Executor {

 

void shutdown();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)

throws InterruptedException;

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,

long timeout, TimeUnit unit)

throws InterruptedException;

 

<T> T invokeAny(Collection<? extends Callable<T>> tasks)

throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,

long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

}

而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

public interface Executor {

void execute(Runnable command);

}

到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

然后ThreadPoolExecutor继承了类AbstractExecutorService。

在ThreadPoolExecutor类中有几个非常重要的方法:

execute()

submit()

shutdown()

shutdownNow()

execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

shutdown()和shutdownNow()是用来关闭线程池的。

还有很多其他的方法:

比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。

二.深入剖析线程池实现原理

在上一节我们从宏观上介绍了ThreadPoolExecutor,下面我们来深入解析一下线程池的具体实现原理,将从下面几个方面讲解:

1.线程池状态

 

2.任务的执行

3.线程池中的线程初始化

 

4.任务缓存队列及排队策略

 

5.任务拒绝策略

 

6.线程池的关闭

 

7.线程池容量的动态调整

1.线程池状态

在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:

volatile int runState;

static final int RUNNING    = 0;

static final int SHUTDOWN   = 1;

static final int STOP       = 2;

static final int TERMINATED = 3;

runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;

下面的几个static final变量表示runState可能的几个取值。

当创建线程池后,初始时,线程池处于RUNNING状态;

如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;

如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;

当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

2.任务的执行

在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:

private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务

private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小

//、runState等)的改变都要使用这个锁

private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集

 

private volatile long  keepAliveTime;    //线程存货时间

private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间

private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)

private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数

 

private volatile int   poolSize;       //线程池中当前的线程数

 

private volatile RejectedExecutionHandler handler; //任务拒绝策略

 

private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程

 

private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数

 

private long completedTaskCount;   //用来记录已经执行完毕的任务个数

每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。

corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。举个简单的例子:

假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。

因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;

当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;

如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;

然后就将任务也分配给这4个临时工人做;

如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。

当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。

这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。

不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。

largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。

下面我们进入正题,看一下任务从提交到最终执行完毕经历了哪些过程。

在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

if (runState == RUNNING && workQueue.offer(command)) {

if (runState != RUNNING || poolSize == 0)

ensureQueuedTaskHandled(command);

}

else if (!addIfUnderMaximumPoolSize(command))

reject(command); // is shutdown or saturated

}

}

上面的代码可能看起来不是那么容易理解,下面我们一句一句解释:

首先,判断提交的任务command是否为null,若是null,则抛出空指针异常;

接着是这句,这句要好好理解一下:

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

由于是或条件运算符,所以先计算前半部分的值,如果线程池中当前线程数不小于核心池大小,那么就会直接进入下面的if语句块了。

如果线程池中当前线程数小于核心池大小,则接着执行后半部分,也就是执行

addIfUnderCorePoolSize(command)

如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行完毕了。

如果执行完addIfUnderCorePoolSize这个方法返回false,然后接着判断:

if (runState == RUNNING && workQueue.offer(command))

如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列;如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:

addIfUnderMaximumPoolSize(command)

如果执行addIfUnderMaximumPoolSize方法失败,则执行reject()方法进行任务拒绝处理。

回到前面:

if (runState == RUNNING && workQueue.offer(command))

这句的执行,如果说当前线程池处于RUNNING状态且将任务放入任务缓存队列成功,则继续进行判断:

if (runState != RUNNING || poolSize == 0)

这句判断是为了防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。如果是这样就执行:

ensureQueuedTaskHandled(command)

进行应急处理,从名字可以看出是保证 添加到任务缓存队列中的任务得到处理。

我们接着看2个关键方法的实现:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {

Thread t = null;

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

if (poolSize < corePoolSize && runState == RUNNING)

t = addThread(firstTask);        //创建线程去执行firstTask任务

} finally {

mainLock.unlock();

}

if (t == null)

return false;

t.start();

return true;

}

这个是addIfUnderCorePoolSize方法的具体实现,从名字可以看出它的意图就是当低于核心吃大小时执行的方法。下面看其具体实现,首先获取到锁,因为这地方涉及到线程池状态的变化,先通过if语句判断当前线程池中的线程数目是否小于核心池大小,有朋友也许会有疑问:前面在execute()方法中不是已经判断过了吗,只有线程池当前线程数目小于核心池大小才会执行addIfUnderCorePoolSize方法的,为何这地方还要继续判断?原因很简单,前面的判断过程中并没有加锁,因此可能在execute方法判断的时候poolSize小于corePoolSize,而判断完之后,在其他线程中又向线程池提交了任务,就可能导致poolSize不小于corePoolSize了,所以需要在这个地方继续判断。然后接着判断线程池的状态是否为RUNNING,原因也很简单,因为有可能在其他线程中调用了shutdown或者shutdownNow方法。然后就是执行

t = addThread(firstTask);

这个方法也非常关键,传进去的参数为提交的任务,返回值为Thread类型。然后接着在下面判断t是否为空,为空则表明创建线程失败(即poolSize>=corePoolSize或者runState不等于RUNNING),否则调用t.start()方法启动线程。

我们来看一下addThread方法的实现:

private Thread addThread(Runnable firstTask) {

Worker w = new Worker(firstTask);

Thread t = threadFactory.newThread(w);  //创建一个线程,执行任务

if (t != null) {

w.thread = t;            //将创建的线程的引用赋值为w的成员变量

workers.add(w);

int nt = ++poolSize;     //当前线程数加1

if (nt > largestPoolSize)

largestPoolSize = nt;

}

return t;

}

在addThread方法中,首先用提交的任务创建了一个Worker对象,然后调用线程工厂threadFactory创建了一个新的线程t,然后将线程t的引用赋值给了Worker对象的成员变量thread,接着通过workers.add(w)将Worker对象添加到工作集当中。

接下文

Java并发编程:Lock(下)

来源:海子

链接:http://www.cnblogs.com/dolphin0520/

三.锁的相关概念介绍

在前面介绍了Lock的基本使用,这一节来介绍一下与锁相关的几个概念。

1.可重入锁

如果锁具备可重入性,则称作为可重入锁。像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制:基于线程的分配,而不是基于方法调用的分配。举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。

看下面这段代码就明白了:

class MyClass {

public synchronized void method1() {

method2();

}

 

public synchronized void method2() {

 

}

}

上述代码中的两个方法method1和method2都用synchronized修饰了,假如某一时刻,线程A执行到了method1,此时线程A获取了这个对象的锁,而由于method2也是synchronized方法,假如synchronized不具备可重入性,此时线程A需要重新申请锁。但是这就会造成一个问题,因为线程A已经持有了该对象的锁,而又在申请获取该对象的锁,这样就会线程A一直等待永远不会获取到的锁。

而由于synchronized和Lock都具备可重入性,所以不会发生上述现象。

2.可中断锁

可中断锁:顾名思义,就是可以相应中断的锁。

在Java中,synchronized就不是可中断锁,而Lock是可中断锁。

如果某一线程A正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程B不想等待了,想先处理其他事情,我们可以让它中断自己或者在别的线程中中断它,这种就是可中断锁。

在前面演示lockInterruptibly()的用法时已经体现了Lock的可中断性。

3.公平锁

公平锁即尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该所,这种就是公平锁。

非公平锁即无法保证锁的获取是按照请求锁的顺序进行的。这样就可能导致某个或者一些线程永远获取不到锁。

在Java中,synchronized就是非公平锁,它无法保证等待的线程获取锁的顺序。

而对于ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁。

看一下这2个类的源代码就清楚了:

在ReentrantLock中定义了2个静态内部类,一个是NotFairSync,一个是FairSync,分别用来实现非公平锁和公平锁。

我们可以在创建ReentrantLock对象时,通过以下方式来设置锁的公平性:

ReentrantLock lock = new ReentrantLock(true);

如果参数为true表示为公平锁,为fasle为非公平锁。默认情况下,如果使用无参构造器,则是非公平锁。

另外在ReentrantLock类中定义了很多方法,比如:

isFair()        //判断锁是否是公平锁

isLocked()    //判断锁是否被任何线程获取了

isHeldByCurrentThread()   //判断锁是否被当前线程获取了

hasQueuedThreads()   //判断是否有线程在等待该锁

在ReentrantReadWriteLock中也有类似的方法,同样也可以设置为公平锁和非公平锁。不过要记住,ReentrantReadWriteLock并未实现Lock接口,它实现的是ReadWriteLock接口。

4.读写锁

读写锁将对一个资源(比如文件)的访问分成了2个锁,一个读锁和一个写锁。

正因为有了读写锁,才使得多个线程之间的读操作不会发生冲突。

ReadWriteLock就是读写锁,它是一个接口,ReentrantReadWriteLock实现了这个接口。

可以通过readLock()获取读锁,通过writeLock()获取写锁。

上面已经演示过了读写锁的使用方法,在此不再赘述。

参考资料:

http://blog.csdn.net/ns_code/article/details/17487337

http://houlinyan.iteye.com/blog/1112535

http://ifeve.com/locks/

http://ifeve.com/read-write-locks/

http://blog.csdn.net/fancyerii/article/details/6783224

http://blog.csdn.net/ghsau/article/details/7461369/

http://blog.csdn.net/zhaozhenzuo/article/details/37109015