001    //$HeadURL: svn+ssh://jwilden@svn.wald.intevation.org/deegree/base/branches/2.5_testing/src/org/deegree/ogcwebservices/csw/manager/CatalogueHarvester.java $
002    /*----------------------------------------------------------------------------
003     This file is part of deegree, http://deegree.org/
004     Copyright (C) 2001-2009 by:
005     Department of Geography, University of Bonn
006     and
007     lat/lon GmbH
008    
009     This library is free software; you can redistribute it and/or modify it under
010     the terms of the GNU Lesser General Public License as published by the Free
011     Software Foundation; either version 2.1 of the License, or (at your option)
012     any later version.
013     This library is distributed in the hope that it will be useful, but WITHOUT
014     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
015     FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
016     details.
017     You should have received a copy of the GNU Lesser General Public License
018     along with this library; if not, write to the Free Software Foundation, Inc.,
019     59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020    
021     Contact information:
022    
023     lat/lon GmbH
024     Aennchenstr. 19, 53177 Bonn
025     Germany
026     http://lat-lon.de/
027    
028     Department of Geography, University of Bonn
029     Prof. Dr. Klaus Greve
030     Postfach 1147, 53001 Bonn
031     Germany
032     http://www.geographie.uni-bonn.de/deegree/
033    
034     e-mail: info@deegree.org
035     ----------------------------------------------------------------------------*/
036    package org.deegree.ogcwebservices.csw.manager;
037    
038    import java.io.IOException;
039    import java.io.InputStream;
040    import java.net.URI;
041    import java.net.URL;
042    import java.sql.SQLException;
043    import java.util.Arrays;
044    import java.util.Collections;
045    import java.util.Date;
046    import java.util.HashMap;
047    import java.util.Iterator;
048    import java.util.List;
049    import java.util.Map;
050    
051    import org.apache.commons.httpclient.HttpClient;
052    import org.apache.commons.httpclient.HttpException;
053    import org.apache.commons.httpclient.methods.PostMethod;
054    import org.apache.commons.httpclient.methods.StringRequestEntity;
055    import org.deegree.enterprise.WebUtils;
056    import org.deegree.framework.log.ILogger;
057    import org.deegree.framework.log.LoggerFactory;
058    import org.deegree.framework.util.CharsetUtils;
059    import org.deegree.framework.util.FileUtils;
060    import org.deegree.framework.util.StringTools;
061    import org.deegree.framework.util.TimeTools;
062    import org.deegree.framework.xml.XMLException;
063    import org.deegree.framework.xml.XMLFragment;
064    import org.deegree.framework.xml.XMLParsingException;
065    import org.deegree.framework.xml.XMLTools;
066    import org.deegree.io.DBPoolException;
067    import org.deegree.ogcwebservices.OGCWebServiceException;
068    import org.deegree.ogcwebservices.csw.configuration.CatalogueConfigurationDocument;
069    import org.deegree.ogcwebservices.csw.manager.HarvestRepository.Record;
070    import org.deegree.ogcwebservices.csw.manager.HarvestRepository.ResourceType;
071    import org.w3c.dom.Element;
072    import org.w3c.dom.Node;
073    import org.xml.sax.SAXException;
074    
075    /**
076     * Harverster implementation for harvesting other catalogue services. Just dataset, series
077     * (datasetcollection) und application metadatatypes will be harvested.
078     * 
079     * 
080     * @version $Revision: 19475 $
081     * @author <a href="mailto:poth@lat-lon.de">Andreas Poth</a>
082     * @author last edited by: $Author: lbuesching $
083     * 
084     * @version 1.0. $Revision: 19475 $, $Date: 2009-09-02 14:51:48 +0200 (Mi, 02 Sep 2009) $
085     * 
086     * @since 2.0
087     */
088    public class CatalogueHarvester extends AbstractHarvester {
089    
090        static final ILogger LOG = LoggerFactory.getLogger( CatalogueHarvester.class );
091    
092        private static CatalogueHarvester ch = null;
093    
094        private enum HarvestOperation {
095            /**
096             *
097             */
098            insert, /**
099                     *
100                     */
101            update, /**
102                     *
103                     */
104            delete, /**
105                     *
106                     */
107            nothing
108        }
109    
110        /**
111         * @param version
112         *            the version of the CSW
113         */
114        private CatalogueHarvester( String version ) {
115            super( version );
116        }
117    
118        /**
119         * singelton
120         * 
121         * @param version
122         *            the version of the CSW
123         * 
124         * @return instance of CatalogueHarvester
125         */
126        public static CatalogueHarvester getInstance( String version ) {
127            if ( ch == null ) {
128                ch = new CatalogueHarvester( version );
129            }
130            return ch;
131        }
132    
133        @Override
134        public void run() {
135            LOG.logDebug( "starting harvest iteration for CatalogueHarvester." );
136            try {
137                HarvestRepository repository = HarvestRepository.getInstance();
138    
139                List<URI> sources = repository.getSources();
140                for ( Iterator<URI> iter = sources.iterator(); iter.hasNext(); ) {
141                    URI source = iter.next();
142                    try {
143                        // determine if source shall be harvested
144                        if ( shallHarvest( source, ResourceType.catalogue ) ) {
145                            // mark source as currently being harvested
146                            inProgress.add( source );
147                            HarvestProcessor processor = new HarvestProcessor( this, source );
148                            processor.start();
149                        }
150                    } catch ( Exception e ) {
151                        e.printStackTrace();
152                        LOG.logError( Messages.format( "CatalogueHarvester.exception1", source ), e );
153                        informResponseHandlers( source, e );
154                    }
155                }
156            } catch ( Exception e ) {
157                LOG.logError( Messages.getString( "CatalogueHarvester.exception2" ), e );
158            }
159    
160        }
161    
162        /**
163         * inner class for processing asynchronous harvesting of a catalogue
164         * 
165         * @version $Revision: 19475 $
166         * @author <a href="mailto:poth@lat-lon.de">Andreas Poth</a>
167         * @author last edited by: $Author: lbuesching $
168         * 
169         * @version 1.0. $Revision: 19475 $, $Date: 2009-09-02 14:51:48 +0200 (Mi, 02 Sep 2009) $
170         * 
171         * @since 2.0
172         */
173        protected class HarvestProcessor extends AbstractHarvestProcessor {
174    
175            private Map<String, Record> records = new HashMap<String, Record>( 10000 );
176    
177            private String sourceVersion = "2.0.0";
178    
179            /**
180             * 
181             * @param owner
182             * @param source
183             */
184            HarvestProcessor( AbstractHarvester owner, URI source ) {
185                super( owner, source );
186                try {
187                    String capaRequest = source + "?REQUEST=GetCapabilities&service=CSW";
188                    CatalogueConfigurationDocument capa = new CatalogueConfigurationDocument();
189                    capa.load( new URL( capaRequest ) );
190                    List<String> versions = Arrays.asList( capa.getServiceIdentification().getServiceTypeVersions() );
191                    Collections.sort( versions );
192                    sourceVersion = versions.get( versions.size() - 1 );
193                } catch ( IOException e ) {
194                    LOG.logError( Messages.format( "CatalogueHarvester.exception5", source ), e );
195                } catch ( SAXException e ) {
196                    LOG.logError( Messages.format( "CatalogueHarvester.exception6", source ), e );
197                } catch ( XMLParsingException e ) {
198                    LOG.logError( Messages.format( "CatalogueHarvester.exception7", source ), e );
199                }
200            }
201    
202            @Override
203            public void run() {
204    
205                String[] typeNames = new String[] { "csw:dataset", "csw:datasetcollection", "csw:application",
206                                                   "csw:service" };
207                records.clear();
208                try {
209                    HarvestRepository repository = HarvestRepository.getInstance();
210                    XMLFragment metaData = null;
211                    Date harvestingTimestamp = repository.getNextHarvestingTimestamp( source );
212    
213                    if ( "2.0.2".equals( sourceVersion ) ) {
214                        runHarvest( "", metaData, repository );
215                    } else {
216                        for ( int i = 0; i < typeNames.length; i++ ) {
217                            runHarvest( typeNames[i], metaData, repository );
218                        }
219                    }
220    
221                    // delete all records from the target catalogue and the
222                    // from harvest cache
223                    deleteRecordsNoHostedAnymore( source );
224    
225                    // update timestamps just if transaction has been performed
226                    // successfully
227                    writeLastHarvestingTimestamp( source, harvestingTimestamp );
228                    writeNextHarvestingTimestamp( source, harvestingTimestamp );
229                    // inform handlers assigend to the harvest request about successfully harvested
230                    // CSW. Even if harvesting a few records has failed s harvest process will
231                    // declared
232                    // as successfull if it cann be fineshed regulary
233                    informResponseHandlers( source );
234                    if ( repository.getHarvestInterval( source ) <= 0 ) {
235                        repository.dropRequest( source );
236                    }
237                } catch ( Exception e ) {
238                    LOG.logError( Messages.format( "CatalogueHarvester.exception4", source ), e );
239                    try {
240                        e.printStackTrace();
241                        owner.informResponseHandlers( source, e );
242                    } catch ( Exception ee ) {
243                        ee.printStackTrace();
244                    }
245                } finally {
246                    inProgress.remove( source );
247                }
248    
249            }
250    
251            /**
252             * 
253             * @param typeName
254             * @param metaData
255             * @param repository
256             * @throws XMLException
257             * @throws IOException
258             * @throws SAXException
259             * @throws XMLParsingException
260             */
261            private void runHarvest( String typeName, XMLFragment metaData, HarvestRepository repository )
262                                    throws XMLException, IOException, SAXException, XMLParsingException {
263                int index = 1;
264                int hits = getNoOfMetadataRecord( source, typeName );
265                LOG.logInfo( hits + " metadatasets to harvest ..." );
266                for ( int j = 0; j < hits; j++ ) {
267                    try {
268                        // read index'th metadata set from CSW
269                        metaData = getNextMetadataRecord( source, index, typeName );
270                        if ( metaData != null ) {
271                            // read record from harvest database if dataset has been harvested
272                            // before
273                            // or create a new one
274                            Record record = createOrGetRecord( source, metaData );
275                            records.put( record.getFileIdentifier(), record );
276                            String trans = null;
277                            try {
278                                // determine harvest operation to perfrom
279                                // insert: dataset has not been harvested before
280                                // update: dataset has been harvested before but has changed
281                                // nothing: e.g. dataset is not a known metadata format
282                                HarvestOperation ho = getHarvestOperation( record, metaData );
283                                if ( ho == HarvestOperation.insert ) {
284                                    trans = createInsertRequest( metaData );
285                                } else if ( ho == HarvestOperation.update ) {
286                                    trans = createUpdateRequest( getID( metaData ),
287                                                                 getIdentifierXPathForUpdate( metaData ), metaData );
288                                }
289                                // perform harvesting for current dataset; insert it or update
290                                // extisting dataset in this CSW
291                                if ( ho != HarvestOperation.nothing ) {
292                                    performTransaction( trans );
293                                    repository.storeRecord( record );
294                                } else {
295                                    LOG.logInfo( "nothing to Harvest" );
296                                }
297                            } catch ( Throwable e ) {
298                                LOG.logError( Messages.format( "CatalogueHarvester.exception3", index, getID( metaData ),
299                                                               source ), e );
300                                try {
301                                    // inform handlers assigend to the harvest request about
302                                    // failure
303                                    // harvesting one specifi dataset.
304                                    // notice:
305                                    // if harvisting one dataset fails, not the complete harvest
306                                    // process fails; the process gones on with next record
307                                    owner.informResponseHandlers( source, e );
308                                } catch ( Exception ee ) {
309                                    ee.printStackTrace();
310                                }
311                                // remove fileIdentifier of current dataset from list of
312                                // inserted
313                                // or updated datasets. After process all available metadata
314                                // records this list will be used to adjust the list of dataset
315                                // assigend to a specific CSW in harvest-metadata db schema
316                                records.remove( record.getFileIdentifier() );
317                            }
318                        } else {
319                            LOG.logInfo( "harvesting will be stopped at index: " + index + " because metadata == null" );
320                        }
321                        LOG.logDebug( index + " metadata " + ( metaData == null ) );
322                    } catch ( Throwable e ) {
323                        LOG.logError( Messages.format( "CatalogueHarvester.exception3", index, "not available", source ), e );
324                        try {
325                            e.printStackTrace();
326                            // inform handlers assigend to the harvest request about failure
327                            // harvesting one specific dataset.
328                            // notice: if harvisting one dataset fails, not the complete harvest
329                            // process fails; the process gones on with next record
330                            owner.informResponseHandlers( source, e );
331                        } catch ( Exception ee ) {
332                            ee.printStackTrace();
333                        }
334                    }
335                    index++;
336                    if ( index % 1000 == 0 ) {
337                        System.gc();
338                    }
339    
340                }
341            }
342    
343            /**
344             * returns the XPath the metadata records dateStamp
345             * 
346             * @param metaData
347             * @return the XPath the metadata records dateStamp
348             */
349            private String getDateStampXPath( XMLFragment metaData ) {
350                String xpath = null;
351                if ( metaData != null ) {
352                    String nspace = metaData.getRootElement().getNamespaceURI();
353                    nspace = StringTools.replace( nspace, "http://", "", true );
354                    xpath = Messages.getString( "dateStamp_" + nspace );
355                }
356                return xpath;
357            }
358    
359            /**
360             * returns the identifier of a metadata record to enable its update and deletion
361             * 
362             * @param metaData
363             * @return the identifier of a metadata record to enable its update and deletion
364             * @throws XMLParsingException
365             */
366            private String getID( XMLFragment metaData )
367                                    throws XMLParsingException {
368                String xpath = getIdentifierXPath( metaData );
369                String fileIdentifier = XMLTools.getRequiredNodeAsString( metaData.getRootElement(), xpath, nsc );
370                return fileIdentifier;
371            }
372    
373            @Override
374            protected String createConstraint( String identifier, String xPath )
375                                    throws IOException {
376    
377                // read template from file
378                URL url = Templates.getTemplate( "Constraints_" + version );
379                String constraints = FileUtils.readTextFile( url ).toString();
380    
381                constraints = StringTools.replace( constraints, "$identifier$", identifier, false );
382                return StringTools.replace( constraints, "$xPath$", xPath, false );
383            }
384    
385            /**
386             * validates if a record stored in the harvester cache if not provided by the harvested
387             * catalogue any more; if so the record will be removed from the cache and the harvesting
388             * catalogue.
389             * 
390             * @throws IOException
391             * @throws SQLException
392             * @throws DBPoolException
393             * @throws SAXException
394             * @throws OGCWebServiceException
395             * 
396             */
397            private void deleteRecordsNoHostedAnymore( URI source )
398                                    throws DBPoolException, SQLException, IOException, OGCWebServiceException, SAXException {
399                HarvestRepository repository = HarvestRepository.getInstance();
400                List<String> cache = repository.getAllRecords( source );
401                int id = repository.getSourceID( source );
402                for ( int i = 0; i < cache.size(); i++ ) {
403                    String fid = cache.get( i );
404                    Record record = records.remove( fid );
405                    if ( record == null ) {
406                        repository.dropRecord( repository.new Record( id, null, fid, source ) );
407                        String trans = createDeleteRequest( fid );
408                        performTransaction( trans );
409                    }
410                }
411            }
412    
413            /**
414             * the method tries to read a record from the harvest repository. If the is not already
415             * stored in the repository a new record will be created
416             * 
417             * @param metaData
418             * @return record from harvest repository
419             * @throws XMLParsingException
420             * @throws IOException
421             * @throws SQLException
422             * @throws DBPoolException
423             */
424            private Record createOrGetRecord( URI source, XMLFragment metaData )
425                                    throws XMLParsingException, IOException, DBPoolException, SQLException {
426    
427                String xpath = getIdentifierXPath( metaData );
428                String fileIdentifier = XMLTools.getRequiredNodeAsString( metaData.getRootElement(), xpath, nsc );
429    
430                HarvestRepository repository = HarvestRepository.getInstance();
431                Record record = repository.getRecordByID( source, fileIdentifier );
432                if ( record == null ) {
433                    xpath = getDateStampXPath( metaData );
434                    String s = XMLTools.getRequiredNodeAsString( metaData.getRootElement(), xpath, nsc );
435                    Date date = TimeTools.createCalendar( s ).getTime();
436                    record = repository.new Record( -1, date, fileIdentifier, source );
437                }
438    
439                return record;
440            }
441    
442            /**
443             * determines what operation shall be performed on a metadata record read from a remote
444             * catalogue
445             * 
446             * @param metaData
447             * @return type of harvest operation to perform
448             * @throws XMLParsingException
449             */
450            private HarvestOperation getHarvestOperation( Record record, XMLFragment metaData )
451                                    throws XMLParsingException {
452    
453                HarvestOperation ho = HarvestOperation.nothing;
454                if ( record.getSourceId() < 0 ) {
455                    ho = HarvestOperation.insert;
456                } else {
457                    String xpath = getDateStampXPath( metaData );
458                    String s = XMLTools.getRequiredNodeAsString( metaData.getRootElement(), xpath, nsc );
459                    Date date = TimeTools.createCalendar( s ).getTime();
460                    if ( !date.equals( record.getDatestamp() ) ) {
461                        ho = HarvestOperation.update;
462                    }
463                }
464                return ho;
465            }
466    
467            /**
468             * read
469             * 
470             * @param source
471             * @return Metadata record
472             * @throws IOException
473             * @throws HttpException
474             * @throws SAXException
475             * @throws XMLException
476             * @throws XMLParsingException
477             */
478            private XMLFragment getNextMetadataRecord( URI source, int index, String type )
479                                    throws IOException, XMLException, SAXException, XMLParsingException {
480    
481                // read template from file
482                URL url = Templates.getTemplate( "GetRecords_" + sourceVersion );
483                String getRecords = FileUtils.readTextFile( url ).toString();
484                getRecords = StringTools.replace( getRecords, "$index$", Integer.toString( index ), false );
485                getRecords = StringTools.replace( getRecords, "$type$", type, false );
486    
487                StringRequestEntity re = new StringRequestEntity( getRecords, "text/xml", CharsetUtils.getSystemCharset() );
488                PostMethod post = new PostMethod( source.toASCIIString() );
489                post.setRequestEntity( re );
490                HttpClient client = new HttpClient();
491                int timeout = 30000;
492                try {
493                    timeout = Integer.parseInt( Messages.getString( "harvest.source.timeout" ) );
494                } catch ( Exception e ) {
495                    LOG.logInfo( "can not read timeout from messages.properties because: " + e.getMessage()
496                                 + "; use 30 sec as default" );
497                }
498                client.getHttpConnectionManager().getParams().setSoTimeout( timeout );
499                client = WebUtils.enableProxyUsage( client, source.toURL() );
500                client.executeMethod( post );
501                InputStream is = post.getResponseBodyAsStream();
502                XMLFragment xml = new XMLFragment();
503                xml.load( is, source.toURL().toExternalForm() );
504    
505                Node node = XMLTools.getNode( xml.getRootElement(), Messages.getString( "SearchResult.child_"
506                                                                                        + sourceVersion ), nsc );
507                if ( node != null ) {
508                    xml.setRootElement( (Element) node );
509                } else {
510                    xml = null;
511                }
512    
513                return xml;
514            }
515    
516            private int getNoOfMetadataRecord( URI source, String type )
517                                    throws IOException, XMLException, SAXException, XMLParsingException {
518    
519                // read template from file
520                URL url = Templates.getTemplate( "GetNoOfRecords_" + sourceVersion );
521                String getRecords = FileUtils.readTextFile( url ).toString();
522                getRecords = StringTools.replace( getRecords, "$type$", type, false );
523                StringRequestEntity re = new StringRequestEntity( getRecords, "text/xml", CharsetUtils.getSystemCharset() );
524                PostMethod post = new PostMethod( source.toASCIIString() );
525                post.setRequestEntity( re );
526                HttpClient client = new HttpClient();
527                client.getHttpConnectionManager().getParams().setSoTimeout( 30000 );
528                client = WebUtils.enableProxyUsage( client, source.toURL() );
529                client.executeMethod( post );
530                InputStream is = post.getResponseBodyAsStream();
531                XMLFragment xml = new XMLFragment();
532                xml.load( is, source.toURL().toExternalForm() );
533    
534                return XMLTools.getNodeAsInt( xml.getRootElement(), Messages.getString( "NumberOfRecordsMatched_"
535                                                                                        + sourceVersion ), nsc, 0 );
536            }
537    
538        }
539    
540    }