001    //$HeadURL: https://svn.wald.intevation.org/svn/deegree/base/branches/2.3_testing/src/org/deegree/io/datastore/wfs/CascadingWFSDatastore.java $
002    /*----------------------------------------------------------------------------
003     This file is part of deegree, http://deegree.org/
004     Copyright (C) 2001-2009 by:
005       Department of Geography, University of Bonn
006     and
007       lat/lon GmbH
008    
009     This library is free software; you can redistribute it and/or modify it under
010     the terms of the GNU Lesser General Public License as published by the Free
011     Software Foundation; either version 2.1 of the License, or (at your option)
012     any later version.
013     This library is distributed in the hope that it will be useful, but WITHOUT
014     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
015     FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
016     details.
017     You should have received a copy of the GNU Lesser General Public License
018     along with this library; if not, write to the Free Software Foundation, Inc.,
019     59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020    
021     Contact information:
022    
023     lat/lon GmbH
024     Aennchenstr. 19, 53177 Bonn
025     Germany
026     http://lat-lon.de/
027    
028     Department of Geography, University of Bonn
029     Prof. Dr. Klaus Greve
030     Postfach 1147, 53001 Bonn
031     Germany
032     http://www.geographie.uni-bonn.de/deegree/
033    
034     e-mail: info@deegree.org
035    ----------------------------------------------------------------------------*/
036    package org.deegree.io.datastore.wfs;
037    
038    import java.io.ByteArrayOutputStream;
039    import java.io.IOException;
040    import java.io.InputStream;
041    import java.net.MalformedURLException;
042    import java.net.URL;
043    import java.util.ArrayList;
044    import java.util.HashMap;
045    import java.util.List;
046    import java.util.Map;
047    import java.util.concurrent.Callable;
048    import java.util.concurrent.CancellationException;
049    
050    import javax.xml.transform.TransformerException;
051    
052    import org.apache.commons.httpclient.HttpClient;
053    import org.apache.commons.httpclient.methods.PostMethod;
054    import org.apache.commons.httpclient.methods.StringRequestEntity;
055    import org.deegree.datatypes.QualifiedName;
056    import org.deegree.enterprise.WebUtils;
057    import org.deegree.framework.concurrent.ExecutionFinishedEvent;
058    import org.deegree.framework.concurrent.Executor;
059    import org.deegree.framework.log.ILogger;
060    import org.deegree.framework.log.LoggerFactory;
061    import org.deegree.framework.util.CharsetUtils;
062    import org.deegree.framework.xml.XMLFragment;
063    import org.deegree.framework.xml.XMLParsingException;
064    import org.deegree.framework.xml.XSLTDocument;
065    import org.deegree.i18n.Messages;
066    import org.deegree.io.datastore.Datastore;
067    import org.deegree.io.datastore.DatastoreException;
068    import org.deegree.io.datastore.DatastoreTransaction;
069    import org.deegree.io.datastore.schema.MappedFeatureType;
070    import org.deegree.model.crs.UnknownCRSException;
071    import org.deegree.model.feature.FeatureCollection;
072    import org.deegree.model.feature.FeatureFactory;
073    import org.deegree.model.feature.GMLFeatureCollectionDocument;
074    import org.deegree.ogcwebservices.OGCWebServiceException;
075    import org.deegree.ogcwebservices.OWSUtils;
076    import org.deegree.ogcwebservices.getcapabilities.InvalidCapabilitiesException;
077    import org.deegree.ogcwebservices.wfs.XMLFactory;
078    import org.deegree.ogcwebservices.wfs.capabilities.WFSCapabilities;
079    import org.deegree.ogcwebservices.wfs.capabilities.WFSCapabilitiesDocument;
080    import org.deegree.ogcwebservices.wfs.operation.GetFeature;
081    import org.deegree.ogcwebservices.wfs.operation.Query;
082    import org.xml.sax.SAXException;
083    
084    /**
085     * {@link Datastore} that uses a remote WFS instance as backend.
086     *
087     * @author <a href="mailto:poth@lat-lon.de">Andreas Poth</a>
088     * @author <a href="mailto:mschneider@lat-lon.de">Markus Schneider</a>
089     * @author <a href="mailto:giovanni.zigliara@grupposistematica.it">Giovanni Zigliara</a>
090     * @author last edited by: $Author: mschneider $
091     *
092     * @version $Revision: 18195 $, $Date: 2009-06-18 17:55:39 +0200 (Do, 18. Jun 2009) $
093     */
094    public class CascadingWFSDatastore extends Datastore {
095    
096        private ILogger LOG = LoggerFactory.getLogger( CascadingWFSDatastore.class );
097    
098        private static Map<URL, WFSCapabilities> wfsCapabilities;
099    
100        static {
101            if ( wfsCapabilities == null ) {
102                wfsCapabilities = new HashMap<URL, WFSCapabilities>();
103            }
104        }
105    
106        @Override
107        public CascadingWFSAnnotationDocument getAnnotationParser() {
108            return new CascadingWFSAnnotationDocument();
109        }
110    
111        @Override
112        public void close()
113                                throws DatastoreException {
114            // is already closed
115        }
116    
117        @Override
118        public FeatureCollection performQuery( Query query, MappedFeatureType[] rootFts, DatastoreTransaction context )
119                                throws DatastoreException, UnknownCRSException {
120            return performQuery( query, rootFts );
121        }
122    
123        @Override
124        public FeatureCollection performQuery( Query query, MappedFeatureType[] rootFts )
125                                throws DatastoreException, UnknownCRSException {
126    
127            GetFeature getFeature = GetFeature.create( "1.1.0", "ID", query.getResultType(), "text/xml; subtype=gml/3.1.1",
128                                                       "", query.getMaxFeatures(), query.getStartPosition(), -1, -1,
129                                                       new Query[] { query } );
130    
131            XMLFragment gfXML = null;
132            try {
133                gfXML = XMLFactory.export( getFeature );
134            } catch ( IOException e ) {
135                LOG.logError( e.getMessage(), e );
136                throw new DatastoreException( e.getMessage() );
137            } catch ( XMLParsingException e ) {
138                LOG.logError( e.getMessage(), e );
139                throw new DatastoreException( e.getMessage() );
140            }
141    
142            if ( LOG.isDebug() ) {
143                LOG.logDebug( "Request to cascaded WFS", gfXML.getAsPrettyString() );
144            }
145    
146            // get URL that is target of a GetFeature request
147            CascadingWFSDatastoreConfiguration config = (CascadingWFSDatastoreConfiguration) this.getConfiguration();
148            WFSDescription[] wfs = config.getWFSDescription();
149            List<Callable<FeatureCollection>> queryTasks = new ArrayList<Callable<FeatureCollection>>( wfs.length );
150            int timeout = 0;
151            for ( int i = 0; i < wfs.length; i++ ) {
152                LOG.logDebug( "Requesting to URL", wfs[i].getUrl() );
153                QueryTask task = new QueryTask( gfXML, wfs[i] );
154                queryTasks.add( task );
155                timeout += wfs[i].getTimeout();
156            }
157    
158            List<ExecutionFinishedEvent<FeatureCollection>> finishedEvents = null;
159            try {
160                finishedEvents = Executor.getInstance().performSynchronously( queryTasks, timeout );
161            } catch ( InterruptedException e ) {
162                LOG.logError( e.getMessage(), e );
163                throw new DatastoreException( Messages.getMessage( "WFS_CASCDS_PERFORM_GF" ), e );
164            }
165    
166            return mergeResults( getFeature.getId(), finishedEvents );
167        }
168    
169        /**
170         * Merges the results of the request subparts into one feature collection.
171         *
172         * @param fcid
173         *            id of the new (result) feature collection
174         * @param finishedEvents
175         * @return feature collection containing all features from all responses
176         * @throws DatastoreException
177         */
178        private FeatureCollection mergeResults( String fcid, List<ExecutionFinishedEvent<FeatureCollection>> finishedEvents )
179                                throws DatastoreException {
180    
181            FeatureCollection result = null;
182            int numFeatures = 0;
183    
184            try {
185                for ( ExecutionFinishedEvent<FeatureCollection> event : finishedEvents ) {
186                    if ( result == null ) {
187                        result = event.getResult();
188                    } else {
189                        result.addAll( event.getResult() );
190                    }
191    
192                    if ( result.getAttribute( "numberOfFeatures" ) != null ) {
193                        numFeatures += Integer.parseInt( result.getAttribute( "numberOfFeatures" ) );
194                    } else {
195                        numFeatures += result.size();
196                    }
197                }
198            } catch ( CancellationException e ) {
199                LOG.logError( e.getMessage(), e );
200                String msg = Messages.getMessage( "WFS_GET_FEATURE_TIMEOUT", e.getMessage() );
201                throw new DatastoreException( msg, e );
202            } catch ( Throwable t ) {
203                LOG.logError( t.getMessage(), t );
204                String msg = Messages.getMessage( "WFS_GET_FEATURE_BACKEND", t.getMessage() );
205                throw new DatastoreException( msg, t );
206            }
207    
208            result.setId( fcid );
209            result.setAttribute( "numberOfFeatures", "" + numFeatures );
210            return result;
211        }
212    
213        /**
214         * Returns the {@link WFSCapabilities} for the WFS that can be reached at the given {@link URL}.
215         *
216         * @param wfsBaseURL
217         *            base URL of the WFS
218         * @return the {@link WFSCapabilities} of the WFS
219         * @throws DatastoreException
220         */
221        WFSCapabilities getWFSCapabilities( URL wfsBaseURL )
222                                throws DatastoreException {
223    
224            String href = OWSUtils.validateHTTPGetBaseURL( wfsBaseURL.toExternalForm() );
225            href = href + "request=GetCapabilities&version=1.1.0&service=WFS";
226    
227            LOG.logDebug( "requested capabilities: ", href );
228    
229            URL requestURL = null;
230            try {
231                requestURL = new URL( href );
232            } catch ( MalformedURLException e1 ) {
233                e1.printStackTrace();
234            }
235    
236            WFSCapabilities caps = wfsCapabilities.get( requestURL );
237            if ( caps == null ) {
238                // access capabilities if not already has been loaded
239                WFSCapabilitiesDocument cd = new WFSCapabilitiesDocument();
240                try {
241                    cd.load( requestURL );
242                } catch ( IOException e ) {
243                    LOG.logError( e.getMessage(), e );
244                    throw new DatastoreException( e.getMessage() );
245                } catch ( SAXException e ) {
246                    LOG.logError( e.getMessage(), e );
247                    throw new DatastoreException( e.getMessage() );
248                }
249                try {
250                    caps = (WFSCapabilities) cd.parseCapabilities();
251                } catch ( InvalidCapabilitiesException e ) {
252                    LOG.logError( e.getMessage(), e );
253                    throw new DatastoreException( e.getMessage() );
254                }
255                wfsCapabilities.put( requestURL, caps );
256            }
257            return caps;
258        }
259    
260        // ///////////////////////////////////////////////////////////////////////////
261        // inner classes
262        // ///////////////////////////////////////////////////////////////////////////
263    
264        /**
265         * Inner class for performing queries on a datastore.
266         */
267        private class QueryTask implements Callable<FeatureCollection> {
268    
269            private XMLFragment getFeature;
270    
271            private WFSDescription wfs;
272    
273            /**
274             *
275             * @param getFeature
276             * @param wfs
277             */
278            QueryTask( XMLFragment getFeature, WFSDescription wfs ) {
279                this.getFeature = getFeature;
280                this.wfs = wfs;
281            }
282    
283            /**
284             * Performs the associated {@link Query} and returns the result.
285             *
286             * @return resulting feature collection
287             * @throws Exception
288             */
289            @SuppressWarnings("synthetic-access")
290            public FeatureCollection call()
291                                    throws Exception {
292    
293                try {
294                    URL url = OWSUtils.getHTTPPostOperationURL( getWFSCapabilities( wfs.getUrl() ), GetFeature.class );
295    
296                    // filter request if necessary
297                    XSLTDocument inFilter = wfs.getInFilter();
298                    if ( inFilter != null ) {
299                        try {
300                            getFeature = inFilter.transform( getFeature );
301    
302                            if ( LOG.isDebug() ) {
303                                LOG.logDebug( "Infilter from cascading WFS-transformed request",
304                                              getFeature.getAsPrettyString() );
305                            }
306                        } catch ( TransformerException e ) {
307                            LOG.logError( e.getMessage(), e );
308                            throw new DatastoreException( e.getMessage() );
309                        }
310                    }
311    
312                    if ( isFeatureTypeSupported( getFeature, wfs.getUrl() ) ) {
313    
314                        InputStream is = null;
315                        FeatureCollection fc = null;
316                        try {
317                            // perform GetFeature request against cascaded WFS
318                            HttpClient client = new HttpClient();
319                            client = WebUtils.enableProxyUsage( client, url );
320                            client.getHttpConnectionManager().getParams().setSoTimeout( wfs.getTimeout() );
321                            PostMethod post = new PostMethod( url.toExternalForm() );
322                            if ( LOG.isDebug() ) {
323                                LOG.logDebug( "Sending request", getFeature.getAsPrettyString() );
324                                LOG.logDebug( "To URL", url );
325                            }
326                            StringRequestEntity se = new StringRequestEntity( getFeature.getAsString(), "text/xml",
327                                                                              CharsetUtils.getSystemCharset() );
328                            post.setRequestEntity( se );
329                            client.executeMethod( post );
330                            is = post.getResponseBodyAsStream();
331                        } catch ( Exception e ) {
332                            throw new DatastoreException( Messages.getMessage( "DATASTORE_WFS_ACCESS", url ) );
333                        }
334    
335                        // read result as GMLFeatureColllection
336                        GMLFeatureCollectionDocument fcd = new GMLFeatureCollectionDocument( true );
337                        try {
338                            fcd.load( is, url.toExternalForm() );
339                        } catch ( Exception e ) {
340                            if ( LOG.getLevel() == ILogger.LOG_DEBUG ) {
341                                ByteArrayOutputStream bos = new ByteArrayOutputStream( 50000 );
342                                int c = 0;
343                                while ( c > -1 ) {
344                                    c = is.read();
345                                    bos.write( c );
346                                }
347                                byte[] b = bos.toByteArray();
348                                bos.close();
349                                if ( LOG.getLevel() == ILogger.LOG_DEBUG ) {
350                                    LOG.logDebug( new String( b ) );
351                                }
352                            }
353                            LOG.logError( e.getMessage(), e );
354                            throw new DatastoreException( e.getMessage() );
355                        } finally {
356                            try {
357                                is.close();
358                            } catch ( IOException shouldNeverHappen ) {
359                                // and is ignored
360                            }
361                        }
362    
363                        // filter result if necessary
364                        XSLTDocument outFilter = wfs.getOutFilter();
365                        if ( outFilter != null ) {
366                            try {
367                                XMLFragment xml = outFilter.transform( fcd );
368                                fcd = new GMLFeatureCollectionDocument();
369                                fcd.setRootElement( xml.getRootElement() );
370                            } catch ( TransformerException e ) {
371                                LOG.logError( e.getMessage(), e );
372                                throw new DatastoreException( e.getMessage() );
373                            }
374                        }
375                        try {
376                            fc = fcd.parse();
377                        } catch ( XMLParsingException e ) {
378                            LOG.logError( e.getMessage(), e );
379                            throw new DatastoreException( e.getMessage() );
380                        }
381    
382                        return fc;
383                    }
384                } catch ( Exception e ) {
385                    // don't do anything
386                    LOG.logError( e.getMessage(), e );
387                }
388                return FeatureFactory.createFeatureCollection( "ID", 1 );
389            }
390    
391            /**
392             * @return true if the WFS reachable through the passed URL supports all feature types targeted by the passed
393             *         GetFeature request.
394             *
395             * @param getFeature
396             * @param url
397             * @throws OGCWebServiceException
398             * @throws DatastoreException
399             */
400            private boolean isFeatureTypeSupported( XMLFragment getFeature, URL url )
401                                    throws OGCWebServiceException, DatastoreException {
402    
403                WFSCapabilities caps = getWFSCapabilities( url );
404    
405                GetFeature gf = GetFeature.create( "ID" + System.currentTimeMillis(), getFeature.getRootElement() );
406                Query[] queries = gf.getQuery();
407                for ( int i = 0; i < queries.length; i++ ) {
408                    QualifiedName featureType = queries[i].getTypeNames()[0];
409                    if ( caps.getFeatureTypeList().getFeatureType( featureType ) == null ) {
410                        LOG.logWarning( "Feature type '" + featureType.getPrefixedName()
411                                        + "' is not supported by remote WFS!" );
412                        return false;
413                    }
414                }
415                return true;
416            }
417        }
418    }