001/* DeDuplicator 002 * 003 * Created on 10.04.2006 004 * 005 * Copyright (C) 2006-2010 National and University Library of Iceland 006 * 007 * This file is part of the DeDuplicator (Heritrix add-on module). 008 * 009 * DeDuplicator is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU Lesser Public License as published by 011 * the Free Software Foundation; either version 2.1 of the License, or 012 * any later version. 013 * 014 * DeDuplicator is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU Lesser Public License for more details. 018 * 019 * You should have received a copy of the GNU Lesser Public License 020 * along with DeDuplicator; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023package is.hi.bok.deduplicator; 024 025import static is.hi.bok.deduplicator.DedupAttributeConstants.A_CONTENT_STATE_KEY; 026import static is.hi.bok.deduplicator.DedupAttributeConstants.CONTENT_UNCHANGED; 027import static org.archive.modules.recrawl.RecrawlAttributeConstants.A_CONTENT_DIGEST; 028import static org.archive.modules.recrawl.RecrawlAttributeConstants.A_FETCH_HISTORY; 029 030import java.io.File; 031import java.io.IOException; 032import java.text.ParseException; 033import java.text.SimpleDateFormat; 034import java.util.Date; 035import java.util.HashMap; 036import java.util.Iterator; 037import java.util.List; 038import java.util.Locale; 039import java.util.logging.Level; 040import java.util.logging.Logger; 041 042import org.apache.lucene.document.Document; 043import org.apache.lucene.index.DirectoryReader; 044import org.apache.lucene.index.IndexReader; 045import org.apache.lucene.search.ConstantScoreQuery; 046import org.apache.lucene.search.IndexSearcher; 047import org.apache.lucene.search.Query; 048import org.apache.lucene.search.ScoreDoc; 049import org.apache.lucene.search.TermRangeFilter; 050import org.apache.lucene.store.FSDirectory; 051import org.apache.lucene.util.BytesRef; 052import org.archive.modules.CrawlURI; 053import org.archive.modules.ProcessResult; 054import org.archive.modules.Processor; 055import org.archive.modules.net.ServerCache; 056import org.archive.modules.revisit.IdenticalPayloadDigestRevisit; 057import org.archive.util.ArchiveUtils; 058import org.archive.util.Base32; 059import org.springframework.beans.factory.InitializingBean; 060import org.springframework.beans.factory.annotation.Autowired; 061 062import dk.netarkivet.common.utils.AllDocsCollector; 063 064/** 065 * Heritrix compatible processor. 066 * <p> 067 * Will determine if CrawlURIs are <i>duplicates</i>. 068 * <p> 069 * Duplicate detection can only be performed <i>after</i> the fetch processors 070 * have run. 071 * Modified by SVC to use Lucene 4.X 072 * 073 * @author Kristinn Sigurðsson 074 * @author Søren Vejrup Carlsen 075 * 076 * <bean id="DeDuplicator" class="is.hi.bok.deduplicator.DeDuplicator"> 077 * <!-- DEDUPLICATION_INDEX_LOCATION is replaced by path on harvest-server --> 078 * <property name="indexLocation" value="/home/svc/dedupcrawllogindex/empty-cache"/> 079 <property name="matchingMethod" value="URL"/> other option: DIGEST 080 <property name="tryEquivalent" value="true"/> 081 <property name="changeContentSize" value="false"/> 082 <property name="mimeFilter" value="^text/.*"/> 083 084 <property name="filterMode" value="BLACKLIST"/> Other option: WHITELIST 085 <property name="analysisMode" value="TIMESTAMP"/> Other options: NONE, TIMESTAMP_AND_ETAG 086 087 <property name="origin" value=""/> 088 <property name="originHandling" value="INDEX"/> Other options: NONE,PROCESSOR 089 <property name="statsPerHost" value="true"/> 090 <property name="revisitInWarcs" value="true"/> 091 092// /** 093// (FROM deduplicator-commons/src/main/java/is/landsbokasafn/deduplicator/IndexFields.java) 094// * These enums correspond to the names of fields in the Lucene index 095// */ 096// public enum IndexFields { 097// /** The URL 098// * This value is suitable for use in warc/revisit records as the WARC-Refers-To-Target-URI 099// **/ 100// URL, 101// /** The content digest as String **/ 102// DIGEST, 103// /** The URLs timestamp (time of fetch). Suitable for use in WARC-Refers-To-Date. Encoded according to 104// * w3c-iso8601 105// */ 106// DATE, 107// /** The document's etag **/ 108// ETAG, 109// /** A canonicalized version of the URL **/ 110// URL_CANONICALIZED, 111// /** WARC Record ID of original payload capture. Suitable for WARC-Refers-To field. **/ 112// ORIGINAL_RECORD_ID; 113// 114// } 115 116@SuppressWarnings({"unchecked"}) 117public class DeDuplicator extends Processor implements InitializingBean { 118 119 @Override public boolean getEnabled() { 120 return super.getEnabled(); 121 } 122 123 @Override public void setEnabled(boolean enabled) { 124 super.setEnabled(enabled); 125 } 126 127 private static Logger logger = 128 Logger.getLogger(DeDuplicator.class.getName()); 129 130 // Spring configurable parameters 131 132 /* Location of Lucene Index to use for lookups */ 133 private final static String ATTR_INDEX_LOCATION = "index-location"; 134 135 public String getIndexLocation() { 136 return (String) kp.get(ATTR_INDEX_LOCATION); 137 } 138 /** SETTER used by Spring */ 139 public void setIndexLocation(String indexLocation) { 140 kp.put(ATTR_INDEX_LOCATION,indexLocation); 141 } 142 143 /* The matching method in use (by url or content digest) */ 144 private final static String ATTR_MATCHING_METHOD = "matching-method"; 145 146 public enum MatchingMethod { 147 URL, 148 DIGEST 149 } 150 151 private final static MatchingMethod DEFAULT_MATCHING_METHOD = MatchingMethod.URL; 152 { 153 setMatchingMethod(DEFAULT_MATCHING_METHOD); 154 } 155 public MatchingMethod getMatchingMethod() { 156 return (MatchingMethod) kp.get(ATTR_MATCHING_METHOD); 157 } 158 159 /** SETTER used by Spring */ 160 public void setMatchingMethod(MatchingMethod method) { 161 kp.put(ATTR_MATCHING_METHOD, method); 162 } 163 164 /* On duplicate, should jump to which part of processing chain? 165 * If not set, nothing is skipped. Otherwise this should be the identity of the processor to jump to. 166 */ 167 public final static String ATTR_JUMP_TO = "jump-to"; 168 public String getJumpTo(){ 169 return (String)kp.get(ATTR_JUMP_TO); 170 } 171 /** SPRING SETTER. 172 * TODO Are we using this property?? The netarkivet are not 173 */ 174 public void setJumpTo(String jumpTo){ 175 kp.put(ATTR_JUMP_TO, jumpTo); 176 } 177 178 /* Origin of duplicate URLs. May be overridden by info from index */ 179 public final static String ATTR_ORIGIN = "origin"; 180 { 181 setOrigin(""); 182 } 183 public String getOrigin() { 184 return (String) kp.get(ATTR_ORIGIN); 185 } 186 187 /** SPRING SETTER */ 188 public void setOrigin(String origin) { 189 kp.put(ATTR_ORIGIN,origin); 190 } 191 192 /* If an exact match is not made, should the processor try 193 * to find an equivalent match? 194 */ 195 public final static String ATTR_EQUIVALENT = "try-equivalent"; 196 { 197 setTryEquivalent(false); 198 } 199 public Boolean getTryEquivalent(){ 200 return (Boolean)kp.get(ATTR_EQUIVALENT); 201 } 202 /** SPRING SETTER */ 203 public void setTryEquivalent(Boolean tryEquivalent){ 204 kp.put(ATTR_EQUIVALENT, tryEquivalent); 205 } 206 207 /* The filter on mime types. This is either a blacklist or whitelist 208 * depending on ATTR_FILTER_MODE. 209 */ 210 public final static String ATTR_MIME_FILTER = "mime-filter"; 211 public final static String DEFAULT_MIME_FILTER = "^text/.*"; 212 { 213 setMimeFilter(DEFAULT_MIME_FILTER); 214 } 215 public String getMimeFilter(){ 216 return (String)kp.get(ATTR_MIME_FILTER); 217 } 218 // USED by SPRING 219 public void setMimeFilter(String mimeFilter){ 220 kp.put(ATTR_MIME_FILTER, mimeFilter); 221 } 222 223 /* Is the mime filter a blacklist (do not apply processor to what matches) 224 * or whitelist (apply processor only to what matches). 225 */ 226 public final static String ATTR_FILTER_MODE = "filter-mode"; 227 { 228 setfilterMode(FilterMode.BLACKLIST); 229 } 230 231 public FilterMode getFilterMode() { 232 return (FilterMode) kp.get(ATTR_FILTER_MODE); 233 } 234 235 236 public enum FilterMode { 237 BLACKLIST, WHITELIST 238 }; 239 240 241 public Boolean getBlacklist(){ 242 FilterMode fMode = (FilterMode) kp.get(ATTR_FILTER_MODE); 243 return fMode.equals(FilterMode.BLACKLIST); 244 } 245 /** SPRING SETTER method */ 246 public void setfilterMode(FilterMode filterMode){ 247 kp.put(ATTR_FILTER_MODE, filterMode); 248 } 249 250 251 public enum AnalysisMode { 252 NONE, TIMESTAMP, TIMESTAMP_AND_ETAG 253 }; 254 255 256 /* Analysis mode. */ 257 public final static String ATTR_ANALYZE_MODE = "analyze-modes"; 258 { 259 setAnalysisMode(AnalysisMode.TIMESTAMP); 260 } 261 262 public boolean getAnalyzeTimestamp() { 263 AnalysisMode analysisMode = (AnalysisMode) kp.get(ATTR_ANALYZE_MODE); 264 return analysisMode.equals(AnalysisMode.TIMESTAMP); 265 } 266 267 public void setAnalysisMode(AnalysisMode analyzeMode) { 268 kp.put(ATTR_ANALYZE_MODE, analyzeMode); 269 } 270 271 public AnalysisMode getAnalysisMode() { 272 return (AnalysisMode) kp.get(ATTR_ANALYZE_MODE); 273 } 274 275 276 /* Should the content size information be set to zero when a duplicate is found? */ 277 public final static String ATTR_CHANGE_CONTENT_SIZE = "change-content-size"; 278 { 279 setChangeContentSize(false); 280 } 281 public Boolean getChangeContentSize(){ 282 return (Boolean)kp.get(ATTR_CHANGE_CONTENT_SIZE); 283 } 284 /** SPRING SETTER */ 285 public void setChangeContentSize(Boolean changeContentSize){ 286 kp.put(ATTR_CHANGE_CONTENT_SIZE, changeContentSize); 287 } 288 289 /* Should statistics be tracked per host? **/ 290 public final static String ATTR_STATS_PER_HOST = "stats-per-host"; 291 { 292 setStatsPerHost(false); 293 } 294 public Boolean getStatsPerHost(){ 295 return (Boolean)kp.get(ATTR_STATS_PER_HOST); 296 } 297 public void setStatsPerHost(Boolean statsPerHost){ 298 kp.put(ATTR_STATS_PER_HOST, statsPerHost); 299 } 300 301 /* How should 'origin' be handled */ 302 public final static String ATTR_ORIGIN_HANDLING = "origin-handling"; 303 public enum OriginHandling { 304 NONE, // No origin information 305 PROCESSOR, // Use processor setting -- ATTR_ORIGIN 306 INDEX // Use index information, each hit on index should contain origin 307 } 308 public final static OriginHandling DEFAULT_ORIGIN_HANDLING = OriginHandling.NONE; 309 { 310 setOriginHandling(DEFAULT_ORIGIN_HANDLING); 311 } 312 public OriginHandling getOriginHandling() { 313 return (OriginHandling) kp.get(ATTR_ORIGIN_HANDLING); 314 } 315 public void setOriginHandling(OriginHandling originHandling) { 316 kp.put(ATTR_ORIGIN_HANDLING, originHandling); 317 } 318 319 public final static String ATTR_REVISIT_IN_WARCS = "revisit-in-warcs"; 320 { 321 setRevisitInWarcs(Boolean.TRUE); // the default is true 322 } 323 324 public void setRevisitInWarcs(Boolean revisitOn) { 325 kp.put(ATTR_REVISIT_IN_WARCS, revisitOn); 326 } 327 public Boolean getRevisitInWarcs() { 328 return (Boolean) kp.get(ATTR_REVISIT_IN_WARCS); 329 } 330 331 // Spring configured access to Heritrix resources 332 333 // Gain access to the ServerCache for host based statistics. 334 protected ServerCache serverCache; 335 public ServerCache getServerCache() { 336 return this.serverCache; 337 } 338 339 @Autowired 340 public void setServerCache(ServerCache serverCache) { 341 this.serverCache = serverCache; 342 } 343 344 345 // Member variables. 346 protected IndexSearcher indexSearcher = null; 347 protected IndexReader indexReader = null; 348 349 350 protected boolean lookupByURL = true; 351 protected boolean statsPerHost = false; 352 353 354 protected boolean useOrigin = false; 355 protected boolean useOriginFromIndex = false; 356 357 protected Statistics stats = null; 358 protected HashMap<String, Statistics> perHostStats = null; 359 360 361 public void afterPropertiesSet() throws Exception { 362 if (!getEnabled()) { 363 logger.info(this.getClass().getName() + " disabled."); 364 return; 365 } 366 // Index location 367 String indexLocation = getIndexLocation(); 368 try { 369 FSDirectory indexDir = FSDirectory.open(new File(indexLocation)); 370 // https://issues.apache.org/jira/browse/LUCENE-1566 371 // Reduce chunksize to avoid OOM to half the size of the default (=100 MB) 372 int chunksize = indexDir.getReadChunkSize(); 373 indexDir.setReadChunkSize(chunksize / 2); 374 indexReader = DirectoryReader.open(indexDir); 375 indexSearcher = new IndexSearcher(indexReader); 376 } catch (Exception e) { 377 throw new IllegalArgumentException("Unable to find/open index at " + indexLocation,e); 378 } 379 380 // Matching method 381 MatchingMethod matchingMethod = getMatchingMethod(); 382 lookupByURL = matchingMethod == MatchingMethod.URL; 383 384 // Track per host stats 385 statsPerHost = getStatsPerHost(); 386 387 // Origin handling. 388 OriginHandling originHandling = getOriginHandling(); 389 if (originHandling != OriginHandling.NONE) { 390 useOrigin = true; 391 logger.fine("Use origin"); 392 if (originHandling == OriginHandling.INDEX) { 393 useOriginFromIndex = true; 394 logger.fine("Use origin from index"); 395 } 396 } 397 398 // Initialize some internal variables: 399 stats = new Statistics(); 400 if (statsPerHost) { 401 perHostStats = new HashMap<String, Statistics>(); 402 } 403 } 404 405 406 @Override 407 protected boolean shouldProcess(CrawlURI curi) { 408 if (!getEnabled()) { 409 logger.finest("Not handling " + curi.toString() + ", deduplication disabled."); 410 return false; 411 } 412 if (curi.isSuccess() == false) { 413 // Early return. No point in doing comparison on failed downloads. 414 logger.finest("Not handling " + curi.toString() 415 + ", did not succeed."); 416 return false; 417 } 418 if (curi.isPrerequisite()) { 419 // Early return. Prerequisites are exempt from checking. 420 logger.finest("Not handling " + curi.toString() 421 + ", prerequisite."); 422 return false; 423 } 424 if (curi.toString().startsWith("http")==false) { 425 // Early return. Non-http documents are not handled at present 426 logger.finest("Not handling " + curi.toString() 427 + ", non-http."); 428 return false; 429 } 430 if(curi.getContentType() == null){ 431 // No content type means we can not handle it. 432 logger.finest("Not handling " + curi.toString() 433 + ", missing content (mime) type"); 434 return false; 435 } 436 if(curi.getContentType().matches(getMimeFilter()) == getBlacklist()){ 437 // Early return. Does not pass the mime filter 438 logger.finest("Not handling " + curi.toString() 439 + ", excluded by mimefilter (" + 440 curi.getContentType() + ")."); 441 return false; 442 } 443 444 if(curi.isRevisit()){ 445 // A previous processor or filter has judged this CrawlURI to be a revisit 446 logger.finest("Not handling " + curi.toString() 447 + ", already flagged as revisit."); 448 return false; 449 } 450 return true; 451 } 452 453 @Override 454 protected void innerProcess(CrawlURI puri) { 455 throw new AssertionError(); 456 } 457 458 /** 459 * Return date from 'date' field if date absent in 'origin' field or origin-field-absent 460 * @param duplicate 461 * @return 462 */ 463 private String getRefersToDate(Document duplicate) { 464 String indexedDate = duplicate.get("date"); // DATE.name() 465 // look for the indexeddate of the revisit date in the origin "arcfile,offset,timestamp" 466 String duplicateOrigin = duplicate.get(DigestIndexer.FIELD_ORIGIN); 467 if (duplicateOrigin != null && !duplicateOrigin.isEmpty()) { 468 String[] parts = duplicateOrigin.split(","); 469 if (parts.length == 3) { // Detect new field-origin format 470 indexedDate = parts[2]; 471 } 472 } 473 Date readDate = null; 474 try { 475 readDate = ArchiveDateConverter.getHeritrixDateFormat().parse(indexedDate); 476 } catch (ParseException e) { 477 logger.warning("Unable to parse the indexed date '" + indexedDate 478 + "' as a 17-digit date: " + e); 479 } 480 String refersToDateString = indexedDate; 481 if (readDate != null) { 482 refersToDateString = ArchiveDateConverter.getWarcDateFormat().format(readDate); 483 } 484 return refersToDateString; 485 } 486 487 488 489 @Override 490 protected ProcessResult innerProcessResult(CrawlURI curi) throws InterruptedException { 491 492 ProcessResult processResult = ProcessResult.PROCEED; // Default. Continue as normal 493 494 logger.finest("Processing " + curi.toString() + "(" + 495 curi.getContentType() + ")"); 496 497 stats.handledNumber++; 498 stats.totalAmount += curi.getContentSize(); 499 Statistics currHostStats = null; 500 if(statsPerHost){ 501 synchronized (perHostStats) { 502 String host = getServerCache().getHostFor(curi.getUURI()).getHostName(); 503 currHostStats = perHostStats.get(host); 504 if(currHostStats==null){ 505 currHostStats = new Statistics(); 506 perHostStats.put(host,currHostStats); 507 } 508 } 509 currHostStats.handledNumber++; 510 currHostStats.totalAmount += curi.getContentSize(); 511 } 512 513 Document duplicate = null; 514 515 if(lookupByURL){ 516 duplicate = lookupByURL(curi,currHostStats); 517 } else { 518 duplicate = lookupByDigest(curi,currHostStats); 519 } 520 521 if (duplicate != null){ 522 // Perform tasks common to when a duplicate is found. 523 524 //// Code taken from LuceneIndexSearcher.wrap() method ////////////////////////// 525 IdenticalPayloadDigestRevisit duplicateRevisit = new IdenticalPayloadDigestRevisit( 526 duplicate.get("digest")); //DIGEST.name())); 527 528 duplicateRevisit.setRefersToTargetURI( 529 duplicate.get("url")); // URL.name() 530 531 532 duplicateRevisit.setRefersToDate(getRefersToDate(duplicate)); 533 534 535 //Check if the record ID information is available in the index. 536 // This requires that record information is available during indexing 537 String refersToRecordID = duplicate.get("orig_record_id"); // ORIGINAL_RECORD_ID.name()); 538 539 if (refersToRecordID!=null && !refersToRecordID.isEmpty()) { 540 duplicateRevisit.setRefersToRecordID(refersToRecordID); 541 } 542 543 544 // Increment statistics counters 545 stats.duplicateAmount += curi.getContentSize(); 546 stats.duplicateNumber++; 547 if(statsPerHost){ 548 currHostStats.duplicateAmount+=curi.getContentSize(); 549 currHostStats.duplicateNumber++; 550 } 551 552 String jumpTo = getJumpTo(); 553 // Duplicate. Skip part of processing chain? 554 if(jumpTo!=null){ 555 processResult = ProcessResult.jump(jumpTo); 556 } 557 558 // Record origin? 559 String annotation = "duplicate"; 560 if(useOrigin){ 561 // TODO: Save origin in the CrawlURI so that other processors 562 // can make use of it. (Future: WARC) 563 if(useOriginFromIndex && 564 duplicate.get(DigestIndexer.FIELD_ORIGIN)!=null){ 565 // Index contains origin, use it. 566 annotation += ":\"" + duplicate.get(DigestIndexer.FIELD_ORIGIN) + "\""; // If 567 } else { 568 String tmp = getOrigin(); 569 // Check if an origin value is actually available 570 if(tmp != null && tmp.trim().length() > 0){ 571 // It is available, add it to the log line. 572 annotation += ":\"" + tmp + "\""; 573 } 574 } 575 } 576 // Make duplicate-note in crawl-log 577 curi.getAnnotations().add(annotation); 578 // Notify Heritrix that this is a revisit if we want revisit records to be written 579 if (getRevisitInWarcs()) { 580 curi.setRevisitProfile(duplicateRevisit); 581 } 582 583 /* TODO enable this when moving to indexing based on this data 584 // Add annotation to crawl.log 585 curi.getAnnotations().add(REVISIT_ANNOTATION_MARKER); 586 587 // Write extra logging information (needs to be enabled in CrawlerLoggerModule) 588 curi.addExtraInfo(EXTRA_REVISIT_PROFILE, duplicateRevisit.getProfileName()); 589 curi.addExtraInfo(EXTRA_REVISIT_URI, duplicateRevisit.getRefersToTargetURI()); 590 curi.addExtraInfo(EXTRA_REVISIT_DATE, duplicateRevisit.getRefersToDate()); 591 */ 592 593 } 594 if(getAnalyzeTimestamp()){ 595 doAnalysis(curi,currHostStats, duplicate!=null); 596 } 597 return processResult; 598 } 599 600 /** 601 * Process a CrawlURI looking up in the index by URL 602 * 603 * @param curi The CrawlURI to process 604 * @param currHostStats A statistics object for the current host. If per host statistics tracking is enabled this 605 * must be non null and the method will increment appropriate counters on it. 606 * @return The result of the lookup (a Lucene document). If a duplicate is not found null is returned. 607 */ 608 protected Document lookupByURL(CrawlURI curi, Statistics currHostStats) { 609 // Look the CrawlURI's URL up in the index. 610 try { 611 Query query = queryField(DigestIndexer.FIELD_URL, curi.toString()); 612 AllDocsCollector collectAllCollector = new AllDocsCollector(); 613 indexSearcher.search(query, collectAllCollector); 614 615 List<ScoreDoc> hits = collectAllCollector.getHits(); 616 Document doc = null; 617 String currentDigest = getDigestAsString(curi); 618 if (hits != null && hits.size() > 0) { 619 // Typically there should only be one it, but we'll allow for 620 // multiple hits. 621 for (ScoreDoc hit : hits) { 622 // for(int i=0 ; i < hits.size() ; i++){ 623 // Multiple hits on same exact URL should be rare 624 // See if any have matching content digests 625 int docId = hit.doc; 626 doc = indexSearcher.doc(docId); 627 String oldDigest = doc.get(DigestIndexer.FIELD_DIGEST); 628 629 if (oldDigest.equalsIgnoreCase(currentDigest)) { 630 stats.exactURLDuplicates++; 631 if (statsPerHost) { 632 currHostStats.exactURLDuplicates++; 633 } 634 635 logger.finest("Found exact match for " + curi.toString()); 636 637 // If we found a hit, no need to look at other hits. 638 return doc; 639 } 640 } 641 } 642 if (getTryEquivalent()) { 643 // No exact hits. Let's try lenient matching. 644 String normalizedURL = DigestIndexer.stripURL(curi.toString()); 645 query = queryField(DigestIndexer.FIELD_URL_NORMALIZED, normalizedURL); 646 collectAllCollector.reset(); // reset collector 647 indexSearcher.search(query, collectAllCollector); 648 hits = collectAllCollector.getHits(); 649 650 for (ScoreDoc hit : hits) { 651 // int i=0 ; i < hits.length ; i++){ 652 653 int docId = hit.doc; 654 Document doc1 = indexSearcher.doc(docId); 655 String indexDigest = doc1.get(DigestIndexer.FIELD_DIGEST); 656 if (indexDigest.equals(currentDigest)) { 657 // Make note in log 658 String equivURL = doc1.get(DigestIndexer.FIELD_URL); 659 curi.getAnnotations().add("equivalentURL:\"" + equivURL + "\""); 660 // Increment statistics counters 661 stats.equivalentURLDuplicates++; 662 if (statsPerHost) { 663 currHostStats.equivalentURLDuplicates++; 664 } 665 logger.finest("Found equivalent match for " + curi.toString() + ". Normalized: " 666 + normalizedURL + ". Equivalent to: " + equivURL); 667 668 // If we found a hit, no need to look at more. 669 return doc1; 670 } 671 } 672 } 673 } catch (IOException e) { 674 logger.log(Level.SEVERE, "Error accessing index.", e); 675 } 676 // If we make it here then this is not a duplicate. 677 return null; 678 } 679 680 /** 681 * Process a CrawlURI looking up in the index by content digest 682 * 683 * @param curi The CrawlURI to process 684 * @param currHostStats A statistics object for the current host. If per host statistics tracking is enabled this 685 * must be non null and the method will increment appropriate counters on it. 686 * @return The result of the lookup (a Lucene document). If a duplicate is not found null is returned. 687 */ 688 protected Document lookupByDigest(CrawlURI curi, Statistics currHostStats) { 689 Document duplicate = null; 690 String currentDigest = null; 691 Object digest = curi.getContentDigest(); 692 if (digest != null) { 693 currentDigest = Base32.encode((byte[]) digest); 694 } else { 695 logger.warning("Digest received from CrawlURI is null. Null Document returned"); 696 return null; 697 } 698 699 Query query = queryField(DigestIndexer.FIELD_DIGEST, currentDigest); 700 try { 701 AllDocsCollector collectAllCollector = new AllDocsCollector(); 702 indexSearcher.search(query, collectAllCollector); 703 704 List<ScoreDoc> hits = collectAllCollector.getHits(); 705 706 StringBuffer mirrors = new StringBuffer(); 707 mirrors.append("mirrors: "); 708 if (hits != null && hits.size() > 0) { 709 // Can definitely be more then one 710 // Note: We may find an equivalent match before we find an 711 // (existing) exact match. 712 // TODO: Ensure that an exact match is recorded if it exists. 713 Iterator<ScoreDoc> hitsIterator = hits.iterator(); 714 while (hitsIterator.hasNext() && duplicate == null) { 715 ScoreDoc hit = hitsIterator.next(); 716 int docId = hit.doc; 717 Document doc = indexSearcher.doc(docId); 718 String indexURL = doc.get(DigestIndexer.FIELD_URL); 719 // See if the current hit is an exact match. 720 if (curi.toString().equals(indexURL)) { 721 duplicate = doc; 722 stats.exactURLDuplicates++; 723 if (statsPerHost) { 724 currHostStats.exactURLDuplicates++; 725 } 726 logger.finest("Found exact match for " + curi.toString()); 727 } 728 729 // If not, then check if it is an equivalent match (if 730 // equivalent matches are allowed). 731 if (duplicate == null && getTryEquivalent()) { 732 String normalURL = DigestIndexer.stripURL(curi.toString()); 733 String indexNormalURL = doc.get(DigestIndexer.FIELD_URL_NORMALIZED); 734 if (normalURL.equals(indexNormalURL)) { 735 duplicate = doc; 736 stats.equivalentURLDuplicates++; 737 if (statsPerHost) { 738 currHostStats.equivalentURLDuplicates++; 739 } 740 curi.getAnnotations().add("equivalentURL:\"" + indexURL + "\""); 741 logger.finest("Found equivalent match for " + curi.toString() + ". Normalized: " 742 + normalURL + ". Equivalent to: " + indexURL); 743 } 744 } 745 746 if (duplicate == null) { 747 // Will only be used if no exact (or equivalent) match 748 // is found. 749 mirrors.append(indexURL + " "); 750 } 751 } 752 if (duplicate == null) { 753 stats.mirrorNumber++; 754 if (statsPerHost) { 755 currHostStats.mirrorNumber++; 756 } 757 logger.log(Level.FINEST, "Found mirror URLs for " + curi.toString() + ". " + mirrors); 758 } 759 } 760 } catch (IOException e) { 761 logger.log(Level.SEVERE, "Error accessing index.", e); 762 } 763 return duplicate; 764 } 765 766 public String report() { 767 StringBuffer ret = new StringBuffer(); 768 ret.append("Processor: is.hi.bok.digest.DeDuplicator\n"); 769 ret.append(" Function: Abort processing of duplicate records\n"); 770 if (!getEnabled()) { 771 ret.append("Processor is disabled by configuration"); 772 ret.append("\n"); 773 return ret.toString(); 774 } 775 ret.append(" - Lookup by " + 776 (lookupByURL?"url":"digest") + " in use\n"); 777 ret.append(" Total handled: " + stats.handledNumber + "\n"); 778 ret.append(" Duplicates found: " + stats.duplicateNumber + " " + 779 getPercentage(stats.duplicateNumber,stats.handledNumber) + "\n"); 780 ret.append(" Bytes total: " + stats.totalAmount + " (" + 781 ArchiveUtils.formatBytesForDisplay(stats.totalAmount) + ")\n"); 782 ret.append(" Bytes discarded: " + stats.duplicateAmount + " (" + 783 ArchiveUtils.formatBytesForDisplay(stats.duplicateAmount) + ") " + 784 getPercentage(stats.duplicateAmount, stats.totalAmount) + "\n"); 785 786 ret.append(" New (no hits): " + (stats.handledNumber- 787 (stats.mirrorNumber+stats.exactURLDuplicates+stats.equivalentURLDuplicates)) + "\n"); 788 ret.append(" Exact hits: " + stats.exactURLDuplicates + "\n"); 789 ret.append(" Equivalent hits: " + stats.equivalentURLDuplicates + "\n"); 790 if(lookupByURL==false){ 791 ret.append(" Mirror hits: " + stats.mirrorNumber + "\n"); 792 } 793 794 if(getAnalyzeTimestamp()){ 795 ret.append(" Timestamp predicts: (Where exact URL existed in the index)\n"); 796 ret.append(" Change correctly: " + stats.timestampChangeCorrect + "\n"); 797 ret.append(" Change falsely: " + stats.timestampChangeFalse + "\n"); 798 ret.append(" Non-change correct:" + stats.timestampNoChangeCorrect + "\n"); 799 ret.append(" Non-change falsely: " + stats.timestampNoChangeFalse + "\n"); 800 ret.append(" Missing timpestamp:" + stats.timestampMissing + "\n"); 801 802 } 803 804 if(statsPerHost){ 805 ret.append(" [Host] [total] [duplicates] [bytes] " + 806 "[bytes discarded] [new] [exact] [equiv]"); 807 if(lookupByURL==false){ 808 ret.append(" [mirror]"); 809 } 810 if(getAnalyzeTimestamp()){ 811 ret.append(" [change correct] [change falsely]"); 812 ret.append(" [non-change correct] [non-change falsely]"); 813 ret.append(" [no timestamp]"); 814 } 815 ret.append("\n"); 816 synchronized (perHostStats) { 817 Iterator<String> it = perHostStats.keySet().iterator(); 818 while(it.hasNext()){ 819 String key = it.next(); 820 Statistics curr = perHostStats.get(key); 821 ret.append(" " +key); 822 ret.append(" "); 823 ret.append(curr.handledNumber); 824 ret.append(" "); 825 ret.append(curr.duplicateNumber); 826 ret.append(" "); 827 ret.append(curr.totalAmount); 828 ret.append(" "); 829 ret.append(curr.duplicateAmount); 830 ret.append(" "); 831 ret.append(curr.handledNumber- 832 (curr.mirrorNumber+ 833 curr.exactURLDuplicates+ 834 curr.equivalentURLDuplicates)); 835 ret.append(" "); 836 ret.append(curr.exactURLDuplicates); 837 ret.append(" "); 838 ret.append(curr.equivalentURLDuplicates); 839 840 if(lookupByURL==false){ 841 ret.append(" "); 842 ret.append(curr.mirrorNumber); 843 } 844 if(getAnalyzeTimestamp()){ 845 ret.append(" "); 846 ret.append(curr.timestampChangeCorrect); 847 ret.append(" "); 848 ret.append(curr.timestampChangeFalse); 849 ret.append(" "); 850 ret.append(curr.timestampNoChangeCorrect); 851 ret.append(" "); 852 ret.append(curr.timestampNoChangeFalse); 853 ret.append(" "); 854 ret.append(curr.timestampMissing); 855 } 856 ret.append("\n"); 857 } 858 } 859 } 860 861 ret.append("\n"); 862 return ret.toString(); 863 } 864 865 protected static String getPercentage(double portion, double total){ 866 double value = portion / total; 867 value = value*100; 868 String ret = Double.toString(value); 869 int dot = ret.indexOf('.'); 870 if(dot+3<ret.length()){ 871 ret = ret.substring(0,dot+3); 872 } 873 return ret + "%"; 874 } 875 876 private static String getDigestAsString(CrawlURI curi){ 877 // The CrawlURI now has a method for this. For backwards 878 // compatibility with older Heritrix versions that is not used. 879 Object digest = curi.getContentDigest(); 880 if (digest != null) { 881 return Base32.encode((byte[])digest); 882 } 883 return null; 884 } 885 886 887 protected void doAnalysis(CrawlURI curi, Statistics currHostStats, 888 boolean isDuplicate) { 889 try{ 890 Query query = queryField(DigestIndexer.FIELD_URL, curi.toString()); 891 AllDocsCollector collectAllCollector = new AllDocsCollector(); 892 indexSearcher.search(query, collectAllCollector); 893 List<ScoreDoc> hits = collectAllCollector.getHits(); 894 895 Document doc = null; 896 897 if(hits != null && hits.size() > 0){ 898 // If there are multiple hits, use the one with the most 899 // recent date. 900 Document docToEval = null; 901 for (ScoreDoc hit : hits) { 902 int docId = hit.doc; 903 doc = indexSearcher.doc(docId); 904 // The format of the timestamp ("yyyyMMddHHmmssSSS") allows 905 // us to do a greater then (later) or lesser than (earlier) 906 // comparison of the strings. 907 String timestamp = doc.get(DigestIndexer.FIELD_TIMESTAMP); 908 if (docToEval == null || docToEval.get(DigestIndexer.FIELD_TIMESTAMP).compareTo(timestamp) > 0) { 909 // Found a more recent hit. 910 docToEval = doc; 911 } 912 } 913 doTimestampAnalysis(curi,docToEval, currHostStats, isDuplicate); 914 } 915 } catch(IOException e){ 916 logger.log(Level.SEVERE,"Error accessing index.",e); 917 } 918 } 919 920 921 protected void doTimestampAnalysis(CrawlURI curi, Document urlHit, 922 Statistics currHostStats, boolean isDuplicate){ 923 924 //HttpMethod method = curi.getHttpMethod(); 925 926 // Compare datestamps (last-modified versus the indexed date) 927 Date lastModified = null; 928 if (curi.getHttpResponseHeader("last-modified") != null) { 929 SimpleDateFormat sdf = 930 new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", 931 Locale.ENGLISH); 932 try { 933 lastModified = sdf.parse( 934 curi.getHttpResponseHeader("last-modified")); // .getValue() 935 } catch (ParseException e) { 936 logger.log(Level.INFO,"Exception parsing last modified of " + 937 curi.toString(),e); 938 return; 939 } 940 } else { 941 stats.timestampMissing++; 942 if (statsPerHost) { 943 currHostStats.timestampMissing++; 944 logger.finest("Missing timestamp on " + curi.toString()); 945 } 946 return; 947 } 948 949 SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS"); 950 Date lastFetch = null; 951 try { 952 lastFetch = sdf.parse( 953 urlHit.get(DigestIndexer.FIELD_TIMESTAMP)); 954 } catch (ParseException e) { 955 logger.log(Level.WARNING,"Exception parsing indexed date for " + 956 urlHit.get(DigestIndexer.FIELD_URL),e); 957 return; 958 } 959 960 if(lastModified.after(lastFetch)){ 961 // Header predicts change 962 if(isDuplicate){ 963 // But the DeDuplicator did not notice a change. 964 stats.timestampChangeFalse++; 965 if (statsPerHost){ 966 currHostStats.timestampChangeFalse++; 967 } 968 logger.finest("Last-modified falsly predicts change on " + 969 curi.toString()); 970 } else { 971 stats.timestampChangeCorrect++; 972 if (statsPerHost){ 973 currHostStats.timestampChangeCorrect++; 974 } 975 logger.finest("Last-modified correctly predicts change on " + 976 curi.toString()); 977 } 978 } else { 979 // Header does not predict change. 980 if(isDuplicate){ 981 // And the DeDuplicator verifies that no change had occurred 982 stats.timestampNoChangeCorrect++; 983 if (statsPerHost){ 984 currHostStats.timestampNoChangeCorrect++; 985 } 986 logger.finest("Last-modified correctly predicts no-change on " + 987 curi.toString()); 988 } else { 989 // As this is particularly bad we'll log the URL at INFO level 990 logger.log(Level.INFO,"Last-modified incorrectly indicated " + 991 "no-change on " + curi.toString() + " " + 992 curi.getContentType() + ". last-modified: " + 993 lastModified + ". Last fetched: " + lastFetch); 994 stats.timestampNoChangeFalse++; 995 if (statsPerHost){ 996 currHostStats.timestampNoChangeFalse++; 997 } 998 } 999 } 1000 1001 } 1002 1003 /** Run a simple Lucene query for a single term in a single field. 1004 * 1005 * @param fieldName name of the field to look in. 1006 * @param value The value to query for 1007 * @return A Query for the given value in the given field. 1008 */ 1009 protected Query queryField(String fieldName, String value) { 1010 Query query = null; 1011 1012 /** alternate solution. */ 1013 BytesRef valueRef = new BytesRef(value.getBytes()); 1014 query = new ConstantScoreQuery(new TermRangeFilter(fieldName, valueRef, valueRef, true, true)); 1015 1016 /** The most clean solution, but it seems also memory demanding */ 1017 // query = new ConstantScoreQuery(new FieldCacheTermsFilter(fieldName, 1018 // value)); 1019 return query; 1020 } 1021 1022} 1023 1024class Statistics{ 1025 // General statistics 1026 1027 /** Number of URIs that make it through the processors exclusion rules 1028 * and are processed by it. 1029 */ 1030 long handledNumber = 0; 1031 1032 /** Number of URIs that are deemed duplicates and further processing is 1033 * aborted 1034 */ 1035 long duplicateNumber = 0; 1036 1037 /** Then number of URIs that turned out to have exact URL and content 1038 * digest matches. 1039 */ 1040 long exactURLDuplicates = 0; 1041 1042 /** The number of URIs that turned out to have equivalent URL and content 1043 * digest matches. 1044 */ 1045 long equivalentURLDuplicates = 0; 1046 1047 /** The number of URIs that, while having no exact or equivalent matches, 1048 * do have exact content digest matches against non-equivalent URIs. 1049 */ 1050 long mirrorNumber = 0; 1051 1052 /** The total amount of data represented by the documents who were deemed 1053 * duplicates and excluded from further processing. 1054 */ 1055 long duplicateAmount = 0; 1056 1057 /** The total amount of data represented by all the documents processed **/ 1058 long totalAmount = 0; 1059 1060 // Timestamp analysis 1061 1062 long timestampChangeCorrect = 0; 1063 long timestampChangeFalse = 0; 1064 long timestampNoChangeCorrect = 0; 1065 long timestampNoChangeFalse = 0; 1066 long timestampMissing = 0; 1067 1068 // ETag analysis; 1069 1070 long ETagChangeCorrect = 0; 1071 long ETagChangeFalse = 0; 1072 long ETagNoChangeCorrect = 0; 1073 long ETagNoChangeFalse = 0; 1074 long ETagMissingIndex = 0; 1075 long ETagMissingCURI = 0; 1076} 1077