001/* 002 * #%L 003 * Netarchivesuite - wayback 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.wayback.aggregator; 024 025import java.io.File; 026import java.io.IOException; 027import java.text.SimpleDateFormat; 028import java.util.Date; 029import java.util.Timer; 030import java.util.TimerTask; 031import java.util.UUID; 032 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import dk.netarkivet.common.exceptions.ArgumentNotValid; 037import dk.netarkivet.common.utils.CleanupIF; 038import dk.netarkivet.common.utils.FileUtils; 039import dk.netarkivet.common.utils.Settings; 040import dk.netarkivet.wayback.WaybackSettings; 041 042/** 043 * The <code>AggregationWorker</code> singleton contains the schedule and file bookkeeping functionality needed in the 044 * aggregation of indexes. 045 * <p> 046 * The <code>AggregationWorker</code> has the responsibility of ensuring each index in the raw index files ends up 047 * appearing exactly once in the index files used by Wayback. If this isn't possible the fallback is to allow duplicate 048 * occurrences of index lines ensuring index lines appears at least once. 049 */ 050public class AggregationWorker implements CleanupIF { 051 /** The AggregationWorker logger. */ 052 //private Log log = LogFactory.getLog(getClass().getName()); 053 private final Logger log = LoggerFactory.getLogger(AggregationWorker.class); 054 /** The singleton instance. */ 055 private static AggregationWorker instance = null; 056 /** The IndexAggregator instance to use for the actual aggregation work. */ 057 private IndexAggregator aggregator = new IndexAggregator(); 058 /** See WaybackSettings.WAYBACK_AGGREGATOR_TEMP_DIR. */ 059 private static File temporaryDir = Settings.getFile(WaybackSettings.WAYBACK_AGGREGATOR_TEMP_DIR); 060 /** See WaybackSettings.WAYBACK_AGGREGATOR_INPUT_DIR). */ 061 private static File indexInputDir = Settings.getFile(WaybackSettings.WAYBACK_BATCH_OUTPUTDIR); 062 /** See WaybackSettings.WAYBACK_AGGREGATOR_OUTPUT_DIR. */ 063 static File indexOutputDir = Settings.getFile(WaybackSettings.WAYBACK_AGGREGATOR_OUTPUT_DIR); 064 /** 065 * The file to use for creating temporary intermediate index file, which subsequent are merge into the final 066 * intermediate index file. 067 */ 068 static File tempIntermediateIndexFile = new File(temporaryDir, "temp_intermediate.index"); 069 /** 070 * The file to use for creating temporary final index file, which subsequent are merge into the working final index 071 * file. 072 */ 073 static File tempFinalIndexFile = new File(temporaryDir, "temp_final.index"); 074 /** The task which is used to schedule the aggregations. */ 075 private TimerTask aggregatorTask = null; 076 077 /** 078 * The Files to store sorted indexes until they have been merge into a intermediate index files. 079 */ 080 public static final File TEMP_FILE_INDEX = new File(temporaryDir, "temp.index"); 081 /** 082 * The intermediate Wayback index file currently used to merge new indexes into. If the intermediate files size 083 * exceeds the WaybackSettings#WAYBACK_AGGREGATOR_INTERMEDIATE_INDEX_FILE_SIZE_LIMIT 084 */ 085 public static final File INTERMEDIATE_INDEX_FILE = new File(indexOutputDir, "wayback_intermediate.index"); 086 /** 087 * The final Wayback index file currently used to intermediate indexes into. A new working file is created and used, 088 * when the current file size + new indexes would exceed 089 * WaybackSettings#WAYBACK_AGGREGATOR_FINAL_INDEX_FILE_SIZE_LIMIT 090 */ 091 public static final File FINAL_INDEX_FILE = new File(indexOutputDir, "wayback.index"); 092 093 /** 094 * Factory method which creates a singleton aggregator and sets it running. It has the side effect of creating the 095 * output directories for the indexer if these do not already exist. 096 * <p> 097 * A temp directory is create if it doesn't exist. The aggregator won't run if a temp directory is already present, 098 * as this might indicate an instance of the aggregator already running. 099 * 100 * @return the indexer. 101 */ 102 public static synchronized AggregationWorker getInstance() { 103 if (instance == null) { 104 instance = new AggregationWorker(); 105 } 106 return instance; 107 } 108 109 /** 110 * Creates an Aggregator and starts the aggregation thread. Only one aggregator will be allowed to run at a time, 111 * {@see #getInstance()}. 112 */ 113 private AggregationWorker() { 114 initialize(); 115 startAggregationThread(); 116 } 117 118 /** 119 * Starts the aggregation task. Only allowed to be called once to avoid aggregation race conditions. 120 */ 121 private void startAggregationThread() { 122 if (aggregatorTask == null) { 123 aggregatorTask = new TimerTask() { 124 @Override 125 public void run() { 126 runAggregation(); 127 } 128 }; 129 Timer aggregatorThreadTimer = new Timer("AggregatorThread"); 130 aggregatorThreadTimer.schedule(aggregatorTask, 0, 131 Settings.getLong(WaybackSettings.WAYBACK_AGGREGATOR_AGGREGATION_INTERVAL)); 132 } else { 133 throw new IllegalStateException("An attempt has been made to start a second aggregation job"); 134 } 135 } 136 137 /** 138 * Runs the actual aggregation. See package description for details. 139 * <p> 140 * Is synchronized so several subsequent scheduled runs of the method will have to run one at a time. 141 */ 142 protected synchronized void runAggregation() { 143 String[] fileNamesToProcess = indexInputDir.list(); 144 if (fileNamesToProcess == null) { 145 log.warn("Input directory '" + indexInputDir.getAbsolutePath() 146 + "' was not found: skipping this aggregation"); 147 return; 148 } 149 150 if (fileNamesToProcess.length == 0) { 151 if (log.isDebugEnabled()) { 152 log.debug("No new raw index files found in '" + indexInputDir.getAbsolutePath() 153 + "' skipping aggregation"); 154 } 155 return; 156 } 157 158 File[] filesToProcess = new File[fileNamesToProcess.length]; 159 160 // Convert filename array to file handle array 161 for (int i = 0; i < fileNamesToProcess.length; i++) { 162 File file = new File(indexInputDir, fileNamesToProcess[i]); 163 if (file.isFile()) { 164 filesToProcess[i] = new File(indexInputDir, fileNamesToProcess[i]); 165 } else { 166 throw new ArgumentNotValid("Encountered non-regular file '" + file.getName() 167 + "' in the index input directory '" + indexInputDir.getAbsolutePath() + "'"); 168 } 169 } 170 171 aggregator.sortAndMergeFiles(filesToProcess, TEMP_FILE_INDEX); 172 if (log.isDebugEnabled()) { 173 log.debug("Sorted raw indexes into temporary index file "); 174 } 175 176 // If no Intermediate Index file exist we just promote the temp index 177 // file to working file. 178 // Normally the Intermediate Index file exists and we 179 // need to merge the new indexes into this. 180 if (!INTERMEDIATE_INDEX_FILE.exists()) { 181 TEMP_FILE_INDEX.renameTo(INTERMEDIATE_INDEX_FILE); 182 } else { 183 aggregator.mergeFiles(new File[] {TEMP_FILE_INDEX, INTERMEDIATE_INDEX_FILE}, tempIntermediateIndexFile); 184 tempIntermediateIndexFile.renameTo(INTERMEDIATE_INDEX_FILE); 185 if (log.isDebugEnabled()) { 186 log.debug("Merged temporary index file into intermediate index " + "file '" 187 + INTERMEDIATE_INDEX_FILE.getAbsolutePath() + "'."); 188 } 189 } 190 191 handlePossibleIntemediateIndexFileLimit(); 192 193 // Delete the files which have been processed to avoid processing them 194 // again 195 for (File inputFile : filesToProcess) { 196 inputFile.delete(); 197 } 198 TEMP_FILE_INDEX.delete(); 199 200 } 201 202 /** 203 * Call the handleFinalIndexFileMerge is case of a exceeded 204 * WaybackSettings.WAYBACK_AGGREGATOR_MAX_INTERMEDIATE_INDEX_FILE_SIZE and ?. 205 */ 206 private void handlePossibleIntemediateIndexFileLimit() { 207 if (INTERMEDIATE_INDEX_FILE.length() > 1024 * Settings 208 .getLong(WaybackSettings.WAYBACK_AGGREGATOR_MAX_INTERMEDIATE_INDEX_FILE_SIZE)) { 209 handleFinalIndexFileMerge(); 210 } 211 } 212 213 /** 214 * See package description for the concrete handling of larger index files. 215 */ 216 private void handleFinalIndexFileMerge() { 217 if (INTERMEDIATE_INDEX_FILE.length() + FINAL_INDEX_FILE.length() > 1024 * Settings 218 .getLong(WaybackSettings.WAYBACK_AGGREGATOR_MAX_MAIN_INDEX_FILE_SIZE)) { 219 renameFinalIndexFile(); 220 } 221 222 if (!FINAL_INDEX_FILE.exists()) { 223 INTERMEDIATE_INDEX_FILE.renameTo(FINAL_INDEX_FILE); 224 if (log.isDebugEnabled()) { 225 log.debug("Promoting Intermediate Index file to final index " + "file '" 226 + FINAL_INDEX_FILE.getAbsolutePath() + "'."); 227 } 228 } else { 229 aggregator.mergeFiles(new File[] {FINAL_INDEX_FILE, INTERMEDIATE_INDEX_FILE}, tempFinalIndexFile); 230 if (log.isDebugEnabled()) { 231 log.debug("Merged intermediate file into final index file"); 232 } 233 234 tempFinalIndexFile.renameTo(FINAL_INDEX_FILE); 235 236 INTERMEDIATE_INDEX_FILE.delete(); 237 } 238 239 try { 240 INTERMEDIATE_INDEX_FILE.createNewFile(); 241 } catch (IOException e) { 242 log.error("Failed to create new Intermediate Index file", e); 243 } 244 } 245 246 /** 247 * Give the FINAL_INDEX_FILE (wayback.index) a unique new name. 248 */ 249 private void renameFinalIndexFile() { 250 String timestampString = (new SimpleDateFormat("yyyyMMdd-HHmm")).format(new Date()); 251 String newFileName = "wayback." + timestampString +".cdx"; 252 File fileToRename = new File(indexOutputDir, FINAL_INDEX_FILE.getName()); 253 File newFile = new File(indexOutputDir, newFileName); 254 if (newFile.exists()) { 255 //This should be rare outside tests 256 newFileName = UUID.randomUUID().toString() + "." + newFileName; 257 newFile = new File(indexOutputDir, newFileName); 258 } 259 fileToRename.renameTo(newFile); 260 } 261 262 @Override 263 public void cleanup() { 264 FileUtils.removeRecursively(temporaryDir); 265 } 266 267 /** 268 * Creates the needed working directories. Also checks whether a temp directory exists, which might be an indication 269 * of a unclean shutdown. 270 */ 271 protected void initialize() { 272 FileUtils.createDir(indexOutputDir); 273 if (temporaryDir.exists()) { 274 log.warn("An temporary Aggregator dir (" 275 + Settings.getFile(WaybackSettings.WAYBACK_AGGREGATOR_TEMP_DIR).getAbsolutePath() 276 + ") already exists. This indicates that the previous " 277 + "running aggregator wasn't shutdown cleanly. " 278 + "The temp dirs will be removed and the aggregation " + "on the indexes will be restarted"); 279 } 280 FileUtils.removeRecursively(temporaryDir); 281 FileUtils.createDir(temporaryDir); 282 } 283}