001/*
002 * #%L
003 * Netarchivesuite - wayback
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.wayback.indexer;
024
025import java.io.BufferedReader;
026import java.io.File;
027import java.io.FileNotFoundException;
028import java.io.FileReader;
029import java.io.IOException;
030import java.util.Date;
031import java.util.Timer;
032import java.util.TimerTask;
033
034import org.apache.commons.io.IOUtils;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import dk.netarkivet.common.exceptions.ArgumentNotValid;
039import dk.netarkivet.common.exceptions.IOFailure;
040import dk.netarkivet.common.exceptions.UnknownID;
041import dk.netarkivet.common.utils.CleanupIF;
042import dk.netarkivet.common.utils.FileUtils;
043import dk.netarkivet.common.utils.Settings;
044import dk.netarkivet.wayback.WaybackSettings;
045
046/**
047 * The WaybackIndexer starts threads to find new files to be indexed and indexes them.
048 * <p>
049 * There is 1 producer thread which runs as a timer thread, for example once a day, and runs first a FileNameHarvester
050 * to get a list of all files in the archive after which it fills the indexer queue with any new files found.
051 * <p>
052 * Simultaneously there is a family of consumer threads which wait for the queue to be populated and take elements from
053 * it and index them.
054 */
055public class WaybackIndexer implements CleanupIF {
056
057    /** The logger for this class. */
058    private static final Logger log = LoggerFactory.getLogger(WaybackIndexer.class);
059
060    /** The singleton instance of this class. */
061    private static WaybackIndexer instance;
062
063    /**
064     * Factory method which creates a singleton wayback indexer and sets it running. It has the side effect of creating
065     * the output directories for the indexer if these do not already exist. It also reads files for the initial ingest
066     * if necessary.
067     *
068     * @return the indexer.
069     */
070    public static synchronized WaybackIndexer getInstance() {
071        if (instance == null) {
072            instance = new WaybackIndexer();
073        }
074        return instance;
075    }
076
077    /**
078     * Private constructor.
079     */
080    private WaybackIndexer() {
081        File temporaryBatchDir = Settings.getFile(WaybackSettings.WAYBACK_INDEX_TEMPDIR);
082        File batchOutputDir = Settings.getFile(WaybackSettings.WAYBACK_BATCH_OUTPUTDIR);
083        FileUtils.createDir(temporaryBatchDir);
084        FileUtils.createDir(batchOutputDir);
085        ingestInitialFiles();
086        startProducerThread();
087        startConsumerThreads();
088    }
089
090    /**
091     * The file represented by WAYBACK_INDEXER_INITIAL_FILES is read line by line and each line is ingested as an
092     * already-indexed archive file.
093     */
094    private static void ingestInitialFiles() {
095        String initialFileString = Settings.get(WaybackSettings.WAYBACK_INDEXER_INITIAL_FILES);
096        if ("".equals(initialFileString)) {
097            log.info("No initial list of indexed files is set");
098            return;
099        }
100        File initialFile = null;
101        try {
102            initialFile = Settings.getFile(WaybackSettings.WAYBACK_INDEXER_INITIAL_FILES);
103        } catch (UnknownID e) {
104            log.info("No initial list of indexed files is set");
105            return;
106        }
107        if (!initialFile.isFile()) {
108            throw new ArgumentNotValid("The file '" + initialFile.getAbsolutePath()
109                    + "' does not exist or is not a file");
110        }
111        BufferedReader br = null;
112        try {
113            br = new BufferedReader(new FileReader(initialFile));
114        } catch (FileNotFoundException e) {
115            throw new IOFailure("Could not find file '" + initialFile + "'", e);
116        }
117        String fileName = null;
118        ArchiveFileDAO dao = new ArchiveFileDAO();
119        Date today = new Date();
120        try {
121            while ((fileName = br.readLine()) != null) {
122                ArchiveFile archiveFile = new ArchiveFile();
123                archiveFile.setFilename(fileName);
124                archiveFile.setIndexed(true);
125                archiveFile.setIndexedDate(today);
126                if (!dao.exists(fileName)) {
127                    log.info("Ingesting '{}'", fileName);
128                    dao.create(archiveFile);
129                }
130            }
131        } catch (IOException e) {
132            throw new IOFailure("Error reading file", e);
133        } finally {
134            IOUtils.closeQuietly(br);
135        }
136    }
137
138    /**
139     * Starts the consumer threads which do the indexing by sending concurrent batch jobs to the arcrepository.
140     */
141    private static void startConsumerThreads() {
142        int consumerThreads = Settings.getInt(WaybackSettings.WAYBACK_INDEXER_CONSUMER_THREADS);
143        for (int threadNumber = 0; threadNumber < consumerThreads; threadNumber++) {
144            new Thread("ConsumerThread-" + threadNumber) {
145
146                @Override
147                public void run() {
148                    super.run();
149                    log.info("Started thread '{}'", Thread.currentThread().getName());
150                    IndexerQueue.getInstance().consume();
151                    log.info("Ending thread '{}'", Thread.currentThread().getName());
152
153                }
154            }.start();
155        }
156    }
157
158    /**
159     * Starts the producer thread. This thread runs on a timer. It downloads a list of all files in the archive and adds
160     * any new ones to the database. It then checks the database for unindexed files and adds them to the queue.
161     */
162    private static void startProducerThread() {
163        Long producerDelay = Settings.getLong(WaybackSettings.WAYBACK_INDEXER_PRODUCER_DELAY);
164        Long producerInterval = Settings.getLong(WaybackSettings.WAYBACK_INDEXER_PRODUCER_INTERVAL);
165        Long recentProducerInterval = Settings.getLong(WaybackSettings.WAYBACK_INDEXER_RECENT_PRODUCER_INTERVAL);
166        TimerTask completeProducerTask = new TimerTask() {
167            @Override
168            public void run() {
169                log.info("Starting producer task for all filenames");
170                IndexerQueue iq = IndexerQueue.getInstance();
171                iq.populate();
172                FileNameHarvester.harvestAllFilenames();
173                iq.populate();
174            }
175        };
176        Timer producerThreadTimer = new Timer("ProducerThread");
177        producerThreadTimer.schedule(completeProducerTask, producerDelay, producerInterval);
178        TimerTask recentProducerTask = new TimerTask() {
179            @Override
180            public void run() {
181                log.info("Starting producer task for recent files");
182                IndexerQueue iq = IndexerQueue.getInstance();
183                iq.populate();
184                FileNameHarvester.harvestRecentFilenames();
185                iq.populate();
186            }
187        };
188        producerThreadTimer.schedule(recentProducerTask, producerDelay, recentProducerInterval);
189    }
190
191    /**
192     * Performs any necessary cleanup functions. These include cleaning any partial batch output from the temporary
193     * batch output file and closing the hibernate session factory.
194     */
195    public void cleanup() {
196        log.info("Cleaning up WaybackIndexer");
197        File temporaryBatchDir = Settings.getFile(WaybackSettings.WAYBACK_INDEX_TEMPDIR);
198        FileUtils.removeRecursively(temporaryBatchDir);
199        HibernateUtil.getSession().getSessionFactory().close();
200    }
201}