001/*
002 * #%L
003 * Netarchivesuite - archive
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.archive.bitarchive.distribute;
024
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.FileOutputStream;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Observable;
032import java.util.Observer;
033
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import dk.netarkivet.archive.ArchiveSettings;
038import dk.netarkivet.archive.bitarchive.BitarchiveMonitor;
039import dk.netarkivet.archive.checksum.distribute.CorrectMessage;
040import dk.netarkivet.archive.checksum.distribute.GetAllChecksumsMessage;
041import dk.netarkivet.archive.checksum.distribute.GetAllFilenamesMessage;
042import dk.netarkivet.archive.checksum.distribute.GetChecksumMessage;
043import dk.netarkivet.archive.distribute.ArchiveMessageHandler;
044import dk.netarkivet.common.CommonSettings;
045import dk.netarkivet.common.distribute.Channels;
046import dk.netarkivet.common.distribute.JMSConnection;
047import dk.netarkivet.common.distribute.JMSConnectionFactory;
048import dk.netarkivet.common.distribute.NetarkivetMessage;
049import dk.netarkivet.common.distribute.RemoteFile;
050import dk.netarkivet.common.distribute.RemoteFileFactory;
051import dk.netarkivet.common.exceptions.ArgumentNotValid;
052import dk.netarkivet.common.exceptions.IOFailure;
053import dk.netarkivet.common.exceptions.UnknownID;
054import dk.netarkivet.common.utils.CleanupIF;
055import dk.netarkivet.common.utils.FileUtils;
056import dk.netarkivet.common.utils.KeyValuePair;
057import dk.netarkivet.common.utils.NotificationType;
058import dk.netarkivet.common.utils.NotificationsFactory;
059import dk.netarkivet.common.utils.Settings;
060import dk.netarkivet.common.utils.batch.ChecksumJob;
061import dk.netarkivet.common.utils.batch.FileBatchJob;
062import dk.netarkivet.common.utils.batch.FileListJob;
063
064/**
065 * Class representing message handling for the monitor for bitarchives. The monitor is used for sending out and
066 * combining the results of executing batch jobs.
067 * <p>
068 * Batch jobs are received on the BAMON-channel, and resent to all bitarchives, that are considered live by the
069 * bitarchive monitor.
070 * <p>
071 * Lets the bitarchive monitor handle batch replies from the bitarchives, and observes it for when the batch job is
072 * done. Then constructs a reply from the data given, and sends it back to the originator.
073 * <p>
074 * Also registers signs of life from the bitarchives in the bitarchive monitor.
075 */
076public class BitarchiveMonitorServer extends ArchiveMessageHandler implements Observer, CleanupIF {
077
078    /** The unique instance of this class. */
079    private static BitarchiveMonitorServer instance;
080
081    /** Logger. */
082    private static final Logger log = LoggerFactory.getLogger(BitarchiveMonitorServer.class);
083
084    /** The jms connection used. */
085    private final JMSConnection con = JMSConnectionFactory.getInstance();
086
087    /** Object that handles logical operations. */
088    private BitarchiveMonitor bamon;
089
090    /** Map for managing the messages, which are made into batchjobs. The String is the ID of the message. */
091    private Map<String, NetarkivetMessage> batchConversions = new HashMap<String, NetarkivetMessage>();
092
093    /**
094     * Map for containing the batch-message-ids and the batchjobs, for the result files to be post-processed before
095     * returned back.
096     */
097    private Map<String, FileBatchJob> batchjobs = new HashMap<String, FileBatchJob>();
098
099    /**
100     * The map for managing the CorrectMessages. This involves three stages.
101     * <p>
102     * In the first, a RemoveAndGetFileMessage is sent, and then the CorrectMessage is put in the map along the ID of
103     * the RemoveAndGetFileMessage.
104     * <p>
105     * In the second stage, the reply of the RemoveAndGetFileMessage is used to extract the CorrectMessage from the Map.
106     * The CorrectMessage is then updated with the results from the RemoveAndGetFileMessage. Then an UploadMessage is
107     * send with the 'correct' file, where the ID of the UploadMessage is put into the map along the CorrectMessage.
108     * <p>
109     * In the third stage, the reply of the UploadMessage is used to extract the CorrectMessage from the map again, and
110     * the results of the UploadMessage is used to update the UploadMessage, which is then returned.
111     */
112    private Map<String, CorrectMessage> correctMessages = new HashMap<String, CorrectMessage>();
113
114    /**
115     * Creates an instance of a BitarchiveMonitorServer.
116     *
117     * @throws IOFailure - if an error with the JMSConnection occurs
118     */
119    protected BitarchiveMonitorServer() throws IOFailure {
120        bamon = BitarchiveMonitor.getInstance();
121        bamon.addObserver(this);
122        con.setListener(Channels.getTheBamon(), this);
123        log.info("BitarchiveMonitorServer instantiated. Listening to queue: '{}'.", Channels.getTheBamon());
124    }
125
126    /**
127     * Returns the unique instance of a BitarchiveMonitorServer.
128     *
129     * @return the instance
130     * @throws IOFailure - if an error with the JMSConnection occurs
131     */
132    public static synchronized BitarchiveMonitorServer getInstance() throws IOFailure {
133        if (instance == null) {
134            instance = new BitarchiveMonitorServer();
135        }
136        return instance;
137    }
138
139    /**
140     * This is the message handling method for BatchMessages.
141     * <p>
142     * A new BatchMessage is created with the same Job as the incoming BatchMessage and sent off to all live
143     * bitarchives.
144     * <p>
145     * The incoming and outgoing batch messages are then registered at the bitarchive monitor.
146     *
147     * @param inbMsg The message received
148     * @throws ArgumentNotValid If the BatchMessage is null.
149     */
150    public void visit(BatchMessage inbMsg) throws ArgumentNotValid {
151        ArgumentNotValid.checkNotNull(inbMsg, "BatchMessage inbMsg");
152
153        log.info("Received BatchMessage\n{}", inbMsg.toString());
154        try {
155            BatchMessage outbMsg = new BatchMessage(Channels.getAllBa(), inbMsg.getJob(),
156                    Settings.get(CommonSettings.USE_REPLICA_ID));
157            con.send(outbMsg);
158            long batchTimeout = inbMsg.getJob().getBatchJobTimeout();
159            // if batch time out is not a positive number, then use settings.
160            if (batchTimeout <= 0) {
161                batchTimeout = Settings.getLong(ArchiveSettings.BITARCHIVE_BATCH_JOB_TIMEOUT);
162            }
163            bamon.registerBatch(inbMsg.getID(), inbMsg.getReplyTo(), outbMsg.getID(), batchTimeout);
164            batchjobs.put(inbMsg.getID(), inbMsg.getJob());
165        } catch (Exception e) {
166            log.warn("Trouble while handling batch request '{}'", inbMsg, e);
167        }
168    }
169
170    /**
171     * This is the message handling method for BatchEndedMessages.
172     * <p>
173     * This delegates the handling of the reply to the bitarchive monitor, which will notify us if the batch job is now
174     * done.
175     *
176     * @param beMsg The BatchEndedMessage to be handled.
177     * @throws ArgumentNotValid If the BatchEndedMessage is null.
178     */
179    public void visit(final BatchEndedMessage beMsg) throws ArgumentNotValid {
180        ArgumentNotValid.checkNotNull(beMsg, "BatchEndedMessage beMsg");
181
182        log.debug("Received batch ended from bitarchive '{}': {}", beMsg.getBitarchiveID(), beMsg);
183        bamon.signOfLife(beMsg.getBitarchiveID());
184        try {
185            new Thread() {
186                public void run() {
187                    // retrieve the error messages.
188                    String errorMessages = null;
189                    if (!beMsg.isOk()) {
190                        errorMessages = beMsg.getErrMsg();
191                    }
192                    // send reply to the bitarchive.
193                    bamon.bitarchiveReply(beMsg.getOriginatingBatchMsgID(), beMsg.getBitarchiveID(),
194                            beMsg.getNoOfFilesProcessed(), beMsg.getFilesFailed(), beMsg.getRemoteFile(),
195                            errorMessages, beMsg.getExceptions());
196                }
197            }.start();
198        } catch (Exception e) {
199            log.warn("Trouble while handling bitarchive reply '{}'", beMsg, e);
200        }
201    }
202
203    /**
204     * This is the message handling method for HeartBeatMessages.
205     * <p>
206     * Registers a sign of life from a bitarchive.
207     *
208     * @param hbMsg the message that represents the sign of life
209     * @throws ArgumentNotValid If the HeartBeatMessage is null.
210     */
211    public void visit(HeartBeatMessage hbMsg) throws ArgumentNotValid {
212        ArgumentNotValid.checkNotNull(hbMsg, "HeartBeatMessage hbMsg");
213
214        try {
215            bamon.signOfLife(hbMsg.getBitarchiveID());
216        } catch (Exception e) {
217            log.warn("Trouble while handling bitarchive sign of life '{}'", hbMsg, e);
218        }
219    }
220
221    /**
222     * This is the first step in correcting a bad entry.
223     * <p>
224     * In the first stage, a RemoveAndGetFileMessage is sent, and then the CorrectMessage is put in the map along the ID
225     * of the RemoveAndGetFileMessage.
226     * <p>
227     * See the correctMessages Map.
228     *
229     * @param cm The CorrectMessage to handle.
230     * @throws ArgumentNotValid If the CorrectMessage is null.
231     */
232    public void visit(CorrectMessage cm) throws ArgumentNotValid {
233        ArgumentNotValid.checkNotNull(cm, "CorrectMessage cm");
234        log.info("Receiving CorrectMessage: {}", cm);
235
236        try {
237            // Create the RemoveAndGetFileMessage for removing the file.
238            RemoveAndGetFileMessage ragfm = new RemoveAndGetFileMessage(Channels.getAllBa(), Channels.getTheBamon(),
239                    cm.getArcfileName(), cm.getReplicaId(), cm.getIncorrectChecksum(), cm.getCredentials());
240
241            // Send the message.
242            con.send(ragfm);
243
244            log.info("Step 1 of handling CorrectMessage. Sending RemoveAndGetFileMessage: {}", ragfm);
245
246            // Put the CorrectMessage into the map along the id of the
247            // RemoveAndGetFileMessage
248            correctMessages.put(ragfm.getID(), cm);
249        } catch (Exception e) {
250            String errMsg = "An error occurred during step 1 of handling "
251                    + " the CorrectMessage: sending RemoveAndGetFileMessage";
252            log.warn(errMsg, e);
253            cm.setNotOk(e);
254            con.reply(cm);
255        }
256    }
257
258    /**
259     * This is the second step in correcting a bad entry.
260     * <p>
261     * In the second stage, the reply of the RemoveAndGetFileMessage is used to extract the CorrectMessage from the Map.
262     * The CorrectMessage is then updated with the results from the RemoveAndGetFileMessage. Then an UploadMessage is
263     * send with the 'correct' file, where the ID of the UploadMessage is put into the map along the CorrectMessage.
264     * <p>
265     * See the correctMessages Map.
266     *
267     * @param msg The RemoteAndGetFileMessage.
268     * @throws ArgumentNotValid If the RemoveAndGetFileMessage is null.
269     */
270    public void visit(RemoveAndGetFileMessage msg) throws ArgumentNotValid {
271        ArgumentNotValid.checkNotNull(msg, "RemoveAndGetFileMessage msg");
272        log.info("Receiving RemoveAndGetFileMessage (presumably reply): {}", msg);
273
274        // Retrieve the correct message
275        CorrectMessage cm = correctMessages.remove(msg.getID());
276
277        // If the RemoveAndGetFileMessage has failed, then the CorrectMessage
278        // has also failed, and should be returned as a fail.
279        if (!msg.isOk()) {
280            String errMsg = "The RemoveAndGetFileMessage has returned the " + "error: '" + msg.getErrMsg()
281                    + "'. Reply to CorrectMessage " + "with the same error.";
282            log.warn(errMsg);
283            cm.setNotOk(errMsg);
284            con.reply(cm);
285            return;
286        }
287
288        try {
289            // update the correct message.
290            cm.setRemovedFile(msg.getRemoteFile());
291
292            // Create the upload message, send it.
293            UploadMessage um = new UploadMessage(Channels.getAllBa(), Channels.getTheBamon(), cm.getCorrectFile());
294            con.send(um);
295            log.info("Step 2 of handling CorrectMessage. Sending UploadMessage: {}", um);
296
297            // Store the CorrectMessage along with the ID of the UploadMessage.
298            correctMessages.put(um.getID(), cm);
299        } catch (Exception e) {
300            String errMsg = "An error occurred during step 2 of handling "
301                    + " the CorrectMessage: sending UploadMessage";
302            log.warn(errMsg, e);
303            cm.setNotOk(e);
304            con.reply(cm);
305        }
306    }
307
308    /**
309     * This is the third step in correcting a bad entry.
310     * <p>
311     * In the third stage, the reply of the UploadMessage is used to extract the CorrectMessage from the map again, and
312     * the results of the UploadMessage is used to update the UploadMessage, which is then returned.
313     * <p>
314     * See the correctMessages Map.
315     *
316     * @param msg The reply of the UploadMessage.
317     * @throws ArgumentNotValid If the UploadMessage is null.
318     */
319    public void visit(UploadMessage msg) throws ArgumentNotValid {
320        ArgumentNotValid.checkNotNull(msg, "UploadMessage msg");
321        log.info("Receiving a reply to an UploadMessage: {}", msg);
322
323        // retrieve the CorrectMessage.
324        CorrectMessage cm = correctMessages.remove(msg.getID());
325
326        // handle potential errors.
327        if (!msg.isOk()) {
328            cm.setNotOk(msg.getErrMsg());
329        }
330
331        // reply to the correct message.
332        con.reply(cm);
333        log.info("Step 3 of handling CorrectMessage. Sending reply for CorrectMessage: '{}'", cm);
334    }
335
336    /**
337     * Method for handling the GetAllChecksumsMessage. This message will be made into a batchjob, which will executed on
338     * the bitarchives. The reply to the batchjob will be handled and uses as reply to the GetAllChecksumsMessage.
339     *
340     * @param msg The GetAllChecksumsMessage, which will be made into a batchjob and sent to the bitarchives.
341     * @throws ArgumentNotValid If the GetAllChecksumsMessage is null.
342     */
343    public void visit(GetAllChecksumsMessage msg) throws ArgumentNotValid {
344        ArgumentNotValid.checkNotNull(msg, "GetAllChecksumsMessage msg");
345
346        log.info("Receiving GetAllChecksumsMessage '{}'", msg);
347
348        // Create batchjob for the GetAllChecksumsMessage.
349        ChecksumJob cj = new ChecksumJob();
350
351        // Execute the batchjob.
352        executeConvertedBatch(cj, msg);
353    }
354
355    /**
356     * Method for handling the GetAllFilenamesMessage. The GetAllFilenamesMessage will be made into a filelist batchjob,
357     * which will be sent to the bitarchives. The reply to the batchjob will then be used as reply to the
358     * GetAllFilenamesMessage.
359     *
360     * @param msg The GetAllFilenamesMessage, which will be made into a batchjob and sent to the bitarchives.
361     * @throws ArgumentNotValid If the GetAllFilenamesMessage is null.
362     */
363    public void visit(GetAllFilenamesMessage msg) throws ArgumentNotValid {
364        ArgumentNotValid.checkNotNull(msg, "GetAllFilenamesMessage msg");
365
366        log.info("Receiving GetAllFilenamesMessage '{}'", msg);
367
368        // Create batchjob for the GetAllChecksumsMessage.
369        FileListJob flj = new FileListJob();
370
371        // Execute the batchjob.
372        executeConvertedBatch(flj, msg);
373    }
374
375    /**
376     * Method for handling the GetChecksumMessage. This is made into the batchjob ChecksumsJob which will be limitted to
377     * the specific filename. The batchjob will be executed on the bitarchives and the reply to the batchjob will be
378     * used as reply to the GetChecksumMessage.
379     *
380     * @param msg The GetAllChecksumsMessage, which will be made into a batchjob and sent to the bitarchives.
381     * @throws ArgumentNotValid If the GetChecksumMessage is null.
382     */
383    public void visit(GetChecksumMessage msg) throws ArgumentNotValid {
384        ArgumentNotValid.checkNotNull(msg, "GetChecksumMessage msg");
385
386        log.info("Receiving GetChecksumsMessage '{}'", msg);
387
388        // Create batchjob for the GetAllChecksumsMessage.
389        ChecksumJob cj = new ChecksumJob();
390        cj.processOnlyFileNamed(msg.getArcfileName());
391        cj.setBatchJobTimeout(Settings.getLong(ArchiveSettings.SINGLE_CHECKSUM_TIMEOUT));
392
393        // Execute the batchjob.
394        executeConvertedBatch(cj, msg);
395    }
396
397    /**
398     * Method for executing messages converted into batchjobs.
399     *
400     * @param job The job to execute.
401     * @param msg The message which is converted into the batchjob.
402     */
403    private void executeConvertedBatch(FileBatchJob job, NetarkivetMessage msg) {
404        try {
405            BatchMessage outbMsg = new BatchMessage(Channels.getAllBa(), job,
406                    Settings.get(CommonSettings.USE_REPLICA_ID));
407            con.send(outbMsg);
408
409            long batchTimeout = job.getBatchJobTimeout();
410            // if batch time out is not a positive number, then use settings.
411            if (batchTimeout <= 0) {
412                batchTimeout = Settings.getLong(ArchiveSettings.BITARCHIVE_BATCH_JOB_TIMEOUT);
413            }
414            bamon.registerBatch(msg.getID(), msg.getReplyTo(), outbMsg.getID(), batchTimeout);
415            batchjobs.put(msg.getID(), job);
416            // Remember that the message is a batch conversion.
417            log.info("{}", outbMsg);
418
419            batchConversions.put(msg.getID(), msg);
420        } catch (Throwable t) {
421            log.warn("Unable to handle batch '{}'request due to unexpected exception", msg, t);
422            msg.setNotOk(t);
423            con.reply(msg);
424        }
425    }
426
427    /**
428     * Handles notifications from the bitarchive monitor, that a batch job is complete.
429     * <p>
430     * Spawns a new thread in which all the results are wrapped and sent back in a reply to the originator of this batch
431     * request.
432     *
433     * @param o the observable object. Should always be the bitarchive monitor. If it isn't, this notification will be
434     * logged and ignored.
435     * @param arg an argument passed from the bitarchive monitor. This should always be a batch status object indicating
436     * the end of that batchjob. If it isn't, this notification will be logged and ignored.
437     */
438    public void update(Observable o, final Object arg) {
439        if (o != bamon) {
440            log.warn("Received unexpected notification from '" + o + "'");
441            return;
442        }
443        if (arg == null) {
444            log.warn("Received unexpected notification with no argument");
445            return;
446        }
447        if (!(arg instanceof dk.netarkivet.archive.bitarchive.BitarchiveMonitor.BatchJobStatus)) {
448            log.warn("Received notification with incorrect argument type {}:'{}''", arg.getClass(), arg);
449            return;
450        }
451        new Thread() {
452            public void run() {
453                // convert the input argument.
454                BitarchiveMonitor.BatchJobStatus bjs = (BitarchiveMonitor.BatchJobStatus) arg;
455
456                // Check whether converted message or actual batchjob.
457                if (batchConversions.containsKey(bjs.originalRequestID)) {
458                    replyConvertedBatch(bjs);
459                } else {
460                    doBatchReply(bjs);
461                }
462            }
463        }.start();
464    }
465
466    /**
467     * This method sends a reply based on the information from bitarchives received and stored in the given batch job
468     * status.
469     * <p>
470     * It will concatenate the results from all the bitarchives in one file, and construct a reply to the originating
471     * requester with all information.
472     *
473     * @param bjs Status of received messages from bitarchives.
474     */
475    private void doBatchReply(BitarchiveMonitor.BatchJobStatus bjs) {
476        RemoteFile resultsFile = null;
477        try {
478            // Post process the file.
479            File postFile = File.createTempFile("post", "batch", FileUtils.getTempDir());
480            try {
481                // retrieve the batchjob
482                FileBatchJob bj = batchjobs.remove(bjs.originalRequestID);
483                if (bj == null) {
484                    throw new UnknownID("Only knows: " + batchjobs.keySet());
485                }
486                log.info("Post processing batchjob results for '{}' with id '{}'", bj.getClass().getName(),
487                        bjs.originalRequestID);
488                // perform the post process, and handle whether it succeeded.
489                if (bj.postProcess(new FileInputStream(bjs.batchResultFile), new FileOutputStream(postFile))) {
490                    log.debug("Post processing finished.");
491                } else {
492                    log.debug("No post processing. Using concatenated file.");
493                    tryAndDeleteTemporaryFile(postFile);
494                    postFile = bjs.batchResultFile;
495                }
496            } catch (Exception e) {
497                log.warn("Exception caught during post processing batchjob. Concatenated file used instead.", e);
498                tryAndDeleteTemporaryFile(postFile);
499                postFile = bjs.batchResultFile;
500            }
501
502            // Get remote file for batch result
503            resultsFile = RemoteFileFactory.getMovefileInstance(postFile);
504        } catch (Exception e) {
505            log.warn("Make remote file from {}", bjs.batchResultFile, e);
506            bjs.appendError("Could not append batch results: " + e);
507        }
508
509        // Make batch reply message
510        BatchReplyMessage brMsg = new BatchReplyMessage(bjs.originalRequestReplyTo, Channels.getTheBamon(),
511                bjs.originalRequestID, bjs.noOfFilesProcessed, bjs.filesFailed, resultsFile);
512        if (bjs.errorMessages != null) {
513            brMsg.setNotOk(bjs.errorMessages);
514        }
515
516        // Send the batch reply message.
517        con.send(brMsg);
518
519        log.info("BatchReplyMessage: '{}' sent from BA monitor to queue: '{}'", brMsg, brMsg.getTo());
520    }
521
522    /**
523     * Helper method to delete temporary files. Logs at level debug, if it couldn't delete the file.
524     *
525     * @param tmpFile the tmpFile that needs to be deleted.
526     */
527    private void tryAndDeleteTemporaryFile(File tmpFile) {
528        boolean deleted = tmpFile.delete();
529        if (!deleted) {
530            log.debug("Failed to delete temporary file '{}'", tmpFile.getAbsolutePath());
531        } else {
532            log.trace("Deleted temporary file '{}' successfully", tmpFile.getAbsolutePath());
533        }
534    }
535
536    /**
537     * Uses the batchjobstatus on the message converted batchjob to reply on the original message.
538     *
539     * @param bjs The status of the batchjob.
540     */
541    private void replyConvertedBatch(BitarchiveMonitor.BatchJobStatus bjs) {
542        // Retrieve the message corresponding to the converted batchjob.
543        NetarkivetMessage msg = batchConversions.remove(bjs.originalRequestID);
544        log.info("replying to converted batchjob message : {}", msg);
545        if (msg instanceof GetAllChecksumsMessage) {
546            replyToGetAllChecksumsMessage(bjs, (GetAllChecksumsMessage) msg);
547        } else if (msg instanceof GetAllFilenamesMessage) {
548            replyToGetAllFilenamesMessage(bjs, (GetAllFilenamesMessage) msg);
549        } else if (msg instanceof GetChecksumMessage) {
550            replyToGetChecksumMessage(bjs, (GetChecksumMessage) msg);
551        } else /* unhandled message type. */{
552            String errMsg = "The message cannot be handled '" + msg + "'";
553            log.error(errMsg);
554            msg.setNotOk(errMsg);
555            con.reply(msg);
556        }
557    }
558
559    /**
560     * Method for replying to a GetAllChecksumsMessage. It uses the reply from the batchjob to make a proper reply to
561     * the GetAllChecksumsMessage.
562     *
563     * @param bjs The BatchJobStatus used to reply to the GetAllChecksumsMessage.
564     * @param msg The GetAllChecksumsMessage to reply to.
565     */
566    private void replyToGetAllChecksumsMessage(BitarchiveMonitor.BatchJobStatus bjs, GetAllChecksumsMessage msg) {
567        try {
568            // Set the resulting file.
569            msg.setFile(bjs.batchResultFile);
570
571            // record any errors.
572            if (bjs.errorMessages != null) {
573                msg.setNotOk(bjs.errorMessages);
574            }
575        } catch (Throwable t) {
576            msg.setNotOk(t);
577            log.warn("An error occurred during the handling of the GetAllChecksumsMessage", t);
578        } finally {
579            // reply
580            log.info("Replying to GetAllChecksumsMessage '{}'", msg);
581            con.reply(msg);
582        }
583    }
584
585    /**
586     * Method for replying to a GetAllFilenamesMessage. It uses the reply from the batchjob to make a proper reply to
587     * the GetAllFilenamesMessage.
588     *
589     * @param bjs The BatchJobStatus used to reply to the GetAllFilenamesMessage.
590     * @param msg The GetAllFilenamesMessage to reply to.
591     */
592    private void replyToGetAllFilenamesMessage(BitarchiveMonitor.BatchJobStatus bjs, GetAllFilenamesMessage msg) {
593        try {
594            // Set the resulting file.
595            msg.setFile(bjs.batchResultFile);
596
597            // record any errors.
598            if (bjs.errorMessages != null) {
599                msg.setNotOk(bjs.errorMessages);
600            }
601        } catch (Throwable t) {
602            msg.setNotOk(t);
603            log.warn("An error occurred during the handling of the GetAllFilenamesMessage", t);
604        } finally {
605            // reply
606            log.info("Replying to GetAllFilenamesMessage '{}'", msg);
607            con.reply(msg);
608        }
609    }
610
611    /**
612     * Method for replying to a GetChecksumMessage. It uses the reply from the batchjob to make a proper reply to the
613     * GetChecksumMessage.
614     *
615     * @param bjs The BatchJobStatus to be used for the reply.
616     * @param msg The GetChecksumMessage to reply to.
617     */
618    private void replyToGetChecksumMessage(BitarchiveMonitor.BatchJobStatus bjs, GetChecksumMessage msg) {
619        try {
620            // Fetch the content of the batchresultfile.
621            List<String> output = FileUtils.readListFromFile(bjs.batchResultFile);
622
623            if (output.size() < 1) {
624                String errMsg = "The batchjob did not find the file '" + msg.getArcfileName() + "' within the "
625                        + "archive.";
626                log.warn(errMsg);
627
628                throw new IOFailure(errMsg);
629            }
630            if (output.size() > 1) {
631                // Log that duplicates have been found.
632                log.warn("The file '{}' was found {} times in the archive. Using the first found '{}' out of '{}'",
633                        msg.getArcfileName(), output.size(), output.get(0), output);
634
635                // check if any different values.
636                String firstVal = output.get(0);
637                for (int i = 1; i < output.size(); i++) {
638                    if (!output.get(i).equals(firstVal)) {
639                        String errorString = "Replica '" + msg.getReplicaId() + "' has unidentical duplicates: '"
640                                + firstVal + "' and '" + output.get(i) + "'.";
641                        log.warn(errorString);
642                        NotificationsFactory.getInstance().notify(errorString, NotificationType.WARNING);
643                    } else {
644                        log.debug("Replica '{}' has identical duplicates: '{}'.", msg.getReplicaId(), firstVal);
645                    }
646                }
647            }
648
649            // Extract the filename and checksum of the first result.
650            KeyValuePair<String, String> firstResult = ChecksumJob.parseLine(output.get(0));
651
652            // Check that the filename has the expected value (the name of
653            // the requested file).
654            if (!msg.getArcfileName().equals(firstResult.getKey())) {
655                String errMsg = "The first result found the file '" + firstResult.getKey()
656                        + "' but should have found '" + msg.getArcfileName() + "'.";
657                log.error(errMsg);
658                throw new IOFailure(errMsg);
659            }
660
661            // Put the checksum into the reply message, and reply.
662            msg.setChecksum(firstResult.getValue());
663
664            // cleanup batchjob file
665            FileUtils.remove(bjs.batchResultFile);
666        } catch (Throwable t) {
667            msg.setNotOk(t);
668            log.warn("An error occurred during the handling of the GetChecksumMessage", t);
669        } finally {
670            log.info("Replying GetChecksumMessage: '{}'.", msg.toString());
671
672            // Reply to the original message (set 'isReply').
673            msg.setIsReply();
674            con.reply(msg);
675        }
676    }
677
678    /**
679     * Close down this BitarchiveMonitor.
680     */
681    public void close() {
682        log.info("BitarchiveMonitorServer closing down.");
683        cleanup();
684        log.info("BitarchiveMonitorServer closed down");
685    }
686
687    /**
688     * Closes this BitarchiveMonitorServer cleanly.
689     */
690    public synchronized void cleanup() {
691        if (instance != null) {
692            con.removeListener(Channels.getTheBamon(), this);
693            batchConversions.clear();
694            instance = null;
695            if (bamon != null) {
696                bamon.cleanup();
697                bamon = null;
698            }
699        }
700    }
701
702}