001/* 002 * #%L 003 * Netarchivesuite - archive 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.archive.bitarchive.distribute; 024 025import java.io.File; 026import java.io.PrintStream; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.Timer; 031 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import dk.netarkivet.archive.ArchiveSettings; 036import dk.netarkivet.archive.bitarchive.Bitarchive; 037import dk.netarkivet.archive.bitarchive.BitarchiveAdmin; 038import dk.netarkivet.archive.distribute.ArchiveMessageHandler; 039import dk.netarkivet.common.CommonSettings; 040import dk.netarkivet.common.distribute.ChannelID; 041import dk.netarkivet.common.distribute.Channels; 042import dk.netarkivet.common.distribute.JMSConnection; 043import dk.netarkivet.common.distribute.JMSConnectionFactory; 044import dk.netarkivet.common.distribute.NullRemoteFile; 045import dk.netarkivet.common.distribute.arcrepository.BatchStatus; 046import dk.netarkivet.common.distribute.arcrepository.BitarchiveRecord; 047import dk.netarkivet.common.exceptions.ArgumentNotValid; 048import dk.netarkivet.common.exceptions.PermissionDenied; 049import dk.netarkivet.common.exceptions.UnknownID; 050import dk.netarkivet.common.utils.ChecksumCalculator; 051import dk.netarkivet.common.utils.CleanupIF; 052import dk.netarkivet.common.utils.FileUtils; 053import dk.netarkivet.common.utils.LoggingOutputStream; 054import dk.netarkivet.common.utils.NotificationType; 055import dk.netarkivet.common.utils.NotificationsFactory; 056import dk.netarkivet.common.utils.Settings; 057import dk.netarkivet.common.utils.SystemUtils; 058 059/** 060 * Bitarchive container responsible for processing the different classes of message which can be received by a 061 * bitarchive and returning appropriate data. 062 */ 063public class BitarchiveServer extends ArchiveMessageHandler implements CleanupIF { 064 065 /** The bitarchive serviced by this server. */ 066 private Bitarchive ba; 067 068 /** The admin data for the bit archive. */ 069 private BitarchiveAdmin baa; 070 071 /** The unique instance of this class. */ 072 private static BitarchiveServer instance; 073 074 /** the jms connection. */ 075 private JMSConnection con; 076 077 /** The logger used by this class. */ 078 private static final Logger log = LoggerFactory.getLogger(BitarchiveServer.class); 079 080 /** the thread which sends heartbeat messages from this bitarchive to its BitarchiveMonitorServer. */ 081 private HeartBeatSender heartBeatSender; 082 083 /** the unique id of this application. */ 084 private String bitarchiveAppId; 085 086 /** Channel to listen on for get/batch/correct. */ 087 private ChannelID allBa; 088 /** Topic to listen on for store. */ 089 private ChannelID anyBa; 090 /** Channel to send BatchEnded messages to when replying. */ 091 private ChannelID baMon; 092 093 /** Map between running batchjob processes and their message id. */ 094 public Map<String, Thread> batchProcesses; 095 096 /** 097 * Returns the unique instance of this class The server creates an instance of the bitarchive it provides access to 098 * and starts to listen to JMS messages on the incomming jms queue 099 * <p> 100 * Also, heartbeats are sent out at regular intervals to the Bitarchive Monitor, to tell that this bitarchive is 101 * alive. 102 * 103 * @return the instance 104 * @throws UnknownID - if there was no heartbeat frequency defined in settings 105 * @throws ArgumentNotValid - if the heartbeat frequency in settings is invalid or either argument is null 106 */ 107 public static synchronized BitarchiveServer getInstance() throws ArgumentNotValid, UnknownID { 108 if (instance == null) { 109 instance = new BitarchiveServer(); 110 } 111 return instance; 112 } 113 114 /** 115 * The server creates an instance of the bitarchive it provides access to and starts to listen to JMS messages on 116 * the incomming jms queue 117 * <p> 118 * Also, heartbeats are sent out at regular intervals to the Bitarchive Monitor, to tell that this bitarchive is 119 * alive. 120 * 121 * @throws UnknownID - if there was no heartbeat frequency or temp dir defined in settings or if the bitarchiveid 122 * cannot be created. 123 * @throws PermissionDenied - if the temporary directory or the file directory cannot be written 124 */ 125 private BitarchiveServer() throws UnknownID, PermissionDenied { 126 System.setOut(new PrintStream(new LoggingOutputStream(LoggingOutputStream.LoggingLevel.INFO, log, "StdOut: "))); 127 System.setErr(new PrintStream(new LoggingOutputStream(LoggingOutputStream.LoggingLevel.WARN, log, "StdErr: "))); 128 boolean listening = false; // are we listening to queue ANY_BA 129 File serverdir = FileUtils.getTempDir(); 130 if (!serverdir.exists()) { 131 serverdir.mkdirs(); 132 } 133 if (!serverdir.canWrite()) { 134 throw new PermissionDenied("Not allowed to write to temp directory '" + serverdir + "'"); 135 } 136 log.info("Storing temporary files at '{}'", serverdir.getPath()); 137 138 bitarchiveAppId = createBitarchiveAppId(); 139 140 allBa = Channels.getAllBa(); 141 anyBa = Channels.getAnyBa(); 142 baMon = Channels.getTheBamon(); 143 ba = Bitarchive.getInstance(); 144 con = JMSConnectionFactory.getInstance(); 145 con.setListener(allBa, this); 146 baa = BitarchiveAdmin.getInstance(); 147 if (baa.hasEnoughSpace()) { 148 con.setListener(anyBa, this); 149 listening = true; 150 } else { 151 log.warn("Not enough space to guarantee store -- not listening to {}", anyBa.getName()); 152 } 153 154 // create map for batchjobs 155 batchProcesses = Collections.synchronizedMap(new HashMap<String, Thread>()); 156 157 // Create and start the heartbeat sender 158 Timer timer = new Timer(true); 159 heartBeatSender = new HeartBeatSender(baMon, this); 160 long frequency = Settings.getLong(ArchiveSettings.BITARCHIVE_HEARTBEAT_FREQUENCY); 161 timer.scheduleAtFixedRate(heartBeatSender, 0, frequency); 162 log.info("Heartbeat frequency: '{}'", frequency); 163 // Next logentry depends on whether we are listening to ANY_BA or not 164 String logmsg = "Created bitarchive server listening on: " + allBa.getName(); 165 if (listening) { 166 logmsg += " and " + anyBa.getName(); 167 } 168 169 log.info(logmsg); 170 171 log.info("Broadcasting heartbeats on: {}", baMon.getName()); 172 } 173 174 /** 175 * Ends the heartbeat sender before next loop and removes the server as listener on allBa and anyBa. Closes the 176 * bitarchive. Calls cleanup. 177 */ 178 public synchronized void close() { 179 log.info("BitarchiveServer {} closing down", getBitarchiveAppId()); 180 cleanup(); 181 if (con != null) { 182 con.removeListener(allBa, this); 183 con.removeListener(anyBa, this); 184 con = null; 185 } 186 log.info("BitarchiveServer {} closed down", getBitarchiveAppId()); 187 } 188 189 /** 190 * Ends the heartbeat sender before next loop. 191 */ 192 public void cleanup() { 193 if (ba != null) { 194 ba.close(); 195 ba = null; 196 } 197 if (baa != null) { 198 baa.close(); 199 baa = null; 200 } 201 if (heartBeatSender != null) { 202 heartBeatSender.cancel(); 203 heartBeatSender = null; 204 } 205 instance = null; 206 } 207 208 /** 209 * Process a get request and send the result back to the client. If the arcfile is not found on this bitarchive 210 * machine, nothing happens. 211 * 212 * @param msg a container for upload request 213 * @throws ArgumentNotValid If the message is null. 214 */ 215 @Override 216 public void visit(GetMessage msg) throws ArgumentNotValid { 217 ArgumentNotValid.checkNotNull(msg, "GetMessage msg"); 218 BitarchiveRecord bar; 219 log.trace("Processing getMessage({}:{}).", msg.getArcFile(), msg.getIndex()); 220 try { 221 bar = ba.get(msg.getArcFile(), msg.getIndex()); 222 } catch (Throwable t) { 223 log.warn("Error while processing get message '{}'", msg, t); 224 msg.setNotOk(t); 225 con.reply(msg); 226 return; 227 } 228 if (bar != null) { 229 msg.setRecord(bar); 230 log.debug("Sending reply: {}", msg.toString()); 231 con.reply(msg); 232 } else { 233 log.trace("Record({}:{}). not found on this BitarchiveServer", msg.getArcFile(), msg.getIndex()); 234 } 235 } 236 237 /** 238 * Process a upload request and send the result back to the client. This may be a very time consuming process and is 239 * a blocking call. 240 * 241 * @param msg a container for upload request 242 * @throws ArgumentNotValid If the message is null. 243 */ 244 @Override 245 public void visit(UploadMessage msg) throws ArgumentNotValid { 246 ArgumentNotValid.checkNotNull(msg, "UploadMessage msg"); 247 // TODO Implement a thread-safe solution on resource level rather than 248 // message processor level. 249 try { 250 try { 251 synchronized (this) { 252 // Important when two identical files are uploaded 253 // simultanously. 254 ba.upload(msg.getRemoteFile(), msg.getArcfileName()); 255 } 256 } catch (Throwable t) { 257 log.warn("Error while processing upload message '{}'", msg, t); 258 msg.setNotOk(t); 259 } finally { // Stop listening if disk is now full 260 if (!baa.hasEnoughSpace()) { 261 log.warn("Cannot guarantee enough space, no longer listening to {} for uploads", anyBa.getName()); 262 con.removeListener(anyBa, this); 263 } 264 } 265 } catch (Throwable t) { 266 // This block will be executed if the above finally block throws an 267 // exception. Therefore the message is not set to notOk here 268 log.warn("Error while removing listener after upload message '{}'", msg, t); 269 } finally { 270 log.info("Sending reply: {}", msg.toString()); 271 con.reply(msg); 272 } 273 } 274 275 /** 276 * Removes an arcfile from the bitarchive and returns the removed file as an remotefile. 277 * <p> 278 * Answers OK if the file is actually removed. Answers notOk if the file exists with wrong checksum or wrong 279 * credentials Doesn't answer if the file doesn't exist. 280 * <p> 281 * This method always generates a warning when deleting a file. 282 * <p> 283 * Before the file is removed it is verified that - the file exists in the bitarchive - the file has the correct 284 * checksum - the supplied credentials are correct 285 * 286 * @param msg a container for remove request 287 * @throws ArgumentNotValid If the RemoveAndGetFileMessage is null. 288 */ 289 @Override 290 public void visit(RemoveAndGetFileMessage msg) throws ArgumentNotValid { 291 ArgumentNotValid.checkNotNull(msg, "RemoveAndGetFileMessage msg"); 292 String mesg = "Request to move file '" + msg.getFileName() + "' with checksum '" + msg.getCheckSum() 293 + "' to attic"; 294 log.info(mesg); 295 NotificationsFactory.getInstance().notify(mesg, NotificationType.INFO); 296 297 File foundFile = ba.getFile(msg.getFileName()); 298 // Only send an reply if the file was found 299 if (foundFile == null) { 300 log.warn("Remove: '{}' not found", msg.getFileName()); 301 return; 302 } 303 304 try { 305 306 log.debug("File located - now checking the credentials"); 307 // Check credentials 308 String credentialsReceived = msg.getCredentials(); 309 ArgumentNotValid.checkNotNullOrEmpty(credentialsReceived, "credentialsReceived"); 310 if (!credentialsReceived.equals(Settings.get(ArchiveSettings.ENVIRONMENT_THIS_CREDENTIALS))) { 311 String message = "Attempt to remove '" + foundFile + "' with wrong credentials!"; 312 log.warn(message); 313 msg.setNotOk(message); 314 return; 315 } 316 317 log.debug("Credentials accepted, now checking the checksum"); 318 319 String checksum = ChecksumCalculator.calculateMd5(foundFile); 320 321 if (!checksum.equals(msg.getCheckSum())) { 322 final String message = "Attempt to remove '" + foundFile + " failed due to checksum mismatch: " 323 + msg.getCheckSum() + " != " + checksum; 324 log.warn(message); 325 msg.setNotOk(message); 326 return; 327 } 328 329 log.debug("Checksums matched - preparing to move and return file"); 330 File moveTo = baa.getAtticPath(foundFile); 331 if (!foundFile.renameTo(moveTo)) { 332 final String message = "Failed to move the file:" + foundFile + "to attic"; 333 log.warn(message); 334 msg.setNotOk(message); 335 return; 336 } 337 msg.setFile(moveTo); 338 339 log.warn("Removed file '{}' with checksum '{}'", msg.getFileName(), msg.getCheckSum()); 340 } catch (Exception e) { 341 final String message = "Error while processing message '" + msg + "'"; 342 log.warn(message, e); 343 msg.setNotOk(e); 344 } finally { 345 con.reply(msg); 346 } 347 } 348 349 /** 350 * Process a batch job and send the result back to the client. 351 * 352 * @param msg a container for batch jobs 353 * @throws ArgumentNotValid If the BatchMessage is null. 354 */ 355 @Override 356 public void visit(final BatchMessage msg) throws ArgumentNotValid { 357 ArgumentNotValid.checkNotNull(msg, "BatchMessage msg"); 358 Thread batchThread = new Thread("Batch-" + msg.getID()) { 359 @Override 360 public void run() { 361 try { 362 // TODO Possibly tell batch something that will let 363 // it create more comprehensible file names. 364 // Run the batch job on all files on this machine 365 BatchStatus batchStatus = ba.batch(bitarchiveAppId, msg.getJob()); 366 367 // Create the message which will contain the reply 368 BatchEndedMessage resultMessage = new BatchEndedMessage(baMon, msg.getID(), batchStatus); 369 370 // Update informational fields in reply message 371 if (batchStatus.getFilesFailed().size() > 0) { 372 resultMessage 373 .setNotOk("Batch job failed on " + batchStatus.getFilesFailed().size() + " files."); 374 } 375 376 // Send the reply 377 con.send(resultMessage); 378 log.debug("Submitted result message for batch job: {}", msg.getID()); 379 } catch (Throwable t) { 380 log.warn("Batch processing failed for message '{}'", msg, t); 381 BatchEndedMessage failMessage = new BatchEndedMessage(baMon, bitarchiveAppId, msg.getID(), 382 new NullRemoteFile()); 383 failMessage.setNotOk(t); 384 385 con.send(failMessage); 386 log.debug("Submitted failure message for batch job: {}", msg.getID()); 387 } finally { 388 // remove from map 389 batchProcesses.remove(msg.getBatchID()); 390 } 391 } 392 }; 393 batchProcesses.put(msg.getBatchID(), batchThread); 394 batchThread.start(); 395 } 396 397 public void visit(BatchTerminationMessage msg) throws ArgumentNotValid { 398 ArgumentNotValid.checkNotNull(msg, "BatchTerminationMessage msg"); 399 log.info("Received BatchTerminationMessage: {}", msg); 400 401 try { 402 Thread t = batchProcesses.get(msg.getTerminateID()); 403 404 // check whether the batchjob is still running. 405 if (t == null) { 406 log.info("The batchjob with ID '{}' cannot be found, and must have terminated by it self.", 407 msg.getTerminateID()); 408 return; 409 } 410 411 // try to interrupt. 412 if (t.isAlive()) { 413 t.interrupt(); 414 } 415 416 // wait one second, before verifying whether it is dead. 417 synchronized (this) { 418 try { 419 this.wait(1000); 420 } catch (InterruptedException e) { 421 log.trace("Unimportant InterruptedException caught.", e); 422 } 423 } 424 425 // Verify that is dead, or log that it might have a problem. 426 if (t.isAlive()) { 427 log.error("The thread '{}' should have been terminated, but it is apparently still alive.", t); 428 } else { 429 log.info("The batchjob with ID '{}' has successfully been terminated!", msg.getTerminateID()); 430 } 431 } catch (Throwable t) { 432 // log problem and set to NotOK! 433 log.error("An error occured while trying to terminate {}", msg.getTerminateID(), t); 434 } 435 } 436 437 /** 438 * Process a getFile request and send the result back to the client. 439 * 440 * @param msg a container for a getfile request 441 * @throws ArgumentNotValid If the GetFileMessage is null. 442 */ 443 @Override 444 public void visit(GetFileMessage msg) throws ArgumentNotValid { 445 ArgumentNotValid.checkNotNull(msg, "GetFileMessage msg"); 446 447 try { 448 File foundFile = ba.getFile(msg.getArcfileName()); 449 // Only send an reply if the file was found 450 if (foundFile != null) { 451 // Be Warned!! The following call does not do what you think it 452 // does. This actually creates the RemoteFile object, uploading 453 // the file to the ftp server as it does so. 454 msg.setFile(foundFile); 455 log.info("Sending reply: {}", msg.toString()); 456 con.reply(msg); 457 } 458 } catch (Throwable t) { 459 log.warn("Error while processing get file message '{}'", msg, t); 460 } 461 } 462 463 /** 464 * Returns a String that identifies this bit archive application (within the bit archive, i.e. either with id ONE or 465 * TWO) 466 * 467 * @return String with IP address of this host and, if specified, the APPLICATION_INSTANCE_ID from settings 468 */ 469 public String getBitarchiveAppId() { 470 return bitarchiveAppId; 471 } 472 473 /** 474 * Returns a String that identifies this bit archive application (within the bit archive, i.e. either with id ONE or 475 * TWO). The string has the following form: hostaddress[_applicationinstanceid] fx. "10.0.0.1_appOne" or just 476 * "10.0.0.1", if no applicationinstanceid has been chosen. 477 * 478 * @return String with IP address of this host and, if specified, the APPLICATION_INSTANCE_ID from settings 479 * @throws UnknownID - if InetAddress.getLocalHost() failed 480 */ 481 private String createBitarchiveAppId() throws UnknownID { 482 String id; 483 484 // Create an id with the IP address of this current host 485 id = SystemUtils.getLocalIP(); 486 487 // Append an underscore and APPLICATION_INSTANCE_ID from settings 488 // to the id, if specified in settings. 489 // If no APPLICATION_INSTANCE_ID is found do nothing. 490 try { 491 String applicationInstanceId = Settings.get(CommonSettings.APPLICATION_INSTANCE_ID); 492 if (!applicationInstanceId.isEmpty()) { 493 id += "_" + applicationInstanceId; 494 } 495 } catch (UnknownID e) { 496 // Ignore the fact, that there is no APPLICATION_INSTANCE_ID in 497 // settings 498 log.warn("No setting APPLICATION_INSTANCE_ID found in settings"); 499 } 500 501 return id; 502 } 503 504}