View Javadoc
1   /*
2    * Copyright (C) 2015 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5    * in compliance with the License. You may obtain a copy of the License at
6    *
7    * http://www.apache.org/licenses/LICENSE-2.0
8    *
9    * Unless required by applicable law or agreed to in writing, software distributed under the License
10   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11   * or implied. See the License for the specific language governing permissions and limitations under
12   * the License.
13   */
14  
15  package com.google.common.util.concurrent;
16  
17  import static com.google.common.collect.Sets.newConcurrentHashSet;
18  import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
19  import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
20  
21  import com.google.common.annotations.GwtCompatible;
22  import com.google.j2objc.annotations.ReflectionSupport;
23  import java.util.Set;
24  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
25  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
26  import java.util.logging.Level;
27  import java.util.logging.Logger;
28  
29  /**
30   * A helper which does some thread-safe operations for aggregate futures, which must be implemented
31   * differently in GWT. Namely:
32   * <ul>
33   * <li>Lazily initializes a set of seen exceptions
34   * <li>Decrements a counter atomically
35   * </ul>
36   */
37  @GwtCompatible(emulated = true)
38  @ReflectionSupport(value = ReflectionSupport.Level.FULL)
39  abstract class AggregateFutureState {
40    // Lazily initialized the first time we see an exception; not released until all the input futures
41    // & this future completes. Released when the future releases the reference to the running state
42    private volatile Set<Throwable> seenExceptions = null;
43  
44    private volatile int remaining;
45  
46    private static final AtomicHelper ATOMIC_HELPER;
47  
48    private static final Logger log = Logger.getLogger(AggregateFutureState.class.getName());
49  
50    static {
51      AtomicHelper helper;
52      try {
53        helper =
54            new SafeAtomicHelper(
55                newUpdater(AggregateFutureState.class, (Class) Set.class, "seenExceptions"),
56                newUpdater(AggregateFutureState.class, "remaining"));
57      } catch (Throwable reflectionFailure) {
58        // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause
59        // getDeclaredField to throw a NoSuchFieldException when the field is definitely there.
60        // For these users fallback to a suboptimal implementation, based on synchronized. This will
61        // be a definite performance hit to those users.
62        log.log(Level.SEVERE, "SafeAtomicHelper is broken!", reflectionFailure);
63        helper = new SynchronizedAtomicHelper();
64      }
65      ATOMIC_HELPER = helper;
66    }
67  
68    AggregateFutureState(int remainingFutures) {
69      this.remaining = remainingFutures;
70    }
71  
72    final Set<Throwable> getOrInitSeenExceptions() {
73      /*
74       * The initialization of seenExceptions has to be more complicated than we'd like. The simple
75       * approach would be for each caller CAS it from null to a Set populated with its exception. But
76       * there's another race: If the first thread fails with an exception and a second thread
77       * immediately fails with the same exception:
78       *
79       * Thread1: calls setException(), which returns true, context switch before it can CAS
80       * seenExceptions to its exception
81       *
82       * Thread2: calls setException(), which returns false, CASes seenExceptions to its exception,
83       * and wrongly believes that its exception is new (leading it to logging it when it shouldn't)
84       *
85       * Our solution is for threads to CAS seenExceptions from null to a Set population with _the
86       * initial exception_, no matter which thread does the work. This ensures that seenExceptions
87       * always contains not just the current thread's exception but also the initial thread's.
88       */
89      Set<Throwable> seenExceptionsLocal = seenExceptions;
90      if (seenExceptionsLocal == null) {
91        seenExceptionsLocal = newConcurrentHashSet();
92        /*
93         * Other handleException() callers may see this as soon as we publish it. We need to populate
94         * it with the initial failure before we do, or else they may think that the initial failure
95         * has never been seen before.
96         */
97        addInitialException(seenExceptionsLocal);
98  
99        ATOMIC_HELPER.compareAndSetSeenExceptions(this, null, seenExceptionsLocal);
100       /*
101        * If another handleException() caller created the set, we need to use that copy in case yet
102        * other callers have added to it.
103        *
104        * This read is guaranteed to get us the right value because we only set this once (here).
105        */
106       seenExceptionsLocal = seenExceptions;
107     }
108     return seenExceptionsLocal;
109   }
110 
111   /** Populates {@code seen} with the exception that was passed to {@code setException}. */
112   abstract void addInitialException(Set<Throwable> seen);
113 
114   final int decrementRemainingAndGet() {
115     return ATOMIC_HELPER.decrementAndGetRemainingCount(this);
116   }
117 
118   private abstract static class AtomicHelper {
119     /** Atomic compare-and-set of the {@link AggregateFutureState#seenExceptions} field. */
120     abstract void compareAndSetSeenExceptions(
121         AggregateFutureState state, Set<Throwable> expect, Set<Throwable> update);
122 
123     /** Atomic decrement-and-get of the {@link AggregateFutureState#remaining} field. */
124     abstract int decrementAndGetRemainingCount(AggregateFutureState state);
125   }
126 
127   private static final class SafeAtomicHelper extends AtomicHelper {
128     final AtomicReferenceFieldUpdater<AggregateFutureState, Set<Throwable>> seenExceptionsUpdater;
129 
130     final AtomicIntegerFieldUpdater<AggregateFutureState> remainingCountUpdater;
131 
132     SafeAtomicHelper(
133         AtomicReferenceFieldUpdater seenExceptionsUpdater,
134         AtomicIntegerFieldUpdater remainingCountUpdater) {
135       this.seenExceptionsUpdater = seenExceptionsUpdater;
136       this.remainingCountUpdater = remainingCountUpdater;
137     }
138 
139     @Override
140     void compareAndSetSeenExceptions(
141         AggregateFutureState state, Set<Throwable> expect, Set<Throwable> update) {
142       seenExceptionsUpdater.compareAndSet(state, expect, update);
143     }
144 
145     @Override
146     int decrementAndGetRemainingCount(AggregateFutureState state) {
147       return remainingCountUpdater.decrementAndGet(state);
148     }
149   }
150 
151   private static final class SynchronizedAtomicHelper extends AtomicHelper {
152     @Override
153     void compareAndSetSeenExceptions(
154         AggregateFutureState state, Set<Throwable> expect, Set<Throwable> update) {
155       synchronized (state) {
156         if (state.seenExceptions == expect) {
157           state.seenExceptions = update;
158         }
159       }
160     }
161 
162     @Override
163     int decrementAndGetRemainingCount(AggregateFutureState state) {
164       synchronized (state) {
165         state.remaining--;
166         return state.remaining;
167       }
168     }
169   }
170 }