001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.harvester.indexserver;
025
026import java.io.BufferedReader;
027import java.io.File;
028import java.io.FileReader;
029import java.io.IOException;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.Map;
033import java.util.Set;
034import java.util.UUID;
035import java.util.concurrent.Callable;
036import java.util.concurrent.ExecutionException;
037import java.util.concurrent.Future;
038import java.util.concurrent.LinkedBlockingQueue;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041
042import org.apache.lucene.index.IndexWriter;
043import org.apache.lucene.store.Directory;
044import org.apache.lucene.store.SimpleFSDirectory;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import dk.netarkivet.common.distribute.indexserver.JobIndexCache;
049import dk.netarkivet.common.exceptions.IOFailure;
050import dk.netarkivet.common.utils.FileUtils;
051import dk.netarkivet.common.utils.Settings;
052import dk.netarkivet.common.utils.TimeUtils;
053import dk.netarkivet.common.utils.ZipUtils;
054import dk.netarkivet.harvester.HarvesterSettings;
055import is.hi.bok.deduplicator.CrawlDataIterator;
056import is.hi.bok.deduplicator.DigestIndexer;
057
058/**
059 * A cache that serves Lucene indices of crawl logs for given job IDs. Uses the DigestIndexer in the deduplicator
060 * software: http://deduplicator.sourceforge.net/apidocs/is/hi/bok/deduplicator/DigestIndexer.html Upon combination of
061 * underlying files, each file in the Lucene index is gzipped and the compressed versions are stored in the directory
062 * given by getCacheFile(). The subclass has to determine in its constructor call which mime types are included.
063 */
064public abstract class CrawlLogIndexCache extends CombiningMultiFileBasedCache<Long> implements JobIndexCache {
065
066    /** The log. */
067    private static final Logger log = LoggerFactory.getLogger(CrawlLogIndexCache.class);
068
069    /** Needed to find origin information, which is file+offset from CDX index. */
070    private final CDXDataCache cdxcache = new CDXDataCache();
071
072    /** the useBlacklist set to true results in docs matching the mimefilter being ignored. */
073    private boolean useBlacklist;
074
075    /** An regular expression for the mimetypes to include or exclude from the index. See useBlackList. */
076    private String mimeFilter;
077
078    /** The time to sleep between each check of completeness. */
079    private final long sleepintervalBetweenCompletenessChecks = Settings
080            .getLong(HarvesterSettings.INDEXSERVER_INDEXING_CHECKINTERVAL);
081
082    /** Number to separate logs the different combine tasks. */
083    private int indexingJobCount = 0;
084
085    /**
086     * Constructor for the CrawlLogIndexCache class.
087     *
088     * @param name The name of the CrawlLogIndexCache
089     * @param blacklist Shall the mimefilter be considered a blacklist or a whitelist?
090     * @param mimeFilter A regular expression for the mimetypes to exclude/include
091     */
092    public CrawlLogIndexCache(String name, boolean blacklist, String mimeFilter) {
093        super(name, new CrawlLogDataCache());
094        useBlacklist = blacklist;
095        this.mimeFilter = mimeFilter;
096    }
097
098    /**
099     * Prepare data for combining. This class overrides prepareCombine to make sure that CDX data is available.
100     *
101     * @param ids Set of IDs that will be combined.
102     * @return Map of ID->File of data to combine for the IDs where we could find data.
103     */
104    protected Map<Long, File> prepareCombine(Set<Long> ids) {
105        log.info("Starting to generate {} for the {} jobs: {}", getCacheDir().getName(), ids.size(), ids);
106        Map<Long, File> returnMap = super.prepareCombine(ids);
107        Set<Long> missing = new HashSet<Long>();
108        for (Long id : returnMap.keySet()) {
109            Long cached = cdxcache.cache(id);
110            if (cached == null) {
111                missing.add(id);
112            }
113        }
114        if (!missing.isEmpty()) {
115            log.warn("Data not found for {} jobs: {}", missing.size(), missing);
116        }
117        for (Long id : missing) {
118            returnMap.remove(id);
119        }
120        return returnMap;
121    }
122
123    /**
124     * Combine a number of crawl.log files into one Lucene index. This index is placed as gzip files under the directory
125     * returned by getCacheFile().
126     *
127     * @param rawfiles The map from job ID into crawl.log contents. No null values are allowed in this map.
128     */
129    protected void combine(Map<Long, File> rawfiles) {
130        ++indexingJobCount;
131        long datasetSize = rawfiles.values().size();
132        log.info("Starting combine task #{}. This combines a dataset with {} crawl logs (thread = {})",
133                indexingJobCount, datasetSize, Thread.currentThread().getName());
134
135        File resultDir = getCacheFile(rawfiles.keySet());
136        Set<File> tmpfiles = new HashSet<File>();
137        String indexLocation = resultDir.getAbsolutePath() + ".luceneDir";
138        ThreadPoolExecutor executor = null;
139        try {
140            DigestIndexer indexer = createStandardIndexer(indexLocation);
141            final boolean verboseIndexing = false;
142            DigestOptions indexingOptions = new DigestOptions(this.useBlacklist, verboseIndexing, this.mimeFilter);
143            long count = 0;
144            Set<IndexingState> outstandingJobs = new HashSet<IndexingState>();
145            final int maxThreads = Settings.getInt(HarvesterSettings.INDEXSERVER_INDEXING_MAXTHREADS);
146            executor = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, TimeUnit.MILLISECONDS,
147                    new LinkedBlockingQueue<Runnable>());
148
149            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
150
151            for (Map.Entry<Long, File> entry : rawfiles.entrySet()) {
152                Long jobId = entry.getKey();
153                File crawlLog = entry.getValue();
154                // Generate UUID to ensure a unique filedir for the index.
155                File tmpFile = new File(FileUtils.getTempDir(), UUID.randomUUID().toString());
156                tmpfiles.add(tmpFile);
157                String localindexLocation = tmpFile.getAbsolutePath();
158                Long cached = cdxcache.cache(jobId);
159                if (cached == null) {
160                    log.warn("Skipping the ingest of logs for job {}. Unable to retrieve cdx-file for job.",
161                            entry.getKey());
162                    continue;
163                }
164                File cachedCDXFile = cdxcache.getCacheFile(cached);
165
166                // Dispatch this indexing task to a separate thread that
167                // handles the sorting of the logfiles and the generation
168                // of a lucene index for this crawllog and cdxfile.
169                ++count;
170                String taskID = count + " out of " + datasetSize;
171                log.debug("Making subthread for indexing job " + jobId + " - task " + taskID);
172                Callable<Boolean> task = new DigestIndexerWorker(localindexLocation, jobId, crawlLog, cachedCDXFile,
173                        indexingOptions, taskID);
174                Future<Boolean> result = executor.submit(task);
175                outstandingJobs.add(new IndexingState(jobId, localindexLocation, result));
176            }
177
178            // wait for all the outstanding subtasks to complete.
179            Set<Directory> subindices = new HashSet<Directory>();
180
181            // Deadline for the combine-task
182            long combineTimeout = Settings.getLong(HarvesterSettings.INDEXSERVER_INDEXING_TIMEOUT);
183            long timeOutTime = System.currentTimeMillis() + combineTimeout;
184
185            // The indexwriter for the totalindex.
186            IndexWriter totalIndex = indexer.getIndex();
187            int subindicesInTotalIndex = 0;
188            // Max number of segments in totalindex.
189            int maxSegments = Settings.getInt(HarvesterSettings.INDEXSERVER_INDEXING_MAX_SEGMENTS);
190
191            final int ACCUMULATED_SUBINDICES_BEFORE_MERGING = 200;
192
193            while (outstandingJobs.size() > 0) {
194                log.info("Outstanding jobs in combine task #{} is now {}", indexingJobCount, outstandingJobs.size());
195                Iterator<IndexingState> iterator = outstandingJobs.iterator();
196                if (timeOutTime < System.currentTimeMillis()) {
197                    log.warn("Max indexing time exceeded for one index ({}). Indexing stops here, "
198                            + "although missing subindices for {} jobs",
199                            TimeUtils.readableTimeInterval(combineTimeout), outstandingJobs.size());
200                    break;
201                }
202                while (iterator.hasNext() && subindices.size() < ACCUMULATED_SUBINDICES_BEFORE_MERGING) {
203                    Future<Boolean> nextResult;
204                    IndexingState next = iterator.next();
205                    if (next.getResultObject().isDone()) {
206                        nextResult = next.getResultObject();
207                        try {
208                            // check, if the indexing failed
209                            if (nextResult.get()) {
210                                subindices.add(new SimpleFSDirectory(new File(next.getIndex())));
211                            } else {
212                                log.warn("Indexing of job {} failed.", next.getJobIdentifier());
213                            }
214
215                        } catch (InterruptedException e) {
216                            log.warn("Unable to get Result back from indexing thread", e);
217                        } catch (ExecutionException e) {
218                            log.warn("Unable to get Result back from indexing thread", e);
219                        }
220                        // remove the done object from the set
221                        iterator.remove();
222                    }
223                }
224
225                if (subindices.size() >= ACCUMULATED_SUBINDICES_BEFORE_MERGING) {
226
227                    log.info(
228                            "Adding {} subindices to main index. Forcing index to contain max {} files (related to combine task #{})",
229                            subindices.size(), maxSegments, indexingJobCount);
230                    totalIndex.addIndexes(subindices.toArray(new Directory[0]));
231                    totalIndex.forceMerge(maxSegments);
232                    totalIndex.commit();
233                    for (Directory luceneDir : subindices) {
234                        luceneDir.close();
235                    }
236                    subindicesInTotalIndex += subindices.size();
237                    log.info(
238                            "Completed adding {} subindices to main index, now containing {} subindices(related to combine task #{})",
239                            subindices.size(), subindicesInTotalIndex, indexingJobCount);
240                    subindices.clear();
241                } else {
242                    sleepAwhile();
243                }
244            }
245
246            log.info("Adding the final {} subindices to main index. "
247                    + "Forcing index to contain max {} files (related to combine task #{})", subindices.size(),
248                    maxSegments, indexingJobCount);
249
250            totalIndex.addIndexes(subindices.toArray(new Directory[0]));
251            totalIndex.forceMerge(maxSegments);
252            totalIndex.commit();
253            for (Directory luceneDir : subindices) {
254                luceneDir.close();
255            }
256            subindices.clear();
257
258            log.info("Adding operation completed (combine task #{})!", indexingJobCount);
259            long docsInIndex = totalIndex.numDocs();
260
261            indexer.close();
262            log.info("Closed index (related to combine task #{}", indexingJobCount);
263
264            // Now the index is made, gzip it up.
265            File totalIndexDir = new File(indexLocation);
266            log.info("Gzip-compressing the individual {} index files of combine task # {}",
267                    totalIndexDir.list().length, indexingJobCount);
268            ZipUtils.gzipFiles(totalIndexDir, resultDir);
269            log.info(
270                    "Completed combine task #{} that combined a dataset with {} crawl logs (entries in combined index: {}) - compressed index has size {}",
271                    indexingJobCount, datasetSize, docsInIndex, FileUtils.getHumanReadableFileSize(resultDir));
272        } catch (IOException e) {
273            throw new IOFailure("Error setting up craw.log index framework for " + resultDir.getAbsolutePath(), e);
274        } finally {
275            // close down Threadpool-executor
276            closeDownThreadpoolQuietly(executor);
277            FileUtils.removeRecursively(new File(indexLocation));
278            for (File temporaryFile : tmpfiles) {
279                FileUtils.removeRecursively(temporaryFile);
280            }
281        }
282    }
283
284    /**
285     * Try to release all resources connected to the given ThreadPoolExecutor.
286     *
287     * @param executor a ThreadPoolExecutor
288     */
289    private void closeDownThreadpoolQuietly(ThreadPoolExecutor executor) {
290        if (executor == null) {
291            return;
292        }
293        if (!executor.isShutdown()) {
294            executor.shutdownNow();
295        }
296    }
297
298    /**
299     * Helper class to sleep a little between completeness checks.
300     */
301    private void sleepAwhile() {
302        try {
303            Thread.sleep(sleepintervalBetweenCompletenessChecks);
304        } catch (InterruptedException e) {
305            log.trace("Was awoken early from sleep: ", e);
306        }
307    }
308
309    /**
310     * Ingest a single crawl.log file using the corresponding CDX file to find offsets.
311     *
312     * @param id ID of a job to ingest.
313     * @param crawllogfile The file containing the crawl.log data for the job
314     * @param cdxfile The file containing the cdx data for the job
315     * @param options The digesting options used.
316     * @param indexer The indexer to add to.
317     */
318    protected static void indexFile(Long id, File crawllogfile, File cdxfile, DigestIndexer indexer,
319            DigestOptions options) {
320        log.debug("Ingesting the crawl.log file '{}' related to job {}", crawllogfile.getAbsolutePath(), id);
321        boolean blacklist = options.getUseBlacklist();
322        final String mimefilter = options.getMimeFilter();
323        final boolean verbose = options.getVerboseMode();
324
325        CrawlDataIterator crawlLogIterator = null;
326        File sortedCdxFile = null;
327        File tmpCrawlLog = null;
328        BufferedReader cdxBuffer = null;
329        try {
330            sortedCdxFile = getSortedCDX(cdxfile);
331            cdxBuffer = new BufferedReader(new FileReader(sortedCdxFile));
332            tmpCrawlLog = getSortedCrawlLog(crawllogfile);
333            crawlLogIterator = new CDXOriginCrawlLogIterator(tmpCrawlLog, cdxBuffer);
334            indexer.writeToIndex(crawlLogIterator, mimefilter, blacklist, "ERROR", verbose);
335        } catch (IOException e) {
336            throw new IOFailure("Fatal error indexing " + id, e);
337        } finally {
338            try {
339                if (crawlLogIterator != null) {
340                    crawlLogIterator.close();
341                }
342                if (tmpCrawlLog != null) {
343                    FileUtils.remove(tmpCrawlLog);
344                }
345                if (cdxBuffer != null) {
346                    cdxBuffer.close();
347                }
348                if (sortedCdxFile != null) {
349                    FileUtils.remove(sortedCdxFile);
350                }
351            } catch (IOException e) {
352                log.warn("Error cleaning up after crawl log index cache generation", e);
353            }
354        }
355    }
356
357    /**
358     * Get a sorted, temporary CDX file corresponding to the given CDXfile.
359     *
360     * @param cdxFile A cdxfile
361     * @return A temporary file with CDX info for that just sorted according to the standard CDX sorting rules. This
362     * file will be removed at the exit of the JVM, but should be attempted removed when it is no longer used.
363     */
364    protected static File getSortedCDX(File cdxFile) {
365        try {
366            final File tmpFile = File.createTempFile("sorted", "cdx", FileUtils.getTempDir());
367            // This throws IOFailure, if the sorting operation fails
368            FileUtils.sortCDX(cdxFile, tmpFile);
369            tmpFile.deleteOnExit();
370            return tmpFile;
371        } catch (IOException e) {
372            throw new IOFailure("Error while making tmp file for " + cdxFile, e);
373        }
374    }
375
376    /**
377     * Get a sorted, temporary crawl.log file from an unsorted one.
378     *
379     * @param file The file containing an unsorted crawl.log file.
380     * @return A temporary file containing the entries sorted according to URL. The file will be removed upon exit of
381     * the JVM, but should be attempted removed when it is no longer used.
382     */
383    protected static File getSortedCrawlLog(File file) {
384        try {
385            File tmpCrawlLog = File.createTempFile("sorted", "crawllog", FileUtils.getTempDir());
386            // This throws IOFailure, if the sorting operation fails
387            FileUtils.sortCrawlLog(file, tmpCrawlLog);
388            tmpCrawlLog.deleteOnExit();
389            return tmpCrawlLog;
390        } catch (IOException e) {
391            throw new IOFailure("Error creating sorted crawl log file for '" + file + "'", e);
392        }
393    }
394
395    /**
396     * Create standard deduplication indexer.
397     *
398     * @param indexLocation The full path to the indexing directory
399     * @return the created deduplication indexer.
400     * @throws IOException If unable to open the index.
401     */
402    protected static DigestIndexer createStandardIndexer(String indexLocation) throws IOException {
403        // Setup Lucene for indexing our crawllogs
404        // MODE_BOTH: Both URL's and Hash are indexed: Alternatives:
405        // DigestIndexer.MODE_HASH or DigestIndexer.MODE_URL
406        String indexingMode = DigestIndexer.MODE_BOTH;
407        // used to be 'equivalent' setting
408        boolean includeNormalizedURL = false;
409        // used to be 'timestamp' setting
410        boolean includeTimestamp = true;
411        // used to be 'etag' setting
412        boolean includeEtag = true;
413        boolean addToExistingIndex = false;
414        DigestIndexer indexer = new DigestIndexer(indexLocation, indexingMode, includeNormalizedURL, includeTimestamp,
415                includeEtag, addToExistingIndex);
416        return indexer;
417    }
418
419}