001    //$HeadURL: svn+ssh://jwilden@svn.wald.intevation.org/deegree/base/branches/2.5_testing/src/org/deegree/io/datastore/wfs/CascadingWFSDatastore.java $
002    /*----------------------------------------------------------------------------
003     This file is part of deegree, http://deegree.org/
004     Copyright (C) 2001-2009 by:
005     Department of Geography, University of Bonn
006     and
007     lat/lon GmbH
008    
009     This library is free software; you can redistribute it and/or modify it under
010     the terms of the GNU Lesser General Public License as published by the Free
011     Software Foundation; either version 2.1 of the License, or (at your option)
012     any later version.
013     This library is distributed in the hope that it will be useful, but WITHOUT
014     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
015     FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
016     details.
017     You should have received a copy of the GNU Lesser General Public License
018     along with this library; if not, write to the Free Software Foundation, Inc.,
019     59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020    
021     Contact information:
022    
023     lat/lon GmbH
024     Aennchenstr. 19, 53177 Bonn
025     Germany
026     http://lat-lon.de/
027    
028     Department of Geography, University of Bonn
029     Prof. Dr. Klaus Greve
030     Postfach 1147, 53001 Bonn
031     Germany
032     http://www.geographie.uni-bonn.de/deegree/
033    
034     e-mail: info@deegree.org
035     ----------------------------------------------------------------------------*/
036    package org.deegree.io.datastore.wfs;
037    
038    import java.io.ByteArrayOutputStream;
039    import java.io.IOException;
040    import java.io.InputStream;
041    import java.net.MalformedURLException;
042    import java.net.URL;
043    import java.util.ArrayList;
044    import java.util.HashMap;
045    import java.util.List;
046    import java.util.Map;
047    import java.util.concurrent.Callable;
048    import java.util.concurrent.CancellationException;
049    
050    import javax.xml.transform.TransformerException;
051    
052    import org.apache.commons.httpclient.HttpClient;
053    import org.apache.commons.httpclient.methods.PostMethod;
054    import org.apache.commons.httpclient.methods.StringRequestEntity;
055    import org.deegree.datatypes.QualifiedName;
056    import org.deegree.enterprise.WebUtils;
057    import org.deegree.framework.concurrent.ExecutionFinishedEvent;
058    import org.deegree.framework.concurrent.Executor;
059    import org.deegree.framework.log.ILogger;
060    import org.deegree.framework.log.LoggerFactory;
061    import org.deegree.framework.util.CharsetUtils;
062    import org.deegree.framework.xml.XMLFragment;
063    import org.deegree.framework.xml.XMLParsingException;
064    import org.deegree.framework.xml.XSLTDocument;
065    import org.deegree.i18n.Messages;
066    import org.deegree.io.datastore.Datastore;
067    import org.deegree.io.datastore.DatastoreException;
068    import org.deegree.io.datastore.DatastoreTransaction;
069    import org.deegree.io.datastore.schema.MappedFeatureType;
070    import org.deegree.model.crs.UnknownCRSException;
071    import org.deegree.model.feature.FeatureCollection;
072    import org.deegree.model.feature.FeatureFactory;
073    import org.deegree.model.feature.GMLFeatureCollectionDocument;
074    import org.deegree.ogcwebservices.OGCWebServiceException;
075    import org.deegree.ogcwebservices.OWSUtils;
076    import org.deegree.ogcwebservices.getcapabilities.InvalidCapabilitiesException;
077    import org.deegree.ogcwebservices.wfs.XMLFactory;
078    import org.deegree.ogcwebservices.wfs.capabilities.WFSCapabilities;
079    import org.deegree.ogcwebservices.wfs.capabilities.WFSCapabilitiesDocument;
080    import org.deegree.ogcwebservices.wfs.operation.GetFeature;
081    import org.deegree.ogcwebservices.wfs.operation.Query;
082    import org.xml.sax.SAXException;
083    
084    /**
085     * {@link Datastore} that uses a remote WFS instance as backend.
086     * 
087     * @author <a href="mailto:poth@lat-lon.de">Andreas Poth</a>
088     * @author <a href="mailto:mschneider@lat-lon.de">Markus Schneider</a>
089     * @author <a href="mailto:giovanni.zigliara@grupposistematica.it">Giovanni Zigliara</a>
090     * @author last edited by: $Author: apoth $
091     * 
092     * @version $Revision: 25030 $, $Date: 2010-06-23 09:14:37 +0200 (Mi, 23 Jun 2010) $
093     */
094    public class CascadingWFSDatastore extends Datastore {
095    
096        private ILogger LOG = LoggerFactory.getLogger( CascadingWFSDatastore.class );
097    
098        private static Map<URL, WFSCapabilities> wfsCapabilities;
099    
100        static {
101            if ( wfsCapabilities == null ) {
102                wfsCapabilities = new HashMap<URL, WFSCapabilities>();
103            }
104        }
105    
106        @Override
107        public CascadingWFSAnnotationDocument getAnnotationParser() {
108            return new CascadingWFSAnnotationDocument();
109        }
110    
111        @Override
112        public void close()
113                                throws DatastoreException {
114            // is already closed
115        }
116    
117        @Override
118        public FeatureCollection performQuery( Query query, MappedFeatureType[] rootFts, DatastoreTransaction context )
119                                throws DatastoreException, UnknownCRSException {
120            return performQuery( query, rootFts );
121        }
122    
123        @Override
124        public FeatureCollection performQuery( Query query, MappedFeatureType[] rootFts )
125                                throws DatastoreException, UnknownCRSException {
126    
127            GetFeature getFeature = GetFeature.create( "1.1.0", "ID", query.getResultType(), "text/xml; subtype=gml/3.1.1",
128                                                       "", query.getMaxFeatures(), query.getStartPosition(), -1, -1,
129                                                       new Query[] { query } );
130    
131            XMLFragment gfXML = null;
132            try {
133                gfXML = XMLFactory.export( getFeature );
134            } catch ( IOException e ) {
135                LOG.logError( e.getMessage(), e );
136                throw new DatastoreException( e.getMessage() );
137            } catch ( XMLParsingException e ) {
138                LOG.logError( e.getMessage(), e );
139                throw new DatastoreException( e.getMessage() );
140            }
141    
142            if ( LOG.isDebug() ) {
143                LOG.logDebug( "Request to cascaded WFS", gfXML.getAsPrettyString() );
144            }
145    
146            // get URL that is target of a GetFeature request
147            CascadingWFSDatastoreConfiguration config = (CascadingWFSDatastoreConfiguration) this.getConfiguration();
148            WFSDescription[] wfs = config.getWFSDescription();
149            List<Callable<FeatureCollection>> queryTasks = new ArrayList<Callable<FeatureCollection>>( wfs.length );
150            int timeout = 0;
151            for ( int i = 0; i < wfs.length; i++ ) {
152                LOG.logDebug( "Requesting to URL", wfs[i].getUrl() );
153                QueryTask task = new QueryTask( gfXML, wfs[i] );
154                queryTasks.add( task );
155                timeout += wfs[i].getTimeout();
156            }
157    
158            List<ExecutionFinishedEvent<FeatureCollection>> finishedEvents = null;
159            try {
160                finishedEvents = Executor.getInstance().performSynchronously( queryTasks, timeout );
161            } catch ( InterruptedException e ) {
162                LOG.logError( e.getMessage(), e );
163                throw new DatastoreException( Messages.getMessage( "WFS_CASCDS_PERFORM_GF" ), e );
164            }
165    
166            return mergeResults( getFeature.getId(), finishedEvents );
167        }
168    
169        /**
170         * Merges the results of the request subparts into one feature collection.
171         * 
172         * @param fcid
173         *            id of the new (result) feature collection
174         * @param finishedEvents
175         * @return feature collection containing all features from all responses
176         * @throws DatastoreException
177         */
178        private FeatureCollection mergeResults( String fcid, List<ExecutionFinishedEvent<FeatureCollection>> finishedEvents )
179                                throws DatastoreException {
180    
181            FeatureCollection result = null;
182            int numFeatures = 0;
183    
184            try {
185                for ( ExecutionFinishedEvent<FeatureCollection> event : finishedEvents ) {
186                    if ( result == null ) {
187                        result = event.getResult();
188                    } else {
189                        result.addAllUncontained( event.getResult() );
190                    }
191    
192                    if ( result.getAttribute( "numberOfFeatures" ) != null
193                         && result.getAttribute( "numberOfFeatures" ).length() > 0 ) {
194                        try {
195                            numFeatures += Integer.parseInt( result.getAttribute( "numberOfFeatures" ) );
196                        } catch ( Exception e ) {
197                            LOG.logWarning( "value of attribute numberOfFeatures is not a number: " +
198                                            result.getAttribute( "numberOfFeatures" ) );
199                            // fall back
200                            numFeatures += result.size();
201                        }
202                    } else {
203                        numFeatures += result.size();
204                    }
205                }
206            } catch ( CancellationException e ) {
207                LOG.logError( e.getMessage(), e );
208                String msg = Messages.getMessage( "WFS_GET_FEATURE_TIMEOUT", e.getMessage() );
209                throw new DatastoreException( msg, e );
210            } catch ( Throwable t ) {
211                LOG.logError( t.getMessage(), t );
212                String msg = Messages.getMessage( "WFS_GET_FEATURE_BACKEND", t.getMessage() );
213                throw new DatastoreException( msg, t );
214            }
215    
216            result.setId( fcid );
217            result.setAttribute( "numberOfFeatures", "" + numFeatures );
218            return result;
219        }
220    
221        /**
222         * Returns the {@link WFSCapabilities} for the WFS that can be reached at the given {@link URL}.
223         * 
224         * @param wfsBaseURL
225         *            base URL of the WFS
226         * @return the {@link WFSCapabilities} of the WFS
227         * @throws DatastoreException
228         */
229        WFSCapabilities getWFSCapabilities( URL wfsBaseURL )
230                                throws DatastoreException {
231    
232            String href = OWSUtils.validateHTTPGetBaseURL( wfsBaseURL.toExternalForm() );
233            href = href + "request=GetCapabilities&version=1.1.0&service=WFS";
234    
235            LOG.logDebug( "requested capabilities: ", href );
236    
237            URL requestURL = null;
238            try {
239                requestURL = new URL( href );
240            } catch ( MalformedURLException e1 ) {
241                e1.printStackTrace();
242            }
243    
244            WFSCapabilities caps = wfsCapabilities.get( requestURL );
245            if ( caps == null ) {
246                // access capabilities if not already has been loaded
247                WFSCapabilitiesDocument cd = new WFSCapabilitiesDocument();
248                try {
249                    cd.load( requestURL );
250                } catch ( IOException e ) {
251                    LOG.logError( e.getMessage(), e );
252                    throw new DatastoreException( e.getMessage() );
253                } catch ( SAXException e ) {
254                    LOG.logError( e.getMessage(), e );
255                    throw new DatastoreException( e.getMessage() );
256                }
257                try {
258                    caps = (WFSCapabilities) cd.parseCapabilities();
259                } catch ( InvalidCapabilitiesException e ) {
260                    LOG.logError( e.getMessage(), e );
261                    throw new DatastoreException( e.getMessage() );
262                }
263                wfsCapabilities.put( requestURL, caps );
264            }
265            return caps;
266        }
267    
268        // ///////////////////////////////////////////////////////////////////////////
269        // inner classes
270        // ///////////////////////////////////////////////////////////////////////////
271    
272        /**
273         * Inner class for performing queries on a datastore.
274         */
275        private class QueryTask implements Callable<FeatureCollection> {
276    
277            private XMLFragment getFeature;
278    
279            private WFSDescription wfs;
280    
281            /**
282             * 
283             * @param getFeature
284             * @param wfs
285             */
286            QueryTask( XMLFragment getFeature, WFSDescription wfs ) {
287                this.getFeature = getFeature;
288                this.wfs = wfs;
289            }
290    
291            /**
292             * Performs the associated {@link Query} and returns the result.
293             * 
294             * @return resulting feature collection
295             * @throws Exception
296             */
297            @SuppressWarnings("synthetic-access")
298            public FeatureCollection call()
299                                    throws Exception {
300    
301                try {
302                    URL url = OWSUtils.getHTTPPostOperationURL( getWFSCapabilities( wfs.getUrl() ), GetFeature.class );
303    
304                    // filter request if necessary
305                    XSLTDocument inFilter = wfs.getInFilter();
306                    if ( inFilter != null ) {
307                        try {
308                            getFeature = inFilter.transform( getFeature );
309    
310                            if ( LOG.isDebug() ) {
311                                LOG.logDebug( "Infilter from cascading WFS-transformed request",
312                                              getFeature.getAsPrettyString() );
313                            }
314                        } catch ( TransformerException e ) {
315                            LOG.logError( e.getMessage(), e );
316                            throw new DatastoreException( e.getMessage() );
317                        }
318                    }
319    
320                    if ( isFeatureTypeSupported( getFeature, wfs.getUrl() ) ) {
321    
322                        InputStream is = null;
323                        FeatureCollection fc = null;
324                        try {
325                            // perform GetFeature request against cascaded WFS
326                            HttpClient client = new HttpClient();
327                            client = WebUtils.enableProxyUsage( client, url );
328                            client.getHttpConnectionManager().getParams().setSoTimeout( wfs.getTimeout() );
329                            PostMethod post = new PostMethod( url.toExternalForm() );
330                            if ( LOG.isDebug() ) {
331                                LOG.logDebug( "Sending request", getFeature.getAsPrettyString() );
332                                LOG.logDebug( "To URL", url );
333                            }
334                            StringRequestEntity se = new StringRequestEntity( getFeature.getAsString(), "text/xml",
335                                                                              CharsetUtils.getSystemCharset() );
336                            post.setRequestEntity( se );
337                            client.executeMethod( post );
338                            is = post.getResponseBodyAsStream();
339                        } catch ( Exception e ) {
340                            throw new DatastoreException( Messages.getMessage( "DATASTORE_WFS_ACCESS", url ) );
341                        }
342    
343                        // read result as GMLFeatureColllection
344                        GMLFeatureCollectionDocument fcd = new GMLFeatureCollectionDocument( true );
345                        try {
346                            fcd.load( is, url.toExternalForm() );
347                        } catch ( Exception e ) {
348                            if ( LOG.getLevel() == ILogger.LOG_DEBUG ) {
349                                ByteArrayOutputStream bos = new ByteArrayOutputStream( 50000 );
350                                int c = 0;
351                                while ( c > -1 ) {
352                                    c = is.read();
353                                    bos.write( c );
354                                }
355                                byte[] b = bos.toByteArray();
356                                bos.close();
357                                if ( LOG.getLevel() == ILogger.LOG_DEBUG ) {
358                                    LOG.logDebug( new String( b ) );
359                                }
360                            }
361                            LOG.logError( e.getMessage(), e );
362                            throw new DatastoreException( e.getMessage() );
363                        } finally {
364                            try {
365                                is.close();
366                            } catch ( IOException shouldNeverHappen ) {
367                                // and is ignored
368                            }
369                        }
370    
371                        // filter result if necessary
372                        XSLTDocument outFilter = wfs.getOutFilter();
373                        if ( outFilter != null ) {
374                            try {
375                                XMLFragment xml = outFilter.transform( fcd );
376                                fcd = new GMLFeatureCollectionDocument();
377                                fcd.setRootElement( xml.getRootElement() );
378                            } catch ( TransformerException e ) {
379                                LOG.logError( e.getMessage(), e );
380                                throw new DatastoreException( e.getMessage() );
381                            }
382                        }
383                        try {
384                            fc = fcd.parse();
385                        } catch ( XMLParsingException e ) {
386                            LOG.logError( e.getMessage(), e );
387                            throw new DatastoreException( e.getMessage() );
388                        }
389    
390                        return fc;
391                    }
392                } catch ( Exception e ) {
393                    // don't do anything
394                    LOG.logError( e.getMessage(), e );
395                }
396                return FeatureFactory.createFeatureCollection( "ID", 1 );
397            }
398    
399            /**
400             * @return true if the WFS reachable through the passed URL supports all feature types targeted by the passed
401             *         GetFeature request.
402             * 
403             * @param getFeature
404             * @param url
405             * @throws OGCWebServiceException
406             * @throws DatastoreException
407             */
408            private boolean isFeatureTypeSupported( XMLFragment getFeature, URL url )
409                                    throws OGCWebServiceException, DatastoreException {
410    
411                WFSCapabilities caps = getWFSCapabilities( url );
412    
413                GetFeature gf = GetFeature.create( "ID" + System.currentTimeMillis(), getFeature.getRootElement() );
414                Query[] queries = gf.getQuery();
415                for ( int i = 0; i < queries.length; i++ ) {
416                    QualifiedName featureType = queries[i].getTypeNames()[0];
417                    if ( caps.getFeatureTypeList().getFeatureType( featureType ) == null ) {
418                        LOG.logWarning( "Feature type '" + featureType.getPrefixedName()
419                                        + "' is not supported by remote WFS!" );
420                        return false;
421                    }
422                }
423                return true;
424            }
425        }
426    }