001    // $HeadURL: svn+ssh://jwilden@svn.wald.intevation.org/deegree/base/branches/2.5_testing/src/org/deegree/framework/concurrent/Executor.java $
002    /*----------------------------------------------------------------------------
003     This file is part of deegree, http://deegree.org/
004     Copyright (C) 2001-2009 by:
005       Department of Geography, University of Bonn
006     and
007       lat/lon GmbH
008    
009     This library is free software; you can redistribute it and/or modify it under
010     the terms of the GNU Lesser General Public License as published by the Free
011     Software Foundation; either version 2.1 of the License, or (at your option)
012     any later version.
013     This library is distributed in the hope that it will be useful, but WITHOUT
014     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
015     FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
016     details.
017     You should have received a copy of the GNU Lesser General Public License
018     along with this library; if not, write to the Free Software Foundation, Inc.,
019     59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020    
021     Contact information:
022    
023     lat/lon GmbH
024     Aennchenstr. 19, 53177 Bonn
025     Germany
026     http://lat-lon.de/
027    
028     Department of Geography, University of Bonn
029     Prof. Dr. Klaus Greve
030     Postfach 1147, 53001 Bonn
031     Germany
032     http://www.geographie.uni-bonn.de/deegree/
033    
034     e-mail: info@deegree.org
035    ----------------------------------------------------------------------------*/
036    package org.deegree.framework.concurrent;
037    
038    import java.util.ArrayList;
039    import java.util.List;
040    import java.util.concurrent.Callable;
041    import java.util.concurrent.CancellationException;
042    import java.util.concurrent.ExecutionException;
043    import java.util.concurrent.ExecutorService;
044    import java.util.concurrent.Executors;
045    import java.util.concurrent.Future;
046    import java.util.concurrent.TimeUnit;
047    
048    /**
049     * The <code>Executor</code> is deegree's central place to:
050     * <ul>
051     * <li>Perform a task asynchronously (in an independent thread) optionally with a maximum execution
052     * time.</li>
053     * <li>Perform a task synchronously with a maximum execution time.</li>
054     * <li>Perform several task synchronously (but in parallel threads) with a maximum execution time.</li>
055     * </ul>
056     * <p>
057     * The <code>Executor</code> class is realized as a singleton and uses a cached thread pool
058     * internally to minimize overhead for acquiring the necessary {@link Thread} instances and to
059     * manage the number of concurrent threads.
060     * </p>
061     *
062     * @author <a href="mailto:poth@lat-lon.de">Andreas Poth</a>
063     * @author <a href="mailto:schmitz@lat-lon.de">Andreas Schmitz</a>
064     * @author <a href="mailto:bezema@lat-lon.de">Rutger Bezema</a>
065     * @author <a href="mailto:schneider@lat-lon.de">Markus Schneider</a>
066     * @author last edited by: $Author: mschneider $
067     *
068     * @version $Revision: 18195 $, $Date: 2009-06-18 17:55:39 +0200 (Do, 18 Jun 2009) $
069     * @see java.util.concurrent.ExecutorService
070     * @see org.deegree.framework.concurrent.ExecutionFinishedListener
071     */
072    public class Executor {
073    
074        private static Executor exec;
075    
076        private ExecutorService execService;
077    
078        /**
079         * Private constructor required for singleton pattern.
080         */
081        private Executor() {
082            this.execService = Executors.newCachedThreadPool();
083        }
084    
085        /**
086         * Returns the only instance of this class (singleton pattern).
087         *
088         * @return the only instance of this class
089         */
090        public synchronized static Executor getInstance() {
091    
092            if ( exec == null ) {
093                exec = new Executor();
094            }
095            return exec;
096        }
097    
098        /**
099         * Performs a task asynchronously (in an independent thread) without any time limit.
100         *
101         * @param <T>
102         *            type of return value
103         * @param task
104         *            task to be performed (specified in the {@link Callable#call()} method)
105         * @param finishedListener
106         *            notified when the method call finishes (succesfully or abnormally), may be null
107         */
108        public <T> void performAsynchronously( Callable<T> task, ExecutionFinishedListener<T> finishedListener ) {
109    
110            AsyncPerformer<T> runner = new AsyncPerformer<T>( task, finishedListener );
111            this.execService.execute( runner );
112        }
113    
114        /**
115         * Performs a task asynchronously (in an independent thread) with a given time limit.
116         *
117         * @param <T>
118         *            type of return value
119         * @param task
120         *            task to be performed (specified in the {@link Callable#call()}} method)
121         * @param finishedListener
122         *            notified when the method call finishes (succesfully or abnormally), may be null
123         * @param timeout
124         *            maximum time allowed for execution in milliseconds
125         */
126        public <T> void performAsynchronously( Callable<T> task, ExecutionFinishedListener<T> finishedListener, long timeout ) {
127    
128            AsyncPerformer<T> runner = new AsyncPerformer<T>( task, finishedListener, timeout );
129            this.execService.execute( runner );
130        }
131    
132        /**
133         * Performs a task synchronously with a given timeout.
134         *
135         * @param <T>
136         *            type of return value
137         * @param task
138         *            tasks to be performed (specified in the {@link Callable#call()} method)
139         * @param timeout
140         *            maximum time allowed for execution in milliseconds
141         * @return result value of the called method
142         * @throws CancellationException
143         *             if the execution time exceeds the specified timeout / thread has been cancelled
144         * @throws InterruptedException
145         *             if interrupted while waiting, in which case unfinished tasks are cancelled
146         * @throws Throwable
147         *             if the tasks throws an exception itself
148         */
149        public <T> T performSynchronously( Callable<T> task, long timeout )
150                                throws CancellationException, InterruptedException, Throwable {
151    
152            T result;
153            List<Callable<T>> tasks = new ArrayList<Callable<T>>( 1 );
154            tasks.add( task );
155    
156            try {
157                List<Future<T>> futures = this.execService.invokeAll( tasks, timeout, TimeUnit.MILLISECONDS );
158                Future<T> future = futures.get( 0 );
159                result = future.get();
160            } catch ( ExecutionException e ) {
161                throw ( e.getCause() );
162            }
163            return result;
164        }
165    
166        /**
167         * Performs several tasks synchronously in parallel threads.
168         * <p>
169         * This method does not return before all tasks are finished (successfully or abnormally). For
170         * each given {@link Callable}, an independent thread is used. For each task, an
171         * {@link ExecutionFinishedEvent} is generated and returned.
172         *
173         * @param <T>
174         *            the result type of the callables
175         *
176         * @param tasks
177         *            tasks to be performed (specified in the {@link Callable#call()} methods)
178         * @return ExecutionFinishedEvents for all tasks
179         * @throws InterruptedException
180         *             if the current thread was interrupted while waiting
181         */
182        public <T> List<ExecutionFinishedEvent<T>> performSynchronously( List<Callable<T>> tasks )
183                                throws InterruptedException {
184    
185            List<ExecutionFinishedEvent<T>> results = new ArrayList<ExecutionFinishedEvent<T>>( tasks.size() );
186            List<Future<T>> futures = this.execService.invokeAll( tasks );
187    
188            for ( int i = 0; i < tasks.size(); i++ ) {
189    
190                ExecutionFinishedEvent<T> finishedEvent = null;
191                Callable<T> task = tasks.get( i );
192                Future<T> future = futures.get( i );
193    
194                try {
195                    T result = future.get();
196                    finishedEvent = new ExecutionFinishedEvent<T>( task, result );
197                } catch ( ExecutionException e ) {
198                    finishedEvent = new ExecutionFinishedEvent<T>( e.getCause(), task );
199                } catch ( CancellationException e ) {
200                    finishedEvent = new ExecutionFinishedEvent<T>( e, task );
201                }
202                results.add( finishedEvent );
203            }
204            return results;
205        }
206    
207        /**
208         * Performs several tasks synchronously with a given timeout in parallel threads.
209         * <p>
210         * This method does not return before all tasks are finished (successfully or abnormally). For
211         * each given {@link Callable}, an independent thread is used. For each task, an
212         * {@link ExecutionFinishedEvent} is generated and returned.
213         *
214         * @param <T>
215         *            the result type of the tasks
216         *
217         * @param tasks
218         *            tasks to be performed (specified in the {@link Callable#call()} methods)
219         * @param timeout
220         *            maximum time allowed for execution (in milliseconds)
221         * @return ExecutionFinishedEvents for all tasks
222         * @throws InterruptedException
223         *             if the current thread was interrupted while waiting
224         */
225        public <T> List<ExecutionFinishedEvent<T>> performSynchronously( List<Callable<T>> tasks, long timeout )
226                                throws InterruptedException {
227    
228            List<ExecutionFinishedEvent<T>> results = new ArrayList<ExecutionFinishedEvent<T>>( tasks.size() );
229            List<Future<T>> futures = this.execService.invokeAll( tasks, timeout, TimeUnit.MILLISECONDS );
230    
231            for ( int i = 0; i < tasks.size(); i++ ) {
232    
233                ExecutionFinishedEvent<T> finishedEvent = null;
234                Callable<T> task = tasks.get( i );
235                Future<T> future = futures.get( i );
236    
237                try {
238                    T result = future.get();
239                    finishedEvent = new ExecutionFinishedEvent<T>( task, result );
240                } catch ( ExecutionException e ) {
241                    finishedEvent = new ExecutionFinishedEvent<T>( e.getCause(), task );
242                } catch ( CancellationException e ) {
243                    finishedEvent = new ExecutionFinishedEvent<T>( e, task );
244                }
245                results.add( finishedEvent );
246            }
247            return results;
248        }
249    
250        // ///////////////////////////////////////////////////////////////////////////
251        // inner classes //
252        // ///////////////////////////////////////////////////////////////////////////
253    
254        /**
255         * Inner class for performing task asynchronously.
256         *
257         * @author <a href="mailto:poth@lat-lon.de">Andreas Poth</a>
258         * @author <a href="mailto:schmitz@lat-lon.de">Andreas Schmitz</a>
259         * @author <a href="mailto:bezema@lat-lon.de">Rutger Bezema</a>
260         * @author <a href="mailto:schneider@lat-lon.de">Markus Schneider</a>
261         * @author last edited by: $Author: mschneider $
262         * @version $Revision: 18195 $
263         */
264        private class AsyncPerformer<T> implements Runnable {
265    
266            private Callable<T> task;
267    
268            private ExecutionFinishedListener<T> finishedListener;
269    
270            private long timeout = -1;
271    
272            AsyncPerformer( Callable<T> task, ExecutionFinishedListener<T> finishedListener ) {
273                this.task = task;
274                this.finishedListener = finishedListener;
275            }
276    
277            AsyncPerformer( Callable<T> task, ExecutionFinishedListener<T> finishedListener, long timeout ) {
278                this.task = task;
279                this.finishedListener = finishedListener;
280                this.timeout = timeout;
281            }
282    
283            /**
284             * Performs the task using {@link Executor#performSynchronously(Callable, long)}.
285             *
286             * @see java.lang.Runnable#run()
287             */
288            public void run() {
289                ExecutionFinishedEvent<T> finishedEvent = null;
290                try {
291                    T result = null;
292                    if ( this.timeout < 0 ) {
293                        result = this.task.call();
294                    } else {
295                        result = Executor.getInstance().performSynchronously( task, timeout );
296                    }
297                    finishedEvent = new ExecutionFinishedEvent<T>( this.task, result );
298                } catch ( Throwable t ) {
299                    finishedEvent = new ExecutionFinishedEvent<T>( t, this.task );
300                }
301                if ( this.finishedListener != null ) {
302                    this.finishedListener.executionFinished( finishedEvent );
303                }
304            }
305        }
306    
307    }