View Javadoc
1   /*
2    * Copyright (C) 2011 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5    * in compliance with the License. You may obtain a copy of the License at
6    *
7    * http://www.apache.org/licenses/LICENSE-2.0
8    *
9    * Unless required by applicable law or agreed to in writing, software distributed under the License
10   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11   * or implied. See the License for the specific language governing permissions and limitations under
12   * the License.
13   */
14  
15  package com.google.common.util.concurrent;
16  
17  import com.google.common.annotations.Beta;
18  import com.google.common.annotations.GwtIncompatible;
19  import com.google.common.annotations.VisibleForTesting;
20  import com.google.common.base.MoreObjects;
21  import com.google.common.base.Preconditions;
22  import com.google.common.base.Supplier;
23  import com.google.common.collect.ImmutableList;
24  import com.google.common.collect.Iterables;
25  import com.google.common.collect.MapMaker;
26  import com.google.common.math.IntMath;
27  import com.google.common.primitives.Ints;
28  import java.lang.ref.Reference;
29  import java.lang.ref.ReferenceQueue;
30  import java.lang.ref.WeakReference;
31  import java.math.RoundingMode;
32  import java.util.Arrays;
33  import java.util.Collections;
34  import java.util.List;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.Semaphore;
37  import java.util.concurrent.atomic.AtomicReferenceArray;
38  import java.util.concurrent.locks.Lock;
39  import java.util.concurrent.locks.ReadWriteLock;
40  import java.util.concurrent.locks.ReentrantLock;
41  import java.util.concurrent.locks.ReentrantReadWriteLock;
42  
43  /**
44   * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping similar
45   * to that of {@code ConcurrentHashMap} in a reusable form, and extends it for semaphores and
46   * read-write locks. Conceptually, lock striping is the technique of dividing a lock into many
47   * <i>stripes</i>, increasing the granularity of a single lock and allowing independent operations
48   * to lock different stripes and proceed concurrently, instead of creating contention for a single
49   * lock.
50   *
51   * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore),
52   * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)} (assuming
53   * {@link Object#hashCode()} is correctly implemented for the keys). Note that if {@code key1} is
54   * <strong>not</strong> equal to {@code key2}, it is <strong>not</strong> guaranteed that
55   * {@code striped.get(key1) != striped.get(key2)}; the elements might nevertheless be mapped to the
56   * same lock. The lower the number of stripes, the higher the probability of this happening.
57   *
58   * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>}, and
59   * {@code Striped<ReadWriteLock>}. For each type, two implementations are offered:
60   * {@linkplain #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak} {@code Striped<Lock>},
61   * {@linkplain #semaphore(int, int) strong} and {@linkplain #lazyWeakSemaphore(int, int) weak}
62   * {@code Striped<Semaphore>}, and {@linkplain #readWriteLock(int) strong} and
63   * {@linkplain #lazyWeakReadWriteLock(int) weak} {@code Striped<ReadWriteLock>}. <i>Strong</i> means
64   * that all stripes (locks/semaphores) are initialized eagerly, and are not reclaimed unless
65   * {@code Striped} itself is reclaimable. <i>Weak</i> means that locks/semaphores are created
66   * lazily, and they are allowed to be reclaimed if nobody is holding on to them. This is useful, for
67   * example, if one wants to create a {@code
68   * Striped<Lock>} of many locks, but worries that in most cases only a small portion of these would
69   * be in use.
70   *
71   * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K}
72   * represents the task. This maximizes concurrency by having each unique key mapped to a unique
73   * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock for
74   * all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of choosing
75   * either of these extremes, {@code Striped} allows the user to trade between required concurrency
76   * and memory footprint. For example, if a set of tasks are CPU-bound, one could easily create a
77   * very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes, instead of
78   * possibly thousands of locks which could be created in a {@code Map<K, Lock>} structure.
79   *
80   * @author Dimitris Andreou
81   * @since 13.0
82   */
83  @Beta
84  @GwtIncompatible
85  public abstract class Striped<L> {
86    /**
87     * If there are at least this many stripes, we assume the memory usage of a ConcurrentMap will be
88     * smaller than a large array. (This assumes that in the lazy case, most stripes are unused. As
89     * always, if many stripes are in use, a non-lazy striped makes more sense.)
90     */
91    private static final int LARGE_LAZY_CUTOFF = 1024;
92  
93    private Striped() {}
94  
95    /**
96     * Returns the stripe that corresponds to the passed key. It is always guaranteed that if
97     * {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}.
98     *
99     * @param key an arbitrary, non-null key
100    * @return the stripe that the passed key corresponds to
101    */
102   public abstract L get(Object key);
103 
104   /**
105    * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to {@code size()},
106    * exclusively.
107    *
108    * @param index the index of the stripe to return; must be in {@code [0...size())}
109    * @return the stripe at the specified index
110    */
111   public abstract L getAt(int index);
112 
113   /**
114    * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key).
115    */
116   abstract int indexFor(Object key);
117 
118   /**
119    * Returns the total number of stripes in this instance.
120    */
121   public abstract int size();
122 
123   /**
124    * Returns the stripes that correspond to the passed objects, in ascending (as per
125    * {@link #getAt(int)}) order. Thus, threads that use the stripes in the order returned by this
126    * method are guaranteed to not deadlock each other.
127    *
128    * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and
129    * {@code bulkGet(keys)} with a relative large number of keys can cause an excessive number of
130    * shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays are
131    * needed for a pair of them to match). Please consider carefully the implications of the number
132    * of stripes, the intended concurrency level, and the typical number of keys used in a
133    * {@code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls
134    * in Bins model</a> for mathematical formulas that can be used to estimate the probability of
135    * collisions.
136    *
137    * @param keys arbitrary non-null keys
138    * @return the stripes corresponding to the objects (one per each object, derived by delegating to
139    *     {@link #get(Object)}; may contain duplicates), in an increasing index order.
140    */
141   public Iterable<L> bulkGet(Iterable<?> keys) {
142     // Initially using the array to store the keys, then reusing it to store the respective L's
143     final Object[] array = Iterables.toArray(keys, Object.class);
144     if (array.length == 0) {
145       return ImmutableList.of();
146     }
147     int[] stripes = new int[array.length];
148     for (int i = 0; i < array.length; i++) {
149       stripes[i] = indexFor(array[i]);
150     }
151     Arrays.sort(stripes);
152     // optimize for runs of identical stripes
153     int previousStripe = stripes[0];
154     array[0] = getAt(previousStripe);
155     for (int i = 1; i < array.length; i++) {
156       int currentStripe = stripes[i];
157       if (currentStripe == previousStripe) {
158         array[i] = array[i - 1];
159       } else {
160         array[i] = getAt(currentStripe);
161         previousStripe = currentStripe;
162       }
163     }
164     /*
165      * Note that the returned Iterable holds references to the returned stripes, to avoid
166      * error-prone code like:
167      *
168      * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)'
169      * Iterable<Lock> locks = stripedLock.bulkGet(keys);
170      * for (Lock lock : locks) {
171      *   lock.lock();
172      * }
173      * operation();
174      * for (Lock lock : locks) {
175      *   lock.unlock();
176      * }
177      *
178      * If we only held the int[] stripes, translating it on the fly to L's, the original locks might
179      * be garbage collected after locking them, ending up in a huge mess.
180      */
181     @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's
182     List<L> asList = (List<L>) Arrays.asList(array);
183     return Collections.unmodifiableList(asList);
184   }
185 
186   // Static factories
187 
188   /**
189    * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks. Every lock
190    * is reentrant.
191    *
192    * @param stripes the minimum number of stripes (locks) required
193    * @return a new {@code Striped<Lock>}
194    */
195   public static Striped<Lock> lock(int stripes) {
196     return new CompactStriped<>(
197         stripes,
198         new Supplier<Lock>() {
199           @Override
200           public Lock get() {
201             return new PaddedLock();
202           }
203         });
204   }
205 
206   /**
207    * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks. Every lock is
208    * reentrant.
209    *
210    * @param stripes the minimum number of stripes (locks) required
211    * @return a new {@code Striped<Lock>}
212    */
213   public static Striped<Lock> lazyWeakLock(int stripes) {
214     return lazy(
215         stripes,
216         new Supplier<Lock>() {
217           @Override
218           public Lock get() {
219             return new ReentrantLock(false);
220           }
221         });
222   }
223 
224   private static <L> Striped<L> lazy(int stripes, Supplier<L> supplier) {
225     return stripes < LARGE_LAZY_CUTOFF
226         ? new SmallLazyStriped<L>(stripes, supplier)
227         : new LargeLazyStriped<L>(stripes, supplier);
228   }
229 
230   /**
231    * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores,
232    * with the specified number of permits.
233    *
234    * @param stripes the minimum number of stripes (semaphores) required
235    * @param permits the number of permits in each semaphore
236    * @return a new {@code Striped<Semaphore>}
237    */
238   public static Striped<Semaphore> semaphore(int stripes, final int permits) {
239     return new CompactStriped<>(
240         stripes,
241         new Supplier<Semaphore>() {
242           @Override
243           public Semaphore get() {
244             return new PaddedSemaphore(permits);
245           }
246         });
247   }
248 
249   /**
250    * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores,
251    * with the specified number of permits.
252    *
253    * @param stripes the minimum number of stripes (semaphores) required
254    * @param permits the number of permits in each semaphore
255    * @return a new {@code Striped<Semaphore>}
256    */
257   public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) {
258     return lazy(
259         stripes,
260         new Supplier<Semaphore>() {
261           @Override
262           public Semaphore get() {
263             return new Semaphore(permits, false);
264           }
265         });
266   }
267 
268   /**
269    * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced
270    * read-write locks. Every lock is reentrant.
271    *
272    * @param stripes the minimum number of stripes (locks) required
273    * @return a new {@code Striped<ReadWriteLock>}
274    */
275   public static Striped<ReadWriteLock> readWriteLock(int stripes) {
276     return new CompactStriped<>(stripes, READ_WRITE_LOCK_SUPPLIER);
277   }
278 
279   /**
280    * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced read-write
281    * locks. Every lock is reentrant.
282    *
283    * @param stripes the minimum number of stripes (locks) required
284    * @return a new {@code Striped<ReadWriteLock>}
285    */
286   public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) {
287     return lazy(stripes, READ_WRITE_LOCK_SUPPLIER);
288   }
289 
290   // ReentrantReadWriteLock is large enough to make padding probably unnecessary
291   private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER =
292       new Supplier<ReadWriteLock>() {
293         @Override
294         public ReadWriteLock get() {
295           return new ReentrantReadWriteLock();
296         }
297       };
298 
299   private abstract static class PowerOfTwoStriped<L> extends Striped<L> {
300     /** Capacity (power of two) minus one, for fast mod evaluation */
301     final int mask;
302 
303     PowerOfTwoStriped(int stripes) {
304       Preconditions.checkArgument(stripes > 0, "Stripes must be positive");
305       this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1;
306     }
307 
308     @Override
309     final int indexFor(Object key) {
310       int hash = smear(key.hashCode());
311       return hash & mask;
312     }
313 
314     @Override
315     public final L get(Object key) {
316       return getAt(indexFor(key));
317     }
318   }
319 
320   /**
321    * Implementation of Striped where 2^k stripes are represented as an array of the same length,
322    * eagerly initialized.
323    */
324   private static class CompactStriped<L> extends PowerOfTwoStriped<L> {
325     /** Size is a power of two. */
326     private final Object[] array;
327 
328     private CompactStriped(int stripes, Supplier<L> supplier) {
329       super(stripes);
330       Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)");
331 
332       this.array = new Object[mask + 1];
333       for (int i = 0; i < array.length; i++) {
334         array[i] = supplier.get();
335       }
336     }
337 
338     @SuppressWarnings("unchecked") // we only put L's in the array
339     @Override
340     public L getAt(int index) {
341       return (L) array[index];
342     }
343 
344     @Override
345     public int size() {
346       return array.length;
347     }
348   }
349 
350   /**
351    * Implementation of Striped where up to 2^k stripes can be represented, using an
352    * AtomicReferenceArray of size 2^k. To map a user key into a stripe, we take a k-bit slice of the
353    * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
354    */
355   @VisibleForTesting
356   static class SmallLazyStriped<L> extends PowerOfTwoStriped<L> {
357     final AtomicReferenceArray<ArrayReference<? extends L>> locks;
358     final Supplier<L> supplier;
359     final int size;
360     final ReferenceQueue<L> queue = new ReferenceQueue<L>();
361 
362     SmallLazyStriped(int stripes, Supplier<L> supplier) {
363       super(stripes);
364       this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
365       this.locks = new AtomicReferenceArray<>(size);
366       this.supplier = supplier;
367     }
368 
369     @Override
370     public L getAt(int index) {
371       if (size != Integer.MAX_VALUE) {
372         Preconditions.checkElementIndex(index, size());
373       } // else no check necessary, all index values are valid
374       ArrayReference<? extends L> existingRef = locks.get(index);
375       L existing = existingRef == null ? null : existingRef.get();
376       if (existing != null) {
377         return existing;
378       }
379       L created = supplier.get();
380       ArrayReference<L> newRef = new ArrayReference<L>(created, index, queue);
381       while (!locks.compareAndSet(index, existingRef, newRef)) {
382         // we raced, we need to re-read and try again
383         existingRef = locks.get(index);
384         existing = existingRef == null ? null : existingRef.get();
385         if (existing != null) {
386           return existing;
387         }
388       }
389       drainQueue();
390       return created;
391     }
392 
393     // N.B. Draining the queue is only necessary to ensure that we don't accumulate empty references
394     // in the array. We could skip this if we decide we don't care about holding on to Reference
395     // objects indefinitely.
396     private void drainQueue() {
397       Reference<? extends L> ref;
398       while ((ref = queue.poll()) != null) {
399         // We only ever register ArrayReferences with the queue so this is always safe.
400         ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref;
401         // Try to clear out the array slot, n.b. if we fail that is fine, in either case the
402         // arrayRef will be out of the array after this step.
403         locks.compareAndSet(arrayRef.index, arrayRef, null);
404       }
405     }
406 
407     @Override
408     public int size() {
409       return size;
410     }
411 
412     private static final class ArrayReference<L> extends WeakReference<L> {
413       final int index;
414 
415       ArrayReference(L referent, int index, ReferenceQueue<L> queue) {
416         super(referent, queue);
417         this.index = index;
418       }
419     }
420   }
421 
422   /**
423    * Implementation of Striped where up to 2^k stripes can be represented, using a ConcurrentMap
424    * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the
425    * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
426    */
427   @VisibleForTesting
428   static class LargeLazyStriped<L> extends PowerOfTwoStriped<L> {
429     final ConcurrentMap<Integer, L> locks;
430     final Supplier<L> supplier;
431     final int size;
432 
433     LargeLazyStriped(int stripes, Supplier<L> supplier) {
434       super(stripes);
435       this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
436       this.supplier = supplier;
437       this.locks = new MapMaker().weakValues().makeMap();
438     }
439 
440     @Override
441     public L getAt(int index) {
442       if (size != Integer.MAX_VALUE) {
443         Preconditions.checkElementIndex(index, size());
444       } // else no check necessary, all index values are valid
445       L existing = locks.get(index);
446       if (existing != null) {
447         return existing;
448       }
449       L created = supplier.get();
450       existing = locks.putIfAbsent(index, created);
451       return MoreObjects.firstNonNull(existing, created);
452     }
453 
454     @Override
455     public int size() {
456       return size;
457     }
458   }
459 
460   /**
461    * A bit mask were all bits are set.
462    */
463   private static final int ALL_SET = ~0;
464 
465   private static int ceilToPowerOfTwo(int x) {
466     return 1 << IntMath.log2(x, RoundingMode.CEILING);
467   }
468 
469   /*
470    * This method was written by Doug Lea with assistance from members of JCP JSR-166 Expert Group
471    * and released to the public domain, as explained at
472    * http://creativecommons.org/licenses/publicdomain
473    *
474    * As of 2010/06/11, this method is identical to the (package private) hash method in OpenJDK 7's
475    * java.util.HashMap class.
476    */
477   // Copied from java/com/google/common/collect/Hashing.java
478   private static int smear(int hashCode) {
479     hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
480     return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
481   }
482 
483   private static class PaddedLock extends ReentrantLock {
484     /*
485      * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add a fourth
486      * long here, to minimize chance of interference between consecutive locks, but I couldn't
487      * observe any benefit from that.
488      */
489     long unused1;
490     long unused2;
491     long unused3;
492 
493     PaddedLock() {
494       super(false);
495     }
496   }
497 
498   private static class PaddedSemaphore extends Semaphore {
499     // See PaddedReentrantLock comment
500     long unused1;
501     long unused2;
502     long unused3;
503 
504     PaddedSemaphore(int permits) {
505       super(permits, false);
506     }
507   }
508 }