001/*
002 * #%L
003 * Netarchivesuite - wayback
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.wayback.indexer;
024
025import java.util.List;
026import java.util.concurrent.LinkedBlockingQueue;
027
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * Singleton class which maintains the basic data structure and methods for the indexer.
033 */
034public class IndexerQueue {
035
036    /** The logger for this class. */
037    private static final Logger log = LoggerFactory.getLogger(IndexerQueue.class);
038
039    /** The unique instance of this class. */
040    private static IndexerQueue instance;
041
042    /** This is the basic underlying datastructure of the indexer - a queue of files waiting to be indexed. */
043    private static LinkedBlockingQueue<ArchiveFile> queue;
044
045    /**
046     * Factory method for obtaining the unique instance of this class.
047     *
048     * @return the instance.
049     */
050    public static synchronized IndexerQueue getInstance() {
051        if (instance == null) {
052            instance = new IndexerQueue();
053        }
054        return instance;
055    }
056
057    /**
058     * Private constructor for this method. Initialises an empty queue.
059     */
060    private IndexerQueue() {
061        queue = new LinkedBlockingQueue<ArchiveFile>();
062    }
063
064    /**
065     * Check the database for any new ArchiveFile objects and add them to the queue.
066     */
067    public synchronized void populate() {
068        List<ArchiveFile> files = (new ArchiveFileDAO()).getFilesAwaitingIndexing();
069        if (!files.isEmpty()) {
070            log.info("Will now add '{}' unindexed files from object store to queue (if they are not already queued).",
071                    files.size());
072        }
073        for (ArchiveFile file : files) {
074            if (!queue.contains(file)) {
075                log.info("Adding file '{}' to indexing queue.", file.getFilename());
076                queue.add(file);
077                log.info("Files in queue: '{}'", queue.size());
078            }
079        }
080    }
081
082    /**
083     * Sequentially take objects from the queue and index them, blocking indefinitely while waiting for new objects to
084     * be added to the queue. It is intended that multiple threads should run this method simultaneously.
085     */
086    public void consume() {
087        while (true) {
088            try {
089                ArchiveFile file = null;
090                try {
091                    file = queue.take();
092                    log.info("Taken file '{}' from indexing queue.", file.getFilename());
093                    log.info("Files in queue: '{}'", queue.size());
094                } catch (InterruptedException e) {
095                    log.error("Unexpected interrupt in indexer while waiting for new elements", e);
096                }
097                file.index();
098            } catch (Exception e) { // Fault Barrier
099                log.warn("Caught exception at fault barrier for {}", Thread.currentThread().getName(), e);
100            }
101        }
102    }
103
104    /**
105     * Convenience method for use in unit tests.
106     */
107    protected static void resestSingleton() {
108        instance = null;
109        if (queue != null) {
110            queue.clear();
111        }
112    }
113}