001/* DeDuplicator
002 * 
003 * Created on 10.04.2006
004 *
005 * Copyright (C) 2006-2010 National and University Library of Iceland
006 * 
007 * This file is part of the DeDuplicator (Heritrix add-on module).
008 * 
009 * DeDuplicator is free software; you can redistribute it and/or modify
010 * it under the terms of the GNU Lesser Public License as published by
011 * the Free Software Foundation; either version 2.1 of the License, or
012 * any later version.
013 * 
014 * DeDuplicator is distributed in the hope that it will be useful, 
015 * but WITHOUT ANY WARRANTY; without even the implied warranty of
016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017 * GNU Lesser Public License for more details.
018 * 
019 * You should have received a copy of the GNU Lesser Public License
020 * along with DeDuplicator; if not, write to the Free Software
021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022 */
023package is.hi.bok.deduplicator;
024
025import static is.hi.bok.deduplicator.DedupAttributeConstants.A_CONTENT_STATE_KEY;
026import static is.hi.bok.deduplicator.DedupAttributeConstants.CONTENT_UNCHANGED;
027import static org.archive.modules.recrawl.RecrawlAttributeConstants.A_CONTENT_DIGEST;
028import static org.archive.modules.recrawl.RecrawlAttributeConstants.A_FETCH_HISTORY;
029
030import java.io.File;
031import java.io.IOException;
032import java.text.ParseException;
033import java.text.SimpleDateFormat;
034import java.util.Date;
035import java.util.HashMap;
036import java.util.Iterator;
037import java.util.List;
038import java.util.Locale;
039import java.util.logging.Level;
040import java.util.logging.Logger;
041
042import org.apache.lucene.document.Document;
043import org.apache.lucene.index.DirectoryReader;
044import org.apache.lucene.index.IndexReader;
045import org.apache.lucene.search.ConstantScoreQuery;
046import org.apache.lucene.search.IndexSearcher;
047import org.apache.lucene.search.Query;
048import org.apache.lucene.search.ScoreDoc;
049import org.apache.lucene.search.TermRangeFilter;
050import org.apache.lucene.store.FSDirectory;
051import org.apache.lucene.util.BytesRef;
052import org.archive.modules.CrawlURI;
053import org.archive.modules.ProcessResult;
054import org.archive.modules.Processor;
055import org.archive.modules.net.ServerCache;
056import org.archive.modules.revisit.IdenticalPayloadDigestRevisit;
057import org.archive.util.ArchiveUtils;
058import org.archive.util.Base32;
059import org.springframework.beans.factory.InitializingBean;
060import org.springframework.beans.factory.annotation.Autowired;
061
062import dk.netarkivet.common.utils.AllDocsCollector;
063
064/**
065 * Heritrix compatible processor.
066 * <p>
067 * Will determine if CrawlURIs are <i>duplicates</i>. 
068 * <p>
069 * Duplicate detection can only be performed <i>after</i> the fetch processors
070 * have run.
071 * Modified by SVC to use Lucene 4.X
072 * 
073 * @author Kristinn Sigur&eth;sson
074 * @author Søren Vejrup Carlsen
075 * 
076 * <bean id="DeDuplicator" class="is.hi.bok.deduplicator.DeDuplicator">
077 * <!-- DEDUPLICATION_INDEX_LOCATION is replaced by path on harvest-server -->
078 * <property name="indexLocation" value="/home/svc/dedupcrawllogindex/empty-cache"/> 
079        <property name="matchingMethod" value="URL"/>  other option: DIGEST
080    <property name="tryEquivalent" value="true"/> 
081       <property name="changeContentSize" value="false"/>
082        <property name="mimeFilter" value="^text/.*"/>
083         
084        <property name="filterMode" value="BLACKLIST"/> Other option:    WHITELIST 
085        <property name="analysisMode" value="TIMESTAMP"/> Other options: NONE, TIMESTAMP_AND_ETAG
086        
087        <property name="origin" value=""/>
088        <property name="originHandling" value="INDEX"/> Other options: NONE,PROCESSOR
089        <property name="statsPerHost" value="true"/>
090        <property name="revisitInWarcs" value="true"/>
091
092//              /**
093//                                      (FROM deduplicator-commons/src/main/java/is/landsbokasafn/deduplicator/IndexFields.java)
094//                     * These enums correspond to the names of fields in the Lucene index
095//                   */
096//              public enum IndexFields {
097//                  /** The URL 
098//                   *  This value is suitable for use in warc/revisit records as the WARC-Refers-To-Target-URI
099//                   **/
100//                      URL,
101//                  /** The content digest as String **/
102//                      DIGEST,
103//                  /** The URLs timestamp (time of fetch). Suitable for use in WARC-Refers-To-Date. Encoded according to
104//                   *  w3c-iso8601  
105//                   */
106//                  DATE,
107//                  /** The document's etag **/
108//                  ETAG,
109//                  /** A canonicalized version of the URL **/
110//                      URL_CANONICALIZED,
111//                  /** WARC Record ID of original payload capture. Suitable for WARC-Refers-To field. **/
112//                  ORIGINAL_RECORD_ID;
113//
114//              }
115 
116@SuppressWarnings({"unchecked"})
117public class DeDuplicator extends Processor implements InitializingBean {
118
119    @Override public boolean getEnabled() {
120        return super.getEnabled();
121    }
122
123    @Override public void setEnabled(boolean enabled) {
124        super.setEnabled(enabled);
125    }
126
127    private static Logger logger =
128        Logger.getLogger(DeDuplicator.class.getName());
129
130    // Spring configurable parameters
131    
132    /* Location of Lucene Index to use for lookups */
133    private final static String ATTR_INDEX_LOCATION = "index-location";
134
135    public String getIndexLocation() {
136        return (String) kp.get(ATTR_INDEX_LOCATION);
137    }
138    /** SETTER used by Spring */
139    public void setIndexLocation(String indexLocation) {
140        kp.put(ATTR_INDEX_LOCATION,indexLocation);
141    }
142
143    /* The matching method in use (by url or content digest) */
144    private final static String ATTR_MATCHING_METHOD = "matching-method";
145    
146    public enum MatchingMethod {
147        URL,
148        DIGEST
149    }
150    
151    private final static MatchingMethod DEFAULT_MATCHING_METHOD = MatchingMethod.URL; 
152    {
153        setMatchingMethod(DEFAULT_MATCHING_METHOD);
154    }
155    public MatchingMethod getMatchingMethod() {
156        return (MatchingMethod) kp.get(ATTR_MATCHING_METHOD);
157    }
158    
159    /** SETTER used by Spring */
160    public void setMatchingMethod(MatchingMethod method) {
161        kp.put(ATTR_MATCHING_METHOD, method);
162    }
163    
164    /* On duplicate, should jump to which part of processing chain? 
165     *  If not set, nothing is skipped. Otherwise this should be the identity of the processor to jump to.
166     */
167    public final static String ATTR_JUMP_TO = "jump-to";
168    public String getJumpTo(){
169        return (String)kp.get(ATTR_JUMP_TO);
170    }
171    /** SPRING SETTER. 
172     * TODO Are we using this property??  The netarkivet are not
173     */
174    public void setJumpTo(String jumpTo){
175        kp.put(ATTR_JUMP_TO, jumpTo);
176    }
177
178    /* Origin of duplicate URLs. May be overridden by info from index */
179    public final static String ATTR_ORIGIN = "origin";
180    {
181        setOrigin("");
182    }
183    public String getOrigin() {
184        return (String) kp.get(ATTR_ORIGIN);
185    }
186    
187    /** SPRING SETTER */
188    public void setOrigin(String origin) {
189        kp.put(ATTR_ORIGIN,origin);
190    }
191
192    /* If an exact match is not made, should the processor try 
193     *  to find an equivalent match?  
194     */
195    public final static String ATTR_EQUIVALENT = "try-equivalent";
196    {
197        setTryEquivalent(false);
198    }
199    public Boolean getTryEquivalent(){
200        return (Boolean)kp.get(ATTR_EQUIVALENT);
201    }
202    /** SPRING SETTER */
203    public void setTryEquivalent(Boolean tryEquivalent){
204        kp.put(ATTR_EQUIVALENT, tryEquivalent);
205    }
206
207    /* The filter on mime types. This is either a blacklist or whitelist
208     *  depending on ATTR_FILTER_MODE.
209     */
210    public final static String ATTR_MIME_FILTER = "mime-filter";
211    public final static String DEFAULT_MIME_FILTER = "^text/.*";
212    {
213        setMimeFilter(DEFAULT_MIME_FILTER);
214    }
215    public String getMimeFilter(){
216        return (String)kp.get(ATTR_MIME_FILTER);
217    }
218    // USED by SPRING
219    public void setMimeFilter(String mimeFilter){
220        kp.put(ATTR_MIME_FILTER, mimeFilter);
221    }
222
223    /* Is the mime filter a blacklist (do not apply processor to what matches) 
224     *  or whitelist (apply processor only to what matches).
225     */
226    public final static String ATTR_FILTER_MODE = "filter-mode";
227    {
228        setfilterMode(FilterMode.BLACKLIST);
229    }
230    
231    public FilterMode getFilterMode() {
232        return (FilterMode) kp.get(ATTR_FILTER_MODE);
233    }
234    
235    
236    public enum FilterMode {
237        BLACKLIST, WHITELIST
238    };
239    
240    
241    public Boolean getBlacklist(){
242        FilterMode fMode = (FilterMode) kp.get(ATTR_FILTER_MODE);
243        return fMode.equals(FilterMode.BLACKLIST);
244    }
245    /** SPRING SETTER method */
246    public void setfilterMode(FilterMode filterMode){
247        kp.put(ATTR_FILTER_MODE, filterMode);
248    }
249   
250    
251    public enum AnalysisMode {
252        NONE, TIMESTAMP, TIMESTAMP_AND_ETAG
253    };
254    
255    
256    /* Analysis mode. */
257    public final static String ATTR_ANALYZE_MODE = "analyze-modes";
258    {
259        setAnalysisMode(AnalysisMode.TIMESTAMP);
260    }
261    
262    public boolean getAnalyzeTimestamp() {
263        AnalysisMode analysisMode = (AnalysisMode) kp.get(ATTR_ANALYZE_MODE);
264        return analysisMode.equals(AnalysisMode.TIMESTAMP);
265    }
266    
267    public void setAnalysisMode(AnalysisMode analyzeMode) {     
268                kp.put(ATTR_ANALYZE_MODE, analyzeMode);
269    }
270    
271    public AnalysisMode getAnalysisMode()  {    
272        return (AnalysisMode) kp.get(ATTR_ANALYZE_MODE);
273    }
274    
275
276    /* Should the content size information be set to zero when a duplicate is found? */
277    public final static String ATTR_CHANGE_CONTENT_SIZE = "change-content-size";
278    {
279        setChangeContentSize(false);
280    }
281    public Boolean getChangeContentSize(){
282        return (Boolean)kp.get(ATTR_CHANGE_CONTENT_SIZE);
283    }
284    /** SPRING SETTER */
285    public void setChangeContentSize(Boolean changeContentSize){
286        kp.put(ATTR_CHANGE_CONTENT_SIZE, changeContentSize);
287    }
288
289    /* Should statistics be tracked per host? **/
290    public final static String ATTR_STATS_PER_HOST = "stats-per-host";
291    {
292        setStatsPerHost(false);
293    }
294    public Boolean getStatsPerHost(){
295        return (Boolean)kp.get(ATTR_STATS_PER_HOST);
296    }
297    public void setStatsPerHost(Boolean statsPerHost){
298        kp.put(ATTR_STATS_PER_HOST, statsPerHost);
299    }
300    
301    /* How should 'origin' be handled */
302    public final static String ATTR_ORIGIN_HANDLING = "origin-handling";
303    public enum OriginHandling {
304        NONE,           // No origin information
305        PROCESSOR,  // Use processor setting -- ATTR_ORIGIN
306        INDEX       // Use index information, each hit on index should contain origin
307    }
308    public final static OriginHandling DEFAULT_ORIGIN_HANDLING = OriginHandling.NONE;
309    {
310        setOriginHandling(DEFAULT_ORIGIN_HANDLING);
311    }
312    public OriginHandling getOriginHandling() {
313        return (OriginHandling) kp.get(ATTR_ORIGIN_HANDLING);
314    }
315    public void setOriginHandling(OriginHandling originHandling) {
316        kp.put(ATTR_ORIGIN_HANDLING, originHandling);
317    }
318
319    public final static String ATTR_REVISIT_IN_WARCS = "revisit-in-warcs";
320    {
321        setRevisitInWarcs(Boolean.TRUE); // the default is true
322    }   
323    
324    public void setRevisitInWarcs(Boolean revisitOn) {
325        kp.put(ATTR_REVISIT_IN_WARCS, revisitOn);
326        }
327    public Boolean getRevisitInWarcs() {
328        return (Boolean) kp.get(ATTR_REVISIT_IN_WARCS);
329    }
330    
331    // Spring configured access to Heritrix resources
332    
333    // Gain access to the ServerCache for host based statistics.
334    protected ServerCache serverCache;
335    public ServerCache getServerCache() {
336        return this.serverCache;
337    }
338    
339        @Autowired
340    public void setServerCache(ServerCache serverCache) {
341        this.serverCache = serverCache;
342    }
343
344    
345    // Member variables.
346    protected IndexSearcher indexSearcher = null;
347    protected IndexReader indexReader = null;
348    
349    
350    protected boolean lookupByURL = true;
351    protected boolean statsPerHost = false;
352    
353    
354    protected boolean useOrigin = false;
355    protected boolean useOriginFromIndex = false;
356
357    protected Statistics stats = null;
358    protected HashMap<String, Statistics> perHostStats = null;
359
360
361    public void afterPropertiesSet() throws Exception {
362        if (!getEnabled()) {
363            logger.info(this.getClass().getName() + " disabled.");
364            return;
365        }
366        // Index location
367        String indexLocation = getIndexLocation();
368        try {
369                FSDirectory indexDir = FSDirectory.open(new File(indexLocation));
370            // https://issues.apache.org/jira/browse/LUCENE-1566
371            // Reduce chunksize to avoid OOM to half the size of the default (=100 MB)
372            int chunksize = indexDir.getReadChunkSize();
373            indexDir.setReadChunkSize(chunksize / 2);
374            indexReader = DirectoryReader.open(indexDir);
375            indexSearcher = new IndexSearcher(indexReader);    
376        } catch (Exception e) {
377                throw new IllegalArgumentException("Unable to find/open index at " + indexLocation,e);
378        } 
379        
380        // Matching method
381        MatchingMethod matchingMethod = getMatchingMethod();
382        lookupByURL = matchingMethod == MatchingMethod.URL;
383
384        // Track per host stats
385        statsPerHost = getStatsPerHost();
386        
387        // Origin handling.
388        OriginHandling originHandling = getOriginHandling();
389        if (originHandling != OriginHandling.NONE) {
390            useOrigin = true;
391            logger.fine("Use origin");        
392            if (originHandling == OriginHandling.INDEX) {
393                useOriginFromIndex = true;
394                logger.fine("Use origin from index");
395            }
396        }
397        
398        // Initialize some internal variables:
399        stats = new Statistics();
400        if (statsPerHost) {
401            perHostStats = new HashMap<String, Statistics>();
402        }
403    }
404    
405
406        @Override
407        protected boolean shouldProcess(CrawlURI curi) {
408        if (!getEnabled()) {
409            logger.finest("Not handling " + curi.toString() + ", deduplication disabled.");
410            return false;
411        }
412        if (curi.isSuccess() == false) {
413            // Early return. No point in doing comparison on failed downloads.
414            logger.finest("Not handling " + curi.toString()
415                    + ", did not succeed.");
416            return false;
417        }
418        if (curi.isPrerequisite()) {
419            // Early return. Prerequisites are exempt from checking.
420            logger.finest("Not handling " + curi.toString()
421                    + ", prerequisite.");
422            return false;
423        }
424        if (curi.toString().startsWith("http")==false) {
425            // Early return. Non-http documents are not handled at present
426            logger.finest("Not handling " + curi.toString()
427                        + ", non-http.");
428            return false;
429        }
430        if(curi.getContentType() == null){
431            // No content type means we can not handle it.
432            logger.finest("Not handling " + curi.toString()
433                    + ", missing content (mime) type");
434            return false;
435        }
436        if(curi.getContentType().matches(getMimeFilter()) == getBlacklist()){
437            // Early return. Does not pass the mime filter
438            logger.finest("Not handling " + curi.toString()
439                    + ", excluded by mimefilter (" + 
440                    curi.getContentType() + ").");
441            return false;
442        }
443        
444        if(curi.isRevisit()){
445            // A previous processor or filter has judged this CrawlURI to be a revisit
446            logger.finest("Not handling " + curi.toString()
447                    + ", already flagged as revisit.");
448            return false;
449        }
450        return true;
451        }
452
453    @Override
454    protected void innerProcess(CrawlURI puri) {
455        throw new AssertionError();
456    }
457
458    /**
459     * Return date from 'date' field if date absent in 'origin' field or origin-field-absent
460     * @param duplicate
461     * @return
462     */
463    private String getRefersToDate(Document duplicate) {
464        String indexedDate = duplicate.get("date"); // DATE.name()
465        // look for the indexeddate of the revisit date in the origin "arcfile,offset,timestamp" 
466        String duplicateOrigin = duplicate.get(DigestIndexer.FIELD_ORIGIN);
467        if (duplicateOrigin != null && !duplicateOrigin.isEmpty()) {
468                String[] parts = duplicateOrigin.split(",");
469                if (parts.length == 3)  { // Detect new field-origin format 
470                        indexedDate = parts[2];
471                }
472        }
473        Date readDate = null;
474        try {
475                readDate = ArchiveDateConverter.getHeritrixDateFormat().parse(indexedDate);
476        } catch (ParseException e) {
477                logger.warning("Unable to parse the indexed date '" + indexedDate 
478                                + "' as a 17-digit date: " + e); 
479        }
480        String refersToDateString = indexedDate;
481        if (readDate != null) {
482                refersToDateString = ArchiveDateConverter.getWarcDateFormat().format(readDate); 
483        }
484        return refersToDateString;
485    }
486
487
488
489        @Override
490        protected ProcessResult innerProcessResult(CrawlURI curi) throws InterruptedException {
491
492        ProcessResult processResult = ProcessResult.PROCEED; // Default. Continue as normal
493
494        logger.finest("Processing " + curi.toString() + "(" + 
495                curi.getContentType() + ")");
496
497        stats.handledNumber++;
498        stats.totalAmount += curi.getContentSize();
499        Statistics currHostStats = null;
500        if(statsPerHost){
501            synchronized (perHostStats) {
502                String host = getServerCache().getHostFor(curi.getUURI()).getHostName();
503                currHostStats = perHostStats.get(host);
504                if(currHostStats==null){
505                    currHostStats = new Statistics();
506                    perHostStats.put(host,currHostStats);
507                }
508            }
509            currHostStats.handledNumber++;
510            currHostStats.totalAmount += curi.getContentSize();
511        }
512        
513        Document duplicate = null; 
514        
515        if(lookupByURL){
516            duplicate = lookupByURL(curi,currHostStats);
517        } else {
518            duplicate = lookupByDigest(curi,currHostStats);
519        }
520
521        if (duplicate != null){
522            // Perform tasks common to when a duplicate is found.
523
524                //// Code taken from LuceneIndexSearcher.wrap() method //////////////////////////
525                IdenticalPayloadDigestRevisit duplicateRevisit = new IdenticalPayloadDigestRevisit(
526                                duplicate.get("digest")); //DIGEST.name()));
527
528                duplicateRevisit.setRefersToTargetURI(
529                                duplicate.get("url"));  // URL.name()
530
531                
532                duplicateRevisit.setRefersToDate(getRefersToDate(duplicate));
533                                
534                
535                //Check if the record ID information is available in the index.
536                // This requires that record information is available during indexing
537                String refersToRecordID = duplicate.get("orig_record_id"); // ORIGINAL_RECORD_ID.name()); 
538        
539                if (refersToRecordID!=null && !refersToRecordID.isEmpty()) {
540                        duplicateRevisit.setRefersToRecordID(refersToRecordID);
541                }               
542
543
544            // Increment statistics counters
545            stats.duplicateAmount += curi.getContentSize();
546            stats.duplicateNumber++;
547            if(statsPerHost){ 
548                currHostStats.duplicateAmount+=curi.getContentSize();
549                currHostStats.duplicateNumber++;
550            }
551
552            String jumpTo = getJumpTo(); 
553            // Duplicate. Skip part of processing chain?
554            if(jumpTo!=null){
555                processResult = ProcessResult.jump(jumpTo);
556            } 
557            
558            // Record origin?
559            String annotation = "duplicate";
560            if(useOrigin){
561                // TODO: Save origin in the CrawlURI so that other processors
562                //       can make use of it. (Future: WARC)
563                if(useOriginFromIndex && 
564                    duplicate.get(DigestIndexer.FIELD_ORIGIN)!=null){
565                    // Index contains origin, use it.
566                    annotation += ":\"" + duplicate.get(DigestIndexer.FIELD_ORIGIN) + "\""; // If 
567                } else {
568                    String tmp = getOrigin();
569                    // Check if an origin value is actually available
570                    if(tmp != null && tmp.trim().length() > 0){
571                        // It is available, add it to the log line.
572                        annotation += ":\"" + tmp + "\""; 
573                    }
574                }
575            } 
576            // Make duplicate-note in crawl-log
577            curi.getAnnotations().add(annotation);
578            // Notify Heritrix that this is a revisit if we want revisit records to be written
579            if (getRevisitInWarcs()) {
580                curi.setRevisitProfile(duplicateRevisit);
581            }
582            
583            /* TODO enable this when moving to indexing based on this data
584            // Add annotation to crawl.log 
585            curi.getAnnotations().add(REVISIT_ANNOTATION_MARKER);
586                        
587            // Write extra logging information (needs to be enabled in CrawlerLoggerModule)
588            curi.addExtraInfo(EXTRA_REVISIT_PROFILE, duplicateRevisit.getProfileName());
589            curi.addExtraInfo(EXTRA_REVISIT_URI, duplicateRevisit.getRefersToTargetURI());
590            curi.addExtraInfo(EXTRA_REVISIT_DATE, duplicateRevisit.getRefersToDate());
591            */
592            
593        }
594        if(getAnalyzeTimestamp()){
595            doAnalysis(curi,currHostStats, duplicate!=null);
596        }
597        return processResult;
598        }
599
600        /**
601     * Process a CrawlURI looking up in the index by URL
602     *
603     * @param curi The CrawlURI to process
604     * @param currHostStats A statistics object for the current host. If per host statistics tracking is enabled this
605     * must be non null and the method will increment appropriate counters on it.
606     * @return The result of the lookup (a Lucene document). If a duplicate is not found null is returned.
607     */
608    protected Document lookupByURL(CrawlURI curi, Statistics currHostStats) {
609        // Look the CrawlURI's URL up in the index.
610        try {
611            Query query = queryField(DigestIndexer.FIELD_URL, curi.toString());
612            AllDocsCollector collectAllCollector = new AllDocsCollector();
613            indexSearcher.search(query, collectAllCollector);
614
615            List<ScoreDoc> hits = collectAllCollector.getHits();
616            Document doc = null;
617            String currentDigest = getDigestAsString(curi);
618            if (hits != null && hits.size() > 0) {
619                // Typically there should only be one it, but we'll allow for
620                // multiple hits.
621                for (ScoreDoc hit : hits) {
622                    // for(int i=0 ; i < hits.size() ; i++){
623                    // Multiple hits on same exact URL should be rare
624                    // See if any have matching content digests
625                    int docId = hit.doc;
626                    doc = indexSearcher.doc(docId);
627                    String oldDigest = doc.get(DigestIndexer.FIELD_DIGEST);
628
629                    if (oldDigest.equalsIgnoreCase(currentDigest)) {
630                        stats.exactURLDuplicates++;
631                        if (statsPerHost) {
632                            currHostStats.exactURLDuplicates++;
633                        }
634
635                        logger.finest("Found exact match for " + curi.toString());
636
637                        // If we found a hit, no need to look at other hits.
638                        return doc;
639                    }
640                }
641            }
642            if (getTryEquivalent()) {
643                // No exact hits. Let's try lenient matching.
644                String normalizedURL = DigestIndexer.stripURL(curi.toString());
645                query = queryField(DigestIndexer.FIELD_URL_NORMALIZED, normalizedURL);
646                collectAllCollector.reset(); // reset collector
647                indexSearcher.search(query, collectAllCollector);
648                hits = collectAllCollector.getHits();
649
650                for (ScoreDoc hit : hits) {
651                    // int i=0 ; i < hits.length ; i++){
652
653                    int docId = hit.doc;
654                    Document doc1 = indexSearcher.doc(docId);
655                    String indexDigest = doc1.get(DigestIndexer.FIELD_DIGEST);
656                    if (indexDigest.equals(currentDigest)) {
657                        // Make note in log
658                        String equivURL = doc1.get(DigestIndexer.FIELD_URL);
659                        curi.getAnnotations().add("equivalentURL:\"" + equivURL + "\"");
660                        // Increment statistics counters
661                        stats.equivalentURLDuplicates++;
662                        if (statsPerHost) {
663                            currHostStats.equivalentURLDuplicates++;
664                        }
665                        logger.finest("Found equivalent match for " + curi.toString() + ". Normalized: "
666                                + normalizedURL + ". Equivalent to: " + equivURL);
667
668                        // If we found a hit, no need to look at more.
669                        return doc1;
670                    }
671                }
672            }
673        } catch (IOException e) {
674            logger.log(Level.SEVERE, "Error accessing index.", e);
675        }
676        // If we make it here then this is not a duplicate.
677        return null;
678    }
679    
680    /**
681     * Process a CrawlURI looking up in the index by content digest
682     *
683     * @param curi The CrawlURI to process
684     * @param currHostStats A statistics object for the current host. If per host statistics tracking is enabled this
685     * must be non null and the method will increment appropriate counters on it.
686     * @return The result of the lookup (a Lucene document). If a duplicate is not found null is returned.
687     */
688    protected Document lookupByDigest(CrawlURI curi, Statistics currHostStats) {
689        Document duplicate = null;
690        String currentDigest = null;
691        Object digest = curi.getContentDigest();
692        if (digest != null) {
693            currentDigest = Base32.encode((byte[]) digest);
694        } else {
695            logger.warning("Digest received from CrawlURI is null. Null Document returned");
696            return null;
697        }
698
699        Query query = queryField(DigestIndexer.FIELD_DIGEST, currentDigest);
700        try {
701            AllDocsCollector collectAllCollector = new AllDocsCollector();
702            indexSearcher.search(query, collectAllCollector);
703
704            List<ScoreDoc> hits = collectAllCollector.getHits();
705
706            StringBuffer mirrors = new StringBuffer();
707            mirrors.append("mirrors: ");
708            if (hits != null && hits.size() > 0) {
709                // Can definitely be more then one
710                // Note: We may find an equivalent match before we find an
711                // (existing) exact match.
712                // TODO: Ensure that an exact match is recorded if it exists.
713                Iterator<ScoreDoc> hitsIterator = hits.iterator();
714                while (hitsIterator.hasNext() && duplicate == null) {
715                    ScoreDoc hit = hitsIterator.next();
716                    int docId = hit.doc;
717                    Document doc = indexSearcher.doc(docId);
718                    String indexURL = doc.get(DigestIndexer.FIELD_URL);
719                    // See if the current hit is an exact match.
720                    if (curi.toString().equals(indexURL)) {
721                        duplicate = doc;
722                        stats.exactURLDuplicates++;
723                        if (statsPerHost) {
724                            currHostStats.exactURLDuplicates++;
725                        }
726                        logger.finest("Found exact match for " + curi.toString());
727                    }
728
729                    // If not, then check if it is an equivalent match (if
730                    // equivalent matches are allowed).
731                    if (duplicate == null && getTryEquivalent()) {
732                        String normalURL = DigestIndexer.stripURL(curi.toString());
733                        String indexNormalURL = doc.get(DigestIndexer.FIELD_URL_NORMALIZED);
734                        if (normalURL.equals(indexNormalURL)) {
735                            duplicate = doc;
736                            stats.equivalentURLDuplicates++;
737                            if (statsPerHost) {
738                                currHostStats.equivalentURLDuplicates++;
739                            }
740                            curi.getAnnotations().add("equivalentURL:\"" + indexURL + "\"");
741                            logger.finest("Found equivalent match for " + curi.toString() + ". Normalized: "
742                                    + normalURL + ". Equivalent to: " + indexURL);
743                        }
744                    }
745
746                    if (duplicate == null) {
747                        // Will only be used if no exact (or equivalent) match
748                        // is found.
749                        mirrors.append(indexURL + " ");
750                    }
751                }
752                if (duplicate == null) {
753                    stats.mirrorNumber++;
754                    if (statsPerHost) {
755                        currHostStats.mirrorNumber++;
756                    }
757                    logger.log(Level.FINEST, "Found mirror URLs for " + curi.toString() + ". " + mirrors);
758                }
759            }
760        } catch (IOException e) {
761            logger.log(Level.SEVERE, "Error accessing index.", e);
762        }
763        return duplicate;
764    }
765
766    public String report() {
767        StringBuffer ret = new StringBuffer();
768        ret.append("Processor: is.hi.bok.digest.DeDuplicator\n");
769        ret.append("  Function:          Abort processing of duplicate records\n");
770        if (!getEnabled()) {
771            ret.append("Processor is disabled by configuration");
772            ret.append("\n");
773            return ret.toString();
774        }
775        ret.append("                     - Lookup by " + 
776                        (lookupByURL?"url":"digest") + " in use\n");
777        ret.append("  Total handled:     " + stats.handledNumber + "\n");
778        ret.append("  Duplicates found:  " + stats.duplicateNumber + " " + 
779                        getPercentage(stats.duplicateNumber,stats.handledNumber) + "\n");
780        ret.append("  Bytes total:       " + stats.totalAmount + " (" + 
781                        ArchiveUtils.formatBytesForDisplay(stats.totalAmount) + ")\n");
782        ret.append("  Bytes discarded:   " + stats.duplicateAmount + " (" + 
783                        ArchiveUtils.formatBytesForDisplay(stats.duplicateAmount) + ") " + 
784                        getPercentage(stats.duplicateAmount, stats.totalAmount) + "\n");
785        
786        ret.append("  New (no hits):     " + (stats.handledNumber-
787                        (stats.mirrorNumber+stats.exactURLDuplicates+stats.equivalentURLDuplicates)) + "\n");
788        ret.append("  Exact hits:        " + stats.exactURLDuplicates + "\n");
789        ret.append("  Equivalent hits:   " + stats.equivalentURLDuplicates + "\n");
790        if(lookupByURL==false){
791                ret.append("  Mirror hits:       " + stats.mirrorNumber + "\n");
792        }
793        
794        if(getAnalyzeTimestamp()){
795                ret.append("  Timestamp predicts: (Where exact URL existed in the index)\n");
796                ret.append("  Change correctly:  " + stats.timestampChangeCorrect + "\n");
797                ret.append("  Change falsely:     " + stats.timestampChangeFalse + "\n");
798                ret.append("  Non-change correct:" + stats.timestampNoChangeCorrect + "\n");
799                ret.append("  Non-change falsely: " + stats.timestampNoChangeFalse + "\n");
800                ret.append("  Missing timpestamp:" + stats.timestampMissing + "\n");
801                
802        }
803        
804        if(statsPerHost){
805            ret.append("  [Host] [total] [duplicates] [bytes] " +
806                    "[bytes discarded] [new] [exact] [equiv]");
807            if(lookupByURL==false){
808                ret.append(" [mirror]");
809            }
810            if(getAnalyzeTimestamp()){
811                ret.append(" [change correct] [change falsely]");
812                ret.append(" [non-change correct] [non-change falsely]");
813                ret.append(" [no timestamp]");
814            }
815            ret.append("\n");
816            synchronized (perHostStats) {
817                Iterator<String> it = perHostStats.keySet().iterator();
818                while(it.hasNext()){
819                    String key = it.next();
820                    Statistics curr = perHostStats.get(key);
821                    ret.append("  " +key);
822                    ret.append(" ");
823                    ret.append(curr.handledNumber);
824                    ret.append(" ");
825                    ret.append(curr.duplicateNumber);
826                    ret.append(" ");
827                    ret.append(curr.totalAmount);
828                    ret.append(" ");
829                    ret.append(curr.duplicateAmount);
830                    ret.append(" ");
831                    ret.append(curr.handledNumber-
832                            (curr.mirrorNumber+
833                             curr.exactURLDuplicates+
834                             curr.equivalentURLDuplicates));
835                    ret.append(" ");
836                    ret.append(curr.exactURLDuplicates);
837                    ret.append(" ");
838                    ret.append(curr.equivalentURLDuplicates);
839
840                    if(lookupByURL==false){
841                        ret.append(" ");
842                        ret.append(curr.mirrorNumber);
843                    }    
844                    if(getAnalyzeTimestamp()){
845                        ret.append(" ");
846                        ret.append(curr.timestampChangeCorrect);
847                        ret.append(" ");
848                        ret.append(curr.timestampChangeFalse);
849                        ret.append(" ");
850                        ret.append(curr.timestampNoChangeCorrect);
851                        ret.append(" ");
852                        ret.append(curr.timestampNoChangeFalse);
853                        ret.append(" ");
854                        ret.append(curr.timestampMissing);
855                    }
856                    ret.append("\n");
857                }
858            }
859        }
860        
861        ret.append("\n");
862        return ret.toString();
863        }
864        
865        protected static String getPercentage(double portion, double total){
866                double value = portion / total;
867                value = value*100;
868                String ret = Double.toString(value);
869                int dot = ret.indexOf('.');
870                if(dot+3<ret.length()){
871                        ret = ret.substring(0,dot+3);
872                }
873                return ret + "%";
874        }
875        
876        private static String getDigestAsString(CrawlURI curi){
877        // The CrawlURI now has a method for this. For backwards
878                // compatibility with older Heritrix versions that is not used.
879                Object digest = curi.getContentDigest();
880        if (digest != null) {
881            return Base32.encode((byte[])digest);
882        }
883        return null;
884        }
885        
886
887        protected void doAnalysis(CrawlURI curi, Statistics currHostStats,
888            boolean isDuplicate) {
889                try{
890                Query query = queryField(DigestIndexer.FIELD_URL, curi.toString());
891            AllDocsCollector collectAllCollector = new AllDocsCollector();
892                        indexSearcher.search(query, collectAllCollector);
893            List<ScoreDoc> hits = collectAllCollector.getHits();
894
895            Document doc = null;
896        
897            if(hits != null && hits.size() > 0){
898                // If there are multiple hits, use the one with the most
899                // recent date.
900                Document docToEval = null;
901                for (ScoreDoc hit : hits) {
902                    int docId = hit.doc;
903                    doc = indexSearcher.doc(docId);
904                    // The format of the timestamp ("yyyyMMddHHmmssSSS") allows
905                    // us to do a greater then (later) or lesser than (earlier)
906                    // comparison of the strings.
907                    String timestamp = doc.get(DigestIndexer.FIELD_TIMESTAMP);
908                    if (docToEval == null || docToEval.get(DigestIndexer.FIELD_TIMESTAMP).compareTo(timestamp) > 0) {
909                        // Found a more recent hit.
910                        docToEval = doc;
911                    }
912                }
913                doTimestampAnalysis(curi,docToEval, currHostStats, isDuplicate);
914                }
915        } catch(IOException e){
916            logger.log(Level.SEVERE,"Error accessing index.",e);
917        }
918        }
919        
920                
921        protected void doTimestampAnalysis(CrawlURI curi, Document urlHit, 
922            Statistics currHostStats, boolean isDuplicate){
923        
924        //HttpMethod method = curi.getHttpMethod();
925
926        // Compare datestamps (last-modified versus the indexed date)
927        Date lastModified = null;
928        if (curi.getHttpResponseHeader("last-modified") != null) {
929            SimpleDateFormat sdf = 
930                new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", 
931                        Locale.ENGLISH);
932            try {
933                                lastModified = sdf.parse(
934                                                curi.getHttpResponseHeader("last-modified"));                   // .getValue()
935                        } catch (ParseException e) {
936                                logger.log(Level.INFO,"Exception parsing last modified of " + 
937                                                curi.toString(),e);
938                                return;
939                        }
940        } else {
941            stats.timestampMissing++;
942            if (statsPerHost) {
943                currHostStats.timestampMissing++;
944                logger.finest("Missing timestamp on " + curi.toString());
945            }
946                return;
947        }
948        
949        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
950        Date lastFetch = null;
951        try {
952                        lastFetch = sdf.parse(
953                                        urlHit.get(DigestIndexer.FIELD_TIMESTAMP));
954                } catch (ParseException e) {
955                        logger.log(Level.WARNING,"Exception parsing indexed date for " + 
956                                        urlHit.get(DigestIndexer.FIELD_URL),e);
957                        return;
958                }
959
960                if(lastModified.after(lastFetch)){
961                        // Header predicts change
962                        if(isDuplicate){
963                                // But the DeDuplicator did not notice a change.
964                stats.timestampChangeFalse++;
965                if (statsPerHost){
966                    currHostStats.timestampChangeFalse++;
967                }
968                logger.finest("Last-modified falsly predicts change on " + 
969                        curi.toString());
970                        } else {
971                stats.timestampChangeCorrect++;
972                if (statsPerHost){
973                    currHostStats.timestampChangeCorrect++;
974                }
975                logger.finest("Last-modified correctly predicts change on " + 
976                        curi.toString());
977                        }
978                } else {
979                        // Header does not predict change.
980                        if(isDuplicate){
981                                // And the DeDuplicator verifies that no change had occurred
982                stats.timestampNoChangeCorrect++;
983                if (statsPerHost){
984                    currHostStats.timestampNoChangeCorrect++;
985                }
986                logger.finest("Last-modified correctly predicts no-change on " + 
987                        curi.toString());
988                        } else {
989                                // As this is particularly bad we'll log the URL at INFO level
990                                logger.log(Level.INFO,"Last-modified incorrectly indicated " +
991                                                "no-change on " + curi.toString() + " " + 
992                                                curi.getContentType() + ". last-modified: " + 
993                        lastModified + ". Last fetched: " + lastFetch);
994                stats.timestampNoChangeFalse++;
995                if (statsPerHost){
996                    currHostStats.timestampNoChangeFalse++;
997                }
998                        }
999                }
1000        
1001        }
1002
1003    /** Run a simple Lucene query for a single term in a single field.
1004     *
1005     * @param fieldName name of the field to look in.
1006     * @param value The value to query for
1007     * @return A Query for the given value in the given field.
1008     */
1009        protected Query queryField(String fieldName, String value) {
1010                Query query = null;
1011
1012                /** alternate solution. */
1013                BytesRef valueRef = new BytesRef(value.getBytes());
1014                query = new ConstantScoreQuery(new TermRangeFilter(fieldName, valueRef, valueRef, true, true));
1015
1016                /** The most clean solution, but it seems also memory demanding */
1017                // query = new ConstantScoreQuery(new FieldCacheTermsFilter(fieldName,
1018                // value));
1019                return query;   
1020        }
1021        
1022}
1023
1024class Statistics{
1025    // General statistics
1026    
1027    /** Number of URIs that make it through the processors exclusion rules
1028     *  and are processed by it.
1029     */
1030    long handledNumber = 0;
1031    
1032    /** Number of URIs that are deemed duplicates and further processing is
1033     *  aborted
1034     */
1035    long duplicateNumber = 0;
1036    
1037    /** Then number of URIs that turned out to have exact URL and content 
1038     *  digest matches.
1039     */
1040    long exactURLDuplicates = 0;
1041    
1042    /** The number of URIs that turned out to have equivalent URL and content
1043     *  digest matches.
1044     */
1045    long equivalentURLDuplicates = 0;
1046    
1047    /** The number of URIs that, while having no exact or equivalent matches,  
1048     *  do have exact content digest matches against non-equivalent URIs.
1049     */
1050    long mirrorNumber = 0;
1051    
1052    /** The total amount of data represented by the documents who were deemed
1053     *  duplicates and excluded from further processing.
1054     */
1055    long duplicateAmount = 0;
1056    
1057    /** The total amount of data represented by all the documents processed **/
1058    long totalAmount = 0;
1059    
1060    // Timestamp analysis
1061    
1062    long timestampChangeCorrect = 0;
1063    long timestampChangeFalse = 0;
1064    long timestampNoChangeCorrect = 0;
1065    long timestampNoChangeFalse = 0;
1066    long timestampMissing = 0;
1067
1068    // ETag analysis;
1069    
1070    long ETagChangeCorrect = 0;
1071    long ETagChangeFalse = 0;
1072    long ETagNoChangeCorrect = 0;
1073    long ETagNoChangeFalse = 0;
1074    long ETagMissingIndex = 0;
1075    long ETagMissingCURI = 0;
1076}
1077