001/* 002 * #%L 003 * Netarchivesuite - wayback 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.wayback.indexer; 024 025import java.io.BufferedReader; 026import java.io.File; 027import java.io.FileNotFoundException; 028import java.io.FileReader; 029import java.io.IOException; 030import java.util.Date; 031import java.util.Timer; 032import java.util.TimerTask; 033 034import org.apache.commons.io.IOUtils; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import dk.netarkivet.common.exceptions.ArgumentNotValid; 039import dk.netarkivet.common.exceptions.IOFailure; 040import dk.netarkivet.common.exceptions.UnknownID; 041import dk.netarkivet.common.utils.CleanupIF; 042import dk.netarkivet.common.utils.FileUtils; 043import dk.netarkivet.common.utils.Settings; 044import dk.netarkivet.wayback.WaybackSettings; 045 046/** 047 * The WaybackIndexer starts threads to find new files to be indexed and indexes them. 048 * <p> 049 * There is 1 producer thread which runs as a timer thread, for example once a day, and runs first a FileNameHarvester 050 * to get a list of all files in the archive after which it fills the indexer queue with any new files found. 051 * <p> 052 * Simultaneously there is a family of consumer threads which wait for the queue to be populated and take elements from 053 * it and index them. 054 */ 055public class WaybackIndexer implements CleanupIF { 056 057 /** The logger for this class. */ 058 private static final Logger log = LoggerFactory.getLogger(WaybackIndexer.class); 059 060 /** The singleton instance of this class. */ 061 private static WaybackIndexer instance; 062 063 /** 064 * Factory method which creates a singleton wayback indexer and sets it running. It has the side effect of creating 065 * the output directories for the indexer if these do not already exist. It also reads files for the initial ingest 066 * if necessary. 067 * 068 * @return the indexer. 069 */ 070 public static synchronized WaybackIndexer getInstance() { 071 if (instance == null) { 072 instance = new WaybackIndexer(); 073 } 074 return instance; 075 } 076 077 /** 078 * Private constructor. 079 */ 080 private WaybackIndexer() { 081 File temporaryBatchDir = Settings.getFile(WaybackSettings.WAYBACK_INDEX_TEMPDIR); 082 File batchOutputDir = Settings.getFile(WaybackSettings.WAYBACK_BATCH_OUTPUTDIR); 083 FileUtils.createDir(temporaryBatchDir); 084 FileUtils.createDir(batchOutputDir); 085 ingestInitialFiles(); 086 startProducerThread(); 087 startConsumerThreads(); 088 } 089 090 /** 091 * The file represented by WAYBACK_INDEXER_INITIAL_FILES is read line by line and each line is ingested as an 092 * already-indexed archive file. 093 */ 094 private static void ingestInitialFiles() { 095 String initialFileString = Settings.get(WaybackSettings.WAYBACK_INDEXER_INITIAL_FILES); 096 if ("".equals(initialFileString)) { 097 log.info("No initial list of indexed files is set"); 098 return; 099 } 100 File initialFile = null; 101 try { 102 initialFile = Settings.getFile(WaybackSettings.WAYBACK_INDEXER_INITIAL_FILES); 103 } catch (UnknownID e) { 104 log.info("No initial list of indexed files is set"); 105 return; 106 } 107 if (!initialFile.isFile()) { 108 throw new ArgumentNotValid("The file '" + initialFile.getAbsolutePath() 109 + "' does not exist or is not a file"); 110 } 111 BufferedReader br = null; 112 try { 113 br = new BufferedReader(new FileReader(initialFile)); 114 } catch (FileNotFoundException e) { 115 throw new IOFailure("Could not find file '" + initialFile + "'", e); 116 } 117 String fileName = null; 118 ArchiveFileDAO dao = new ArchiveFileDAO(); 119 Date today = new Date(); 120 try { 121 while ((fileName = br.readLine()) != null) { 122 ArchiveFile archiveFile = new ArchiveFile(); 123 archiveFile.setFilename(fileName); 124 archiveFile.setIndexed(true); 125 archiveFile.setIndexedDate(today); 126 if (!dao.exists(fileName)) { 127 log.info("Ingesting '{}'", fileName); 128 dao.create(archiveFile); 129 } 130 } 131 } catch (IOException e) { 132 throw new IOFailure("Error reading file", e); 133 } finally { 134 IOUtils.closeQuietly(br); 135 } 136 } 137 138 /** 139 * Starts the consumer threads which do the indexing by sending concurrent batch jobs to the arcrepository. 140 */ 141 private static void startConsumerThreads() { 142 int consumerThreads = Settings.getInt(WaybackSettings.WAYBACK_INDEXER_CONSUMER_THREADS); 143 for (int threadNumber = 0; threadNumber < consumerThreads; threadNumber++) { 144 new Thread("ConsumerThread-" + threadNumber) { 145 146 @Override 147 public void run() { 148 super.run(); 149 log.info("Started thread '{}'", Thread.currentThread().getName()); 150 IndexerQueue.getInstance().consume(); 151 log.info("Ending thread '{}'", Thread.currentThread().getName()); 152 153 } 154 }.start(); 155 } 156 } 157 158 /** 159 * Starts the producer thread. This thread runs on a timer. It downloads a list of all files in the archive and adds 160 * any new ones to the database. It then checks the database for unindexed files and adds them to the queue. 161 */ 162 private static void startProducerThread() { 163 Long producerDelay = Settings.getLong(WaybackSettings.WAYBACK_INDEXER_PRODUCER_DELAY); 164 Long producerInterval = Settings.getLong(WaybackSettings.WAYBACK_INDEXER_PRODUCER_INTERVAL); 165 Long recentProducerInterval = Settings.getLong(WaybackSettings.WAYBACK_INDEXER_RECENT_PRODUCER_INTERVAL); 166 TimerTask completeProducerTask = new TimerTask() { 167 @Override 168 public void run() { 169 log.info("Starting producer task for all filenames"); 170 IndexerQueue iq = IndexerQueue.getInstance(); 171 iq.populate(); 172 FileNameHarvester.harvestAllFilenames(); 173 iq.populate(); 174 } 175 }; 176 Timer producerThreadTimer = new Timer("ProducerThread"); 177 producerThreadTimer.schedule(completeProducerTask, producerDelay, producerInterval); 178 TimerTask recentProducerTask = new TimerTask() { 179 @Override 180 public void run() { 181 log.info("Starting producer task for recent files"); 182 IndexerQueue iq = IndexerQueue.getInstance(); 183 iq.populate(); 184 FileNameHarvester.harvestRecentFilenames(); 185 iq.populate(); 186 } 187 }; 188 producerThreadTimer.schedule(recentProducerTask, producerDelay, recentProducerInterval); 189 } 190 191 /** 192 * Performs any necessary cleanup functions. These include cleaning any partial batch output from the temporary 193 * batch output file and closing the hibernate session factory. 194 */ 195 public void cleanup() { 196 log.info("Cleaning up WaybackIndexer"); 197 File temporaryBatchDir = Settings.getFile(WaybackSettings.WAYBACK_INDEX_TEMPDIR); 198 FileUtils.removeRecursively(temporaryBatchDir); 199 HibernateUtil.getSession().getSessionFactory().close(); 200 } 201}