001/* DeDuplicator 002 * 003 * Created on 10.04.2006 004 * 005 * Copyright (C) 2006 National and University Library of Iceland 006 * 007 * This file is part of the DeDuplicator (Heritrix add-on module). 008 * 009 * DeDuplicator is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU Lesser Public License as published by 011 * the Free Software Foundation; either version 2.1 of the License, or 012 * any later version. 013 * 014 * DeDuplicator is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU Lesser Public License for more details. 018 * 019 * You should have received a copy of the GNU Lesser Public License 020 * along with DeDuplicator; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023package is.hi.bok.deduplicator; 024 025import java.io.File; 026import java.io.IOException; 027import java.text.ParseException; 028import java.text.SimpleDateFormat; 029import java.util.Date; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Locale; 034import java.util.logging.Level; 035import java.util.logging.Logger; 036 037import org.apache.commons.httpclient.HttpMethod; 038import org.apache.lucene.document.Document; 039import org.apache.lucene.index.DirectoryReader; 040import org.apache.lucene.index.IndexReader; 041import org.apache.lucene.search.ConstantScoreQuery; 042import org.apache.lucene.search.IndexSearcher; 043import org.apache.lucene.search.Query; 044import org.apache.lucene.search.ScoreDoc; 045import org.apache.lucene.search.TermRangeFilter; 046import org.apache.lucene.store.FSDirectory; 047import org.apache.lucene.util.BytesRef; 048import org.archive.crawler.datamodel.CoreAttributeConstants; 049import org.archive.crawler.datamodel.CrawlOrder; 050import org.archive.crawler.datamodel.CrawlURI; 051import org.archive.crawler.framework.Processor; 052import org.archive.crawler.frontier.AdaptiveRevisitAttributeConstants; 053import org.archive.crawler.settings.SimpleType; 054import org.archive.crawler.settings.Type; 055import org.archive.util.ArchiveUtils; 056import org.archive.util.Base32; 057 058import dk.netarkivet.common.utils.AllDocsCollector; 059 060/** 061 * Heritrix compatible processor. 062 * <p> 063 * Will abort the processing (skip to post processor chain) of CrawlURIs that are deemed <i>duplicates</i>. 064 * <p> 065 * Duplicate detection can only be performed <i>after</i> the fetch processors have run. 066 * 067 * @author Kristinn Sigurðsson 068 * @author Søren Vejrup Carlsen 069 */ 070public class DeDuplicator extends Processor implements AdaptiveRevisitAttributeConstants { 071 072 private static final long serialVersionUID = ArchiveUtils.classnameBasedUID(DeDuplicator.class, 1); 073 074 private static final Logger logger = Logger.getLogger(DeDuplicator.class.getName()); 075 076 protected IndexSearcher index = null; 077 protected IndexReader indexReader = null; 078 protected boolean lookupByURL = true; 079 protected boolean equivalent = DEFAULT_EQUIVALENT.booleanValue(); 080 protected String mimefilter = DEFAULT_MIME_FILTER; 081 protected boolean blacklist = true; 082 protected boolean doTimestampAnalysis = false; 083 protected boolean doETagAnalysis = false; 084 protected boolean statsPerHost = DEFAULT_STATS_PER_HOST.booleanValue(); 085 protected boolean changeContentSize = DEFAULT_CHANGE_CONTENT_SIZE.booleanValue(); 086 protected boolean useOrigin = false; 087 protected boolean useOriginFromIndex = false; 088 protected boolean useSparseRangeFilter = DEFAULT_USE_SPARSE_RANGE_FILTER; 089 090 protected Statistics stats = null; 091 protected HashMap<String, Statistics> perHostStats = null; 092 protected boolean skipWriting = DEFAULT_SKIP_WRITE.booleanValue(); 093 094 /* 095 * Configurable parameters - Index location - Matching mode (By URL (default) or By Content Digest) - Try equivalent 096 * matches - Mime filter - Filter mode (blacklist (default) or whitelist) - Analysis (None (default), Timestamp only 097 * or Timestamp and ETag) - Log level - Track per host stats - Origin - Skip writing 098 */ 099 /** Location of Lucene Index to use for lookups */ 100 public final static String ATTR_INDEX_LOCATION = "index-location"; 101 public final static String DEFAULT_INDEX_LOCATION = ""; 102 103 /** The matching method in use (by url or content digest) */ 104 public final static String ATTR_MATCHING_METHOD = "matching-method"; 105 public final static String[] AVAILABLE_MATCHING_METHODS = {"By URL", "By content digest"}; 106 public final static String DEFAULT_MATCHING_METHOD = AVAILABLE_MATCHING_METHODS[0]; 107 108 /** 109 * If an exact match is not made, should the processor try to find an equivalent match? 110 */ 111 public final static String ATTR_EQUIVALENT = "try-equivalent"; 112 public final static Boolean DEFAULT_EQUIVALENT = new Boolean(false); 113 114 /** 115 * The filter on mime types. This is either a blacklist or whitelist depending on ATTR_FILTER_MODE. 116 */ 117 public final static String ATTR_MIME_FILTER = "mime-filter"; 118 public final static String DEFAULT_MIME_FILTER = "^text/.*"; 119 120 /** 121 * Is the mime filter a blacklist (do not apply processor to what matches) or whitelist (apply processor only to 122 * what matches). 123 */ 124 public final static String ATTR_FILTER_MODE = "filter-mode"; 125 public final static String[] AVAILABLE_FILTER_MODES = {"Blacklist", "Whitelist"}; 126 public final static String DEFAULT_FILTER_MODE = AVAILABLE_FILTER_MODES[0]; 127 128 /** Set analysis mode. */ 129 public final static String ATTR_ANALYSIS_MODE = "analysis-mode"; 130 public final static String[] AVAILABLE_ANALYSIS_MODES = {"None", "Timestamp", "Timestamp and ETag"}; 131 public final static String DEFAULT_ANALYSIS_MODE = AVAILABLE_ANALYSIS_MODES[0]; 132 133 /** 134 * Should the content size information be set to zero when a duplicate is found? 135 */ 136 public final static String ATTR_CHANGE_CONTENT_SIZE = "change-content-size"; 137 public final static Boolean DEFAULT_CHANGE_CONTENT_SIZE = new Boolean(true); 138 139 /** What to write to a log file */ 140 public final static String ATTR_LOG_LEVEL = "log-level"; 141 public final static String[] AVAILABLE_LOG_LEVELS = {Level.SEVERE.toString(), Level.INFO.toString(), 142 Level.FINEST.toString()}; 143 public final static String DEFAULT_LOG_LEVEL = AVAILABLE_LOG_LEVELS[0]; 144 145 /** Should statistics be tracked per host? * */ 146 public final static String ATTR_STATS_PER_HOST = "stats-per-host"; 147 public final static Boolean DEFAULT_STATS_PER_HOST = new Boolean(false); 148 149 /** How should 'origin' be handled * */ 150 public final static String ATTR_ORIGIN_HANDLING = "origin-handling"; 151 public final static String ORIGIN_HANDLING_NONE = "No origin information"; 152 public final static String ORIGIN_HANDLING_PROCESSOR = "Use processor setting"; 153 public final static String ORIGIN_HANDLING_INDEX = "Use index information"; 154 public final static String[] AVAILABLE_ORIGIN_HANDLING = {ORIGIN_HANDLING_NONE, ORIGIN_HANDLING_PROCESSOR, 155 ORIGIN_HANDLING_INDEX}; 156 public final static String DEFAULT_ORIGIN_HANDLING = ORIGIN_HANDLING_NONE; 157 158 /** Origin of duplicate URLs * */ 159 public final static String ATTR_ORIGIN = "origin"; 160 public final static String DEFAULT_ORIGIN = ""; 161 162 /** Should the writer processor chain be skipped? * */ 163 public final static String ATTR_SKIP_WRITE = "skip-writing"; 164 public final static Boolean DEFAULT_SKIP_WRITE = new Boolean(true); 165 166 /** Should we use sparse queries (uses less memory at a cost to performance? * */ 167 public final static String ATTR_USE_SPARSE_RANGE_FILTER = "use-sparse-range-filter"; 168 public final static Boolean DEFAULT_USE_SPARSE_RANGE_FILTER = new Boolean(false); 169 170 public DeDuplicator(String name) { 171 super(name, "Aborts the processing of URIs (skips to post processing " 172 + "chain) if a duplicate is found in the specified index. " 173 + "Note that any changes made to this processors configuration " 174 + "at run time will be ignored unless otherwise stated."); 175 Type t = new SimpleType(ATTR_INDEX_LOCATION, "Location of index (full path). Can not be changed at run " 176 + "time.", DEFAULT_INDEX_LOCATION); 177 t.setOverrideable(false); 178 addElementToDefinition(t); 179 t = new SimpleType(ATTR_MATCHING_METHOD, "Select if we should lookup by URL " 180 + "or by content digest (counts mirror matches).", DEFAULT_MATCHING_METHOD, AVAILABLE_MATCHING_METHODS); 181 t.setOverrideable(false); 182 addElementToDefinition(t); 183 t = new SimpleType(ATTR_EQUIVALENT, "If an exact match of URI and content digest is not found " 184 + "then an equivalent URI (i.e. one with any www[0-9]*, " 185 + "trailing slashes and parameters removed) can be checked. " 186 + "If an equivalent URI has an identical content digest then " 187 + "enabling this feature will cause the processor to consider " 188 + "this a duplicate. Equivalent matches are noted in the " 189 + "crawl log and their number is tracked seperately.", DEFAULT_EQUIVALENT); 190 t.setOverrideable(false); 191 addElementToDefinition(t); 192 t = new SimpleType(ATTR_MIME_FILTER, "A regular expression that the mimetype of all documents " 193 + "will be compared against. \nIf the attribute filter-mode is " 194 + "set to 'Blacklist' then all the documents whose mimetype " 195 + "matches will be ignored by this processor. If the filter-" 196 + "mode is set to 'Whitelist' only those documents whose " + "mimetype matches will be processed.", 197 DEFAULT_MIME_FILTER); 198 t.setOverrideable(false); 199 t.setExpertSetting(true); 200 addElementToDefinition(t); 201 t = new SimpleType(ATTR_FILTER_MODE, "Determines if the mime-filter acts as a blacklist (declares " 202 + "what should be ignored) or whitelist (declares what should " + "be processed).", 203 DEFAULT_FILTER_MODE, AVAILABLE_FILTER_MODES); 204 t.setOverrideable(false); 205 t.setExpertSetting(true); 206 addElementToDefinition(t); 207 t = new SimpleType(ATTR_ANALYSIS_MODE, "If enabled, the processor can analyse the timestamp (last-" 208 + "modified) and ETag info of the HTTP headers and compare " 209 + "their predictions as to whether or not the document had " 210 + "changed against the result of the index lookup. This is " 211 + "ONLY for the purpose of gathering statistics about the " 212 + "usefulness and accuracy of the HTTP header information in " 213 + "question and has no effect on the processing of documents. " + "Analysis is only possible if " 214 + "the relevant data was included in the index.", DEFAULT_ANALYSIS_MODE, AVAILABLE_ANALYSIS_MODES); 215 t.setOverrideable(false); 216 t.setExpertSetting(true); 217 addElementToDefinition(t); 218 219 t = new SimpleType(ATTR_LOG_LEVEL, "Adjust the verbosity of the processor. By default, it only " 220 + "reports serious (Java runtime) errors. " + "By setting the log level " 221 + "higher, various additional data can be logged. " 222 + "* Serious - Default logging level, only serious errors. " 223 + "Note that it is possible that a more permissive default " 224 + "logging level has been set via the heritrix.properties " 225 + "file. This setting (severe) will not affect that.\n" 226 + "* Info - Records some anomalies. Such as the information " 227 + "on URIs that the HTTP header info falsely predicts " + "no-change on.\n" 228 + "* Finest - Full logging of all URIs processed. For " + "debugging purposes only!", 229 DEFAULT_LOG_LEVEL, AVAILABLE_LOG_LEVELS); 230 t.setOverrideable(false); 231 t.setExpertSetting(true); 232 addElementToDefinition(t); 233 t = new SimpleType(ATTR_STATS_PER_HOST, "If enabled the processor will keep track of the number of " 234 + "processed uris, duplicates found etc. per host. The listing " 235 + "will be added to the processor report (not the host-report).", DEFAULT_STATS_PER_HOST); 236 t.setOverrideable(false); 237 t.setExpertSetting(true); 238 addElementToDefinition(t); 239 t = new SimpleType(ATTR_CHANGE_CONTENT_SIZE, "If set to true then the processor will set the content size " 240 + "of the CrawlURI to zero when a duplicate is discovered. ", DEFAULT_CHANGE_CONTENT_SIZE); 241 t.setOverrideable(false); 242 addElementToDefinition(t); 243 244 t = new SimpleType(ATTR_ORIGIN_HANDLING, "The origin of duplicate URLs can be handled a few different " 245 + "ways. It is important to note that the 'origin' information " 246 + "is malleable and may be anything from a ARC name and offset " 247 + "to a simple ID of a particular crawl. It is entirely at the " + "operators discretion.\n " 248 + ORIGIN_HANDLING_NONE + " - No origin information is " + "associated with the URLs.\n " 249 + ORIGIN_HANDLING_PROCESSOR + " - Duplicate URLs are all given " 250 + "the same origin, specified by the 'origin' setting of this " + "processor.\n " 251 + ORIGIN_HANDLING_INDEX + " - The origin of each duplicate URL " 252 + "is read from the index. If the index does not contain any " 253 + "origin information for an URL, the processor setting is " + "used as a fallback!", 254 DEFAULT_ORIGIN_HANDLING, AVAILABLE_ORIGIN_HANDLING); 255 t.setOverrideable(false); 256 addElementToDefinition(t); 257 258 t = new SimpleType(ATTR_ORIGIN, "The origin of duplicate URLs.", DEFAULT_ORIGIN); 259 addElementToDefinition(t); 260 261 t = new SimpleType(ATTR_SKIP_WRITE, "If set to true, then processing of duplicate URIs will be " 262 + "skipped directly to the post processing chain. If false, " 263 + "processing of duplicates will skip directly to the writer " 264 + "chain that precedes the post processing chain.", DEFAULT_SKIP_WRITE); 265 t.setOverrideable(true); 266 addElementToDefinition(t); 267 268 t = new SimpleType(ATTR_USE_SPARSE_RANGE_FILTER, "If set to true, then Lucene queries use a custom 'sparse' " 269 + "range filter. This uses less memory at the cost of some " 270 + "lost performance. Suitable for very large indexes.", DEFAULT_USE_SPARSE_RANGE_FILTER); 271 t.setOverrideable(false); 272 t.setExpertSetting(true); 273 addElementToDefinition(t); 274 } 275 276 /* 277 * (non-Javadoc) 278 * 279 * @see org.archive.crawler.framework.Processor#initialTasks() 280 */ 281 @Override 282 protected void initialTasks() { 283 // Read settings and set appropriate class variables. 284 285 // Index location 286 String indexLocation = (String) readAttribute(ATTR_INDEX_LOCATION, ""); 287 try { 288 FSDirectory indexDir = FSDirectory.open(new File(indexLocation)); 289 // https://issues.apache.org/jira/browse/LUCENE-1566 290 // Reduce chunksize to avoid OOM to half the size of the default (=100 MB) 291 int chunksize = indexDir.getReadChunkSize(); 292 indexDir.setReadChunkSize(chunksize / 2); 293 IndexReader reader = DirectoryReader.open(indexDir); 294 index = new IndexSearcher(reader); 295 } catch (Exception e) { 296 logger.log(Level.SEVERE, "Unable to find/open index.", e); 297 } 298 299 // Matching method 300 String matchingMethod = (String) readAttribute(ATTR_MATCHING_METHOD, DEFAULT_MATCHING_METHOD); 301 lookupByURL = matchingMethod.equals(DEFAULT_MATCHING_METHOD); 302 303 // Try equivalent matches 304 equivalent = ((Boolean) readAttribute(ATTR_EQUIVALENT, DEFAULT_EQUIVALENT)).booleanValue(); 305 306 // Mime filter 307 mimefilter = (String) readAttribute(ATTR_MIME_FILTER, DEFAULT_MIME_FILTER); 308 309 // Filter mode (blacklist (default) or whitelist) 310 blacklist = ((String) readAttribute(ATTR_FILTER_MODE, DEFAULT_FILTER_MODE)).equals(DEFAULT_FILTER_MODE); 311 312 // Analysis (None (default), Timestamp only or Timestamp and ETag) 313 String analysisMode = (String) readAttribute(ATTR_ANALYSIS_MODE, DEFAULT_ANALYSIS_MODE); 314 if (analysisMode.equals(AVAILABLE_ANALYSIS_MODES[1])) { 315 // Timestamp only 316 doTimestampAnalysis = true; 317 } else if (analysisMode.equals(AVAILABLE_ANALYSIS_MODES[2])) { 318 // Both timestamp and ETag 319 doTimestampAnalysis = true; 320 doETagAnalysis = true; 321 } 322 323 // Log file/level 324 String lev = (String) readAttribute(ATTR_LOG_LEVEL, DEFAULT_LOG_LEVEL); 325 if (lev.equals(Level.FINEST.toString())) { 326 logger.setLevel(Level.FINEST); 327 } else if (lev.equals(Level.INFO.toString())) { 328 logger.setLevel(Level.INFO); 329 } // Severe effectively means default level. 330 331 // Track per host stats 332 statsPerHost = ((Boolean) readAttribute(ATTR_STATS_PER_HOST, DEFAULT_STATS_PER_HOST)).booleanValue(); 333 334 // Change content size 335 changeContentSize = ((Boolean) readAttribute(ATTR_CHANGE_CONTENT_SIZE, DEFAULT_CHANGE_CONTENT_SIZE)) 336 .booleanValue(); 337 338 // Origin handling. 339 String originHandling = (String) readAttribute(ATTR_ORIGIN_HANDLING, DEFAULT_ORIGIN_HANDLING); 340 if (originHandling.equals(ORIGIN_HANDLING_NONE) == false) { 341 useOrigin = true; 342 if (originHandling.equals(ORIGIN_HANDLING_INDEX)) { 343 useOriginFromIndex = true; 344 } 345 } 346 347 // Range Filter type 348 useSparseRangeFilter = ((Boolean) readAttribute(ATTR_USE_SPARSE_RANGE_FILTER, DEFAULT_USE_SPARSE_RANGE_FILTER)) 349 .booleanValue(); 350 351 // Initialize some internal variables: 352 stats = new Statistics(); 353 if (statsPerHost) { 354 perHostStats = new HashMap<String, Statistics>(); 355 } 356 } 357 358 /** 359 * A utility method for reading attributes. If not found, an error is logged and the defaultValue is returned. 360 * 361 * @param name The name of the attribute 362 * @param defaultValue A default value to return if an error occurs 363 * @return The value of the attribute or the default value if an error occurs 364 */ 365 protected Object readAttribute(String name, Object defaultValue) { 366 try { 367 return getAttribute(name); 368 } catch (Exception e) { 369 logger.log(Level.SEVERE, "Unable read " + name + " attribute", e); 370 return defaultValue; 371 } 372 } 373 374 protected void innerProcess(CrawlURI curi) throws InterruptedException { 375 if (curi.isSuccess() == false) { 376 // Early return. No point in doing comparison on failed downloads. 377 logger.finest("Not handling " + curi.toString() + ", did not succeed."); 378 return; 379 } 380 if (curi.isPrerequisite()) { 381 // Early return. Prerequisites are exempt from checking. 382 logger.finest("Not handling " + curi.toString() + ", prerequisite."); 383 return; 384 } 385 if (curi.isSuccess() == false || curi.isPrerequisite() || curi.toString().startsWith("http") == false) { 386 // Early return. Non-http documents are not handled at present 387 logger.finest("Not handling " + curi.toString() + ", non-http."); 388 return; 389 } 390 if (curi.getContentType() == null) { 391 // No content type means we can not handle it. 392 logger.finest("Not handling " + curi.toString() + ", missing content (mime) type"); 393 return; 394 } 395 if (curi.getContentType().matches(mimefilter) == blacklist) { 396 // Early return. Does not pass the mime filter 397 logger.finest("Not handling " + curi.toString() + ", excluded by mimefilter (" + curi.getContentType() 398 + ")."); 399 return; 400 } 401 if (curi.containsKey(A_CONTENT_STATE_KEY) && curi.getInt(A_CONTENT_STATE_KEY) == CONTENT_UNCHANGED) { 402 // Early return. A previous processor or filter has judged this 403 // CrawlURI as having unchanged content. 404 logger.finest("Not handling " + curi.toString() + ", already flagged as unchanged."); 405 return; 406 } 407 logger.finest("Processing " + curi.toString() + "(" + curi.getContentType() + ")"); 408 409 stats.handledNumber++; 410 stats.totalAmount += curi.getContentSize(); 411 Statistics currHostStats = null; 412 if (statsPerHost) { 413 synchronized (perHostStats) { 414 String host = getController().getServerCache().getHostFor(curi).getHostName(); 415 currHostStats = perHostStats.get(host); 416 if (currHostStats == null) { 417 currHostStats = new Statistics(); 418 perHostStats.put(host, currHostStats); 419 } 420 } 421 currHostStats.handledNumber++; 422 currHostStats.totalAmount += curi.getContentSize(); 423 } 424 425 Document duplicate = null; 426 427 if (lookupByURL) { 428 duplicate = lookupByURL(curi, currHostStats); 429 } else { 430 duplicate = lookupByDigest(curi, currHostStats); 431 } 432 433 if (duplicate != null) { 434 // Perform tasks common to when a duplicate is found. 435 // Increment statistics counters 436 stats.duplicateAmount += curi.getContentSize(); 437 stats.duplicateNumber++; 438 if (statsPerHost) { 439 currHostStats.duplicateAmount += curi.getContentSize(); 440 currHostStats.duplicateNumber++; 441 } 442 // Duplicate. Abort further processing of URI. 443 if (((Boolean) readAttribute(ATTR_SKIP_WRITE, DEFAULT_SKIP_WRITE)).booleanValue()) { 444 // Skip writing, go directly to post processing chain 445 curi.skipToProcessorChain(getController().getPostprocessorChain()); 446 } else { 447 // Do not skip writing, go to writer processors 448 curi.skipToProcessorChain(getController().getProcessorChainList().getProcessorChain( 449 CrawlOrder.ATTR_WRITE_PROCESSORS)); 450 } 451 452 // Record origin? 453 String annotation = "duplicate"; 454 if (useOrigin) { 455 // TODO: Save origin in the CrawlURI so that other processors 456 // can make use of it. (Future: WARC) 457 if (useOriginFromIndex && duplicate.get(DigestIndexer.FIELD_ORIGIN) != null) { 458 // Index contains origin, use it. 459 annotation += ":\"" + duplicate.get(DigestIndexer.FIELD_ORIGIN) + "\""; 460 } else { 461 String tmp = (String) getUncheckedAttribute(curi, ATTR_ORIGIN); 462 // Check if an origin value is actually available 463 if (tmp != null && tmp.trim().length() > 0) { 464 // It is available, add it to the log line. 465 annotation += ":\"" + tmp + "\""; 466 } 467 } 468 } 469 // Make note in log 470 curi.addAnnotation(annotation); 471 472 if (changeContentSize) { 473 // Set content size to zero, we are not planning to 474 // 'write it to disk' 475 // TODO: Reconsider this 476 curi.setContentSize(0); 477 } 478 // Mark as duplicate for other processors 479 curi.putInt(A_CONTENT_STATE_KEY, CONTENT_UNCHANGED); 480 } 481 482 if (doTimestampAnalysis) { 483 doAnalysis(curi, currHostStats, duplicate != null); 484 } 485 } 486 487 /** 488 * Process a CrawlURI looking up in the index by URL 489 * 490 * @param curi The CrawlURI to process 491 * @param currHostStats A statistics object for the current host. If per host statistics tracking is enabled this 492 * must be non null and the method will increment appropriate counters on it. 493 * @return The result of the lookup (a Lucene document). If a duplicate is not found null is returned. 494 */ 495 protected Document lookupByURL(CrawlURI curi, Statistics currHostStats) { 496 // Look the CrawlURI's URL up in the index. 497 try { 498 Query query = queryField(DigestIndexer.FIELD_URL, curi.toString()); 499 AllDocsCollector collectAllCollector = new AllDocsCollector(); 500 index.search(query, collectAllCollector); 501 502 List<ScoreDoc> hits = collectAllCollector.getHits(); 503 Document doc = null; 504 String currentDigest = getDigestAsString(curi); 505 if (hits != null && hits.size() > 0) { 506 // Typically there should only be one it, but we'll allow for 507 // multiple hits. 508 for (ScoreDoc hit : hits) { 509 // for(int i=0 ; i < hits.size() ; i++){ 510 // Multiple hits on same exact URL should be rare 511 // See if any have matching content digests 512 int docId = hit.doc; 513 doc = index.doc(docId); 514 String oldDigest = doc.get(DigestIndexer.FIELD_DIGEST); 515 516 if (oldDigest.equalsIgnoreCase(currentDigest)) { 517 stats.exactURLDuplicates++; 518 if (statsPerHost) { 519 currHostStats.exactURLDuplicates++; 520 } 521 522 logger.finest("Found exact match for " + curi.toString()); 523 524 // If we found a hit, no need to look at other hits. 525 return doc; 526 } 527 } 528 } 529 if (equivalent) { 530 // No exact hits. Let's try lenient matching. 531 String normalizedURL = DigestIndexer.stripURL(curi.toString()); 532 query = queryField(DigestIndexer.FIELD_URL_NORMALIZED, normalizedURL); 533 collectAllCollector.reset(); // reset collector 534 index.search(query, collectAllCollector); 535 hits = collectAllCollector.getHits(); 536 537 for (ScoreDoc hit : hits) { 538 // int i=0 ; i < hits.length ; i++){ 539 540 int docId = hit.doc; 541 Document doc1 = index.doc(docId); 542 String indexDigest = doc1.get(DigestIndexer.FIELD_DIGEST); 543 if (indexDigest.equals(currentDigest)) { 544 // Make note in log 545 String equivURL = doc1.get(DigestIndexer.FIELD_URL); 546 curi.addAnnotation("equivalent to " + equivURL); 547 // Increment statistics counters 548 stats.equivalentURLDuplicates++; 549 if (statsPerHost) { 550 currHostStats.equivalentURLDuplicates++; 551 } 552 logger.finest("Found equivalent match for " + curi.toString() + ". Normalized: " 553 + normalizedURL + ". Equivalent to: " + equivURL); 554 555 // If we found a hit, no need to look at more. 556 return doc1; 557 } 558 } 559 } 560 } catch (IOException e) { 561 logger.log(Level.SEVERE, "Error accessing index.", e); 562 } 563 // If we make it here then this is not a duplicate. 564 return null; 565 } 566 567 /** 568 * Process a CrawlURI looking up in the index by content digest 569 * 570 * @param curi The CrawlURI to process 571 * @param currHostStats A statistics object for the current host. If per host statistics tracking is enabled this 572 * must be non null and the method will increment appropriate counters on it. 573 * @return The result of the lookup (a Lucene document). If a duplicate is not found null is returned. 574 */ 575 protected Document lookupByDigest(CrawlURI curi, Statistics currHostStats) { 576 Document duplicate = null; 577 String currentDigest = null; 578 Object digest = curi.getContentDigest(); 579 if (digest != null) { 580 currentDigest = Base32.encode((byte[]) digest); 581 } else { 582 logger.warning("Digest received from CrawlURI is null. Null Document returned"); 583 return null; 584 } 585 586 Query query = queryField(DigestIndexer.FIELD_DIGEST, currentDigest); 587 try { 588 AllDocsCollector collectAllCollector = new AllDocsCollector(); 589 index.search(query, collectAllCollector); 590 591 List<ScoreDoc> hits = collectAllCollector.getHits(); 592 593 StringBuffer mirrors = new StringBuffer(); 594 mirrors.append("mirrors: "); 595 if (hits != null && hits.size() > 0) { 596 // Can definitely be more then one 597 // Note: We may find an equivalent match before we find an 598 // (existing) exact match. 599 // TODO: Ensure that an exact match is recorded if it exists. 600 Iterator<ScoreDoc> hitsIterator = hits.iterator(); 601 while (hitsIterator.hasNext() && duplicate == null) { 602 ScoreDoc hit = hitsIterator.next(); 603 int docId = hit.doc; 604 Document doc = index.doc(docId); 605 String indexURL = doc.get(DigestIndexer.FIELD_URL); 606 // See if the current hit is an exact match. 607 if (curi.toString().equals(indexURL)) { 608 duplicate = doc; 609 stats.exactURLDuplicates++; 610 if (statsPerHost) { 611 currHostStats.exactURLDuplicates++; 612 } 613 logger.finest("Found exact match for " + curi.toString()); 614 } 615 616 // If not, then check if it is an equivalent match (if 617 // equivalent matches are allowed). 618 if (duplicate == null && equivalent) { 619 String normalURL = DigestIndexer.stripURL(curi.toString()); 620 String indexNormalURL = doc.get(DigestIndexer.FIELD_URL_NORMALIZED); 621 if (normalURL.equals(indexNormalURL)) { 622 duplicate = doc; 623 stats.equivalentURLDuplicates++; 624 if (statsPerHost) { 625 currHostStats.equivalentURLDuplicates++; 626 } 627 curi.addAnnotation("equivalent to " + indexURL); 628 logger.finest("Found equivalent match for " + curi.toString() + ". Normalized: " 629 + normalURL + ". Equivalent to: " + indexURL); 630 } 631 } 632 633 if (duplicate == null) { 634 // Will only be used if no exact (or equivalent) match 635 // is found. 636 mirrors.append(indexURL + " "); 637 } 638 } 639 if (duplicate == null) { 640 stats.mirrorNumber++; 641 if (statsPerHost) { 642 currHostStats.mirrorNumber++; 643 } 644 logger.log(Level.FINEST, "Found mirror URLs for " + curi.toString() + ". " + mirrors); 645 } 646 } 647 } catch (IOException e) { 648 logger.log(Level.SEVERE, "Error accessing index.", e); 649 } 650 return duplicate; 651 } 652 653 public String report() { 654 StringBuffer ret = new StringBuffer(); 655 ret.append("Processor: is.hi.bok.digest.DeDuplicator\n"); 656 ret.append(" Function: Abort processing of duplicate records\n"); 657 ret.append(" - Lookup by " + (lookupByURL ? "url" : "digest") + " in use\n"); 658 ret.append(" Total handled: " + stats.handledNumber + "\n"); 659 ret.append(" Duplicates found: " + stats.duplicateNumber + " " 660 + getPercentage(stats.duplicateNumber, stats.handledNumber) + "\n"); 661 ret.append(" Bytes total: " + stats.totalAmount + " (" 662 + ArchiveUtils.formatBytesForDisplay(stats.totalAmount) + ")\n"); 663 ret.append(" Bytes discarded: " + stats.duplicateAmount + " (" 664 + ArchiveUtils.formatBytesForDisplay(stats.duplicateAmount) + ") " 665 + getPercentage(stats.duplicateAmount, stats.totalAmount) + "\n"); 666 667 ret.append(" New (no hits): " 668 + (stats.handledNumber - (stats.mirrorNumber + stats.exactURLDuplicates + stats.equivalentURLDuplicates)) 669 + "\n"); 670 ret.append(" Exact hits: " + stats.exactURLDuplicates + "\n"); 671 ret.append(" Equivalent hits: " + stats.equivalentURLDuplicates + "\n"); 672 if (lookupByURL == false) { 673 ret.append(" Mirror hits: " + stats.mirrorNumber + "\n"); 674 } 675 676 if (doTimestampAnalysis) { 677 ret.append(" Timestamp predicts: (Where exact URL existed in the index)\n"); 678 ret.append(" Change correctly: " + stats.timestampChangeCorrect + "\n"); 679 ret.append(" Change falsly: " + stats.timestampChangeFalse + "\n"); 680 ret.append(" Non-change correct:" + stats.timestampNoChangeCorrect + "\n"); 681 ret.append(" Non-change falsly: " + stats.timestampNoChangeFalse + "\n"); 682 ret.append(" Missing timpestamp:" + stats.timestampMissing + "\n"); 683 684 } 685 686 if (statsPerHost) { 687 ret.append(" [Host] [total] [duplicates] [bytes] " + "[bytes discarded] [new] [exact] [equiv]"); 688 if (lookupByURL == false) { 689 ret.append(" [mirror]"); 690 } 691 if (doTimestampAnalysis) { 692 ret.append(" [change correct] [change falsly]"); 693 ret.append(" [non-change correct] [non-change falsly]"); 694 ret.append(" [no timestamp]\n"); 695 } 696 synchronized (perHostStats) { 697 Iterator<String> it = perHostStats.keySet().iterator(); 698 while (it.hasNext()) { 699 String key = (String) it.next(); 700 Statistics curr = perHostStats.get(key); 701 ret.append(" " + key); 702 ret.append(" "); 703 ret.append(curr.handledNumber); 704 ret.append(" "); 705 ret.append(curr.duplicateNumber); 706 ret.append(" "); 707 ret.append(curr.totalAmount); 708 ret.append(" "); 709 ret.append(curr.duplicateAmount); 710 ret.append(" "); 711 ret.append(curr.handledNumber 712 - (curr.mirrorNumber + curr.exactURLDuplicates + curr.equivalentURLDuplicates)); 713 ret.append(" "); 714 ret.append(curr.exactURLDuplicates); 715 ret.append(" "); 716 ret.append(curr.equivalentURLDuplicates); 717 718 if (lookupByURL == false) { 719 ret.append(" "); 720 ret.append(curr.mirrorNumber); 721 } 722 if (doTimestampAnalysis) { 723 ret.append(" "); 724 ret.append(curr.timestampChangeCorrect); 725 ret.append(" "); 726 ret.append(curr.timestampChangeFalse); 727 ret.append(" "); 728 ret.append(curr.timestampNoChangeCorrect); 729 ret.append(" "); 730 ret.append(curr.timestampNoChangeFalse); 731 ret.append(" "); 732 ret.append(curr.timestampMissing); 733 } 734 ret.append("\n"); 735 } 736 } 737 } 738 739 ret.append("\n"); 740 return ret.toString(); 741 } 742 743 protected static String getPercentage(double portion, double total) { 744 double value = portion / total; 745 value = value * 100; 746 String ret = Double.toString(value); 747 int dot = ret.indexOf('.'); 748 if (dot + 3 < ret.length()) { 749 ret = ret.substring(0, dot + 3); 750 } 751 return ret + "%"; 752 } 753 754 private static String getDigestAsString(CrawlURI curi) { 755 // The CrawlURI now has a method for this. For backwards 756 // compatibility with older Heritrix versions that is not used. 757 Object digest = curi.getContentDigest(); 758 if (digest != null) { 759 return Base32.encode((byte[]) digest); 760 } 761 return null; 762 } 763 764 protected void doAnalysis(CrawlURI curi, Statistics currHostStats, boolean isDuplicate) { 765 try { 766 Query query = queryField(DigestIndexer.FIELD_URL, curi.toString()); 767 AllDocsCollector collectAllCollector = new AllDocsCollector(); 768 index.search(query, collectAllCollector); 769 List<ScoreDoc> hits = collectAllCollector.getHits(); 770 771 Document doc = null; 772 if (hits != null && hits.size() > 0) { 773 // If there are multiple hits, use the one with the most 774 // recent date. 775 Document docToEval = null; 776 777 for (ScoreDoc hit : hits) { 778 int docId = hit.doc; 779 doc = index.doc(docId); 780 // The format of the timestamp ("yyyyMMddHHmmssSSS") allows 781 // us to do a greater then (later) or lesser than (earlier) 782 // comparison of the strings. 783 String timestamp = doc.get(DigestIndexer.FIELD_TIMESTAMP); 784 if (docToEval == null || docToEval.get(DigestIndexer.FIELD_TIMESTAMP).compareTo(timestamp) > 0) { 785 // Found a more recent hit. 786 docToEval = doc; 787 } 788 } 789 doTimestampAnalysis(curi, docToEval, currHostStats, isDuplicate); 790 if (doETagAnalysis) { 791 // TODO: Do etag analysis 792 } 793 } 794 } catch (IOException e) { 795 logger.log(Level.SEVERE, "Error accessing index.", e); 796 } 797 } 798 799 protected void doTimestampAnalysis(CrawlURI curi, Document urlHit, Statistics currHostStats, boolean isDuplicate) { 800 801 HttpMethod method = (HttpMethod) curi.getObject(CoreAttributeConstants.A_HTTP_TRANSACTION); 802 803 // Compare datestamps (last-modified versus the indexed date) 804 Date lastModified = null; 805 if (method.getResponseHeader("last-modified") != null) { 806 SimpleDateFormat sdf = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.ENGLISH); 807 try { 808 lastModified = sdf.parse(method.getResponseHeader("last-modified").getValue()); 809 } catch (ParseException e) { 810 logger.log(Level.INFO, "Exception parsing last modified of " + curi.toString(), e); 811 return; 812 } 813 } else { 814 stats.timestampMissing++; 815 if (statsPerHost) { 816 currHostStats.timestampMissing++; 817 logger.finest("Missing timestamp on " + curi.toString()); 818 } 819 return; 820 } 821 822 SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS"); 823 Date lastFetch = null; 824 try { 825 lastFetch = sdf.parse(urlHit.get(DigestIndexer.FIELD_TIMESTAMP)); 826 } catch (ParseException e) { 827 logger.log(Level.WARNING, "Exception parsing indexed date for " + urlHit.get(DigestIndexer.FIELD_URL), e); 828 return; 829 } 830 831 if (lastModified.after(lastFetch)) { 832 // Header predicts change 833 if (isDuplicate) { 834 // But the DeDuplicator did not notice a change. 835 stats.timestampChangeFalse++; 836 if (statsPerHost) { 837 currHostStats.timestampChangeFalse++; 838 } 839 logger.finest("Last-modified falsly predicts change on " + curi.toString()); 840 } else { 841 stats.timestampChangeCorrect++; 842 if (statsPerHost) { 843 currHostStats.timestampChangeCorrect++; 844 } 845 logger.finest("Last-modified correctly predicts change on " + curi.toString()); 846 } 847 } else { 848 // Header does not predict change. 849 if (isDuplicate) { 850 // And the DeDuplicator verifies that no change had occurred 851 stats.timestampNoChangeCorrect++; 852 if (statsPerHost) { 853 currHostStats.timestampNoChangeCorrect++; 854 } 855 logger.finest("Last-modified correctly predicts no-change on " + curi.toString()); 856 } else { 857 // As this is particularly bad we'll log the URL at INFO level 858 logger.log(Level.INFO, "Last-modified incorrectly indicated " + "no-change on " + curi.toString() + " " 859 + curi.getContentType() + ". last-modified: " + lastModified + ". Last fetched: " + lastFetch); 860 stats.timestampNoChangeFalse++; 861 if (statsPerHost) { 862 currHostStats.timestampNoChangeFalse++; 863 } 864 } 865 } 866 867 } 868 869 /** 870 * Run a simple Lucene query for a single term in a single field. 871 * 872 * @param fieldName name of the field to look in. 873 * @param value The value to query for 874 * @return A Query for the given value in the given field. 875 */ 876 protected Query queryField(String fieldName, String value) { 877 Query query = null; 878 879 /** alternate solution. */ 880 BytesRef valueRef = new BytesRef(value.getBytes()); 881 query = new ConstantScoreQuery(new TermRangeFilter(fieldName, valueRef, valueRef, true, true)); 882 883 /** The most clean solution, but it seems also memory demanding */ 884 // query = new ConstantScoreQuery(new FieldCacheTermsFilter(fieldName, 885 // value)); 886 return query; 887 } 888 889 @Override 890 protected void finalTasks() { 891 } 892} 893 894class Statistics { 895 // General statistics 896 897 /** 898 * Number of URIs that make it through the processors exclusion rules and are processed by it. 899 */ 900 long handledNumber = 0; 901 902 /** 903 * Number of URIs that are deemed duplicates and further processing is aborted 904 */ 905 long duplicateNumber = 0; 906 907 /** 908 * Then number of URIs that turned out to have exact URL and content digest matches. 909 */ 910 long exactURLDuplicates = 0; 911 912 /** 913 * The number of URIs that turned out to have equivalent URL and content digest matches. 914 */ 915 long equivalentURLDuplicates = 0; 916 917 /** 918 * The number of URIs that, while having no exact or equivalent matches, do have exact content digest matches 919 * against non-equivalent URIs. 920 */ 921 long mirrorNumber = 0; 922 923 /** 924 * The total amount of data represented by the documents who were deemed duplicates and excluded from further 925 * processing. 926 */ 927 long duplicateAmount = 0; 928 929 /** The total amount of data represented by all the documents processed * */ 930 long totalAmount = 0; 931 932 // Timestamp analysis 933 934 long timestampChangeCorrect = 0; 935 long timestampChangeFalse = 0; 936 long timestampNoChangeCorrect = 0; 937 long timestampNoChangeFalse = 0; 938 long timestampMissing = 0; 939 940 // ETag analysis; 941 942 long ETagChangeCorrect = 0; 943 long ETagChangeFalse = 0; 944 long ETagNoChangeCorrect = 0; 945 long ETagNoChangeFalse = 0; 946 long ETagMissingIndex = 0; 947 long ETagMissingCURI = 0; 948}