001/*
002 * #%L
003 * Netarchivesuite - wayback
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.wayback.aggregator;
024
025import java.io.File;
026import java.io.IOException;
027import java.text.SimpleDateFormat;
028import java.util.Date;
029import java.util.Timer;
030import java.util.TimerTask;
031import java.util.UUID;
032
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import dk.netarkivet.common.exceptions.ArgumentNotValid;
037import dk.netarkivet.common.utils.CleanupIF;
038import dk.netarkivet.common.utils.FileUtils;
039import dk.netarkivet.common.utils.Settings;
040import dk.netarkivet.wayback.WaybackSettings;
041
042/**
043 * The <code>AggregationWorker</code> singleton contains the schedule and file bookkeeping functionality needed in the
044 * aggregation of indexes.
045 * <p>
046 * The <code>AggregationWorker</code> has the responsibility of ensuring each index in the raw index files ends up
047 * appearing exactly once in the index files used by Wayback. If this isn't possible the fallback is to allow duplicate
048 * occurrences of index lines ensuring index lines appears at least once.
049 */
050public class AggregationWorker implements CleanupIF {
051    /** The AggregationWorker logger. */
052    //private Log log = LogFactory.getLog(getClass().getName());
053    private final Logger log = LoggerFactory.getLogger(AggregationWorker.class);
054    /** The singleton instance. */
055    private static AggregationWorker instance = null;
056    /** The IndexAggregator instance to use for the actual aggregation work. */
057    private IndexAggregator aggregator = new IndexAggregator();
058    /** See WaybackSettings.WAYBACK_AGGREGATOR_TEMP_DIR. */
059    private static File temporaryDir = Settings.getFile(WaybackSettings.WAYBACK_AGGREGATOR_TEMP_DIR);
060    /** See WaybackSettings.WAYBACK_AGGREGATOR_INPUT_DIR). */
061    private static File indexInputDir = Settings.getFile(WaybackSettings.WAYBACK_BATCH_OUTPUTDIR);
062    /** See WaybackSettings.WAYBACK_AGGREGATOR_OUTPUT_DIR. */
063    static File indexOutputDir = Settings.getFile(WaybackSettings.WAYBACK_AGGREGATOR_OUTPUT_DIR);
064    /**
065     * The file to use for creating temporary intermediate index file, which subsequent are merge into the final
066     * intermediate index file.
067     */
068    static File tempIntermediateIndexFile = new File(temporaryDir, "temp_intermediate.index");
069    /**
070     * The file to use for creating temporary final index file, which subsequent are merge into the working final index
071     * file.
072     */
073    static File tempFinalIndexFile = new File(temporaryDir, "temp_final.index");
074    /** The task which is used to schedule the aggregations. */
075    private TimerTask aggregatorTask = null;
076
077    /**
078     * The Files to store sorted indexes until they have been merge into a intermediate index files.
079     */
080    public static final File TEMP_FILE_INDEX = new File(temporaryDir, "temp.index");
081    /**
082     * The intermediate Wayback index file currently used to merge new indexes into. If the intermediate files size
083     * exceeds the WaybackSettings#WAYBACK_AGGREGATOR_INTERMEDIATE_INDEX_FILE_SIZE_LIMIT
084     */
085    public static final File INTERMEDIATE_INDEX_FILE = new File(indexOutputDir, "wayback_intermediate.index");
086    /**
087     * The final Wayback index file currently used to intermediate indexes into. A new working file is created and used,
088     * when the current file size + new indexes would exceed
089     * WaybackSettings#WAYBACK_AGGREGATOR_FINAL_INDEX_FILE_SIZE_LIMIT
090     */
091    public static final File FINAL_INDEX_FILE = new File(indexOutputDir, "wayback.index");
092
093    /**
094     * Factory method which creates a singleton aggregator and sets it running. It has the side effect of creating the
095     * output directories for the indexer if these do not already exist.
096     * <p>
097     * A temp directory is create if it doesn't exist. The aggregator won't run if a temp directory is already present,
098     * as this might indicate an instance of the aggregator already running.
099     *
100     * @return the indexer.
101     */
102    public static synchronized AggregationWorker getInstance() {
103        if (instance == null) {
104            instance = new AggregationWorker();
105        }
106        return instance;
107    }
108
109    /**
110     * Creates an Aggregator and starts the aggregation thread. Only one aggregator will be allowed to run at a time,
111     * {@see #getInstance()}.
112     */
113    private AggregationWorker() {
114        initialize();
115        startAggregationThread();
116    }
117
118    /**
119     * Starts the aggregation task. Only allowed to be called once to avoid aggregation race conditions.
120     */
121    private void startAggregationThread() {
122        if (aggregatorTask == null) {
123            aggregatorTask = new TimerTask() {
124                @Override
125                public void run() {
126                    runAggregation();
127                }
128            };
129            Timer aggregatorThreadTimer = new Timer("AggregatorThread");
130            aggregatorThreadTimer.schedule(aggregatorTask, 0,
131                    Settings.getLong(WaybackSettings.WAYBACK_AGGREGATOR_AGGREGATION_INTERVAL));
132        } else {
133            throw new IllegalStateException("An attempt has been made to start a second aggregation job");
134        }
135    }
136
137    /**
138     * Runs the actual aggregation. See package description for details.
139     * <p>
140     * Is synchronized so several subsequent scheduled runs of the method will have to run one at a time.
141     */
142    protected synchronized void runAggregation() {
143        String[] fileNamesToProcess = indexInputDir.list();
144        if (fileNamesToProcess == null) {
145            log.warn("Input directory '" + indexInputDir.getAbsolutePath()
146                    + "' was not found: skipping this aggregation");
147            return;
148        }
149
150        if (fileNamesToProcess.length == 0) {
151            if (log.isDebugEnabled()) {
152                log.debug("No new raw index files found in '" + indexInputDir.getAbsolutePath()
153                        + "' skipping aggregation");
154            }
155            return;
156        }
157
158        File[] filesToProcess = new File[fileNamesToProcess.length];
159
160        // Convert filename array to file handle array
161        for (int i = 0; i < fileNamesToProcess.length; i++) {
162            File file = new File(indexInputDir, fileNamesToProcess[i]);
163            if (file.isFile()) {
164                filesToProcess[i] = new File(indexInputDir, fileNamesToProcess[i]);
165            } else {
166                throw new ArgumentNotValid("Encountered non-regular file '" + file.getName()
167                        + "' in the index input directory '" + indexInputDir.getAbsolutePath() + "'");
168            }
169        }
170
171        aggregator.sortAndMergeFiles(filesToProcess, TEMP_FILE_INDEX);
172        if (log.isDebugEnabled()) {
173            log.debug("Sorted raw indexes into temporary index file ");
174        }
175
176        // If no Intermediate Index file exist we just promote the temp index
177        // file to working file.
178        // Normally the Intermediate Index file exists and we
179        // need to merge the new indexes into this.
180        if (!INTERMEDIATE_INDEX_FILE.exists()) {
181            TEMP_FILE_INDEX.renameTo(INTERMEDIATE_INDEX_FILE);
182        } else {
183            aggregator.mergeFiles(new File[] {TEMP_FILE_INDEX, INTERMEDIATE_INDEX_FILE}, tempIntermediateIndexFile);
184            tempIntermediateIndexFile.renameTo(INTERMEDIATE_INDEX_FILE);
185            if (log.isDebugEnabled()) {
186                log.debug("Merged temporary index file into intermediate index " + "file '"
187                        + INTERMEDIATE_INDEX_FILE.getAbsolutePath() + "'.");
188            }
189        }
190
191        handlePossibleIntemediateIndexFileLimit();
192
193        // Delete the files which have been processed to avoid processing them
194        // again
195        for (File inputFile : filesToProcess) {
196            inputFile.delete();
197        }
198        TEMP_FILE_INDEX.delete();
199
200    }
201
202    /**
203     * Call the handleFinalIndexFileMerge is case of a exceeded
204     * WaybackSettings.WAYBACK_AGGREGATOR_MAX_INTERMEDIATE_INDEX_FILE_SIZE and ?.
205     */
206    private void handlePossibleIntemediateIndexFileLimit() {
207        if (INTERMEDIATE_INDEX_FILE.length() > 1024 * Settings
208                .getLong(WaybackSettings.WAYBACK_AGGREGATOR_MAX_INTERMEDIATE_INDEX_FILE_SIZE)) {
209            handleFinalIndexFileMerge();
210        }
211    }
212
213    /**
214     * See package description for the concrete handling of larger index files.
215     */
216    private void handleFinalIndexFileMerge() {
217        if (INTERMEDIATE_INDEX_FILE.length() + FINAL_INDEX_FILE.length() > 1024 * Settings
218                .getLong(WaybackSettings.WAYBACK_AGGREGATOR_MAX_MAIN_INDEX_FILE_SIZE)) {
219             renameFinalIndexFile();
220        }
221
222        if (!FINAL_INDEX_FILE.exists()) {
223            INTERMEDIATE_INDEX_FILE.renameTo(FINAL_INDEX_FILE);
224            if (log.isDebugEnabled()) {
225                log.debug("Promoting Intermediate Index file to final index " + "file '"
226                        + FINAL_INDEX_FILE.getAbsolutePath() + "'.");
227            }
228        } else {
229            aggregator.mergeFiles(new File[] {FINAL_INDEX_FILE, INTERMEDIATE_INDEX_FILE}, tempFinalIndexFile);
230            if (log.isDebugEnabled()) {
231                log.debug("Merged intermediate file into final index file");
232            }
233
234            tempFinalIndexFile.renameTo(FINAL_INDEX_FILE);
235
236            INTERMEDIATE_INDEX_FILE.delete();
237        }
238
239        try {
240            INTERMEDIATE_INDEX_FILE.createNewFile();
241        } catch (IOException e) {
242            log.error("Failed to create new Intermediate Index file", e);
243        }
244    }
245
246    /**
247     * Give the FINAL_INDEX_FILE (wayback.index) a unique new name.
248     */
249    private void renameFinalIndexFile() {
250        String timestampString = (new SimpleDateFormat("yyyyMMdd-HHmm")).format(new Date());
251        String newFileName = "wayback." + timestampString +".cdx";
252        File fileToRename = new File(indexOutputDir, FINAL_INDEX_FILE.getName());
253        File newFile = new File(indexOutputDir, newFileName);
254        if (newFile.exists()) {
255            //This should be rare outside tests
256            newFileName = UUID.randomUUID().toString() + "." + newFileName;
257            newFile = new File(indexOutputDir, newFileName);
258        }
259        fileToRename.renameTo(newFile);
260    }
261
262    @Override
263    public void cleanup() {
264        FileUtils.removeRecursively(temporaryDir);
265    }
266
267    /**
268     * Creates the needed working directories. Also checks whether a temp directory exists, which might be an indication
269     * of a unclean shutdown.
270     */
271    protected void initialize() {
272        FileUtils.createDir(indexOutputDir);
273        if (temporaryDir.exists()) {
274            log.warn("An temporary Aggregator dir ("
275                    + Settings.getFile(WaybackSettings.WAYBACK_AGGREGATOR_TEMP_DIR).getAbsolutePath()
276                    + ") already exists. This indicates that the previous "
277                    + "running aggregator wasn't shutdown cleanly. "
278                    + "The temp dirs will be removed and the aggregation " + "on the indexes will be restarted");
279        }
280        FileUtils.removeRecursively(temporaryDir);
281        FileUtils.createDir(temporaryDir);
282    }
283}