001/* DeDuplicator
002 * 
003 * Created on 10.04.2006
004 *
005 * Copyright (C) 2006 National and University Library of Iceland
006 * 
007 * This file is part of the DeDuplicator (Heritrix add-on module).
008 * 
009 * DeDuplicator is free software; you can redistribute it and/or modify
010 * it under the terms of the GNU Lesser Public License as published by
011 * the Free Software Foundation; either version 2.1 of the License, or
012 * any later version.
013 * 
014 * DeDuplicator is distributed in the hope that it will be useful, 
015 * but WITHOUT ANY WARRANTY; without even the implied warranty of
016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017 * GNU Lesser Public License for more details.
018 * 
019 * You should have received a copy of the GNU Lesser Public License
020 * along with DeDuplicator; if not, write to the Free Software
021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022 */
023package is.hi.bok.deduplicator;
024
025import java.io.File;
026import java.io.IOException;
027import java.text.ParseException;
028import java.text.SimpleDateFormat;
029import java.util.Date;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Locale;
034import java.util.logging.Level;
035import java.util.logging.Logger;
036
037import org.apache.commons.httpclient.HttpMethod;
038import org.apache.lucene.document.Document;
039import org.apache.lucene.index.DirectoryReader;
040import org.apache.lucene.index.IndexReader;
041import org.apache.lucene.search.ConstantScoreQuery;
042import org.apache.lucene.search.IndexSearcher;
043import org.apache.lucene.search.Query;
044import org.apache.lucene.search.ScoreDoc;
045import org.apache.lucene.search.TermRangeFilter;
046import org.apache.lucene.store.FSDirectory;
047import org.apache.lucene.util.BytesRef;
048import org.archive.crawler.datamodel.CoreAttributeConstants;
049import org.archive.crawler.datamodel.CrawlOrder;
050import org.archive.crawler.datamodel.CrawlURI;
051import org.archive.crawler.framework.Processor;
052import org.archive.crawler.frontier.AdaptiveRevisitAttributeConstants;
053import org.archive.crawler.settings.SimpleType;
054import org.archive.crawler.settings.Type;
055import org.archive.util.ArchiveUtils;
056import org.archive.util.Base32;
057
058import dk.netarkivet.common.utils.AllDocsCollector;
059
060/**
061 * Heritrix compatible processor.
062 * <p>
063 * Will abort the processing (skip to post processor chain) of CrawlURIs that are deemed <i>duplicates</i>.
064 * <p>
065 * Duplicate detection can only be performed <i>after</i> the fetch processors have run.
066 *
067 * @author Kristinn Sigur&eth;sson
068 * @author Søren Vejrup Carlsen
069 */
070public class DeDuplicator extends Processor implements AdaptiveRevisitAttributeConstants {
071
072    private static final long serialVersionUID = ArchiveUtils.classnameBasedUID(DeDuplicator.class, 1);
073
074    private static final Logger logger = Logger.getLogger(DeDuplicator.class.getName());
075
076    protected IndexSearcher index = null;
077    protected IndexReader indexReader = null;
078    protected boolean lookupByURL = true;
079    protected boolean equivalent = DEFAULT_EQUIVALENT.booleanValue();
080    protected String mimefilter = DEFAULT_MIME_FILTER;
081    protected boolean blacklist = true;
082    protected boolean doTimestampAnalysis = false;
083    protected boolean doETagAnalysis = false;
084    protected boolean statsPerHost = DEFAULT_STATS_PER_HOST.booleanValue();
085    protected boolean changeContentSize = DEFAULT_CHANGE_CONTENT_SIZE.booleanValue();
086    protected boolean useOrigin = false;
087    protected boolean useOriginFromIndex = false;
088    protected boolean useSparseRangeFilter = DEFAULT_USE_SPARSE_RANGE_FILTER;
089
090    protected Statistics stats = null;
091    protected HashMap<String, Statistics> perHostStats = null;
092    protected boolean skipWriting = DEFAULT_SKIP_WRITE.booleanValue();
093
094    /*
095     * Configurable parameters - Index location - Matching mode (By URL (default) or By Content Digest) - Try equivalent
096     * matches - Mime filter - Filter mode (blacklist (default) or whitelist) - Analysis (None (default), Timestamp only
097     * or Timestamp and ETag) - Log level - Track per host stats - Origin - Skip writing
098     */
099    /** Location of Lucene Index to use for lookups */
100    public final static String ATTR_INDEX_LOCATION = "index-location";
101    public final static String DEFAULT_INDEX_LOCATION = "";
102
103    /** The matching method in use (by url or content digest) */
104    public final static String ATTR_MATCHING_METHOD = "matching-method";
105    public final static String[] AVAILABLE_MATCHING_METHODS = {"By URL", "By content digest"};
106    public final static String DEFAULT_MATCHING_METHOD = AVAILABLE_MATCHING_METHODS[0];
107
108    /**
109     * If an exact match is not made, should the processor try to find an equivalent match?
110     */
111    public final static String ATTR_EQUIVALENT = "try-equivalent";
112    public final static Boolean DEFAULT_EQUIVALENT = new Boolean(false);
113
114    /**
115     * The filter on mime types. This is either a blacklist or whitelist depending on ATTR_FILTER_MODE.
116     */
117    public final static String ATTR_MIME_FILTER = "mime-filter";
118    public final static String DEFAULT_MIME_FILTER = "^text/.*";
119
120    /**
121     * Is the mime filter a blacklist (do not apply processor to what matches) or whitelist (apply processor only to
122     * what matches).
123     */
124    public final static String ATTR_FILTER_MODE = "filter-mode";
125    public final static String[] AVAILABLE_FILTER_MODES = {"Blacklist", "Whitelist"};
126    public final static String DEFAULT_FILTER_MODE = AVAILABLE_FILTER_MODES[0];
127
128    /** Set analysis mode. */
129    public final static String ATTR_ANALYSIS_MODE = "analysis-mode";
130    public final static String[] AVAILABLE_ANALYSIS_MODES = {"None", "Timestamp", "Timestamp and ETag"};
131    public final static String DEFAULT_ANALYSIS_MODE = AVAILABLE_ANALYSIS_MODES[0];
132
133    /**
134     * Should the content size information be set to zero when a duplicate is found?
135     */
136    public final static String ATTR_CHANGE_CONTENT_SIZE = "change-content-size";
137    public final static Boolean DEFAULT_CHANGE_CONTENT_SIZE = new Boolean(true);
138
139    /** What to write to a log file */
140    public final static String ATTR_LOG_LEVEL = "log-level";
141    public final static String[] AVAILABLE_LOG_LEVELS = {Level.SEVERE.toString(), Level.INFO.toString(),
142            Level.FINEST.toString()};
143    public final static String DEFAULT_LOG_LEVEL = AVAILABLE_LOG_LEVELS[0];
144
145    /** Should statistics be tracked per host? * */
146    public final static String ATTR_STATS_PER_HOST = "stats-per-host";
147    public final static Boolean DEFAULT_STATS_PER_HOST = new Boolean(false);
148
149    /** How should 'origin' be handled * */
150    public final static String ATTR_ORIGIN_HANDLING = "origin-handling";
151    public final static String ORIGIN_HANDLING_NONE = "No origin information";
152    public final static String ORIGIN_HANDLING_PROCESSOR = "Use processor setting";
153    public final static String ORIGIN_HANDLING_INDEX = "Use index information";
154    public final static String[] AVAILABLE_ORIGIN_HANDLING = {ORIGIN_HANDLING_NONE, ORIGIN_HANDLING_PROCESSOR,
155            ORIGIN_HANDLING_INDEX};
156    public final static String DEFAULT_ORIGIN_HANDLING = ORIGIN_HANDLING_NONE;
157
158    /** Origin of duplicate URLs * */
159    public final static String ATTR_ORIGIN = "origin";
160    public final static String DEFAULT_ORIGIN = "";
161
162    /** Should the writer processor chain be skipped? * */
163    public final static String ATTR_SKIP_WRITE = "skip-writing";
164    public final static Boolean DEFAULT_SKIP_WRITE = new Boolean(true);
165
166    /** Should we use sparse queries (uses less memory at a cost to performance? * */
167    public final static String ATTR_USE_SPARSE_RANGE_FILTER = "use-sparse-range-filter";
168    public final static Boolean DEFAULT_USE_SPARSE_RANGE_FILTER = new Boolean(false);
169
170    public DeDuplicator(String name) {
171        super(name, "Aborts the processing of URIs (skips to post processing "
172                + "chain) if a duplicate is found in the specified index. "
173                + "Note that any changes made to this processors configuration "
174                + "at run time will be ignored unless otherwise stated.");
175        Type t = new SimpleType(ATTR_INDEX_LOCATION, "Location of index (full path). Can not be changed at run "
176                + "time.", DEFAULT_INDEX_LOCATION);
177        t.setOverrideable(false);
178        addElementToDefinition(t);
179        t = new SimpleType(ATTR_MATCHING_METHOD, "Select if we should lookup by URL "
180                + "or by content digest (counts mirror matches).", DEFAULT_MATCHING_METHOD, AVAILABLE_MATCHING_METHODS);
181        t.setOverrideable(false);
182        addElementToDefinition(t);
183        t = new SimpleType(ATTR_EQUIVALENT, "If an exact match of URI and content digest is not found "
184                + "then an equivalent URI (i.e. one with any www[0-9]*, "
185                + "trailing slashes and parameters removed) can be checked. "
186                + "If an equivalent URI has an identical content digest then "
187                + "enabling this feature will cause the processor to consider "
188                + "this a duplicate. Equivalent matches are noted in the "
189                + "crawl log and their number is tracked seperately.", DEFAULT_EQUIVALENT);
190        t.setOverrideable(false);
191        addElementToDefinition(t);
192        t = new SimpleType(ATTR_MIME_FILTER, "A regular expression that the mimetype of all documents "
193                + "will be compared against. \nIf the attribute filter-mode is "
194                + "set to 'Blacklist' then all the documents whose mimetype "
195                + "matches will be ignored by this processor. If the filter-"
196                + "mode is set to 'Whitelist' only those documents whose " + "mimetype matches will be processed.",
197                DEFAULT_MIME_FILTER);
198        t.setOverrideable(false);
199        t.setExpertSetting(true);
200        addElementToDefinition(t);
201        t = new SimpleType(ATTR_FILTER_MODE, "Determines if the mime-filter acts as a blacklist (declares "
202                + "what should be ignored) or whitelist (declares what should " + "be processed).",
203                DEFAULT_FILTER_MODE, AVAILABLE_FILTER_MODES);
204        t.setOverrideable(false);
205        t.setExpertSetting(true);
206        addElementToDefinition(t);
207        t = new SimpleType(ATTR_ANALYSIS_MODE, "If enabled, the processor can analyse the timestamp (last-"
208                + "modified) and ETag info of the HTTP headers and compare "
209                + "their predictions as to whether or not the document had "
210                + "changed against the result of the index lookup. This is "
211                + "ONLY for the purpose of gathering statistics about the "
212                + "usefulness and accuracy of the HTTP header information in "
213                + "question and has no effect on the processing of documents. " + "Analysis is only possible if "
214                + "the relevant data was included in the index.", DEFAULT_ANALYSIS_MODE, AVAILABLE_ANALYSIS_MODES);
215        t.setOverrideable(false);
216        t.setExpertSetting(true);
217        addElementToDefinition(t);
218
219        t = new SimpleType(ATTR_LOG_LEVEL, "Adjust the verbosity of the processor. By default, it only "
220                + "reports serious (Java runtime) errors. " + "By setting the log level "
221                + "higher, various additional data can be logged. "
222                + "* Serious - Default logging level, only serious errors. "
223                + "Note that it is possible that a more permissive default "
224                + "logging level has been set via the heritrix.properties "
225                + "file. This setting (severe) will not affect that.\n"
226                + "* Info - Records some anomalies. Such as the information "
227                + "on URIs that the HTTP header info falsely predicts " + "no-change on.\n"
228                + "* Finest - Full logging of all URIs processed. For " + "debugging purposes only!",
229                DEFAULT_LOG_LEVEL, AVAILABLE_LOG_LEVELS);
230        t.setOverrideable(false);
231        t.setExpertSetting(true);
232        addElementToDefinition(t);
233        t = new SimpleType(ATTR_STATS_PER_HOST, "If enabled the processor will keep track of the number of "
234                + "processed uris, duplicates found etc. per host. The listing "
235                + "will be added to the processor report (not the host-report).", DEFAULT_STATS_PER_HOST);
236        t.setOverrideable(false);
237        t.setExpertSetting(true);
238        addElementToDefinition(t);
239        t = new SimpleType(ATTR_CHANGE_CONTENT_SIZE, "If set to true then the processor will set the content size "
240                + "of the CrawlURI to zero when a duplicate is discovered. ", DEFAULT_CHANGE_CONTENT_SIZE);
241        t.setOverrideable(false);
242        addElementToDefinition(t);
243
244        t = new SimpleType(ATTR_ORIGIN_HANDLING, "The origin of duplicate URLs can be handled a few different "
245                + "ways. It is important to note that the 'origin' information "
246                + "is malleable and may be anything from a ARC name and offset "
247                + "to a simple ID of a particular crawl. It is entirely at the " + "operators discretion.\n "
248                + ORIGIN_HANDLING_NONE + " - No origin information is " + "associated with the URLs.\n "
249                + ORIGIN_HANDLING_PROCESSOR + " - Duplicate URLs are all given "
250                + "the same origin, specified by the 'origin' setting of this " + "processor.\n "
251                + ORIGIN_HANDLING_INDEX + " - The origin of each duplicate URL "
252                + "is read from the index. If the index does not contain any "
253                + "origin information for an URL, the processor setting is " + "used as a fallback!",
254                DEFAULT_ORIGIN_HANDLING, AVAILABLE_ORIGIN_HANDLING);
255        t.setOverrideable(false);
256        addElementToDefinition(t);
257
258        t = new SimpleType(ATTR_ORIGIN, "The origin of duplicate URLs.", DEFAULT_ORIGIN);
259        addElementToDefinition(t);
260
261        t = new SimpleType(ATTR_SKIP_WRITE, "If set to true, then processing of duplicate URIs will be "
262                + "skipped directly to the post processing chain. If false, "
263                + "processing of duplicates will skip directly to the writer "
264                + "chain that precedes the post processing chain.", DEFAULT_SKIP_WRITE);
265        t.setOverrideable(true);
266        addElementToDefinition(t);
267
268        t = new SimpleType(ATTR_USE_SPARSE_RANGE_FILTER, "If set to true, then Lucene queries use a custom 'sparse' "
269                + "range filter. This uses less memory at the cost of some "
270                + "lost performance. Suitable for very large indexes.", DEFAULT_USE_SPARSE_RANGE_FILTER);
271        t.setOverrideable(false);
272        t.setExpertSetting(true);
273        addElementToDefinition(t);
274    }
275
276    /*
277     * (non-Javadoc)
278     * 
279     * @see org.archive.crawler.framework.Processor#initialTasks()
280     */
281    @Override
282    protected void initialTasks() {
283        // Read settings and set appropriate class variables.
284
285        // Index location
286        String indexLocation = (String) readAttribute(ATTR_INDEX_LOCATION, "");
287        try {
288            FSDirectory indexDir = FSDirectory.open(new File(indexLocation));
289            // https://issues.apache.org/jira/browse/LUCENE-1566
290            // Reduce chunksize to avoid OOM to half the size of the default (=100 MB)
291            int chunksize = indexDir.getReadChunkSize();
292            indexDir.setReadChunkSize(chunksize / 2);
293            IndexReader reader = DirectoryReader.open(indexDir);
294            index = new IndexSearcher(reader);
295        } catch (Exception e) {
296            logger.log(Level.SEVERE, "Unable to find/open index.", e);
297        }
298
299        // Matching method
300        String matchingMethod = (String) readAttribute(ATTR_MATCHING_METHOD, DEFAULT_MATCHING_METHOD);
301        lookupByURL = matchingMethod.equals(DEFAULT_MATCHING_METHOD);
302
303        // Try equivalent matches
304        equivalent = ((Boolean) readAttribute(ATTR_EQUIVALENT, DEFAULT_EQUIVALENT)).booleanValue();
305
306        // Mime filter
307        mimefilter = (String) readAttribute(ATTR_MIME_FILTER, DEFAULT_MIME_FILTER);
308
309        // Filter mode (blacklist (default) or whitelist)
310        blacklist = ((String) readAttribute(ATTR_FILTER_MODE, DEFAULT_FILTER_MODE)).equals(DEFAULT_FILTER_MODE);
311
312        // Analysis (None (default), Timestamp only or Timestamp and ETag)
313        String analysisMode = (String) readAttribute(ATTR_ANALYSIS_MODE, DEFAULT_ANALYSIS_MODE);
314        if (analysisMode.equals(AVAILABLE_ANALYSIS_MODES[1])) {
315            // Timestamp only
316            doTimestampAnalysis = true;
317        } else if (analysisMode.equals(AVAILABLE_ANALYSIS_MODES[2])) {
318            // Both timestamp and ETag
319            doTimestampAnalysis = true;
320            doETagAnalysis = true;
321        }
322
323        // Log file/level
324        String lev = (String) readAttribute(ATTR_LOG_LEVEL, DEFAULT_LOG_LEVEL);
325        if (lev.equals(Level.FINEST.toString())) {
326            logger.setLevel(Level.FINEST);
327        } else if (lev.equals(Level.INFO.toString())) {
328            logger.setLevel(Level.INFO);
329        } // Severe effectively means default level.
330
331        // Track per host stats
332        statsPerHost = ((Boolean) readAttribute(ATTR_STATS_PER_HOST, DEFAULT_STATS_PER_HOST)).booleanValue();
333
334        // Change content size
335        changeContentSize = ((Boolean) readAttribute(ATTR_CHANGE_CONTENT_SIZE, DEFAULT_CHANGE_CONTENT_SIZE))
336                .booleanValue();
337
338        // Origin handling.
339        String originHandling = (String) readAttribute(ATTR_ORIGIN_HANDLING, DEFAULT_ORIGIN_HANDLING);
340        if (originHandling.equals(ORIGIN_HANDLING_NONE) == false) {
341            useOrigin = true;
342            if (originHandling.equals(ORIGIN_HANDLING_INDEX)) {
343                useOriginFromIndex = true;
344            }
345        }
346
347        // Range Filter type
348        useSparseRangeFilter = ((Boolean) readAttribute(ATTR_USE_SPARSE_RANGE_FILTER, DEFAULT_USE_SPARSE_RANGE_FILTER))
349                .booleanValue();
350
351        // Initialize some internal variables:
352        stats = new Statistics();
353        if (statsPerHost) {
354            perHostStats = new HashMap<String, Statistics>();
355        }
356    }
357
358    /**
359     * A utility method for reading attributes. If not found, an error is logged and the defaultValue is returned.
360     *
361     * @param name The name of the attribute
362     * @param defaultValue A default value to return if an error occurs
363     * @return The value of the attribute or the default value if an error occurs
364     */
365    protected Object readAttribute(String name, Object defaultValue) {
366        try {
367            return getAttribute(name);
368        } catch (Exception e) {
369            logger.log(Level.SEVERE, "Unable read " + name + " attribute", e);
370            return defaultValue;
371        }
372    }
373
374    protected void innerProcess(CrawlURI curi) throws InterruptedException {
375        if (curi.isSuccess() == false) {
376            // Early return. No point in doing comparison on failed downloads.
377            logger.finest("Not handling " + curi.toString() + ", did not succeed.");
378            return;
379        }
380        if (curi.isPrerequisite()) {
381            // Early return. Prerequisites are exempt from checking.
382            logger.finest("Not handling " + curi.toString() + ", prerequisite.");
383            return;
384        }
385        if (curi.isSuccess() == false || curi.isPrerequisite() || curi.toString().startsWith("http") == false) {
386            // Early return. Non-http documents are not handled at present
387            logger.finest("Not handling " + curi.toString() + ", non-http.");
388            return;
389        }
390        if (curi.getContentType() == null) {
391            // No content type means we can not handle it.
392            logger.finest("Not handling " + curi.toString() + ", missing content (mime) type");
393            return;
394        }
395        if (curi.getContentType().matches(mimefilter) == blacklist) {
396            // Early return. Does not pass the mime filter
397            logger.finest("Not handling " + curi.toString() + ", excluded by mimefilter (" + curi.getContentType()
398                    + ").");
399            return;
400        }
401        if (curi.containsKey(A_CONTENT_STATE_KEY) && curi.getInt(A_CONTENT_STATE_KEY) == CONTENT_UNCHANGED) {
402            // Early return. A previous processor or filter has judged this
403            // CrawlURI as having unchanged content.
404            logger.finest("Not handling " + curi.toString() + ", already flagged as unchanged.");
405            return;
406        }
407        logger.finest("Processing " + curi.toString() + "(" + curi.getContentType() + ")");
408
409        stats.handledNumber++;
410        stats.totalAmount += curi.getContentSize();
411        Statistics currHostStats = null;
412        if (statsPerHost) {
413            synchronized (perHostStats) {
414                String host = getController().getServerCache().getHostFor(curi).getHostName();
415                currHostStats = perHostStats.get(host);
416                if (currHostStats == null) {
417                    currHostStats = new Statistics();
418                    perHostStats.put(host, currHostStats);
419                }
420            }
421            currHostStats.handledNumber++;
422            currHostStats.totalAmount += curi.getContentSize();
423        }
424
425        Document duplicate = null;
426
427        if (lookupByURL) {
428            duplicate = lookupByURL(curi, currHostStats);
429        } else {
430            duplicate = lookupByDigest(curi, currHostStats);
431        }
432
433        if (duplicate != null) {
434            // Perform tasks common to when a duplicate is found.
435            // Increment statistics counters
436            stats.duplicateAmount += curi.getContentSize();
437            stats.duplicateNumber++;
438            if (statsPerHost) {
439                currHostStats.duplicateAmount += curi.getContentSize();
440                currHostStats.duplicateNumber++;
441            }
442            // Duplicate. Abort further processing of URI.
443            if (((Boolean) readAttribute(ATTR_SKIP_WRITE, DEFAULT_SKIP_WRITE)).booleanValue()) {
444                // Skip writing, go directly to post processing chain
445                curi.skipToProcessorChain(getController().getPostprocessorChain());
446            } else {
447                // Do not skip writing, go to writer processors
448                curi.skipToProcessorChain(getController().getProcessorChainList().getProcessorChain(
449                        CrawlOrder.ATTR_WRITE_PROCESSORS));
450            }
451
452            // Record origin?
453            String annotation = "duplicate";
454            if (useOrigin) {
455                // TODO: Save origin in the CrawlURI so that other processors
456                // can make use of it. (Future: WARC)
457                if (useOriginFromIndex && duplicate.get(DigestIndexer.FIELD_ORIGIN) != null) {
458                    // Index contains origin, use it.
459                    annotation += ":\"" + duplicate.get(DigestIndexer.FIELD_ORIGIN) + "\"";
460                } else {
461                    String tmp = (String) getUncheckedAttribute(curi, ATTR_ORIGIN);
462                    // Check if an origin value is actually available
463                    if (tmp != null && tmp.trim().length() > 0) {
464                        // It is available, add it to the log line.
465                        annotation += ":\"" + tmp + "\"";
466                    }
467                }
468            }
469            // Make note in log
470            curi.addAnnotation(annotation);
471
472            if (changeContentSize) {
473                // Set content size to zero, we are not planning to
474                // 'write it to disk'
475                // TODO: Reconsider this
476                curi.setContentSize(0);
477            }
478            // Mark as duplicate for other processors
479            curi.putInt(A_CONTENT_STATE_KEY, CONTENT_UNCHANGED);
480        }
481
482        if (doTimestampAnalysis) {
483            doAnalysis(curi, currHostStats, duplicate != null);
484        }
485    }
486
487    /**
488     * Process a CrawlURI looking up in the index by URL
489     *
490     * @param curi The CrawlURI to process
491     * @param currHostStats A statistics object for the current host. If per host statistics tracking is enabled this
492     * must be non null and the method will increment appropriate counters on it.
493     * @return The result of the lookup (a Lucene document). If a duplicate is not found null is returned.
494     */
495    protected Document lookupByURL(CrawlURI curi, Statistics currHostStats) {
496        // Look the CrawlURI's URL up in the index.
497        try {
498            Query query = queryField(DigestIndexer.FIELD_URL, curi.toString());
499            AllDocsCollector collectAllCollector = new AllDocsCollector();
500            index.search(query, collectAllCollector);
501
502            List<ScoreDoc> hits = collectAllCollector.getHits();
503            Document doc = null;
504            String currentDigest = getDigestAsString(curi);
505            if (hits != null && hits.size() > 0) {
506                // Typically there should only be one it, but we'll allow for
507                // multiple hits.
508                for (ScoreDoc hit : hits) {
509                    // for(int i=0 ; i < hits.size() ; i++){
510                    // Multiple hits on same exact URL should be rare
511                    // See if any have matching content digests
512                    int docId = hit.doc;
513                    doc = index.doc(docId);
514                    String oldDigest = doc.get(DigestIndexer.FIELD_DIGEST);
515
516                    if (oldDigest.equalsIgnoreCase(currentDigest)) {
517                        stats.exactURLDuplicates++;
518                        if (statsPerHost) {
519                            currHostStats.exactURLDuplicates++;
520                        }
521
522                        logger.finest("Found exact match for " + curi.toString());
523
524                        // If we found a hit, no need to look at other hits.
525                        return doc;
526                    }
527                }
528            }
529            if (equivalent) {
530                // No exact hits. Let's try lenient matching.
531                String normalizedURL = DigestIndexer.stripURL(curi.toString());
532                query = queryField(DigestIndexer.FIELD_URL_NORMALIZED, normalizedURL);
533                collectAllCollector.reset(); // reset collector
534                index.search(query, collectAllCollector);
535                hits = collectAllCollector.getHits();
536
537                for (ScoreDoc hit : hits) {
538                    // int i=0 ; i < hits.length ; i++){
539
540                    int docId = hit.doc;
541                    Document doc1 = index.doc(docId);
542                    String indexDigest = doc1.get(DigestIndexer.FIELD_DIGEST);
543                    if (indexDigest.equals(currentDigest)) {
544                        // Make note in log
545                        String equivURL = doc1.get(DigestIndexer.FIELD_URL);
546                        curi.addAnnotation("equivalent to " + equivURL);
547                        // Increment statistics counters
548                        stats.equivalentURLDuplicates++;
549                        if (statsPerHost) {
550                            currHostStats.equivalentURLDuplicates++;
551                        }
552                        logger.finest("Found equivalent match for " + curi.toString() + ". Normalized: "
553                                + normalizedURL + ". Equivalent to: " + equivURL);
554
555                        // If we found a hit, no need to look at more.
556                        return doc1;
557                    }
558                }
559            }
560        } catch (IOException e) {
561            logger.log(Level.SEVERE, "Error accessing index.", e);
562        }
563        // If we make it here then this is not a duplicate.
564        return null;
565    }
566
567    /**
568     * Process a CrawlURI looking up in the index by content digest
569     *
570     * @param curi The CrawlURI to process
571     * @param currHostStats A statistics object for the current host. If per host statistics tracking is enabled this
572     * must be non null and the method will increment appropriate counters on it.
573     * @return The result of the lookup (a Lucene document). If a duplicate is not found null is returned.
574     */
575    protected Document lookupByDigest(CrawlURI curi, Statistics currHostStats) {
576        Document duplicate = null;
577        String currentDigest = null;
578        Object digest = curi.getContentDigest();
579        if (digest != null) {
580            currentDigest = Base32.encode((byte[]) digest);
581        } else {
582            logger.warning("Digest received from CrawlURI is null. Null Document returned");
583            return null;
584        }
585
586        Query query = queryField(DigestIndexer.FIELD_DIGEST, currentDigest);
587        try {
588            AllDocsCollector collectAllCollector = new AllDocsCollector();
589            index.search(query, collectAllCollector);
590
591            List<ScoreDoc> hits = collectAllCollector.getHits();
592
593            StringBuffer mirrors = new StringBuffer();
594            mirrors.append("mirrors: ");
595            if (hits != null && hits.size() > 0) {
596                // Can definitely be more then one
597                // Note: We may find an equivalent match before we find an
598                // (existing) exact match.
599                // TODO: Ensure that an exact match is recorded if it exists.
600                Iterator<ScoreDoc> hitsIterator = hits.iterator();
601                while (hitsIterator.hasNext() && duplicate == null) {
602                    ScoreDoc hit = hitsIterator.next();
603                    int docId = hit.doc;
604                    Document doc = index.doc(docId);
605                    String indexURL = doc.get(DigestIndexer.FIELD_URL);
606                    // See if the current hit is an exact match.
607                    if (curi.toString().equals(indexURL)) {
608                        duplicate = doc;
609                        stats.exactURLDuplicates++;
610                        if (statsPerHost) {
611                            currHostStats.exactURLDuplicates++;
612                        }
613                        logger.finest("Found exact match for " + curi.toString());
614                    }
615
616                    // If not, then check if it is an equivalent match (if
617                    // equivalent matches are allowed).
618                    if (duplicate == null && equivalent) {
619                        String normalURL = DigestIndexer.stripURL(curi.toString());
620                        String indexNormalURL = doc.get(DigestIndexer.FIELD_URL_NORMALIZED);
621                        if (normalURL.equals(indexNormalURL)) {
622                            duplicate = doc;
623                            stats.equivalentURLDuplicates++;
624                            if (statsPerHost) {
625                                currHostStats.equivalentURLDuplicates++;
626                            }
627                            curi.addAnnotation("equivalent to " + indexURL);
628                            logger.finest("Found equivalent match for " + curi.toString() + ". Normalized: "
629                                    + normalURL + ". Equivalent to: " + indexURL);
630                        }
631                    }
632
633                    if (duplicate == null) {
634                        // Will only be used if no exact (or equivalent) match
635                        // is found.
636                        mirrors.append(indexURL + " ");
637                    }
638                }
639                if (duplicate == null) {
640                    stats.mirrorNumber++;
641                    if (statsPerHost) {
642                        currHostStats.mirrorNumber++;
643                    }
644                    logger.log(Level.FINEST, "Found mirror URLs for " + curi.toString() + ". " + mirrors);
645                }
646            }
647        } catch (IOException e) {
648            logger.log(Level.SEVERE, "Error accessing index.", e);
649        }
650        return duplicate;
651    }
652
653    public String report() {
654        StringBuffer ret = new StringBuffer();
655        ret.append("Processor: is.hi.bok.digest.DeDuplicator\n");
656        ret.append("  Function:          Abort processing of duplicate records\n");
657        ret.append("                     - Lookup by " + (lookupByURL ? "url" : "digest") + " in use\n");
658        ret.append("  Total handled:     " + stats.handledNumber + "\n");
659        ret.append("  Duplicates found:  " + stats.duplicateNumber + " "
660                + getPercentage(stats.duplicateNumber, stats.handledNumber) + "\n");
661        ret.append("  Bytes total:       " + stats.totalAmount + " ("
662                + ArchiveUtils.formatBytesForDisplay(stats.totalAmount) + ")\n");
663        ret.append("  Bytes discarded:   " + stats.duplicateAmount + " ("
664                + ArchiveUtils.formatBytesForDisplay(stats.duplicateAmount) + ") "
665                + getPercentage(stats.duplicateAmount, stats.totalAmount) + "\n");
666
667        ret.append("  New (no hits):     "
668                + (stats.handledNumber - (stats.mirrorNumber + stats.exactURLDuplicates + stats.equivalentURLDuplicates))
669                + "\n");
670        ret.append("  Exact hits:        " + stats.exactURLDuplicates + "\n");
671        ret.append("  Equivalent hits:   " + stats.equivalentURLDuplicates + "\n");
672        if (lookupByURL == false) {
673            ret.append("  Mirror hits:       " + stats.mirrorNumber + "\n");
674        }
675
676        if (doTimestampAnalysis) {
677            ret.append("  Timestamp predicts: (Where exact URL existed in the index)\n");
678            ret.append("  Change correctly:  " + stats.timestampChangeCorrect + "\n");
679            ret.append("  Change falsly:     " + stats.timestampChangeFalse + "\n");
680            ret.append("  Non-change correct:" + stats.timestampNoChangeCorrect + "\n");
681            ret.append("  Non-change falsly: " + stats.timestampNoChangeFalse + "\n");
682            ret.append("  Missing timpestamp:" + stats.timestampMissing + "\n");
683
684        }
685
686        if (statsPerHost) {
687            ret.append("  [Host] [total] [duplicates] [bytes] " + "[bytes discarded] [new] [exact] [equiv]");
688            if (lookupByURL == false) {
689                ret.append(" [mirror]");
690            }
691            if (doTimestampAnalysis) {
692                ret.append(" [change correct] [change falsly]");
693                ret.append(" [non-change correct] [non-change falsly]");
694                ret.append(" [no timestamp]\n");
695            }
696            synchronized (perHostStats) {
697                Iterator<String> it = perHostStats.keySet().iterator();
698                while (it.hasNext()) {
699                    String key = (String) it.next();
700                    Statistics curr = perHostStats.get(key);
701                    ret.append("  " + key);
702                    ret.append(" ");
703                    ret.append(curr.handledNumber);
704                    ret.append(" ");
705                    ret.append(curr.duplicateNumber);
706                    ret.append(" ");
707                    ret.append(curr.totalAmount);
708                    ret.append(" ");
709                    ret.append(curr.duplicateAmount);
710                    ret.append(" ");
711                    ret.append(curr.handledNumber
712                            - (curr.mirrorNumber + curr.exactURLDuplicates + curr.equivalentURLDuplicates));
713                    ret.append(" ");
714                    ret.append(curr.exactURLDuplicates);
715                    ret.append(" ");
716                    ret.append(curr.equivalentURLDuplicates);
717
718                    if (lookupByURL == false) {
719                        ret.append(" ");
720                        ret.append(curr.mirrorNumber);
721                    }
722                    if (doTimestampAnalysis) {
723                        ret.append(" ");
724                        ret.append(curr.timestampChangeCorrect);
725                        ret.append(" ");
726                        ret.append(curr.timestampChangeFalse);
727                        ret.append(" ");
728                        ret.append(curr.timestampNoChangeCorrect);
729                        ret.append(" ");
730                        ret.append(curr.timestampNoChangeFalse);
731                        ret.append(" ");
732                        ret.append(curr.timestampMissing);
733                    }
734                    ret.append("\n");
735                }
736            }
737        }
738
739        ret.append("\n");
740        return ret.toString();
741    }
742
743    protected static String getPercentage(double portion, double total) {
744        double value = portion / total;
745        value = value * 100;
746        String ret = Double.toString(value);
747        int dot = ret.indexOf('.');
748        if (dot + 3 < ret.length()) {
749            ret = ret.substring(0, dot + 3);
750        }
751        return ret + "%";
752    }
753
754    private static String getDigestAsString(CrawlURI curi) {
755        // The CrawlURI now has a method for this. For backwards
756        // compatibility with older Heritrix versions that is not used.
757        Object digest = curi.getContentDigest();
758        if (digest != null) {
759            return Base32.encode((byte[]) digest);
760        }
761        return null;
762    }
763
764    protected void doAnalysis(CrawlURI curi, Statistics currHostStats, boolean isDuplicate) {
765        try {
766            Query query = queryField(DigestIndexer.FIELD_URL, curi.toString());
767            AllDocsCollector collectAllCollector = new AllDocsCollector();
768            index.search(query, collectAllCollector);
769            List<ScoreDoc> hits = collectAllCollector.getHits();
770
771            Document doc = null;
772            if (hits != null && hits.size() > 0) {
773                // If there are multiple hits, use the one with the most
774                // recent date.
775                Document docToEval = null;
776
777                for (ScoreDoc hit : hits) {
778                    int docId = hit.doc;
779                    doc = index.doc(docId);
780                    // The format of the timestamp ("yyyyMMddHHmmssSSS") allows
781                    // us to do a greater then (later) or lesser than (earlier)
782                    // comparison of the strings.
783                    String timestamp = doc.get(DigestIndexer.FIELD_TIMESTAMP);
784                    if (docToEval == null || docToEval.get(DigestIndexer.FIELD_TIMESTAMP).compareTo(timestamp) > 0) {
785                        // Found a more recent hit.
786                        docToEval = doc;
787                    }
788                }
789                doTimestampAnalysis(curi, docToEval, currHostStats, isDuplicate);
790                if (doETagAnalysis) {
791                    // TODO: Do etag analysis
792                }
793            }
794        } catch (IOException e) {
795            logger.log(Level.SEVERE, "Error accessing index.", e);
796        }
797    }
798
799    protected void doTimestampAnalysis(CrawlURI curi, Document urlHit, Statistics currHostStats, boolean isDuplicate) {
800
801        HttpMethod method = (HttpMethod) curi.getObject(CoreAttributeConstants.A_HTTP_TRANSACTION);
802
803        // Compare datestamps (last-modified versus the indexed date)
804        Date lastModified = null;
805        if (method.getResponseHeader("last-modified") != null) {
806            SimpleDateFormat sdf = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.ENGLISH);
807            try {
808                lastModified = sdf.parse(method.getResponseHeader("last-modified").getValue());
809            } catch (ParseException e) {
810                logger.log(Level.INFO, "Exception parsing last modified of " + curi.toString(), e);
811                return;
812            }
813        } else {
814            stats.timestampMissing++;
815            if (statsPerHost) {
816                currHostStats.timestampMissing++;
817                logger.finest("Missing timestamp on " + curi.toString());
818            }
819            return;
820        }
821
822        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
823        Date lastFetch = null;
824        try {
825            lastFetch = sdf.parse(urlHit.get(DigestIndexer.FIELD_TIMESTAMP));
826        } catch (ParseException e) {
827            logger.log(Level.WARNING, "Exception parsing indexed date for " + urlHit.get(DigestIndexer.FIELD_URL), e);
828            return;
829        }
830
831        if (lastModified.after(lastFetch)) {
832            // Header predicts change
833            if (isDuplicate) {
834                // But the DeDuplicator did not notice a change.
835                stats.timestampChangeFalse++;
836                if (statsPerHost) {
837                    currHostStats.timestampChangeFalse++;
838                }
839                logger.finest("Last-modified falsly predicts change on " + curi.toString());
840            } else {
841                stats.timestampChangeCorrect++;
842                if (statsPerHost) {
843                    currHostStats.timestampChangeCorrect++;
844                }
845                logger.finest("Last-modified correctly predicts change on " + curi.toString());
846            }
847        } else {
848            // Header does not predict change.
849            if (isDuplicate) {
850                // And the DeDuplicator verifies that no change had occurred
851                stats.timestampNoChangeCorrect++;
852                if (statsPerHost) {
853                    currHostStats.timestampNoChangeCorrect++;
854                }
855                logger.finest("Last-modified correctly predicts no-change on " + curi.toString());
856            } else {
857                // As this is particularly bad we'll log the URL at INFO level
858                logger.log(Level.INFO, "Last-modified incorrectly indicated " + "no-change on " + curi.toString() + " "
859                        + curi.getContentType() + ". last-modified: " + lastModified + ". Last fetched: " + lastFetch);
860                stats.timestampNoChangeFalse++;
861                if (statsPerHost) {
862                    currHostStats.timestampNoChangeFalse++;
863                }
864            }
865        }
866
867    }
868
869    /**
870     * Run a simple Lucene query for a single term in a single field.
871     *
872     * @param fieldName name of the field to look in.
873     * @param value The value to query for
874     * @returns A Query for the given value in the given field.
875     */
876    protected Query queryField(String fieldName, String value) {
877        Query query = null;
878
879        /** alternate solution. */
880        BytesRef valueRef = new BytesRef(value.getBytes());
881        query = new ConstantScoreQuery(new TermRangeFilter(fieldName, valueRef, valueRef, true, true));
882
883        /** The most clean solution, but it seems also memory demanding */
884        // query = new ConstantScoreQuery(new FieldCacheTermsFilter(fieldName,
885        // value));
886        return query;
887    }
888
889    @Override
890    protected void finalTasks() {
891    }
892}
893
894class Statistics {
895    // General statistics
896
897    /**
898     * Number of URIs that make it through the processors exclusion rules and are processed by it.
899     */
900    long handledNumber = 0;
901
902    /**
903     * Number of URIs that are deemed duplicates and further processing is aborted
904     */
905    long duplicateNumber = 0;
906
907    /**
908     * Then number of URIs that turned out to have exact URL and content digest matches.
909     */
910    long exactURLDuplicates = 0;
911
912    /**
913     * The number of URIs that turned out to have equivalent URL and content digest matches.
914     */
915    long equivalentURLDuplicates = 0;
916
917    /**
918     * The number of URIs that, while having no exact or equivalent matches, do have exact content digest matches
919     * against non-equivalent URIs.
920     */
921    long mirrorNumber = 0;
922
923    /**
924     * The total amount of data represented by the documents who were deemed duplicates and excluded from further
925     * processing.
926     */
927    long duplicateAmount = 0;
928
929    /** The total amount of data represented by all the documents processed * */
930    long totalAmount = 0;
931
932    // Timestamp analysis
933
934    long timestampChangeCorrect = 0;
935    long timestampChangeFalse = 0;
936    long timestampNoChangeCorrect = 0;
937    long timestampNoChangeFalse = 0;
938    long timestampMissing = 0;
939
940    // ETag analysis;
941
942    long ETagChangeCorrect = 0;
943    long ETagChangeFalse = 0;
944    long ETagNoChangeCorrect = 0;
945    long ETagNoChangeFalse = 0;
946    long ETagMissingIndex = 0;
947    long ETagMissingCURI = 0;
948}