View Javadoc
1   /*
2    * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3    *
4    * This code is free software; you can redistribute it and/or modify it
5    * under the terms of the GNU General Public License version 2 only, as
6    * published by the Free Software Foundation.  Oracle designates this
7    * particular file as subject to the "Classpath" exception as provided
8    * by Oracle in the LICENSE file that accompanied this code.
9    *
10   * This code is distributed in the hope that it will be useful, but WITHOUT
11   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12   * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13   * version 2 for more details (a copy is included in the LICENSE file that
14   * accompanied this code).
15   *
16   * You should have received a copy of the GNU General Public License version
17   * 2 along with this work; if not, write to the Free Software Foundation,
18   * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19   *
20   * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21   * or visit www.oracle.com if you need additional information or have any
22   * questions.
23   */
24  
25  /*
26   * This file is available under and governed by the GNU General Public
27   * License version 2 only, as published by the Free Software Foundation.
28   * However, the following notice accompanied the original version of this
29   * file:
30   *
31   * Written by Doug Lea with assistance from members of JCP JSR-166
32   * Expert Group and released to the public domain, as explained at
33   * http://creativecommons.org/publicdomain/zero/1.0/
34   */
35  
36  package java.util.concurrent;
37  
38  import java.security.AccessControlContext;
39  import java.security.ProtectionDomain;
40  
41  /**
42   * A thread managed by a {@link ForkJoinPool}, which executes
43   * {@link ForkJoinTask}s.
44   * This class is subclassable solely for the sake of adding
45   * functionality -- there are no overridable methods dealing with
46   * scheduling or execution.  However, you can override initialization
47   * and termination methods surrounding the main task processing loop.
48   * If you do create such a subclass, you will also need to supply a
49   * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
50   * {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
51   *
52   * @since 1.7
53   * @author Doug Lea
54   */
55  public class ForkJoinWorkerThread extends Thread {
56      /*
57       * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
58       * ForkJoinTasks. For explanation, see the internal documentation
59       * of class ForkJoinPool.
60       *
61       * This class just maintains links to its pool and WorkQueue.  The
62       * pool field is set immediately upon construction, but the
63       * workQueue field is not set until a call to registerWorker
64       * completes. This leads to a visibility race, that is tolerated
65       * by requiring that the workQueue field is only accessed by the
66       * owning thread.
67       *
68       * Support for (non-public) subclass InnocuousForkJoinWorkerThread
69       * requires that we break quite a lot of encapulation (via Unsafe)
70       * both here and in the subclass to access and set Thread fields.
71       */
72  
73      final ForkJoinPool pool;                // the pool this thread works in
74      final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
75  
76      /**
77       * Creates a ForkJoinWorkerThread operating in the given pool.
78       *
79       * @param pool the pool this thread works in
80       * @throws NullPointerException if pool is null
81       */
82      protected ForkJoinWorkerThread(ForkJoinPool pool) {
83          // Use a placeholder until a useful name can be set in registerWorker
84          super("aForkJoinWorkerThread");
85          this.pool = pool;
86          this.workQueue = pool.registerWorker(this);
87      }
88  
89      /**
90       * Version for InnocuousForkJoinWorkerThread
91       */
92      ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
93                           AccessControlContext acc) {
94          super(threadGroup, null, "aForkJoinWorkerThread");
95          U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
96          eraseThreadLocals(); // clear before registering
97          this.pool = pool;
98          this.workQueue = pool.registerWorker(this);
99      }
100 
101     /**
102      * Returns the pool hosting this thread.
103      *
104      * @return the pool
105      */
106     public ForkJoinPool getPool() {
107         return pool;
108     }
109 
110     /**
111      * Returns the unique index number of this thread in its pool.
112      * The returned value ranges from zero to the maximum number of
113      * threads (minus one) that may exist in the pool, and does not
114      * change during the lifetime of the thread.  This method may be
115      * useful for applications that track status or collect results
116      * per-worker-thread rather than per-task.
117      *
118      * @return the index number
119      */
120     public int getPoolIndex() {
121         return workQueue.poolIndex >>> 1; // ignore odd/even tag bit
122     }
123 
124     /**
125      * Initializes internal state after construction but before
126      * processing any tasks. If you override this method, you must
127      * invoke {@code super.onStart()} at the beginning of the method.
128      * Initialization requires care: Most fields must have legal
129      * default values, to ensure that attempted accesses from other
130      * threads work correctly even before this thread starts
131      * processing tasks.
132      */
133     protected void onStart() {
134     }
135 
136     /**
137      * Performs cleanup associated with termination of this worker
138      * thread.  If you override this method, you must invoke
139      * {@code super.onTermination} at the end of the overridden method.
140      *
141      * @param exception the exception causing this thread to abort due
142      * to an unrecoverable error, or {@code null} if completed normally
143      */
144     protected void onTermination(Throwable exception) {
145     }
146 
147     /**
148      * This method is required to be public, but should never be
149      * called explicitly. It performs the main run loop to execute
150      * {@link ForkJoinTask}s.
151      */
152     public void run() {
153         if (workQueue.array == null) { // only run once
154             Throwable exception = null;
155             try {
156                 onStart();
157                 pool.runWorker(workQueue);
158             } catch (Throwable ex) {
159                 exception = ex;
160             } finally {
161                 try {
162                     onTermination(exception);
163                 } catch (Throwable ex) {
164                     if (exception == null)
165                         exception = ex;
166                 } finally {
167                     pool.deregisterWorker(this, exception);
168                 }
169             }
170         }
171     }
172 
173     /**
174      * Erases ThreadLocals by nulling out Thread maps
175      */
176     final void eraseThreadLocals() {
177         U.putObject(this, THREADLOCALS, null);
178         U.putObject(this, INHERITABLETHREADLOCALS, null);
179     }
180 
181     /**
182      * Non-public hook method for InnocuousForkJoinWorkerThread
183      */
184     void afterTopLevelExec() {
185     }
186 
187     // Set up to allow setting thread fields in constructor
188     private static final sun.misc.Unsafe U;
189     private static final long THREADLOCALS;
190     private static final long INHERITABLETHREADLOCALS;
191     private static final long INHERITEDACCESSCONTROLCONTEXT;
192     static {
193         try {
194             U = sun.misc.Unsafe.getUnsafe();
195             Class<?> tk = Thread.class;
196             THREADLOCALS = U.objectFieldOffset
197                 (tk.getDeclaredField("threadLocals"));
198             INHERITABLETHREADLOCALS = U.objectFieldOffset
199                 (tk.getDeclaredField("inheritableThreadLocals"));
200             INHERITEDACCESSCONTROLCONTEXT = U.objectFieldOffset
201                 (tk.getDeclaredField("inheritedAccessControlContext"));
202 
203         } catch (Exception e) {
204             throw new Error(e);
205         }
206     }
207 
208     /**
209      * A worker thread that has no permissions, is not a member of any
210      * user-defined ThreadGroup, and erases all ThreadLocals after
211      * running each top-level task.
212      */
213     static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
214         /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
215         private static final ThreadGroup innocuousThreadGroup =
216             createThreadGroup();
217 
218         /** An AccessControlContext supporting no privileges */
219         private static final AccessControlContext INNOCUOUS_ACC =
220             new AccessControlContext(
221                 new ProtectionDomain[] {
222                     new ProtectionDomain(null, null)
223                 });
224 
225         InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
226             super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
227         }
228 
229         @Override // to erase ThreadLocals
230         void afterTopLevelExec() {
231             eraseThreadLocals();
232         }
233 
234         @Override // to always report system loader
235         public ClassLoader getContextClassLoader() {
236             return ClassLoader.getSystemClassLoader();
237         }
238 
239         @Override // to silently fail
240         public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
241 
242         @Override // paranoically
243         public void setContextClassLoader(ClassLoader cl) {
244             throw new SecurityException("setContextClassLoader");
245         }
246 
247         /**
248          * Returns a new group with the system ThreadGroup (the
249          * topmost, parentless group) as parent.  Uses Unsafe to
250          * traverse Thread group and ThreadGroup parent fields.
251          */
252         private static ThreadGroup createThreadGroup() {
253             try {
254                 sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
255                 Class<?> tk = Thread.class;
256                 Class<?> gk = ThreadGroup.class;
257                 long tg = u.objectFieldOffset(tk.getDeclaredField("group"));
258                 long gp = u.objectFieldOffset(gk.getDeclaredField("parent"));
259                 ThreadGroup group = (ThreadGroup)
260                     u.getObject(Thread.currentThread(), tg);
261                 while (group != null) {
262                     ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
263                     if (parent == null)
264                         return new ThreadGroup(group,
265                                                "InnocuousForkJoinWorkerThreadGroup");
266                     group = parent;
267                 }
268             } catch (Exception e) {
269                 throw new Error(e);
270             }
271             // fall through if null as cannot-happen safeguard
272             throw new Error("Cannot create ThreadGroup");
273         }
274     }
275 
276 }
277