036    package org.deegree.io.datastore.wfs;
038    import java.io.ByteArrayOutputStream;
039    import java.io.IOException;
040    import java.io.InputStream;
041    import java.net.MalformedURLException;
042    import java.net.URL;
043    import java.util.ArrayList;
044    import java.util.HashMap;
045    import java.util.List;
046    import java.util.Map;
047    import java.util.concurrent.Callable;
048    import java.util.concurrent.CancellationException;
050    import javax.xml.transform.TransformerException;
052    import org.apache.commons.httpclient.HttpClient;
053    import org.apache.commons.httpclient.methods.PostMethod;
054    import org.apache.commons.httpclient.methods.StringRequestEntity;
055    import org.deegree.datatypes.QualifiedName;
056    import org.deegree.enterprise.WebUtils;
057    import org.deegree.framework.concurrent.ExecutionFinishedEvent;
058    import org.deegree.framework.concurrent.Executor;
059    import org.deegree.framework.log.ILogger;
060    import org.deegree.framework.log.LoggerFactory;
061    import org.deegree.framework.util.CharsetUtils;
062    import org.deegree.framework.xml.XMLFragment;
063    import org.deegree.framework.xml.XMLParsingException;
064    import org.deegree.framework.xml.XSLTDocument;
065    import org.deegree.i18n.Messages;
066    import org.deegree.io.datastore.Datastore;
067    import org.deegree.io.datastore.DatastoreException;
068    import org.deegree.io.datastore.DatastoreTransaction;
069    import org.deegree.io.datastore.schema.MappedFeatureType;
070    import org.deegree.model.crs.UnknownCRSException;
071    import org.deegree.model.feature.FeatureCollection;
072    import org.deegree.model.feature.FeatureFactory;
073    import org.deegree.model.feature.GMLFeatureCollectionDocument;
074    import org.deegree.ogcwebservices.OGCWebServiceException;
075    import org.deegree.ogcwebservices.OWSUtils;
076    import org.deegree.ogcwebservices.getcapabilities.InvalidCapabilitiesException;
077    import org.deegree.ogcwebservices.wfs.XMLFactory;
078    import org.deegree.ogcwebservices.wfs.capabilities.WFSCapabilities;
079    import org.deegree.ogcwebservices.wfs.capabilities.WFSCapabilitiesDocument;
080    import org.deegree.ogcwebservices.wfs.operation.GetFeature;
081    import org.deegree.ogcwebservices.wfs.operation.Query;
082    import org.xml.sax.SAXException;
084    /**
085     * {@link Datastore} that uses a remote WFS instance as backend.
086     * 
087     * @author <a href="mailto:poth@lat-lon.de">Andreas Poth</a>
088     * @author <a href="mailto:mschneider@lat-lon.de">Markus Schneider</a>
089     * @author <a href="mailto:giovanni.zigliara@grupposistematica.it">Giovanni Zigliara</a>
090     * @author last edited by: $Author: apoth $
091     * 
092     * @version $Revision: 25030 $, $Date: 2010-06-23 09:14:37 +0200 (Mi, 23 Jun 2010) $
093     */
094    public class CascadingWFSDatastore extends Datastore {
096        private ILogger LOG = LoggerFactory.getLogger( CascadingWFSDatastore.class );
098        private static Map<URL, WFSCapabilities> wfsCapabilities;
100        static {
101            if ( wfsCapabilities == null ) {
102                wfsCapabilities = new HashMap<URL, WFSCapabilities>();
103            }
104        }
106        @Override
107        public CascadingWFSAnnotationDocument getAnnotationParser() {
108            return new CascadingWFSAnnotationDocument();
109        }
111        @Override
112        public void close()
113                                throws DatastoreException {
114            // is already closed
115        }
117        @Override
118        public FeatureCollection performQuery( Query query, MappedFeatureType[] rootFts, DatastoreTransaction context )
119                                throws DatastoreException, UnknownCRSException {
120            return performQuery( query, rootFts );
121        }
123        @Override
124        public FeatureCollection performQuery( Query query, MappedFeatureType[] rootFts )
125                                throws DatastoreException, UnknownCRSException {
127            GetFeature getFeature = GetFeature.create( "1.1.0", "ID", query.getResultType(), "text/xml; subtype=gml/3.1.1",
128                                                       "", query.getMaxFeatures(), query.getStartPosition(), -1, -1,
129                                                       new Query[] { query } );
131            XMLFragment gfXML = null;
132            try {
133                gfXML = XMLFactory.export( getFeature );
134            } catch ( IOException e ) {
135                LOG.logError( e.getMessage(), e );
136                throw new DatastoreException( e.getMessage() );
137            } catch ( XMLParsingException e ) {
138                LOG.logError( e.getMessage(), e );
139                throw new DatastoreException( e.getMessage() );
140            }
142            if ( LOG.isDebug() ) {
143                LOG.logDebug( "Request to cascaded WFS", gfXML.getAsPrettyString() );
144            }
146            // get URL that is target of a GetFeature request
147            CascadingWFSDatastoreConfiguration config = (CascadingWFSDatastoreConfiguration) this.getConfiguration();
148            WFSDescription[] wfs = config.getWFSDescription();
149            List<Callable<FeatureCollection>> queryTasks = new ArrayList<Callable<FeatureCollection>>( wfs.length );
150            int timeout = 0;
151            for ( int i = 0; i < wfs.length; i++ ) {
152                LOG.logDebug( "Requesting to URL", wfs[i].getUrl() );
153                QueryTask task = new QueryTask( gfXML, wfs[i] );
154                queryTasks.add( task );
155                timeout += wfs[i].getTimeout();
156            }
158            List<ExecutionFinishedEvent<FeatureCollection>> finishedEvents = null;
159            try {
160                finishedEvents = Executor.getInstance().performSynchronously( queryTasks, timeout );
161            } catch ( InterruptedException e ) {
162                LOG.logError( e.getMessage(), e );
163                throw new DatastoreException( Messages.getMessage( "WFS_CASCDS_PERFORM_GF" ), e );
164            }
166            return mergeResults( getFeature.getId(), finishedEvents );
167        }
169        /**
170         * Merges the results of the request subparts into one feature collection.
171         * 
172         * @param fcid
173         *            id of the new (result) feature collection
174         * @param finishedEvents
175         * @return feature collection containing all features from all responses
176         * @throws DatastoreException
177         */
178        private FeatureCollection mergeResults( String fcid, List<ExecutionFinishedEvent<FeatureCollection>> finishedEvents )
179                                throws DatastoreException {
181            FeatureCollection result = null;
182            int numFeatures = 0;
184            try {
185                for ( ExecutionFinishedEvent<FeatureCollection> event : finishedEvents ) {
186                    if ( result == null ) {
187                        result = event.getResult();
188                    } else {
189                        result.addAllUncontained( event.getResult() );
190                    }
192                    if ( result.getAttribute( "numberOfFeatures" ) != null
193                         && result.getAttribute( "numberOfFeatures" ).length() > 0 ) {
194                        try {
195                            numFeatures += Integer.parseInt( result.getAttribute( "numberOfFeatures" ) );
196                        } catch ( Exception e ) {
197                            LOG.logWarning( "value of attribute numberOfFeatures is not a number: " +
198                                            result.getAttribute( "numberOfFeatures" ) );
199                            // fall back
200                            numFeatures += result.size();
201                        }
202                    } else {
203                        numFeatures += result.size();
204                    }
205                }
206            } catch ( CancellationException e ) {
207                LOG.logError( e.getMessage(), e );
208                String msg = Messages.getMessage( "WFS_GET_FEATURE_TIMEOUT", e.getMessage() );
209                throw new DatastoreException( msg, e );
210            } catch ( Throwable t ) {
211                LOG.logError( t.getMessage(), t );
212                String msg = Messages.getMessage( "WFS_GET_FEATURE_BACKEND", t.getMessage() );
213                throw new DatastoreException( msg, t );
214            }
216            result.setId( fcid );
217            result.setAttribute( "numberOfFeatures", "" + numFeatures );
218            return result;
219        }
221        /**
222         * Returns the {@link WFSCapabilities} for the WFS that can be reached at the given {@link URL}.
223         * 
224         * @param wfsBaseURL
225         *            base URL of the WFS
226         * @return the {@link WFSCapabilities} of the WFS
227         * @throws DatastoreException
228         */
229        WFSCapabilities getWFSCapabilities( URL wfsBaseURL )
230                                throws DatastoreException {
232            String href = OWSUtils.validateHTTPGetBaseURL( wfsBaseURL.toExternalForm() );
233            href = href + "request=GetCapabilities&version=1.1.0&service=WFS";
235            LOG.logDebug( "requested capabilities: ", href );
237            URL requestURL = null;
238            try {
239                requestURL = new URL( href );
240            } catch ( MalformedURLException e1 ) {
241                e1.printStackTrace();
242            }
244            WFSCapabilities caps = wfsCapabilities.get( requestURL );
245            if ( caps == null ) {
246                // access capabilities if not already has been loaded
247                WFSCapabilitiesDocument cd = new WFSCapabilitiesDocument();
248                try {
249                    cd.load( requestURL );
250                } catch ( IOException e ) {
251                    LOG.logError( e.getMessage(), e );
252                    throw new DatastoreException( e.getMessage() );
253                } catch ( SAXException e ) {
254                    LOG.logError( e.getMessage(), e );
255                    throw new DatastoreException( e.getMessage() );
256                }
257                try {
258                    caps = (WFSCapabilities) cd.parseCapabilities();
259                } catch ( InvalidCapabilitiesException e ) {
260                    LOG.logError( e.getMessage(), e );
261                    throw new DatastoreException( e.getMessage() );
262                }
263                wfsCapabilities.put( requestURL, caps );
264            }
265            return caps;
266        }
268        // ///////////////////////////////////////////////////////////////////////////
269        // inner classes
270        // ///////////////////////////////////////////////////////////////////////////
272        /**
273         * Inner class for performing queries on a datastore.
274         */
275        private class QueryTask implements Callable<FeatureCollection> {
277            private XMLFragment getFeature;
279            private WFSDescription wfs;
281            /**
282             * 
283             * @param getFeature
284             * @param wfs
285             */
286            QueryTask( XMLFragment getFeature, WFSDescription wfs ) {
287                this.getFeature = getFeature;
288                this.wfs = wfs;
289            }
291            /**
292             * Performs the associated {@link Query} and returns the result.
293             * 
294             * @return resulting feature collection
295             * @throws Exception
296             */
297            @SuppressWarnings("synthetic-access")
298            public FeatureCollection call()
299                                    throws Exception {
301                try {
302                    URL url = OWSUtils.getHTTPPostOperationURL( getWFSCapabilities( wfs.getUrl() ), GetFeature.class );
304                    // filter request if necessary
305                    XSLTDocument inFilter = wfs.getInFilter();
306                    if ( inFilter != null ) {
307                        try {
308                            getFeature = inFilter.transform( getFeature );
310                            if ( LOG.isDebug() ) {
311                                LOG.logDebug( "Infilter from cascading WFS-transformed request",
312                                              getFeature.getAsPrettyString() );
313                            }
314                        } catch ( TransformerException e ) {
315                            LOG.logError( e.getMessage(), e );
316                            throw new DatastoreException( e.getMessage() );
317                        }
318                    }
320                    if ( isFeatureTypeSupported( getFeature, wfs.getUrl() ) ) {
322                        InputStream is = null;
323                        FeatureCollection fc = null;
324                        try {
325                            // perform GetFeature request against cascaded WFS
326                            HttpClient client = new HttpClient();
327                            client = WebUtils.enableProxyUsage( client, url );
328                            client.getHttpConnectionManager().getParams().setSoTimeout( wfs.getTimeout() );
329                            PostMethod post = new PostMethod( url.toExternalForm() );
330                            if ( LOG.isDebug() ) {
331                                LOG.logDebug( "Sending request", getFeature.getAsPrettyString() );
332                                LOG.logDebug( "To URL", url );
333                            }
334                            StringRequestEntity se = new StringRequestEntity( getFeature.getAsString(), "text/xml",
335                                                                              CharsetUtils.getSystemCharset() );
336                            post.setRequestEntity( se );
337                            client.executeMethod( post );
338                            is = post.getResponseBodyAsStream();
339                        } catch ( Exception e ) {
340                            throw new DatastoreException( Messages.getMessage( "DATASTORE_WFS_ACCESS", url ) );
341                        }
343                        // read result as GMLFeatureColllection
344                        GMLFeatureCollectionDocument fcd = new GMLFeatureCollectionDocument( true );
345                        try {
346                            fcd.load( is, url.toExternalForm() );
347                        } catch ( Exception e ) {
348                            if ( LOG.getLevel() == ILogger.LOG_DEBUG ) {
349                                ByteArrayOutputStream bos = new ByteArrayOutputStream( 50000 );
350                                int c = 0;
351                                while ( c > -1 ) {
352                                    c = is.read();
353                                    bos.write( c );
354                                }
355                                byte[] b = bos.toByteArray();
356                                bos.close();
357                                if ( LOG.getLevel() == ILogger.LOG_DEBUG ) {
358                                    LOG.logDebug( new String( b ) );
359                                }
360                            }
361                            LOG.logError( e.getMessage(), e );
362                            throw new DatastoreException( e.getMessage() );
363                        } finally {
364                            try {
365                                is.close();
366                            } catch ( IOException shouldNeverHappen ) {
367                                // and is ignored
368                            }
369                        }
371                        // filter result if necessary
372                        XSLTDocument outFilter = wfs.getOutFilter();
373                        if ( outFilter != null ) {
374                            try {
375                                XMLFragment xml = outFilter.transform( fcd );
376                                fcd = new GMLFeatureCollectionDocument();
377                                fcd.setRootElement( xml.getRootElement() );
378                            } catch ( TransformerException e ) {
379                                LOG.logError( e.getMessage(), e );
380                                throw new DatastoreException( e.getMessage() );
381                            }
382                        }
383                        try {
384                            fc = fcd.parse();
385                        } catch ( XMLParsingException e ) {
386                            LOG.logError( e.getMessage(), e );
387                            throw new DatastoreException( e.getMessage() );
388                        }
390                        return fc;
391                    }
392                } catch ( Exception e ) {
393                    // don't do anything
394                    LOG.logError( e.getMessage(), e );
395                }
396                return FeatureFactory.createFeatureCollection( "ID", 1 );
397            }
399            /**
400             * @return true if the WFS reachable through the passed URL supports all feature types targeted by the passed
401             *         GetFeature request.
402             * 
403             * @param getFeature
404             * @param url
405             * @throws OGCWebServiceException
406             * @throws DatastoreException
407             */
408            private boolean isFeatureTypeSupported( XMLFragment getFeature, URL url )
409                                    throws OGCWebServiceException, DatastoreException {
411                WFSCapabilities caps = getWFSCapabilities( url );
413                GetFeature gf = GetFeature.create( "ID" + System.currentTimeMillis(), getFeature.getRootElement() );
414                Query[] queries = gf.getQuery();
415                for ( int i = 0; i < queries.length; i++ ) {
416                    QualifiedName featureType = queries[i].getTypeNames()[0];
417                    if ( caps.getFeatureTypeList().getFeatureType( featureType ) == null ) {
418                        LOG.logWarning( "Feature type '" + featureType.getPrefixedName()
419                                        + "' is not supported by remote WFS!" );
420                        return false;
421                    }
422                }
423                return true;
424            }
425        }
426    }