001/* DeDupFetchHTTP 002 * 003 * Created on 10.04.2006 004 * 005 * Copyright (C) 2006 National and University Library of Iceland 006 * 007 * This file is part of the DeDuplicator (Heritrix add-on module). 008 * 009 * DeDuplicator is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU Lesser Public License as published by 011 * the Free Software Foundation; either version 2.1 of the License, or 012 * any later version. 013 * 014 * DeDuplicator is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU Lesser Public License for more details. 018 * 019 * You should have received a copy of the GNU Lesser Public License 020 * along with DeDuplicator; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023package is.hi.bok.deduplicator; 024 025import java.io.File; 026import java.io.IOException; 027import java.text.SimpleDateFormat; 028import java.util.List; 029 030import org.apache.commons.httpclient.HttpConnection; 031import org.apache.commons.httpclient.HttpMethod; 032import org.apache.lucene.document.Document; 033import org.apache.lucene.index.DirectoryReader; 034import org.apache.lucene.index.IndexReader; 035import org.apache.lucene.search.ConstantScoreQuery; 036import org.apache.lucene.search.IndexSearcher; 037import org.apache.lucene.search.Query; 038import org.apache.lucene.search.ScoreDoc; 039import org.apache.lucene.search.TermRangeFilter; 040import org.apache.lucene.store.FSDirectory; 041import org.apache.lucene.util.BytesRef; 042import org.archive.crawler.datamodel.CrawlURI; 043import org.archive.crawler.fetcher.FetchHTTP; 044import org.archive.crawler.frontier.AdaptiveRevisitAttributeConstants; 045import org.archive.crawler.settings.SimpleType; 046import org.archive.crawler.settings.Type; 047import org.archive.httpclient.HttpRecorderMethod; 048import org.archive.util.ArchiveUtils; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import dk.netarkivet.common.utils.AllDocsCollector; 053 054/** 055 * An extension of Heritrix's {@link org.archive.crawler.fetcher.FetchHTTP} processor for downloading HTTP documents. 056 * This extension adds a check after the content header has been downloaded that compares the 'last-modified' and or 057 * 'last-etag' values from the header against information stored in an appropriate index. 058 * 059 * @author Kristinn Sigurðsson 060 * @author Søren Vejrup Carlsen 061 * @see is.hi.bok.deduplicator.DigestIndexer 062 * @see org.archive.crawler.fetcher.FetchHTTP 063 */ 064 065public class DeDupFetchHTTP extends FetchHTTP implements AdaptiveRevisitAttributeConstants { 066 067 private static final long serialVersionUID = ArchiveUtils.classnameBasedUID(DeDupFetchHTTP.class, 1); 068 069 private static final Logger log = LoggerFactory.getLogger(FetchHTTP.class.getName()); 070 071 protected IndexSearcher index; 072 protected IndexReader indexReader; 073 protected String mimefilter = DEFAULT_MIME_FILTER; 074 protected boolean blacklist = true; 075 076 SimpleDateFormat sdfLastModified; 077 SimpleDateFormat sdfIndexDate; 078 079 protected long processedURLs = 0; 080 protected long unchangedURLs = 0; 081 082 protected boolean useSparseRangeFilter = DEFAULT_USE_SPARSE_RANGE_FILTER; 083 084 // Settings. 085 public static final String ATTR_DECISION_SCHEME = "decision-scheme"; 086 public static final String SCHEME_TIMESTAMP = "Timestamp only"; 087 public static final String SCHEME_ETAG = "Etag only"; 088 public static final String SCHEME_TIMESTAMP_AND_ETAG = "Timestamp AND Etag"; 089 public static final String SCHEME_TIMESTAMP_OR_ETAG = "Timestamp OR Etag"; 090 public static final String[] AVAILABLE_DECISION_SCHEMES = {SCHEME_TIMESTAMP, SCHEME_ETAG, 091 SCHEME_TIMESTAMP_AND_ETAG, SCHEME_TIMESTAMP_OR_ETAG}; 092 public static final String DEFAULT_DECISION_SCHEME = SCHEME_TIMESTAMP; 093 094 public static final String ATTR_INDEX_LOCATION = "index-location"; 095 public static final String DEFAULT_INDEX_LOCATION = ""; 096 097 /** 098 * The filter on mime types. This is either a blacklist or whitelist depending on ATTR_FILTER_MODE. 099 */ 100 public final static String ATTR_MIME_FILTER = "mime-filter"; 101 public final static String DEFAULT_MIME_FILTER = "^text/.*"; 102 103 /** 104 * Is the mime filter a blacklist (do not apply processor to what matches) or whitelist (apply processor only to 105 * what matches). 106 */ 107 public final static String ATTR_FILTER_MODE = "filter-mode"; 108 public final static String[] AVAILABLE_FILTER_MODES = {"Blacklist", "Whitelist"}; 109 public final static String DEFAULT_FILTER_MODE = AVAILABLE_FILTER_MODES[0]; 110 111 /** Should we use sparse queries (uses less memory at a cost to performance? * */ 112 public final static String ATTR_USE_SPARSE_RANGE_FILTER = "use-sparse-range-filter"; 113 public final static Boolean DEFAULT_USE_SPARSE_RANGE_FILTER = new Boolean(false); 114 115 public DeDupFetchHTTP(String name) { 116 super(name); 117 setDescription("Fetch HTTP processor that aborts downloading of " 118 + "unchanged documents. This processor extends the standard " 119 + "FetchHTTP processor, adding a check after the header is " 120 + "downloaded where the header information for 'last-modified' " 121 + "and 'etag' is compared against values stored in a Lucene " 122 + "index built using the DigestIndexer.\n Note that the index " 123 + "must have been built indexed by URL and the Timestamp " 124 + "and/or Etag info must have been included in the index!"); 125 Type t; 126 t = new SimpleType(ATTR_DECISION_SCHEME, "The different schmes for deciding when to re-download a " 127 + "page given an old version of the same page (or rather " + "meta-data on it)\n " 128 + "Timestamp only: Download when a datestamp is missing " 129 + "in either the downloaded header or index or if the header " 130 + "datestamp is newer then the one in the index.\n " 131 + "Etag only: Download when the Etag is missing in either the" 132 + "header download or the index or the header Etag and the one " + "in the index differ.\n " 133 + "Timestamp AND Etag: When both datestamp and Etag are " 134 + "available in both the header download and the index, " 135 + "download if EITHER of them indicates change." 136 + "Timestamp OR Etag: When both datestamp and Etag are " 137 + "available in both the header download and the index, " 138 + "download only if BOTH of them indicate change.", DEFAULT_DECISION_SCHEME, AVAILABLE_DECISION_SCHEMES); 139 addElementToDefinition(t); 140 t = new SimpleType(ATTR_INDEX_LOCATION, "Location of index (full path). Can not be changed at run " + "time.", 141 DEFAULT_INDEX_LOCATION); 142 t.setOverrideable(false); 143 addElementToDefinition(t); 144 t = new SimpleType(ATTR_MIME_FILTER, "A regular expression that the mimetype of all documents " 145 + "will be compared against. Only those that pass will be " + "considered. Others are given a pass. " 146 + "\nIf the attribute filter-mode is " + "set to 'Blacklist' then all the documents whose mimetype " 147 + "matches will be ignored by this processor. If the filter-" 148 + "mode is set to 'Whitelist' only those documents whose " + "mimetype matches will be processed.", 149 DEFAULT_MIME_FILTER); 150 t.setOverrideable(false); 151 t.setExpertSetting(true); 152 addElementToDefinition(t); 153 t = new SimpleType(ATTR_FILTER_MODE, "Determines if the mime-filter acts as a blacklist (declares " 154 + "what should be ignored) or whitelist (declares what should " + "be processed).", 155 DEFAULT_FILTER_MODE, AVAILABLE_FILTER_MODES); 156 t.setOverrideable(false); 157 t.setExpertSetting(true); 158 addElementToDefinition(t); 159 160 t = new SimpleType(ATTR_USE_SPARSE_RANGE_FILTER, "If set to true, then Lucene queries use a custom 'sparse' " 161 + "range filter. This uses less memory at the cost of some " 162 + "lost performance. Suitable for very large indexes.", DEFAULT_USE_SPARSE_RANGE_FILTER); 163 t.setOverrideable(false); 164 t.setExpertSetting(true); 165 addElementToDefinition(t); 166 } 167 168 protected boolean checkMidfetchAbort(CrawlURI curi, HttpRecorderMethod method, HttpConnection conn) { 169 // We'll check for prerequisites here since there is no way to know 170 // if the super method returns false because of a prereq or because 171 // all filters accepeted. 172 if (curi.isPrerequisite()) { 173 return false; 174 } 175 176 // Run super to allow filters to also abort. Also this method has 177 // been pressed into service as a general 'stuff to do at this point' 178 boolean ret = super.checkMidfetchAbort(curi, method, conn); 179 180 // Ok, now check for duplicates. 181 if (isDuplicate(curi)) { 182 ret = true; 183 unchangedURLs++; 184 curi.putInt(A_CONTENT_STATE_KEY, CONTENT_UNCHANGED); 185 curi.addAnnotation("header-duplicate"); 186 187 } 188 189 return ret; 190 } 191 192 /** 193 * Compare the header infomation for 'last-modified' and/or 'etag' against data in the index. 194 * 195 * @param curi The Crawl URI being processed. 196 * @return True if header infomation indicates that the document has not changed since the crawl that the index is 197 * based on was performed. 198 */ 199 protected boolean isDuplicate(CrawlURI curi) { 200 boolean ret = false; 201 if (curi.getContentType() != null && curi.getContentType().matches(mimefilter) != blacklist) { 202 processedURLs++; 203 // Ok, passes mime-filter 204 HttpMethod method = (HttpMethod) curi.getObject(A_HTTP_TRANSACTION); 205 // Check the decision scheme. 206 String scheme = (String) getUncheckedAttribute(curi, ATTR_DECISION_SCHEME); 207 208 Document doc = lookup(curi); 209 210 if (doc != null) { 211 // Found a hit. Do the necessary evalution. 212 if (scheme.equals(SCHEME_TIMESTAMP)) { 213 ret = datestampIndicatesNonChange(method, doc); 214 } else if (scheme.equals(SCHEME_ETAG)) { 215 ret = etagIndicatesNonChange(method, doc); 216 } else { 217 218 if (scheme.equals(SCHEME_TIMESTAMP_AND_ETAG)) { 219 ret = datestampIndicatesNonChange(method, doc) && etagIndicatesNonChange(method, doc); 220 } else if (scheme.equals(SCHEME_TIMESTAMP_OR_ETAG)) { 221 ret = datestampIndicatesNonChange(method, doc) || etagIndicatesNonChange(method, doc); 222 } else { 223 log.error("Unknown decision sceme: {}",scheme); 224 } 225 } 226 } 227 } 228 return ret; 229 } 230 231 /** 232 * Checks if the 'last-modified' in the HTTP header and compares it against the timestamp in the supplied Lucene 233 * document. If both dates are found and the header's date is older then the datestamp indicates non-change. 234 * Otherwise a change must be assumed. 235 * 236 * @param method HTTPMethod that allows access to the relevant HTTP header 237 * @param doc The Lucene document to compare against 238 * @return True if a the header and document data indicates a non-change. False otherwise. 239 */ 240 protected boolean datestampIndicatesNonChange(HttpMethod method, Document doc) { 241 String headerDate = null; 242 if (method.getResponseHeader("last-modified") != null) { 243 headerDate = method.getResponseHeader("last-modified").getValue(); 244 } 245 String indexDate = doc.get(DigestIndexer.FIELD_TIMESTAMP); 246 247 if (headerDate != null && indexDate != null) { 248 try { 249 // If both dates exist and last-modified is before the index 250 // date then we assume no change has occured. 251 return (sdfLastModified.parse(headerDate)).before(sdfIndexDate.parse(indexDate)); 252 } catch (Exception e) { 253 // Any exceptions parsing the date should be interpreted as 254 // missing date information. 255 // ParseException and NumberFormatException are the most 256 // likely exceptions to occur. 257 return false; 258 } 259 } 260 return false; 261 } 262 263 /** 264 * Checks if the 'etag' in the HTTP header and compares it against the etag in the supplied Lucene document. If both 265 * dates are found and match then the datestamp indicate non-change. Otherwise a change must be assumed. 266 * 267 * @param method HTTPMethod that allows access to the relevant HTTP header 268 * @param doc The Lucene document to compare against 269 * @return True if a the header and document data indicates a non-change. False otherwise. 270 */ 271 protected boolean etagIndicatesNonChange(HttpMethod method, Document doc) { 272 String headerEtag = null; 273 if (method.getResponseHeader("last-etag") != null) { 274 headerEtag = method.getResponseHeader("last-etag").getValue(); 275 } 276 String indexEtag = doc.get(DigestIndexer.FIELD_ETAG); 277 278 if (headerEtag != null && indexEtag != null) { 279 // If both etags exist and are identical then we assume no 280 // change has occured. 281 return headerEtag.equals(indexEtag); 282 } 283 return false; 284 } 285 286 /** 287 * Searches the index for the URL of the given CrawlURI. If multiple hits are found the most recent one is returned 288 * if the index included the timestamp, otherwise a random one is returned. If no hit is found null is returned. 289 * 290 * @param curi The CrawlURI to search for 291 * @return the index Document matching the URI or null if none was found 292 */ 293 protected Document lookup(CrawlURI curi) { 294 try { 295 Query query = null; 296 297 /** The least memory demanding query. */ 298 BytesRef curiStringRef = new BytesRef(curi.toString().getBytes()); 299 query = new ConstantScoreQuery(new TermRangeFilter(DigestIndexer.FIELD_URL, curiStringRef, curiStringRef, 300 true, true)); 301 302 /** The preferred solution, but it seems also more memory demanding */ 303 // query = new ConstantScoreQuery(new FieldCacheTermsFilter(fieldName, 304 // value)); 305 306 AllDocsCollector collectAllCollector = new AllDocsCollector(); 307 index.search(query, collectAllCollector); 308 309 List<ScoreDoc> hits = collectAllCollector.getHits(); 310 Document doc = null; 311 if (hits != null && hits.size() > 0) { 312 // If there are multiple hits, use the one with the most 313 // recent date. 314 Document docToEval = null; 315 for (ScoreDoc hit : hits) { 316 int docId = hit.doc; 317 doc = index.doc(docId); 318 // The format of the timestamp ("yyyyMMddHHmmssSSS") allows 319 // us to do a greater then (later) or lesser than (earlier) 320 // comparison of the strings. 321 String timestamp = doc.get(DigestIndexer.FIELD_TIMESTAMP); 322 if (docToEval == null || timestamp == null 323 || docToEval.get(DigestIndexer.FIELD_TIMESTAMP).compareTo(timestamp) > 0) { 324 // Found a more recent hit or timestamp is null 325 // NOTE: Either all hits should have a timestamp or 326 // none. This implementation will cause the last 327 // URI in the hit list to be returned if there is no 328 // timestamp. 329 docToEval = doc; 330 } 331 } 332 return docToEval; 333 } 334 } catch (IOException e) { 335 log.error("Error accessing index.", e); 336 } 337 return null; 338 } 339 340 @Override 341 public void finalTasks() { 342 super.finalTasks(); 343 } 344 345 @Override 346 public void initialTasks() { 347 super.initialTasks(); 348 // Index location 349 try { 350 String indexLocation = (String) getAttribute(ATTR_INDEX_LOCATION); 351 FSDirectory indexDir = FSDirectory.open(new File(indexLocation)); 352 // https://issues.apache.org/jira/browse/LUCENE-1566 353 // Reduce chunksize to avoid OOM to half the size of the default (=100 MB) 354 int chunksize = indexDir.getReadChunkSize(); 355 indexDir.setReadChunkSize(chunksize / 2); 356 IndexReader reader = DirectoryReader.open(indexDir); 357 index = new IndexSearcher(reader); 358 } catch (Exception e) { 359 log.error("Unable to find/open index.", e); 360 } 361 362 // Mime filter 363 try { 364 mimefilter = (String) getAttribute(ATTR_MIME_FILTER); 365 } catch (Exception e) { 366 log.error("Unable to get attribute " + ATTR_MIME_FILTER, e); 367 } 368 369 // Filter mode (blacklist (default) or whitelist) 370 try { 371 blacklist = ((String) getAttribute(ATTR_FILTER_MODE)).equals(DEFAULT_FILTER_MODE); 372 } catch (Exception e) { 373 log.error("Unable to get attribute " + ATTR_FILTER_MODE, e); 374 } 375 376 // Date format of last-modified is EEE, dd MMM yyyy HH:mm:ss z 377 sdfLastModified = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z"); 378 // Date format of indexDate is yyyyMMddHHmmssSSS 379 sdfIndexDate = new SimpleDateFormat("yyyyMMddHHmmssSSS"); 380 381 // Range Filter type 382 try { 383 useSparseRangeFilter = ((Boolean) getAttribute(ATTR_USE_SPARSE_RANGE_FILTER)).booleanValue(); 384 } catch (Exception e) { 385 log.error("Unable to get attribute " + ATTR_USE_SPARSE_RANGE_FILTER, e); 386 useSparseRangeFilter = DEFAULT_USE_SPARSE_RANGE_FILTER; 387 } 388 } 389 390 @Override 391 public String report() { 392 StringBuffer ret = new StringBuffer(); 393 ret.append("Processor: is.hi.bok.deduplicator.DeDupFetchHTTP\n"); 394 ret.append(" URLs compared against index: " + processedURLs + "\n"); 395 ret.append(" URLs judged unchanged: " + unchangedURLs + "\n"); 396 ret.append(" processor extends (parent report)\n"); 397 ret.append(super.report()); 398 return ret.toString(); 399 } 400 401}