001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023 024package dk.netarkivet.harvester.indexserver; 025 026import java.io.BufferedReader; 027import java.io.File; 028import java.io.FileReader; 029import java.io.IOException; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.Map; 033import java.util.Set; 034import java.util.UUID; 035import java.util.concurrent.Callable; 036import java.util.concurrent.ExecutionException; 037import java.util.concurrent.Future; 038import java.util.concurrent.LinkedBlockingQueue; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041 042import org.apache.lucene.index.IndexWriter; 043import org.apache.lucene.store.Directory; 044import org.apache.lucene.store.SimpleFSDirectory; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import dk.netarkivet.common.distribute.indexserver.JobIndexCache; 049import dk.netarkivet.common.exceptions.IOFailure; 050import dk.netarkivet.common.utils.FileUtils; 051import dk.netarkivet.common.utils.Settings; 052import dk.netarkivet.common.utils.TimeUtils; 053import dk.netarkivet.common.utils.ZipUtils; 054import dk.netarkivet.harvester.HarvesterSettings; 055import is.hi.bok.deduplicator.CrawlDataIterator; 056import is.hi.bok.deduplicator.DigestIndexer; 057 058/** 059 * A cache that serves Lucene indices of crawl logs for given job IDs. Uses the DigestIndexer in the deduplicator 060 * software: http://deduplicator.sourceforge.net/apidocs/is/hi/bok/deduplicator/DigestIndexer.html Upon combination of 061 * underlying files, each file in the Lucene index is gzipped and the compressed versions are stored in the directory 062 * given by getCacheFile(). The subclass has to determine in its constructor call which mime types are included. 063 */ 064public abstract class CrawlLogIndexCache extends CombiningMultiFileBasedCache<Long> implements JobIndexCache { 065 066 /** The log. */ 067 private static final Logger log = LoggerFactory.getLogger(CrawlLogIndexCache.class); 068 069 /** Needed to find origin information, which is file+offset from CDX index. */ 070 private final CDXDataCache cdxcache = new CDXDataCache(); 071 072 /** the useBlacklist set to true results in docs matching the mimefilter being ignored. */ 073 private boolean useBlacklist; 074 075 /** An regular expression for the mimetypes to include or exclude from the index. See useBlackList. */ 076 private String mimeFilter; 077 078 /** The time to sleep between each check of completeness. */ 079 private final long sleepintervalBetweenCompletenessChecks = Settings 080 .getLong(HarvesterSettings.INDEXSERVER_INDEXING_CHECKINTERVAL); 081 082 /** Number to separate logs the different combine tasks. */ 083 private int indexingJobCount = 0; 084 085 /** 086 * Constructor for the CrawlLogIndexCache class. 087 * 088 * @param name The name of the CrawlLogIndexCache 089 * @param blacklist Shall the mimefilter be considered a blacklist or a whitelist? 090 * @param mimeFilter A regular expression for the mimetypes to exclude/include 091 */ 092 public CrawlLogIndexCache(String name, boolean blacklist, String mimeFilter) { 093 super(name, new CrawlLogDataCache()); 094 useBlacklist = blacklist; 095 this.mimeFilter = mimeFilter; 096 } 097 098 /** 099 * Prepare data for combining. This class overrides prepareCombine to make sure that CDX data is available. 100 * 101 * @param ids Set of IDs that will be combined. 102 * @return Map of ID->File of data to combine for the IDs where we could find data. 103 */ 104 protected Map<Long, File> prepareCombine(Set<Long> ids) { 105 log.info("Starting to generate {} for the {} jobs: {}", getCacheDir().getName(), ids.size(), ids); 106 Map<Long, File> returnMap = super.prepareCombine(ids); 107 Set<Long> missing = new HashSet<Long>(); 108 for (Long id : returnMap.keySet()) { 109 Long cached = cdxcache.cache(id); 110 if (cached == null) { 111 missing.add(id); 112 } 113 } 114 if (!missing.isEmpty()) { 115 log.warn("Data not found for {} jobs: {}", missing.size(), missing); 116 } 117 for (Long id : missing) { 118 returnMap.remove(id); 119 } 120 return returnMap; 121 } 122 123 /** 124 * Combine a number of crawl.log files into one Lucene index. This index is placed as gzip files under the directory 125 * returned by getCacheFile(). 126 * 127 * @param rawfiles The map from job ID into crawl.log contents. No null values are allowed in this map. 128 */ 129 protected void combine(Map<Long, File> rawfiles) { 130 ++indexingJobCount; 131 long datasetSize = rawfiles.values().size(); 132 log.info("Starting combine task #{}. This combines a dataset with {} crawl logs (thread = {})", 133 indexingJobCount, datasetSize, Thread.currentThread().getName()); 134 135 File resultDir = getCacheFile(rawfiles.keySet()); 136 Set<File> tmpfiles = new HashSet<File>(); 137 String indexLocation = resultDir.getAbsolutePath() + ".luceneDir"; 138 ThreadPoolExecutor executor = null; 139 try { 140 DigestIndexer indexer = createStandardIndexer(indexLocation); 141 final boolean verboseIndexing = false; 142 DigestOptions indexingOptions = new DigestOptions(this.useBlacklist, verboseIndexing, this.mimeFilter); 143 long count = 0; 144 Set<IndexingState> outstandingJobs = new HashSet<IndexingState>(); 145 final int maxThreads = Settings.getInt(HarvesterSettings.INDEXSERVER_INDEXING_MAXTHREADS); 146 executor = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, TimeUnit.MILLISECONDS, 147 new LinkedBlockingQueue<Runnable>()); 148 149 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 150 151 for (Map.Entry<Long, File> entry : rawfiles.entrySet()) { 152 Long jobId = entry.getKey(); 153 File crawlLog = entry.getValue(); 154 // Generate UUID to ensure a unique filedir for the index. 155 File tmpFile = new File(FileUtils.getTempDir(), UUID.randomUUID().toString()); 156 tmpfiles.add(tmpFile); 157 String localindexLocation = tmpFile.getAbsolutePath(); 158 Long cached = cdxcache.cache(jobId); 159 if (cached == null) { 160 log.warn("Skipping the ingest of logs for job {}. Unable to retrieve cdx-file for job.", 161 entry.getKey()); 162 continue; 163 } 164 File cachedCDXFile = cdxcache.getCacheFile(cached); 165 166 // Dispatch this indexing task to a separate thread that 167 // handles the sorting of the logfiles and the generation 168 // of a lucene index for this crawllog and cdxfile. 169 ++count; 170 String taskID = count + " out of " + datasetSize; 171 log.debug("Making subthread for indexing job " + jobId + " - task " + taskID); 172 Callable<Boolean> task = new DigestIndexerWorker(localindexLocation, jobId, crawlLog, cachedCDXFile, 173 indexingOptions, taskID); 174 Future<Boolean> result = executor.submit(task); 175 outstandingJobs.add(new IndexingState(jobId, localindexLocation, result)); 176 } 177 178 // wait for all the outstanding subtasks to complete. 179 Set<Directory> subindices = new HashSet<Directory>(); 180 181 // Deadline for the combine-task 182 long combineTimeout = Settings.getLong(HarvesterSettings.INDEXSERVER_INDEXING_TIMEOUT); 183 long timeOutTime = System.currentTimeMillis() + combineTimeout; 184 185 // The indexwriter for the totalindex. 186 IndexWriter totalIndex = indexer.getIndex(); 187 int subindicesInTotalIndex = 0; 188 // Max number of segments in totalindex. 189 int maxSegments = Settings.getInt(HarvesterSettings.INDEXSERVER_INDEXING_MAX_SEGMENTS); 190 191 final int ACCUMULATED_SUBINDICES_BEFORE_MERGING = 200; 192 193 while (outstandingJobs.size() > 0) { 194 log.info("Outstanding jobs in combine task #{} is now {}", indexingJobCount, outstandingJobs.size()); 195 Iterator<IndexingState> iterator = outstandingJobs.iterator(); 196 if (timeOutTime < System.currentTimeMillis()) { 197 log.warn("Max indexing time exceeded for one index ({}). Indexing stops here, " 198 + "although missing subindices for {} jobs", 199 TimeUtils.readableTimeInterval(combineTimeout), outstandingJobs.size()); 200 break; 201 } 202 while (iterator.hasNext() && subindices.size() < ACCUMULATED_SUBINDICES_BEFORE_MERGING) { 203 Future<Boolean> nextResult; 204 IndexingState next = iterator.next(); 205 if (next.getResultObject().isDone()) { 206 nextResult = next.getResultObject(); 207 try { 208 // check, if the indexing failed 209 if (nextResult.get()) { 210 subindices.add(new SimpleFSDirectory(new File(next.getIndex()))); 211 } else { 212 log.warn("Indexing of job {} failed.", next.getJobIdentifier()); 213 } 214 215 } catch (InterruptedException e) { 216 log.warn("Unable to get Result back from indexing thread", e); 217 } catch (ExecutionException e) { 218 log.warn("Unable to get Result back from indexing thread", e); 219 } 220 // remove the done object from the set 221 iterator.remove(); 222 } 223 } 224 225 if (subindices.size() >= ACCUMULATED_SUBINDICES_BEFORE_MERGING) { 226 227 log.info( 228 "Adding {} subindices to main index. Forcing index to contain max {} files (related to combine task #{})", 229 subindices.size(), maxSegments, indexingJobCount); 230 totalIndex.addIndexes(subindices.toArray(new Directory[0])); 231 totalIndex.forceMerge(maxSegments); 232 totalIndex.commit(); 233 for (Directory luceneDir : subindices) { 234 luceneDir.close(); 235 } 236 subindicesInTotalIndex += subindices.size(); 237 log.info( 238 "Completed adding {} subindices to main index, now containing {} subindices(related to combine task #{})", 239 subindices.size(), subindicesInTotalIndex, indexingJobCount); 240 subindices.clear(); 241 } else { 242 sleepAwhile(); 243 } 244 } 245 246 log.info("Adding the final {} subindices to main index. " 247 + "Forcing index to contain max {} files (related to combine task #{})", subindices.size(), 248 maxSegments, indexingJobCount); 249 250 totalIndex.addIndexes(subindices.toArray(new Directory[0])); 251 totalIndex.forceMerge(maxSegments); 252 totalIndex.commit(); 253 for (Directory luceneDir : subindices) { 254 luceneDir.close(); 255 } 256 subindices.clear(); 257 258 log.info("Adding operation completed (combine task #{})!", indexingJobCount); 259 long docsInIndex = totalIndex.numDocs(); 260 261 indexer.close(); 262 log.info("Closed index (related to combine task #{}", indexingJobCount); 263 264 // Now the index is made, gzip it up. 265 File totalIndexDir = new File(indexLocation); 266 log.info("Gzip-compressing the individual {} index files of combine task # {}", 267 totalIndexDir.list().length, indexingJobCount); 268 ZipUtils.gzipFiles(totalIndexDir, resultDir); 269 log.info( 270 "Completed combine task #{} that combined a dataset with {} crawl logs (entries in combined index: {}) - compressed index has size {}", 271 indexingJobCount, datasetSize, docsInIndex, FileUtils.getHumanReadableFileSize(resultDir)); 272 } catch (IOException e) { 273 throw new IOFailure("Error setting up craw.log index framework for " + resultDir.getAbsolutePath(), e); 274 } finally { 275 // close down Threadpool-executor 276 closeDownThreadpoolQuietly(executor); 277 FileUtils.removeRecursively(new File(indexLocation)); 278 for (File temporaryFile : tmpfiles) { 279 FileUtils.removeRecursively(temporaryFile); 280 } 281 } 282 } 283 284 /** 285 * Try to release all resources connected to the given ThreadPoolExecutor. 286 * 287 * @param executor a ThreadPoolExecutor 288 */ 289 private void closeDownThreadpoolQuietly(ThreadPoolExecutor executor) { 290 if (executor == null) { 291 return; 292 } 293 if (!executor.isShutdown()) { 294 executor.shutdownNow(); 295 } 296 } 297 298 /** 299 * Helper class to sleep a little between completeness checks. 300 */ 301 private void sleepAwhile() { 302 try { 303 Thread.sleep(sleepintervalBetweenCompletenessChecks); 304 } catch (InterruptedException e) { 305 log.trace("Was awoken early from sleep: ", e); 306 } 307 } 308 309 /** 310 * Ingest a single crawl.log file using the corresponding CDX file to find offsets. 311 * 312 * @param id ID of a job to ingest. 313 * @param crawllogfile The file containing the crawl.log data for the job 314 * @param cdxfile The file containing the cdx data for the job 315 * @param options The digesting options used. 316 * @param indexer The indexer to add to. 317 */ 318 protected static void indexFile(Long id, File crawllogfile, File cdxfile, DigestIndexer indexer, 319 DigestOptions options) { 320 log.debug("Ingesting the crawl.log file '{}' related to job {}", crawllogfile.getAbsolutePath(), id); 321 boolean blacklist = options.getUseBlacklist(); 322 final String mimefilter = options.getMimeFilter(); 323 final boolean verbose = options.getVerboseMode(); 324 325 CrawlDataIterator crawlLogIterator = null; 326 File sortedCdxFile = null; 327 File tmpCrawlLog = null; 328 BufferedReader cdxBuffer = null; 329 try { 330 sortedCdxFile = getSortedCDX(cdxfile); 331 cdxBuffer = new BufferedReader(new FileReader(sortedCdxFile)); 332 tmpCrawlLog = getSortedCrawlLog(crawllogfile); 333 crawlLogIterator = new CDXOriginCrawlLogIterator(tmpCrawlLog, cdxBuffer); 334 indexer.writeToIndex(crawlLogIterator, mimefilter, blacklist, "ERROR", verbose); 335 } catch (IOException e) { 336 throw new IOFailure("Fatal error indexing " + id, e); 337 } finally { 338 try { 339 if (crawlLogIterator != null) { 340 crawlLogIterator.close(); 341 } 342 if (tmpCrawlLog != null) { 343 FileUtils.remove(tmpCrawlLog); 344 } 345 if (cdxBuffer != null) { 346 cdxBuffer.close(); 347 } 348 if (sortedCdxFile != null) { 349 FileUtils.remove(sortedCdxFile); 350 } 351 } catch (IOException e) { 352 log.warn("Error cleaning up after crawl log index cache generation", e); 353 } 354 } 355 } 356 357 /** 358 * Get a sorted, temporary CDX file corresponding to the given CDXfile. 359 * 360 * @param cdxFile A cdxfile 361 * @return A temporary file with CDX info for that just sorted according to the standard CDX sorting rules. This 362 * file will be removed at the exit of the JVM, but should be attempted removed when it is no longer used. 363 */ 364 protected static File getSortedCDX(File cdxFile) { 365 try { 366 final File tmpFile = File.createTempFile("sorted", "cdx", FileUtils.getTempDir()); 367 // This throws IOFailure, if the sorting operation fails 368 FileUtils.sortCDX(cdxFile, tmpFile); 369 tmpFile.deleteOnExit(); 370 return tmpFile; 371 } catch (IOException e) { 372 throw new IOFailure("Error while making tmp file for " + cdxFile, e); 373 } 374 } 375 376 /** 377 * Get a sorted, temporary crawl.log file from an unsorted one. 378 * 379 * @param file The file containing an unsorted crawl.log file. 380 * @return A temporary file containing the entries sorted according to URL. The file will be removed upon exit of 381 * the JVM, but should be attempted removed when it is no longer used. 382 */ 383 protected static File getSortedCrawlLog(File file) { 384 try { 385 File tmpCrawlLog = File.createTempFile("sorted", "crawllog", FileUtils.getTempDir()); 386 // This throws IOFailure, if the sorting operation fails 387 FileUtils.sortCrawlLog(file, tmpCrawlLog); 388 tmpCrawlLog.deleteOnExit(); 389 return tmpCrawlLog; 390 } catch (IOException e) { 391 throw new IOFailure("Error creating sorted crawl log file for '" + file + "'", e); 392 } 393 } 394 395 /** 396 * Create standard deduplication indexer. 397 * 398 * @param indexLocation The full path to the indexing directory 399 * @return the created deduplication indexer. 400 * @throws IOException If unable to open the index. 401 */ 402 protected static DigestIndexer createStandardIndexer(String indexLocation) throws IOException { 403 // Setup Lucene for indexing our crawllogs 404 // MODE_BOTH: Both URL's and Hash are indexed: Alternatives: 405 // DigestIndexer.MODE_HASH or DigestIndexer.MODE_URL 406 String indexingMode = DigestIndexer.MODE_BOTH; 407 // used to be 'equivalent' setting 408 boolean includeNormalizedURL = false; 409 // used to be 'timestamp' setting 410 boolean includeTimestamp = true; 411 // used to be 'etag' setting 412 boolean includeEtag = true; 413 boolean addToExistingIndex = false; 414 DigestIndexer indexer = new DigestIndexer(indexLocation, indexingMode, includeNormalizedURL, includeTimestamp, 415 includeEtag, addToExistingIndex); 416 return indexer; 417 } 418 419}