001/*
002 * #%L
003 * Netarchivesuite - archive
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.archive.bitarchive;
024
025import java.io.File;
026import java.io.FileOutputStream;
027import java.io.IOException;
028import java.io.OutputStream;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.List;
035import java.util.Map;
036import java.util.Observable;
037import java.util.Set;
038import java.util.Timer;
039import java.util.TimerTask;
040
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import dk.netarkivet.archive.ArchiveSettings;
045import dk.netarkivet.common.distribute.ChannelID;
046import dk.netarkivet.common.distribute.RemoteFile;
047import dk.netarkivet.common.exceptions.ArgumentNotValid;
048import dk.netarkivet.common.exceptions.IOFailure;
049import dk.netarkivet.common.utils.CleanupIF;
050import dk.netarkivet.common.utils.ExceptionUtils;
051import dk.netarkivet.common.utils.FileUtils;
052import dk.netarkivet.common.utils.Settings;
053import dk.netarkivet.common.utils.StringUtils;
054import dk.netarkivet.common.utils.batch.FileBatchJob;
055
056/**
057 * Class representing the monitor for bitarchives. The monitor is used for sending out and combining the results of
058 * executing batch jobs.
059 * <p>
060 * Registers outgoing batchjobs to bitarchives, and handles replies from bitarchives, finally notifying observers when
061 * all bitarchives have replied, or when the batch times out, after a time specified in settings.
062 * <p>
063 * We wait for replies from bitarchives that are considered live when the batch begins. A bitarchive is considered live
064 * if we have heard any activity from it within a time specified in settings.
065 */
066public class BitarchiveMonitor extends Observable implements CleanupIF {
067
068    /** The current instance. */
069    private static BitarchiveMonitor instance;
070
071    /** The time of the latest sign of life received from each bitarchive. */
072    private Map<String, Long> bitarchiveSignsOfLife = Collections.synchronizedMap(new HashMap<String, Long>());
073
074    /** The acceptable delay in milliseconds between signs of life. */
075    private final long acceptableSignOfLifeDelay;
076
077    /**
078     * Map from the ID of batch jobs sent to bitarchives, to tuple class of status for this batch job. The Map contains
079     * all batch jobs currently running.
080     */
081    private Map<String, BatchJobStatus> runningBatchJobs = Collections
082            .synchronizedMap(new HashMap<String, BatchJobStatus>());
083
084    /** Whether the timer will be created as a daemon-thread. */
085    private static final boolean IS_DAEMON = true;
086
087    /** The timer for keeping track of running batchjobs. */
088    protected final Timer batchTimer = new Timer(IS_DAEMON);
089
090    /** Logger for this class. */
091    private static final Logger log = LoggerFactory.getLogger(BitarchiveMonitor.class);
092
093    /**
094     * Initialises the bitarchive monitor. During this, the acceptable delay between signs of life and the timeout
095     * setting for batchjobs are read and logged.
096     */
097    private BitarchiveMonitor() {
098        acceptableSignOfLifeDelay = Settings.getLong(ArchiveSettings.BITARCHIVE_ACCEPTABLE_HEARTBEAT_DELAY);
099        log.info("Bitarchive liveness times out after {} milliseconds.", acceptableSignOfLifeDelay);
100    }
101
102    /**
103     * Method for retrieving the current instance. If no instance has been instantiated, then a new one will be created.
104     *
105     * @return The current instance of the BitarchiveMonitor.
106     */
107    public static synchronized BitarchiveMonitor getInstance() {
108        if (instance == null) {
109            instance = new BitarchiveMonitor();
110        }
111        return instance;
112    }
113
114    /**
115     * Registers a sign of life from a bitarchive. This method logs when new bit archives present themselves.
116     *
117     * @param appID the ID of the bitarchive that generated the life sign
118     */
119    public void signOfLife(String appID) {
120        ArgumentNotValid.checkNotNullOrEmpty(appID, "String appID");
121        long now = System.currentTimeMillis();
122        if ((!bitarchiveSignsOfLife.containsKey(appID))) {
123            log.info("Bitarchive '{}' is now known by the bitarchive monitor", appID);
124        }
125        log.trace("Received sign of life from bitarchive '{}'", appID);
126        bitarchiveSignsOfLife.put(appID, now);
127    }
128
129    /**
130     * Register a new batch sent to the bitarchives.
131     * <p>
132     * This registers a new batchstatus object, with a list of live bitarchives awaiting reply, and a timer task letting
133     * the job time out after the specified time.
134     *
135     * @param requestID The ID of the batch request.
136     * @param requestReplyTo The replyTo channel of the batch request.
137     * @param bitarchiveBatchID The ID of the batch job sent on to the bit archives.
138     * @param timeout Timeout of specific batch job.
139     * @throws ArgumentNotValid If any argument is null, or either string is empty.
140     */
141    public void registerBatch(String requestID, ChannelID requestReplyTo, String bitarchiveBatchID, long timeout)
142            throws ArgumentNotValid {
143        ArgumentNotValid.checkNotNullOrEmpty(requestID, "String requestID");
144        ArgumentNotValid.checkNotNull(requestReplyTo, "ChannelID requestReplyTo");
145        ArgumentNotValid.checkNotNullOrEmpty(bitarchiveBatchID, "String bitarchiveBatchID");
146        BatchJobStatus bjs = new BatchJobStatus(requestID, requestReplyTo, bitarchiveBatchID,
147                getRunningBitarchiveIDs(), timeout);
148        runningBatchJobs.put(bitarchiveBatchID, bjs);
149        log.info("Registered Batch job from {} with timeout {}. Number of outstanding batchjobs are now: {}",
150                requestID, timeout, runningBatchJobs.size());
151    }
152
153    /**
154     * Generate a set of bitarchiveIDs that are considered live.
155     *
156     * @return Set of IDs of active bitarchives
157     */
158    private Set<String> getRunningBitarchiveIDs() {
159        Map<String, Long> signsOfLifeCopy;
160        long now;
161        synchronized (bitarchiveSignsOfLife) {
162            now = System.currentTimeMillis();
163            signsOfLifeCopy = new HashMap<String, Long>(bitarchiveSignsOfLife);
164        }
165        Set<String> runningApps = new HashSet<String>();
166        for (Map.Entry<String, Long> baID : signsOfLifeCopy.entrySet()) {
167            if (baID.getValue() + acceptableSignOfLifeDelay > now) {
168                runningApps.add(baID.getKey());
169            } else {
170                log.warn(
171                        "Not listening for replies from the bitarchive '{}' which hasn't shown signs of life in {} milliseconds",
172                        baID.getKey(), (now - baID.getValue()));
173                // Remove the bitarchive to ensure this warning is not logged
174                // more than once, and a new message is logged when it returns.
175                bitarchiveSignsOfLife.remove(baID.getKey());
176            }
177        }
178        return runningApps;
179    }
180
181    /**
182     * Handle a reply received from a bitarchive.
183     * <p>
184     * This method registers the information from the bitarchive in the batch status for this job, if any (otherwise
185     * logs and quits).
186     * <p>
187     * If this is the last bitarchive we were missing replies from, notify observers with the batch status for this job.
188     * <p>
189     * TODO why are the 'exceptions' argument not used?
190     *
191     * @param bitarchiveBatchID The ID of the batch job sent on to the bit archives.
192     * @param bitarchiveID The ID of the replying bitarchive.
193     * @param noOfFilesProcessed The number of files the bitarchive has processed.
194     * @param filesFailed A collection of filenames of failed files in that bitarchive. Might be null if no files
195     * failed.
196     * @param remoteFile A remote pointer to a file with results from that bitarchive. Might be null if job was not OK.
197     * @param errMsg An error message, if the job was not successful on the bitarchive, or null for none.
198     * @param exceptions A list of exceptions caught during batch processing.
199     * @throws ArgumentNotValid If either ID is null.
200     */
201    public void bitarchiveReply(String bitarchiveBatchID, String bitarchiveID, int noOfFilesProcessed,
202            Collection<File> filesFailed, RemoteFile remoteFile, String errMsg,
203            List<FileBatchJob.ExceptionOccurrence> exceptions) throws ArgumentNotValid {
204        ArgumentNotValid.checkNotNullOrEmpty(bitarchiveBatchID, "String bitarchiveBatchID");
205        ArgumentNotValid.checkNotNullOrEmpty(bitarchiveID, "String bitarchiveID");
206        ArgumentNotValid.checkNotNegative(noOfFilesProcessed, "int noOfFilesProcessed");
207
208        BatchJobStatus bjs = runningBatchJobs.get(bitarchiveBatchID);
209        if (bjs == null) {
210            // If the batch ID does not correspond to any of the pending batch
211            // jobs, just log and ignore the message.
212            log.debug(
213                    "The batch ID '{}' of the received reply from bitarchives does not correspond to any pending batch job. "
214                            + "Ignoring and deleting RemoteFile '{}'." + "Only knows batchjob with IDs: {}",
215                    bitarchiveBatchID, remoteFile, runningBatchJobs.keySet());
216
217            if (remoteFile != null) {
218                remoteFile.cleanup();
219            }
220        } else {
221            bjs.updateWithBitarchiveReply(bitarchiveID, noOfFilesProcessed, filesFailed, remoteFile, errMsg);
222        }
223    }
224
225    /**
226     * Notifies observers that the given batch job has ended.
227     *
228     * @param batchJobStatus The batch job that has ended.
229     */
230    private void notifyBatchEnded(BatchJobStatus batchJobStatus) {
231        runningBatchJobs.remove(batchJobStatus.bitarchiveBatchID);
232        // Notify observers that this batch is done
233        setChanged();
234        notifyObservers(batchJobStatus);
235        log.info("Batchjob '{}' finished. The number of outstanding batchjobs are now: {}",
236                batchJobStatus.bitarchiveBatchID, runningBatchJobs.size());
237    }
238
239    /**
240     * Closes this BitarchiveMonitor cleanly. Currently does nothing.
241     */
242    public void cleanup() {
243        instance = null;
244    }
245
246    /**
247     * Class handling state and updates in batch job status.
248     * <p>
249     * This class remembers information about the batchjob sent, and information from all bitarchive replies received.
250     * It also contains information about the original requester of the batchjob.
251     */
252    public final class BatchJobStatus {
253
254        /** The timer task that handles timeout of this batch job. */
255        private final BatchTimeoutTask batchTimeoutTask;
256        /** The ID of the job sent to the bitarchives. */
257        private final String bitarchiveBatchID;
258        /** Have we begun replying for this batch job? */
259        private boolean notifyInitiated;
260
261        /** the ID of the original batch request. */
262        public final String originalRequestID;
263
264        /** The reply channel for the original request. */
265        public final ChannelID originalRequestReplyTo;
266
267        /** set containing the bitarchives that were alive when we sent the job, but haven't answered yet. */
268        public final Set<String> missingRespondents;
269
270        /** The accumulated number of files processed in replies received so far. */
271        public int noOfFilesProcessed;
272
273        /** The accumulated list of files failed in replies received so far. */
274        public final Collection<File> filesFailed;
275
276        /** A string with a concatenation of errors. This error message is null, if the job is successful. */
277        public String errorMessages;
278
279        /** A File with a concatenation of results from replies received so far. */
280        public final File batchResultFile;
281
282        /** A list of the exceptions that occurred during processing. */
283        public final List<FileBatchJob.ExceptionOccurrence> exceptions;
284
285        /** The timeout for batch jobs in milliseconds. */
286        private long batchTimeout;
287
288        /**
289         * Initialise the status on a fresh batch request. Apart from the given values, a file is created to store batch
290         * results in. <b>Sideeffect</b>: BatchTimeout is started here
291         *
292         * @param originalRequestID The ID of the originating request.
293         * @param originalRequestReplyTo The reply channel for the originating request.
294         * @param bitarchiveBatchID The ID of the job sent to bitarchives.
295         * @param missingRespondents List of all live bitarchives, used to know which bitarchives to await reply from.
296         * @param timeout Timeout for Batch job
297         * @throws IOFailure if a file for batch results cannot be made.
298         */
299        private BatchJobStatus(String originalRequestID, ChannelID originalRequestReplyTo, String bitarchiveBatchID,
300                Set<String> missingRespondents, long timeout) throws IOFailure {
301            this.originalRequestID = originalRequestID;
302            this.originalRequestReplyTo = originalRequestReplyTo;
303            this.bitarchiveBatchID = bitarchiveBatchID;
304            this.missingRespondents = missingRespondents;
305            batchTimeoutTask = new BatchTimeoutTask(bitarchiveBatchID);
306            batchTimeout = timeout;
307            batchTimer.schedule(batchTimeoutTask, batchTimeout);
308            this.noOfFilesProcessed = 0;
309            try {
310                this.batchResultFile = File.createTempFile(bitarchiveBatchID, "batch_aggregation",
311                        FileUtils.getTempDir());
312            } catch (IOException e) {
313                log.warn("Unable to create file for batch output");
314                throw new IOFailure("Unable to create file for batch output", e);
315            }
316            this.filesFailed = new ArrayList<File>();
317            // Null indicates no error
318            this.errorMessages = null;
319            this.notifyInitiated = false;
320
321            exceptions = new ArrayList<FileBatchJob.ExceptionOccurrence>();
322        }
323
324        /**
325         * Appends the given message to the current error message.
326         *
327         * @param errMsg A message describing what went wrong.
328         */
329        public void appendError(String errMsg) {
330            if (this.errorMessages == null) {
331                this.errorMessages = errMsg;
332            } else {
333                this.errorMessages += "\n" + errMsg;
334            }
335        }
336
337        /**
338         * Updates the status with info from a bitarchive reply.
339         * <p>
340         * This will add the results given to the status, and if this was the last remaining bitarchive, also sends a
341         * notification to all observers of the bitarchive monitor.
342         *
343         * @param bitarchiveID The ID of the bitarchive that has replied
344         * @param numberOfFilesProcessed The number of files processed by that bit archive.
345         * @param failedFiles List of files failed in that bit archive.
346         * @param remoteFile A pointer to a remote file with results from the bitarchive.
347         * @param errMsg An error message with errors from that bit archive.
348         */
349        private synchronized void updateWithBitarchiveReply(String bitarchiveID, int numberOfFilesProcessed,
350                Collection<File> failedFiles, RemoteFile remoteFile, String errMsg) {
351            if (notifyInitiated) {
352                log.debug("The reply for batch job: '{}' from bitarchive '{}' arrived after we had started replying. "
353                        + "Ignoring this reply.", bitarchiveBatchID, bitarchiveID);
354                remoteFile.cleanup();
355                return;
356            }
357            // found is set to true, if bitarchiveID was among
358            // the missingRespondents, before it was deleted.
359            boolean found = missingRespondents.remove(bitarchiveID);
360
361            // Handle the reply, even though the bitarchive was not known to be
362            // live, but log a warning.
363            if (!found) {
364                log.warn("Received a batch reply for: {} from an unexpected bit archive: '{}'", bitarchiveBatchID,
365                        bitarchiveID);
366            }
367            this.noOfFilesProcessed += numberOfFilesProcessed;
368            if (failedFiles != null) {
369                this.filesFailed.addAll(failedFiles);
370            }
371
372            appendRemoteFileToAggregateFile(remoteFile);
373            this.exceptions.addAll(this.exceptions);
374
375            // In case the batch reply contains an error, the final
376            // we append this error.
377            if (errMsg != null) {
378                appendError(errMsg);
379                log.warn("Received batch reply with error: {} at BA monitor from bitarchive {}", errMsg, bitarchiveID);
380            }
381
382            // if all archives have answered then notify observers that we are
383            // done.
384            if (missingRespondents.isEmpty()) {
385                notifyBatchEnded();
386            }
387        }
388
389        /**
390         * Append a remotefile to the batch result aggregate file. Adds info on errors while concatenating to the batch
391         * status.
392         *
393         * @param rf A remotefile to read from
394         */
395        private void appendRemoteFileToAggregateFile(RemoteFile rf) {
396            if (rf != null) {
397                OutputStream aggregateStream = null;
398                try {
399                    aggregateStream = new FileOutputStream(batchResultFile, true);
400                    rf.appendTo(aggregateStream);
401
402                    try {
403                        rf.cleanup();
404                    } catch (IOFailure e) {
405                        log.warn("Could not remove remotefile '{}'", rf, e);
406                        // Harmless, though. Continue
407                    }
408                } catch (IOFailure e) {
409                    String errMsg = "Exception while aggregating batch output for " + rf.getName() + ": "
410                            + ExceptionUtils.getStackTrace(e);
411                    appendError(errMsg);
412                } catch (IOException e) {
413                    String errMsg = "Exception while aggregating batch output for " + rf.getName() + ": "
414                            + ExceptionUtils.getStackTrace(e);
415                    appendError(errMsg);
416                } finally {
417                    if (aggregateStream != null) {
418                        try {
419                            aggregateStream.close();
420                        } catch (IOException e) {
421                            String errMsg = "Exception while aggregating batch output for " + rf.getName() + ": "
422                                    + ExceptionUtils.getStackTrace(e);
423                            appendError(errMsg);
424                        }
425                    }
426                }
427            }
428        }
429
430        /**
431         * Checks whether this batch job is already being notified about. If not, it notifies observers with this batch
432         * status.
433         */
434        private synchronized void notifyBatchEnded() {
435            if (!notifyInitiated) {
436                notifyInitiated = true;
437                batchTimeoutTask.cancel();
438                BitarchiveMonitor.this.notifyBatchEnded(this);
439            }
440        }
441
442    }
443
444    /**
445     * A timertask that makes batch ended notifications happen after a time specified in settings has elapsed, even
446     * though not all replies have been received.
447     */
448    private class BatchTimeoutTask extends TimerTask {
449        /** The ID of the batch job this object handles timeout for. */
450        private final String bitarchiveBatchID;
451
452        /**
453         * Initiate a timer task for the given batch job status.
454         *
455         * @param bitarchiveBatchID The ID of the batch job to monitor timeout for.
456         */
457        public BatchTimeoutTask(String bitarchiveBatchID) {
458            ArgumentNotValid.checkNotNullOrEmpty(bitarchiveBatchID, "String bitarchiveBatchID");
459            this.bitarchiveBatchID = bitarchiveBatchID;
460        }
461
462        /**
463         * Send a notifications on timeout if a notification is not already initiated.
464         */
465        public void run() {
466            // synchronize to ensure timeouts and batchreplies do not interfere
467            // with one another
468            BatchJobStatus bjs = runningBatchJobs.get(bitarchiveBatchID);
469            if (bjs != null) {
470                synchronized (bjs) {
471                    if (bjs.notifyInitiated) {
472                        // timeout occurred, but we are already in the process of
473                        // notifying. Just ignore.
474                        return;
475                    }
476                    try {
477                        final String errMsg = "A timeout has occurred for batch job: " + bjs.bitarchiveBatchID
478                                + ". Missing replies from [" + StringUtils.conjoin(", ", bjs.missingRespondents) + "]";
479                        log.warn(errMsg);
480                        bjs.appendError(errMsg);
481                        bjs.notifyBatchEnded();
482                    } catch (Throwable t) {
483                        log.warn("An error occurred during execution of timeout task.", t);
484                    }
485                }
486            }
487        }
488    }
489
490}