001/*
002 * #%L
003 * Netarchivesuite - archive
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.archive.bitarchive.distribute;
024
025import java.io.File;
026import java.io.PrintStream;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.Timer;
031
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import dk.netarkivet.archive.ArchiveSettings;
036import dk.netarkivet.archive.bitarchive.Bitarchive;
037import dk.netarkivet.archive.bitarchive.BitarchiveAdmin;
038import dk.netarkivet.archive.distribute.ArchiveMessageHandler;
039import dk.netarkivet.common.CommonSettings;
040import dk.netarkivet.common.distribute.ChannelID;
041import dk.netarkivet.common.distribute.Channels;
042import dk.netarkivet.common.distribute.JMSConnection;
043import dk.netarkivet.common.distribute.JMSConnectionFactory;
044import dk.netarkivet.common.distribute.NullRemoteFile;
045import dk.netarkivet.common.distribute.arcrepository.BatchStatus;
046import dk.netarkivet.common.distribute.arcrepository.BitarchiveRecord;
047import dk.netarkivet.common.exceptions.ArgumentNotValid;
048import dk.netarkivet.common.exceptions.PermissionDenied;
049import dk.netarkivet.common.exceptions.UnknownID;
050import dk.netarkivet.common.utils.ChecksumCalculator;
051import dk.netarkivet.common.utils.CleanupIF;
052import dk.netarkivet.common.utils.FileUtils;
053import dk.netarkivet.common.utils.LoggingOutputStream;
054import dk.netarkivet.common.utils.NotificationType;
055import dk.netarkivet.common.utils.NotificationsFactory;
056import dk.netarkivet.common.utils.Settings;
057import dk.netarkivet.common.utils.SystemUtils;
058
059/**
060 * Bitarchive container responsible for processing the different classes of message which can be received by a
061 * bitarchive and returning appropriate data.
062 */
063public class BitarchiveServer extends ArchiveMessageHandler implements CleanupIF {
064
065    /** The bitarchive serviced by this server. */
066    private Bitarchive ba;
067
068    /** The admin data for the bit archive. */
069    private BitarchiveAdmin baa;
070
071    /** The unique instance of this class. */
072    private static BitarchiveServer instance;
073
074    /** the jms connection. */
075    private JMSConnection con;
076
077    /** The logger used by this class. */
078    private static final Logger log = LoggerFactory.getLogger(BitarchiveServer.class);
079
080    /** the thread which sends heartbeat messages from this bitarchive to its BitarchiveMonitorServer. */
081    private HeartBeatSender heartBeatSender;
082
083    /** the unique id of this application. */
084    private String bitarchiveAppId;
085
086    /** Channel to listen on for get/batch/correct. */
087    private ChannelID allBa;
088    /** Topic to listen on for store. */
089    private ChannelID anyBa;
090    /** Channel to send BatchEnded messages to when replying. */
091    private ChannelID baMon;
092
093    /** Map between running batchjob processes and their message id. */
094    public Map<String, Thread> batchProcesses;
095
096    /**
097     * Returns the unique instance of this class The server creates an instance of the bitarchive it provides access to
098     * and starts to listen to JMS messages on the incomming jms queue
099     * <p>
100     * Also, heartbeats are sent out at regular intervals to the Bitarchive Monitor, to tell that this bitarchive is
101     * alive.
102     *
103     * @return the instance
104     * @throws UnknownID - if there was no heartbeat frequency defined in settings
105     * @throws ArgumentNotValid - if the heartbeat frequency in settings is invalid or either argument is null
106     */
107    public static synchronized BitarchiveServer getInstance() throws ArgumentNotValid, UnknownID {
108        if (instance == null) {
109            instance = new BitarchiveServer();
110        }
111        return instance;
112    }
113
114    /**
115     * The server creates an instance of the bitarchive it provides access to and starts to listen to JMS messages on
116     * the incomming jms queue
117     * <p>
118     * Also, heartbeats are sent out at regular intervals to the Bitarchive Monitor, to tell that this bitarchive is
119     * alive.
120     *
121     * @throws UnknownID - if there was no heartbeat frequency or temp dir defined in settings or if the bitarchiveid
122     * cannot be created.
123     * @throws PermissionDenied - if the temporary directory or the file directory cannot be written
124     */
125    private BitarchiveServer() throws UnknownID, PermissionDenied {
126        System.setOut(new PrintStream(new LoggingOutputStream(LoggingOutputStream.LoggingLevel.INFO, log, "StdOut: ")));
127        System.setErr(new PrintStream(new LoggingOutputStream(LoggingOutputStream.LoggingLevel.WARN, log, "StdErr: ")));
128        boolean listening = false; // are we listening to queue ANY_BA
129        File serverdir = FileUtils.getTempDir();
130        if (!serverdir.exists()) {
131            serverdir.mkdirs();
132        }
133        if (!serverdir.canWrite()) {
134            throw new PermissionDenied("Not allowed to write to temp directory '" + serverdir + "'");
135        }
136        log.info("Storing temporary files at '{}'", serverdir.getPath());
137
138        bitarchiveAppId = createBitarchiveAppId();
139
140        allBa = Channels.getAllBa();
141        anyBa = Channels.getAnyBa();
142        baMon = Channels.getTheBamon();
143        ba = Bitarchive.getInstance();
144        con = JMSConnectionFactory.getInstance();
145        con.setListener(allBa, this);
146        baa = BitarchiveAdmin.getInstance();
147        if (baa.hasEnoughSpace()) {
148            con.setListener(anyBa, this);
149            listening = true;
150        } else {
151            log.warn("Not enough space to guarantee store -- not listening to {}", anyBa.getName());
152        }
153
154        // create map for batchjobs
155        batchProcesses = Collections.synchronizedMap(new HashMap<String, Thread>());
156
157        // Create and start the heartbeat sender
158        Timer timer = new Timer(true);
159        heartBeatSender = new HeartBeatSender(baMon, this);
160        long frequency = Settings.getLong(ArchiveSettings.BITARCHIVE_HEARTBEAT_FREQUENCY);
161        timer.scheduleAtFixedRate(heartBeatSender, 0, frequency);
162        log.info("Heartbeat frequency: '{}'", frequency);
163        // Next logentry depends on whether we are listening to ANY_BA or not
164        String logmsg = "Created bitarchive server listening on: " + allBa.getName();
165        if (listening) {
166            logmsg += " and " + anyBa.getName();
167        }
168
169        log.info(logmsg);
170
171        log.info("Broadcasting heartbeats on: {}", baMon.getName());
172    }
173
174    /**
175     * Ends the heartbeat sender before next loop and removes the server as listener on allBa and anyBa. Closes the
176     * bitarchive. Calls cleanup.
177     */
178    public synchronized void close() {
179        log.info("BitarchiveServer {} closing down", getBitarchiveAppId());
180        cleanup();
181        if (con != null) {
182            con.removeListener(allBa, this);
183            con.removeListener(anyBa, this);
184            con = null;
185        }
186        log.info("BitarchiveServer {} closed down", getBitarchiveAppId());
187    }
188
189    /**
190     * Ends the heartbeat sender before next loop.
191     */
192    public void cleanup() {
193        if (ba != null) {
194            ba.close();
195            ba = null;
196        }
197        if (baa != null) {
198            baa.close();
199            baa = null;
200        }
201        if (heartBeatSender != null) {
202            heartBeatSender.cancel();
203            heartBeatSender = null;
204        }
205        instance = null;
206    }
207
208    /**
209     * Process a get request and send the result back to the client. If the arcfile is not found on this bitarchive
210     * machine, nothing happens.
211     *
212     * @param msg a container for upload request
213     * @throws ArgumentNotValid If the message is null.
214     */
215    @Override
216    public void visit(GetMessage msg) throws ArgumentNotValid {
217        ArgumentNotValid.checkNotNull(msg, "GetMessage msg");
218        BitarchiveRecord bar;
219        log.trace("Processing getMessage({}:{}).", msg.getArcFile(), msg.getIndex());
220        try {
221            bar = ba.get(msg.getArcFile(), msg.getIndex());
222        } catch (Throwable t) {
223            log.warn("Error while processing get message '{}'", msg, t);
224            msg.setNotOk(t);
225            con.reply(msg);
226            return;
227        }
228        if (bar != null) {
229            msg.setRecord(bar);
230            log.debug("Sending reply: {}", msg.toString());
231            con.reply(msg);
232        } else {
233            log.trace("Record({}:{}). not found on this BitarchiveServer", msg.getArcFile(), msg.getIndex());
234        }
235    }
236
237    /**
238     * Process a upload request and send the result back to the client. This may be a very time consuming process and is
239     * a blocking call.
240     *
241     * @param msg a container for upload request
242     * @throws ArgumentNotValid If the message is null.
243     */
244    @Override
245    public void visit(UploadMessage msg) throws ArgumentNotValid {
246        ArgumentNotValid.checkNotNull(msg, "UploadMessage msg");
247        // TODO Implement a thread-safe solution on resource level rather than
248        // message processor level.
249        try {
250            try {
251                synchronized (this) {
252                    // Important when two identical files are uploaded
253                    // simultanously.
254                    ba.upload(msg.getRemoteFile(), msg.getArcfileName());
255                }
256            } catch (Throwable t) {
257                log.warn("Error while processing upload message '{}'", msg, t);
258                msg.setNotOk(t);
259            } finally { // Stop listening if disk is now full
260                if (!baa.hasEnoughSpace()) {
261                    log.warn("Cannot guarantee enough space, no longer listening to {} for uploads", anyBa.getName());
262                    con.removeListener(anyBa, this);
263                }
264            }
265        } catch (Throwable t) {
266            // This block will be executed if the above finally block throws an
267            // exception. Therefore the message is not set to notOk here
268            log.warn("Error while removing listener after upload message '{}'", msg, t);
269        } finally {
270            log.info("Sending reply: {}", msg.toString());
271            con.reply(msg);
272        }
273    }
274
275    /**
276     * Removes an arcfile from the bitarchive and returns the removed file as an remotefile.
277     * <p>
278     * Answers OK if the file is actually removed. Answers notOk if the file exists with wrong checksum or wrong
279     * credentials Doesn't answer if the file doesn't exist.
280     * <p>
281     * This method always generates a warning when deleting a file.
282     * <p>
283     * Before the file is removed it is verified that - the file exists in the bitarchive - the file has the correct
284     * checksum - the supplied credentials are correct
285     *
286     * @param msg a container for remove request
287     * @throws ArgumentNotValid If the RemoveAndGetFileMessage is null.
288     */
289    @Override
290    public void visit(RemoveAndGetFileMessage msg) throws ArgumentNotValid {
291        ArgumentNotValid.checkNotNull(msg, "RemoveAndGetFileMessage msg");
292        String mesg = "Request to move file '" + msg.getFileName() + "' with checksum '" + msg.getCheckSum()
293                + "' to attic";
294        log.info(mesg);
295        NotificationsFactory.getInstance().notify(mesg, NotificationType.INFO);
296
297        File foundFile = ba.getFile(msg.getFileName());
298        // Only send an reply if the file was found
299        if (foundFile == null) {
300            log.warn("Remove: '{}' not found", msg.getFileName());
301            return;
302        }
303
304        try {
305
306            log.debug("File located - now checking the credentials");
307            // Check credentials
308            String credentialsReceived = msg.getCredentials();
309            ArgumentNotValid.checkNotNullOrEmpty(credentialsReceived, "credentialsReceived");
310            if (!credentialsReceived.equals(Settings.get(ArchiveSettings.ENVIRONMENT_THIS_CREDENTIALS))) {
311                String message = "Attempt to remove '" + foundFile + "' with wrong credentials!";
312                log.warn(message);
313                msg.setNotOk(message);
314                return;
315            }
316
317            log.debug("Credentials accepted, now checking the checksum");
318
319            String checksum = ChecksumCalculator.calculateMd5(foundFile);
320
321            if (!checksum.equals(msg.getCheckSum())) {
322                final String message = "Attempt to remove '" + foundFile + " failed due to checksum mismatch: "
323                        + msg.getCheckSum() + " != " + checksum;
324                log.warn(message);
325                msg.setNotOk(message);
326                return;
327            }
328
329            log.debug("Checksums matched - preparing to move and return file");
330            File moveTo = baa.getAtticPath(foundFile);
331            if (!foundFile.renameTo(moveTo)) {
332                final String message = "Failed to move the file:" + foundFile + "to attic";
333                log.warn(message);
334                msg.setNotOk(message);
335                return;
336            }
337            msg.setFile(moveTo);
338
339            log.warn("Removed file '{}' with checksum '{}'", msg.getFileName(), msg.getCheckSum());
340        } catch (Exception e) {
341            final String message = "Error while processing message '" + msg + "'";
342            log.warn(message, e);
343            msg.setNotOk(e);
344        } finally {
345            con.reply(msg);
346        }
347    }
348
349    /**
350     * Process a batch job and send the result back to the client.
351     *
352     * @param msg a container for batch jobs
353     * @throws ArgumentNotValid If the BatchMessage is null.
354     */
355    @Override
356    public void visit(final BatchMessage msg) throws ArgumentNotValid {
357        ArgumentNotValid.checkNotNull(msg, "BatchMessage msg");
358        Thread batchThread = new Thread("Batch-" + msg.getID()) {
359            @Override
360            public void run() {
361                try {
362                    // TODO Possibly tell batch something that will let
363                    // it create more comprehensible file names.
364                    // Run the batch job on all files on this machine
365                    BatchStatus batchStatus = ba.batch(bitarchiveAppId, msg.getJob());
366
367                    // Create the message which will contain the reply
368                    BatchEndedMessage resultMessage = new BatchEndedMessage(baMon, msg.getID(), batchStatus);
369
370                    // Update informational fields in reply message
371                    if (batchStatus.getFilesFailed().size() > 0) {
372                        resultMessage
373                                .setNotOk("Batch job failed on " + batchStatus.getFilesFailed().size() + " files.");
374                    }
375
376                    // Send the reply
377                    con.send(resultMessage);
378                    log.debug("Submitted result message for batch job: {}", msg.getID());
379                } catch (Throwable t) {
380                    log.warn("Batch processing failed for message '{}'", msg, t);
381                    BatchEndedMessage failMessage = new BatchEndedMessage(baMon, bitarchiveAppId, msg.getID(),
382                            new NullRemoteFile());
383                    failMessage.setNotOk(t);
384
385                    con.send(failMessage);
386                    log.debug("Submitted failure message for batch job: {}", msg.getID());
387                } finally {
388                    // remove from map
389                    batchProcesses.remove(msg.getBatchID());
390                }
391            }
392        };
393        batchProcesses.put(msg.getBatchID(), batchThread);
394        batchThread.start();
395    }
396
397    public void visit(BatchTerminationMessage msg) throws ArgumentNotValid {
398        ArgumentNotValid.checkNotNull(msg, "BatchTerminationMessage msg");
399        log.info("Received BatchTerminationMessage: {}", msg);
400
401        try {
402            Thread t = batchProcesses.get(msg.getTerminateID());
403
404            // check whether the batchjob is still running.
405            if (t == null) {
406                log.info("The batchjob with ID '{}' cannot be found, and must have terminated by it self.",
407                        msg.getTerminateID());
408                return;
409            }
410
411            // try to interrupt.
412            if (t.isAlive()) {
413                t.interrupt();
414            }
415
416            // wait one second, before verifying whether it is dead.
417            synchronized (this) {
418                try {
419                    this.wait(1000);
420                } catch (InterruptedException e) {
421                    log.trace("Unimportant InterruptedException caught.", e);
422                }
423            }
424
425            // Verify that is dead, or log that it might have a problem.
426            if (t.isAlive()) {
427                log.error("The thread '{}' should have been terminated, but it is apparently still alive.", t);
428            } else {
429                log.info("The batchjob with ID '{}' has successfully been terminated!", msg.getTerminateID());
430            }
431        } catch (Throwable t) {
432            // log problem and set to NotOK!
433            log.error("An error occured while trying to terminate {}", msg.getTerminateID(), t);
434        }
435    }
436
437    /**
438     * Process a getFile request and send the result back to the client.
439     *
440     * @param msg a container for a getfile request
441     * @throws ArgumentNotValid If the GetFileMessage is null.
442     */
443    @Override
444    public void visit(GetFileMessage msg) throws ArgumentNotValid {
445        ArgumentNotValid.checkNotNull(msg, "GetFileMessage msg");
446
447        try {
448            File foundFile = ba.getFile(msg.getArcfileName());
449            // Only send an reply if the file was found
450            if (foundFile != null) {
451                // Be Warned!! The following call does not do what you think it
452                // does. This actually creates the RemoteFile object, uploading
453                // the file to the ftp server as it does so.
454                msg.setFile(foundFile);
455                log.info("Sending reply: {}", msg.toString());
456                con.reply(msg);
457            }
458        } catch (Throwable t) {
459            log.warn("Error while processing get file message '{}'", msg, t);
460        }
461    }
462
463    /**
464     * Returns a String that identifies this bit archive application (within the bit archive, i.e. either with id ONE or
465     * TWO)
466     *
467     * @return String with IP address of this host and, if specified, the APPLICATION_INSTANCE_ID from settings
468     */
469    public String getBitarchiveAppId() {
470        return bitarchiveAppId;
471    }
472
473    /**
474     * Returns a String that identifies this bit archive application (within the bit archive, i.e. either with id ONE or
475     * TWO). The string has the following form: hostaddress[_applicationinstanceid] fx. "10.0.0.1_appOne" or just
476     * "10.0.0.1", if no applicationinstanceid has been chosen.
477     *
478     * @return String with IP address of this host and, if specified, the APPLICATION_INSTANCE_ID from settings
479     * @throws UnknownID - if InetAddress.getLocalHost() failed
480     */
481    private String createBitarchiveAppId() throws UnknownID {
482        String id;
483
484        // Create an id with the IP address of this current host
485        id = SystemUtils.getLocalIP();
486
487        // Append an underscore and APPLICATION_INSTANCE_ID from settings
488        // to the id, if specified in settings.
489        // If no APPLICATION_INSTANCE_ID is found do nothing.
490        try {
491            String applicationInstanceId = Settings.get(CommonSettings.APPLICATION_INSTANCE_ID);
492            if (!applicationInstanceId.isEmpty()) {
493                id += "_" + applicationInstanceId;
494            }
495        } catch (UnknownID e) {
496            // Ignore the fact, that there is no APPLICATION_INSTANCE_ID in
497            // settings
498            log.warn("No setting APPLICATION_INSTANCE_ID found in settings");
499        }
500
501        return id;
502    }
503
504}