天天看点

深入Guava源码之Stripe

sun在java5中引入了concurrent包,它对java的并发编程提供了强大的支持。首先,它提供了lock接口,可用了更细粒度的控制锁的区域,它的实现类有reentrantlock,readlock,writelock,其中readlock和writelock共同用于实现reetrantreadwritelock(它继承自readwritelock,但是没有实现lock接口,readwritelock接口也没有继承lock接口)。而且,它还提供了一些常用并发场景下的类工具:semaphore、countdownlatch和cyclicbarrier。它们个字的应用场景:

semaphore(信号量)

有n个非线程安全的资源(资源池),这些资源使用一个semaphore(计数信号量)保护,每个线程在使用这些资源时需要首先获得一个信号量(acquire)表示当前资源池还有可用资源,然后线程从该资源池中获取并移除一个资源,在使用完后,将该资源交回给资源池,并释放已经获得信号量(release)(这里的“移除”、“交回”并不一定需要显示操作,只是一种形象的描述,之所以这么描述是应为这里的各个资源是一样的,因而对一个线程它每次拿到的资源不一定是同一个资源,用于区分stripe的使用场景),其中pool是一种典型的应用。

countdownlatch(闭锁)

有n个task,它们执行完成后需要执行另外一个收尾的task(aggregated task),比如在做report计算中,有n个report要计算,而在所有report计算完成后需要生成一个基于所有report结果的一个总的report,而这个总的report需要等到所有report计算出结果后才能开始,此时就可以定义一个countdownlatch,其初始值是n,在总的report计算前调用countdownlatch的await方法等待其他report执行完成,而其他report在完成后都会调用countdownlatch中的countdown方法。

cyclicbarrier(关卡)

每个线程执行完成后需要等待,直到n个线程都执行完成后,才能继续执行,在n个线程执行完成之后,而下一次执行开始之前可以添加自定义逻辑(通过构建cyclicbarrier实例时传入一个runnable实例自定义逻辑),即在每个线程执行完成后调用cyclicbarrier的await方法并等待(即所谓的关卡),当n个线程都完成后,自定义的runnable实例会自动被执行(如果存在这样的runnable实例的话),然后所有线程继续下一次执行。这个现实中的例子没有想到比较合适的。。。。

exchanger(交换者)

exchanger是一种特殊的cyclicbarrier,它只有两个线程参与,一个生产者,一个消费者,有两个队列共同参与,生产者和消费者各自有一个队列,其中生产者向它的队列添加数据,而消费者从它包含的队列中拿数据,当生产者中的队列满时调用exchange方法,传入自己原有的队列,期待交换得到消费者中空的队列;而当消费者中的队列满时同样调用exchange方法,传入自己的原有队列,期待获取到生产者中已经填满的队列。这样,生产者和消费者可以和谐的生产消费,并且它们的步骤是一致的(不管哪一方比另一方快都会等待另一方)。

最后,java5中还提供了一些atomic类以实现简单场景下高效非lock方式的线程安全,以及blockingqueue、synchronizer、completionservice、concurrenthashmap等工具类。

在实际应用中,我们有一个cache系统,它包含key和payload的键值对(map),在cache中map的实现已经是线程安全了,然而我们不仅仅是向cache中写数据要保证线程安全,在操作payload时,也需要保证线程安全。因为我们在cache中的数据量很大,为每个payload配置一个单独的锁显然不现实,也不需要因为它们没有那么高的并发行,因而我们需要一种机制将key分成不同的group,而每个group共享一个锁(这就是concurrenthashmap的实现思路)。通过key即可获得一个锁,并且每个相同的key获得的锁实例是相同的(获得相同锁实例的key它们不一定相等,因为这是一对多的关系)。

根据以上应用场景,stripe的实现很简单,只需要内部保存一个lock数组,对每个给定的key,计算其hash值,根据hash值计算其锁对应的数组下标,而该下标下的lock实例既是和该key关联的lock实例。这里通过hash值把key和lock实例关联起来,为了扩展性,在实现时还可以把计算数组下标的逻辑抽象成一个接口,用户可以通过传入自定义该接口的实现类实例加入用户自定义的关联逻辑,默认采用hash值关联方式。

在guava中,stripe以抽象类的形式存在,它定义了通过给定key或index获得相应lock/semaphore/readwritelock实例:

public abstract class striped<l> {

  /**

   * returns the stripe that corresponds to the passed key. it is always guaranteed that if

   * {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}.

   *

   * @param key an arbitrary, non-null key

   * @return the stripe that the passed key corresponds to

   */

  public abstract l get(object key);

   * returns the stripe at the specified index. valid indexes are 0, inclusively, to

   * {@code size()}, exclusively.

   * @param index the index of the stripe to return; must be in {@code [0

深入Guava源码之Stripe

size())}

   * @return the stripe at the specified index

  public abstract l getat(int index);

   * returns the index to which the given key is mapped, so that getat(indexfor(key)) == get(key).

  abstract int indexfor(object key);

   * returns the total number of stripes in this instance.

  public abstract int size();

   * returns the stripes that correspond to the passed objects, in ascending (as per

   * {@link #getat(int)}) order. thus, threads that use the stripes in the order returned

   * by this method are guaranteed to not deadlock each other.

   * <p>it should be noted that using a {@code striped<l>} with relatively few stripes, and

   * {@code bulkget(keys)} with a relative large number of keys can cause an excessive number

   * of shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays

   * are needed for a pair of them to match). please consider carefully the implications of the

   * number of stripes, the intended concurrency level, and the typical number of keys used in a

   * {@code bulkget(keys)} operation. see <a href="http://www.mathpages.com/home/kmath199.htm">balls

   * in bins model</a> for mathematical formulas that can be used to estimate the probability of

   * collisions.

   * @param keys arbitrary non-null keys

   * @return the stripes corresponding to the objects (one per each object, derived by delegating

   *         to {@link #get(object)}; may contain duplicates), in an increasing index order.

  public iterable<l> bulkget(iterable<?> keys);

}

可以使用一下几个静态工厂方法创建相应的striped实例,其中lazyweakxxx创建的striped实例中锁以弱引用的方式存在(在什么样的场景中使用呢?):

/**

 * creates a {@code striped<lock>} with eagerly initialized, strongly referenced locks.

 * every lock is reentrant.

 *

 * @param stripes the minimum number of stripes (locks) required

 * @return a new {@code striped<lock>}

 */

public static striped<lock> lock(int stripes);

 * creates a {@code striped<lock>} with lazily initialized, weakly referenced locks.

public static striped<lock> lazyweaklock(int stripes);

 * creates a {@code striped<semaphore>} with eagerly initialized, strongly referenced semaphores,

 * with the specified number of permits.

 * @param stripes the minimum number of stripes (semaphores) required

 * @param permits the number of permits in each semaphore

 * @return a new {@code striped<semaphore>}

public static striped<semaphore> semaphore(int stripes, final int permits);

 * creates a {@code striped<semaphore>} with lazily initialized, weakly referenced semaphores,

public static striped<semaphore> lazyweaksemaphore(int stripes, final int permits);

 * creates a {@code striped<readwritelock>} with eagerly initialized, strongly referenced

 * read-write locks. every lock is reentrant.

 * @return a new {@code striped<readwritelock>}

public static striped<readwritelock> readwritelock(int stripes);

 * creates a {@code striped<readwritelock>} with lazily initialized, weakly referenced

public static striped<readwritelock> lazyweakreadwritelock(int stripes);

striped有两个具体实现类,compactstriped和lazystriped,他们都继承自poweroftwostriped(用于表达内部保存的stripes值是2的指数值)。poweroftwostriped实现了indexfor()方法,它使用hash值做映射函数:

  private abstract static class poweroftwostriped<l> extends striped<l> {

    /** capacity (power of two) minus one, for fast mod evaluation */

    final int mask;

    @override final int indexfor(object key) {

      int hash = smear(key.hashcode());

      return hash & mask;

    }

  }

  private static int smear(int hashcode) {

    hashcode ^= (hashcode >>> 20) ^ (hashcode >>> 12);

    return hashcode ^ (hashcode >>> 7) ^ (hashcode >>> 4);

compactstriped类使用一个数组保存所有的lock/semaphore/readwritelock实例,在初始化时就建立所有的锁实例;而lazystriped类使用一个值为weakreference的concurrentmap做为数据结构,index值为key,lock/semaphore/readwritelock的weakreference为值,所有锁实例在用到时动态创建。在compactstriped中创建锁实例时对reentrantlock/semaphore创建采用paddedxxx版本,不知道为何要做pad。

striped类实现的类图如下:

深入Guava源码之Stripe