001/* DeDupFetchHTTP
002 * 
003 * Created on 10.04.2006
004 *
005 * Copyright (C) 2006 National and University Library of Iceland
006 * 
007 * This file is part of the DeDuplicator (Heritrix add-on module).
008 * 
009 * DeDuplicator is free software; you can redistribute it and/or modify
010 * it under the terms of the GNU Lesser Public License as published by
011 * the Free Software Foundation; either version 2.1 of the License, or
012 * any later version.
013 * 
014 * DeDuplicator is distributed in the hope that it will be useful, 
015 * but WITHOUT ANY WARRANTY; without even the implied warranty of
016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017 * GNU Lesser Public License for more details.
018 * 
019 * You should have received a copy of the GNU Lesser Public License
020 * along with DeDuplicator; if not, write to the Free Software
021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022 */
023package is.hi.bok.deduplicator;
024
025import java.io.File;
026import java.io.IOException;
027import java.text.SimpleDateFormat;
028import java.util.List;
029
030import org.apache.commons.httpclient.HttpConnection;
031import org.apache.commons.httpclient.HttpMethod;
032import org.apache.lucene.document.Document;
033import org.apache.lucene.index.DirectoryReader;
034import org.apache.lucene.index.IndexReader;
035import org.apache.lucene.search.ConstantScoreQuery;
036import org.apache.lucene.search.IndexSearcher;
037import org.apache.lucene.search.Query;
038import org.apache.lucene.search.ScoreDoc;
039import org.apache.lucene.search.TermRangeFilter;
040import org.apache.lucene.store.FSDirectory;
041import org.apache.lucene.util.BytesRef;
042import org.archive.crawler.datamodel.CrawlURI;
043import org.archive.crawler.fetcher.FetchHTTP;
044import org.archive.crawler.frontier.AdaptiveRevisitAttributeConstants;
045import org.archive.crawler.settings.SimpleType;
046import org.archive.crawler.settings.Type;
047import org.archive.httpclient.HttpRecorderMethod;
048import org.archive.util.ArchiveUtils;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import dk.netarkivet.common.utils.AllDocsCollector;
053
054/**
055 * An extension of Heritrix's {@link org.archive.crawler.fetcher.FetchHTTP} processor for downloading HTTP documents.
056 * This extension adds a check after the content header has been downloaded that compares the 'last-modified' and or
057 * 'last-etag' values from the header against information stored in an appropriate index.
058 *
059 * @author Kristinn Sigurðsson
060 * @author Søren Vejrup Carlsen
061 * @see is.hi.bok.deduplicator.DigestIndexer
062 * @see org.archive.crawler.fetcher.FetchHTTP
063 */
064
065public class DeDupFetchHTTP extends FetchHTTP implements AdaptiveRevisitAttributeConstants {
066
067    private static final long serialVersionUID = ArchiveUtils.classnameBasedUID(DeDupFetchHTTP.class, 1);
068
069    private static final Logger log = LoggerFactory.getLogger(FetchHTTP.class.getName());
070
071    protected IndexSearcher index;
072    protected IndexReader indexReader;
073    protected String mimefilter = DEFAULT_MIME_FILTER;
074    protected boolean blacklist = true;
075
076    SimpleDateFormat sdfLastModified;
077    SimpleDateFormat sdfIndexDate;
078
079    protected long processedURLs = 0;
080    protected long unchangedURLs = 0;
081
082    protected boolean useSparseRangeFilter = DEFAULT_USE_SPARSE_RANGE_FILTER;
083
084    // Settings.
085    public static final String ATTR_DECISION_SCHEME = "decision-scheme";
086    public static final String SCHEME_TIMESTAMP = "Timestamp only";
087    public static final String SCHEME_ETAG = "Etag only";
088    public static final String SCHEME_TIMESTAMP_AND_ETAG = "Timestamp AND Etag";
089    public static final String SCHEME_TIMESTAMP_OR_ETAG = "Timestamp OR Etag";
090    public static final String[] AVAILABLE_DECISION_SCHEMES = {SCHEME_TIMESTAMP, SCHEME_ETAG,
091            SCHEME_TIMESTAMP_AND_ETAG, SCHEME_TIMESTAMP_OR_ETAG};
092    public static final String DEFAULT_DECISION_SCHEME = SCHEME_TIMESTAMP;
093
094    public static final String ATTR_INDEX_LOCATION = "index-location";
095    public static final String DEFAULT_INDEX_LOCATION = "";
096
097    /**
098     * The filter on mime types. This is either a blacklist or whitelist depending on ATTR_FILTER_MODE.
099     */
100    public final static String ATTR_MIME_FILTER = "mime-filter";
101    public final static String DEFAULT_MIME_FILTER = "^text/.*";
102
103    /**
104     * Is the mime filter a blacklist (do not apply processor to what matches) or whitelist (apply processor only to
105     * what matches).
106     */
107    public final static String ATTR_FILTER_MODE = "filter-mode";
108    public final static String[] AVAILABLE_FILTER_MODES = {"Blacklist", "Whitelist"};
109    public final static String DEFAULT_FILTER_MODE = AVAILABLE_FILTER_MODES[0];
110
111    /** Should we use sparse queries (uses less memory at a cost to performance? * */
112    public final static String ATTR_USE_SPARSE_RANGE_FILTER = "use-sparse-range-filter";
113    public final static Boolean DEFAULT_USE_SPARSE_RANGE_FILTER = new Boolean(false);
114
115    public DeDupFetchHTTP(String name) {
116        super(name);
117        setDescription("Fetch HTTP processor that aborts downloading of "
118                + "unchanged documents. This processor extends the standard "
119                + "FetchHTTP processor, adding a check after the header is "
120                + "downloaded where the header information for 'last-modified' "
121                + "and 'etag' is compared against values stored in a Lucene "
122                + "index built using the DigestIndexer.\n Note that the index "
123                + "must have been built indexed by URL and the Timestamp "
124                + "and/or Etag info must have been included in the index!");
125        Type t;
126        t = new SimpleType(ATTR_DECISION_SCHEME, "The different schmes for deciding when to re-download a "
127                + "page given an old version of the same page (or rather " + "meta-data on it)\n "
128                + "Timestamp only: Download when a datestamp is missing "
129                + "in either the downloaded header or index or if the header "
130                + "datestamp is newer then the one in the index.\n "
131                + "Etag only: Download when the Etag is missing in either the"
132                + "header download or the index or the header Etag and the one " + "in the index differ.\n "
133                + "Timestamp AND Etag: When both datestamp and Etag are "
134                + "available in both the header download and the index, "
135                + "download if EITHER of them indicates change."
136                + "Timestamp OR Etag: When both datestamp and Etag are "
137                + "available in both the header download and the index, "
138                + "download only if BOTH of them indicate change.", DEFAULT_DECISION_SCHEME, AVAILABLE_DECISION_SCHEMES);
139        addElementToDefinition(t);
140        t = new SimpleType(ATTR_INDEX_LOCATION, "Location of index (full path). Can not be changed at run " + "time.",
141                DEFAULT_INDEX_LOCATION);
142        t.setOverrideable(false);
143        addElementToDefinition(t);
144        t = new SimpleType(ATTR_MIME_FILTER, "A regular expression that the mimetype of all documents "
145                + "will be compared against. Only those that pass will be " + "considered. Others are given a pass. "
146                + "\nIf the attribute filter-mode is " + "set to 'Blacklist' then all the documents whose mimetype "
147                + "matches will be ignored by this processor. If the filter-"
148                + "mode is set to 'Whitelist' only those documents whose " + "mimetype matches will be processed.",
149                DEFAULT_MIME_FILTER);
150        t.setOverrideable(false);
151        t.setExpertSetting(true);
152        addElementToDefinition(t);
153        t = new SimpleType(ATTR_FILTER_MODE, "Determines if the mime-filter acts as a blacklist (declares "
154                + "what should be ignored) or whitelist (declares what should " + "be processed).",
155                DEFAULT_FILTER_MODE, AVAILABLE_FILTER_MODES);
156        t.setOverrideable(false);
157        t.setExpertSetting(true);
158        addElementToDefinition(t);
159
160        t = new SimpleType(ATTR_USE_SPARSE_RANGE_FILTER, "If set to true, then Lucene queries use a custom 'sparse' "
161                + "range filter. This uses less memory at the cost of some "
162                + "lost performance. Suitable for very large indexes.", DEFAULT_USE_SPARSE_RANGE_FILTER);
163        t.setOverrideable(false);
164        t.setExpertSetting(true);
165        addElementToDefinition(t);
166    }
167
168    protected boolean checkMidfetchAbort(CrawlURI curi, HttpRecorderMethod method, HttpConnection conn) {
169        // We'll check for prerequisites here since there is no way to know
170        // if the super method returns false because of a prereq or because
171        // all filters accepeted.
172        if (curi.isPrerequisite()) {
173            return false;
174        }
175
176        // Run super to allow filters to also abort. Also this method has
177        // been pressed into service as a general 'stuff to do at this point'
178        boolean ret = super.checkMidfetchAbort(curi, method, conn);
179
180        // Ok, now check for duplicates.
181        if (isDuplicate(curi)) {
182            ret = true;
183            unchangedURLs++;
184            curi.putInt(A_CONTENT_STATE_KEY, CONTENT_UNCHANGED);
185            curi.addAnnotation("header-duplicate");
186
187        }
188
189        return ret;
190    }
191
192    /**
193     * Compare the header infomation for 'last-modified' and/or 'etag' against data in the index.
194     *
195     * @param curi The Crawl URI being processed.
196     * @return True if header infomation indicates that the document has not changed since the crawl that the index is
197     * based on was performed.
198     */
199    protected boolean isDuplicate(CrawlURI curi) {
200        boolean ret = false;
201        if (curi.getContentType() != null && curi.getContentType().matches(mimefilter) != blacklist) {
202            processedURLs++;
203            // Ok, passes mime-filter
204            HttpMethod method = (HttpMethod) curi.getObject(A_HTTP_TRANSACTION);
205            // Check the decision scheme.
206            String scheme = (String) getUncheckedAttribute(curi, ATTR_DECISION_SCHEME);
207
208            Document doc = lookup(curi);
209
210            if (doc != null) {
211                // Found a hit. Do the necessary evalution.
212                if (scheme.equals(SCHEME_TIMESTAMP)) {
213                    ret = datestampIndicatesNonChange(method, doc);
214                } else if (scheme.equals(SCHEME_ETAG)) {
215                    ret = etagIndicatesNonChange(method, doc);
216                } else {
217
218                    if (scheme.equals(SCHEME_TIMESTAMP_AND_ETAG)) {
219                        ret = datestampIndicatesNonChange(method, doc) && etagIndicatesNonChange(method, doc);
220                    } else if (scheme.equals(SCHEME_TIMESTAMP_OR_ETAG)) {
221                        ret = datestampIndicatesNonChange(method, doc) || etagIndicatesNonChange(method, doc);
222                    } else {
223                        log.error("Unknown decision sceme: {}",scheme);
224                    }
225                }
226            }
227        }
228        return ret;
229    }
230
231    /**
232     * Checks if the 'last-modified' in the HTTP header and compares it against the timestamp in the supplied Lucene
233     * document. If both dates are found and the header's date is older then the datestamp indicates non-change.
234     * Otherwise a change must be assumed.
235     *
236     * @param method HTTPMethod that allows access to the relevant HTTP header
237     * @param doc The Lucene document to compare against
238     * @return True if a the header and document data indicates a non-change. False otherwise.
239     */
240    protected boolean datestampIndicatesNonChange(HttpMethod method, Document doc) {
241        String headerDate = null;
242        if (method.getResponseHeader("last-modified") != null) {
243            headerDate = method.getResponseHeader("last-modified").getValue();
244        }
245        String indexDate = doc.get(DigestIndexer.FIELD_TIMESTAMP);
246
247        if (headerDate != null && indexDate != null) {
248            try {
249                // If both dates exist and last-modified is before the index
250                // date then we assume no change has occured.
251                return (sdfLastModified.parse(headerDate)).before(sdfIndexDate.parse(indexDate));
252            } catch (Exception e) {
253                // Any exceptions parsing the date should be interpreted as
254                // missing date information.
255                // ParseException and NumberFormatException are the most
256                // likely exceptions to occur.
257                return false;
258            }
259        }
260        return false;
261    }
262
263    /**
264     * Checks if the 'etag' in the HTTP header and compares it against the etag in the supplied Lucene document. If both
265     * dates are found and match then the datestamp indicate non-change. Otherwise a change must be assumed.
266     *
267     * @param method HTTPMethod that allows access to the relevant HTTP header
268     * @param doc The Lucene document to compare against
269     * @return True if a the header and document data indicates a non-change. False otherwise.
270     */
271    protected boolean etagIndicatesNonChange(HttpMethod method, Document doc) {
272        String headerEtag = null;
273        if (method.getResponseHeader("last-etag") != null) {
274            headerEtag = method.getResponseHeader("last-etag").getValue();
275        }
276        String indexEtag = doc.get(DigestIndexer.FIELD_ETAG);
277
278        if (headerEtag != null && indexEtag != null) {
279            // If both etags exist and are identical then we assume no
280            // change has occured.
281            return headerEtag.equals(indexEtag);
282        }
283        return false;
284    }
285
286    /**
287     * Searches the index for the URL of the given CrawlURI. If multiple hits are found the most recent one is returned
288     * if the index included the timestamp, otherwise a random one is returned. If no hit is found null is returned.
289     *
290     * @param curi The CrawlURI to search for
291     * @return the index Document matching the URI or null if none was found
292     */
293    protected Document lookup(CrawlURI curi) {
294        try {
295            Query query = null;
296
297            /** The least memory demanding query. */
298            BytesRef curiStringRef = new BytesRef(curi.toString().getBytes());
299            query = new ConstantScoreQuery(new TermRangeFilter(DigestIndexer.FIELD_URL, curiStringRef, curiStringRef,
300                    true, true));
301
302            /** The preferred solution, but it seems also more memory demanding */
303            // query = new ConstantScoreQuery(new FieldCacheTermsFilter(fieldName,
304            // value));
305
306            AllDocsCollector collectAllCollector = new AllDocsCollector();
307            index.search(query, collectAllCollector);
308
309            List<ScoreDoc> hits = collectAllCollector.getHits();
310            Document doc = null;
311            if (hits != null && hits.size() > 0) {
312                // If there are multiple hits, use the one with the most
313                // recent date.
314                Document docToEval = null;
315                for (ScoreDoc hit : hits) {
316                    int docId = hit.doc;
317                    doc = index.doc(docId);
318                    // The format of the timestamp ("yyyyMMddHHmmssSSS") allows
319                    // us to do a greater then (later) or lesser than (earlier)
320                    // comparison of the strings.
321                    String timestamp = doc.get(DigestIndexer.FIELD_TIMESTAMP);
322                    if (docToEval == null || timestamp == null
323                            || docToEval.get(DigestIndexer.FIELD_TIMESTAMP).compareTo(timestamp) > 0) {
324                        // Found a more recent hit or timestamp is null
325                        // NOTE: Either all hits should have a timestamp or
326                        // none. This implementation will cause the last
327                        // URI in the hit list to be returned if there is no
328                        // timestamp.
329                        docToEval = doc;
330                    }
331                }
332                return docToEval;
333            }
334        } catch (IOException e) {
335            log.error("Error accessing index.", e);
336        }
337        return null;
338    }
339
340    @Override
341    public void finalTasks() {
342        super.finalTasks();
343    }
344
345    @Override
346    public void initialTasks() {
347        super.initialTasks();
348        // Index location
349        try {
350            String indexLocation = (String) getAttribute(ATTR_INDEX_LOCATION);
351            FSDirectory indexDir = FSDirectory.open(new File(indexLocation));
352            // https://issues.apache.org/jira/browse/LUCENE-1566
353            // Reduce chunksize to avoid OOM to half the size of the default (=100 MB)
354            int chunksize = indexDir.getReadChunkSize();
355            indexDir.setReadChunkSize(chunksize / 2);
356            IndexReader reader = DirectoryReader.open(indexDir);
357            index = new IndexSearcher(reader);
358        } catch (Exception e) {
359            log.error("Unable to find/open index.", e);
360        }
361
362        // Mime filter
363        try {
364            mimefilter = (String) getAttribute(ATTR_MIME_FILTER);
365        } catch (Exception e) {
366            log.error("Unable to get attribute " + ATTR_MIME_FILTER, e);
367        }
368
369        // Filter mode (blacklist (default) or whitelist)
370        try {
371            blacklist = ((String) getAttribute(ATTR_FILTER_MODE)).equals(DEFAULT_FILTER_MODE);
372        } catch (Exception e) {
373            log.error("Unable to get attribute " + ATTR_FILTER_MODE, e);
374        }
375
376        // Date format of last-modified is EEE, dd MMM yyyy HH:mm:ss z
377        sdfLastModified = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z");
378        // Date format of indexDate is yyyyMMddHHmmssSSS
379        sdfIndexDate = new SimpleDateFormat("yyyyMMddHHmmssSSS");
380
381        // Range Filter type
382        try {
383            useSparseRangeFilter = ((Boolean) getAttribute(ATTR_USE_SPARSE_RANGE_FILTER)).booleanValue();
384        } catch (Exception e) {
385            log.error("Unable to get attribute " + ATTR_USE_SPARSE_RANGE_FILTER, e);
386            useSparseRangeFilter = DEFAULT_USE_SPARSE_RANGE_FILTER;
387        }
388    }
389
390    @Override
391    public String report() {
392        StringBuffer ret = new StringBuffer();
393        ret.append("Processor: is.hi.bok.deduplicator.DeDupFetchHTTP\n");
394        ret.append("  URLs compared against index: " + processedURLs + "\n");
395        ret.append("  URLs judged unchanged:       " + unchangedURLs + "\n");
396        ret.append("  processor extends (parent report)\n");
397        ret.append(super.report());
398        return ret.toString();
399    }
400
401}