001/* 002 * #%L 003 * Netarchivesuite - wayback 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.wayback.indexer; 024 025import java.util.List; 026import java.util.concurrent.LinkedBlockingQueue; 027 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * Singleton class which maintains the basic data structure and methods for the indexer. 033 */ 034public class IndexerQueue { 035 036 /** The logger for this class. */ 037 private static final Logger log = LoggerFactory.getLogger(IndexerQueue.class); 038 039 /** The unique instance of this class. */ 040 private static IndexerQueue instance; 041 042 /** This is the basic underlying datastructure of the indexer - a queue of files waiting to be indexed. */ 043 private static LinkedBlockingQueue<ArchiveFile> queue; 044 045 /** 046 * Factory method for obtaining the unique instance of this class. 047 * 048 * @return the instance. 049 */ 050 public static synchronized IndexerQueue getInstance() { 051 if (instance == null) { 052 instance = new IndexerQueue(); 053 } 054 return instance; 055 } 056 057 /** 058 * Private constructor for this method. Initialises an empty queue. 059 */ 060 private IndexerQueue() { 061 queue = new LinkedBlockingQueue<ArchiveFile>(); 062 } 063 064 /** 065 * Check the database for any new ArchiveFile objects and add them to the queue. 066 */ 067 public synchronized void populate() { 068 List<ArchiveFile> files = (new ArchiveFileDAO()).getFilesAwaitingIndexing(); 069 if (!files.isEmpty()) { 070 log.info("Will now add '{}' unindexed files from object store to queue (if they are not already queued).", 071 files.size()); 072 } 073 for (ArchiveFile file : files) { 074 if (!queue.contains(file)) { 075 log.info("Adding file '{}' to indexing queue.", file.getFilename()); 076 queue.add(file); 077 log.info("Files in queue: '{}'", queue.size()); 078 } 079 } 080 } 081 082 /** 083 * Sequentially take objects from the queue and index them, blocking indefinitely while waiting for new objects to 084 * be added to the queue. It is intended that multiple threads should run this method simultaneously. 085 */ 086 public void consume() { 087 while (true) { 088 try { 089 ArchiveFile file = null; 090 try { 091 file = queue.take(); 092 log.info("Taken file '{}' from indexing queue.", file.getFilename()); 093 log.info("Files in queue: '{}'", queue.size()); 094 } catch (InterruptedException e) { 095 log.error("Unexpected interrupt in indexer while waiting for new elements", e); 096 } 097 file.index(); 098 } catch (Exception e) { // Fault Barrier 099 log.warn("Caught exception at fault barrier for {}", Thread.currentThread().getName(), e); 100 } 101 } 102 } 103 104 /** 105 * Convenience method for use in unit tests. 106 */ 107 protected static void resestSingleton() { 108 instance = null; 109 if (queue != null) { 110 queue.clear(); 111 } 112 } 113}