001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.indexserver.distribute; 024 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.io.ObjectInputStream; 030import java.io.ObjectOutputStream; 031import java.util.ArrayList; 032import java.util.EnumMap; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.Timer; 039import java.util.TimerTask; 040import java.util.concurrent.atomic.AtomicBoolean; 041 042import org.apache.commons.io.IOUtils; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import dk.netarkivet.common.distribute.Channels; 047import dk.netarkivet.common.distribute.JMSConnection; 048import dk.netarkivet.common.distribute.JMSConnectionFactory; 049import dk.netarkivet.common.distribute.RemoteFile; 050import dk.netarkivet.common.distribute.RemoteFileFactory; 051import dk.netarkivet.common.distribute.RemoteFileSettings; 052import dk.netarkivet.common.distribute.indexserver.RequestType; 053import dk.netarkivet.common.exceptions.ArgumentNotValid; 054import dk.netarkivet.common.exceptions.IOFailure; 055import dk.netarkivet.common.exceptions.IllegalState; 056import dk.netarkivet.common.exceptions.UnknownID; 057import dk.netarkivet.common.utils.ChecksumCalculator; 058import dk.netarkivet.common.utils.CleanupIF; 059import dk.netarkivet.common.utils.FileUtils; 060import dk.netarkivet.common.utils.Settings; 061import dk.netarkivet.common.utils.StringUtils; 062import dk.netarkivet.harvester.HarvesterSettings; 063import dk.netarkivet.harvester.distribute.HarvesterMessageHandler; 064import dk.netarkivet.harvester.distribute.IndexReadyMessage; 065import dk.netarkivet.harvester.indexserver.FileBasedCache; 066import dk.netarkivet.harvester.indexserver.IndexRequestServerInterface; 067 068/** 069 * Index request server singleton. 070 * <p> 071 * This class contains a singleton that handles requesting an index over JMS. 072 * <p> 073 * It will ALWAYS reply to such messages, either with the index, a message telling that only a subset is available, and 074 * which, or an error message, 075 */ 076public final class IndexRequestServer extends HarvesterMessageHandler implements CleanupIF, IndexRequestServerInterface { 077 078 /** The class logger. */ 079 private static final Logger log = LoggerFactory.getLogger(IndexRequestServer.class); 080 081 /** The unique instance. */ 082 private static IndexRequestServer instance; 083 /** The handlers for index request types. */ 084 private Map<RequestType, FileBasedCache<Set<Long>>> handlers; 085 086 /** The connection to the JMSBroker. */ 087 private static JMSConnection conn; 088 /** A set with the current indexing jobs in progress. */ 089 private static Map<String, IndexRequestMessage> currentJobs; 090 091 /** The max number of concurrent jobs. */ 092 private static long maxConcurrentJobs; 093 /** Are we listening, now. */ 094 private static AtomicBoolean isListening = new AtomicBoolean(); 095 096 /** Interval in milliseconds between listening checks. */ 097 private static long listeningInterval; 098 /** The timer that initiates the checkIflisteningTask. */ 099 private Timer checkIflisteningTimer = new Timer(); 100 101 /** satisfactoryThreshold percentage as an integer. */ 102 private int satisfactoryThresholdPercentage; 103 104 /** 105 * The directory to store backup copies of the currentJobs. In case of the indexserver crashing. 106 */ 107 private File requestDir; 108 109 /** 110 * Initialise index request server with no handlers, listening to the index JMS channel. 111 */ 112 private IndexRequestServer() { 113 maxConcurrentJobs = Settings.getLong(HarvesterSettings.INDEXSERVER_INDEXING_MAXCLIENTS); 114 requestDir = Settings.getFile(HarvesterSettings.INDEXSERVER_INDEXING_REQUESTDIR); 115 listeningInterval = Settings.getLong(HarvesterSettings.INDEXSERVER_INDEXING_LISTENING_INTERVAL); 116 satisfactoryThresholdPercentage = Settings 117 .getInt(HarvesterSettings.INDEXSERVER_INDEXING_SATISFACTORYTHRESHOLD_PERCENTAGE); 118 119 currentJobs = new HashMap<String, IndexRequestMessage>(); 120 handlers = new EnumMap<RequestType, FileBasedCache<Set<Long>>>(RequestType.class); 121 conn = JMSConnectionFactory.getInstance(); 122 checkIflisteningTimer = new Timer(); 123 } 124 125 /** 126 * Restore old requests from requestDir. 127 */ 128 private void restoreRequestsfromRequestDir() { 129 if (!requestDir.exists()) { 130 log.info("requestdir not found: creating request dir"); 131 if (!requestDir.mkdirs()) { 132 throw new IOFailure("Unable to create requestdir '" + requestDir.getAbsolutePath() + "'"); 133 } else { 134 return; // requestdir was just created, so nothing to do 135 } 136 } 137 138 File[] requests = requestDir.listFiles(); 139 // Fill up the currentJobs 140 for (File request : requests) { 141 if (request.isFile()) { 142 final IndexRequestMessage msg = restoreMessage(request); 143 synchronized (currentJobs) { 144 if (!currentJobs.containsKey(msg.getID())) { 145 currentJobs.put(msg.getID(), msg); 146 } else { 147 log.debug("Skipped message w/id='{}'. Already among current jobs", msg.getID()); 148 continue; 149 } 150 151 } 152 // Start a new thread to handle the actual request. 153 new Thread() { 154 public void run() { 155 doProcessIndexRequestMessage(msg); 156 } 157 }.start(); 158 log.info("Restarting indexjob w/ ID={}", msg.getID()); 159 } else { 160 log.debug("Ignoring directory in requestdir: " + request.getAbsolutePath()); 161 } 162 } 163 } 164 165 /** 166 * Get the unique index request server instance. 167 * 168 * @return The index request server. 169 */ 170 public static synchronized IndexRequestServer getInstance() { 171 if (instance == null) { 172 instance = new IndexRequestServer(); 173 } 174 175 return instance; 176 } 177 178 /** 179 * Set handler for certain type of index request. If called more than once, new handler overwrites old one. 180 * 181 * @param t The type of index requested 182 * @param handler The handler that should handle this request. 183 */ 184 public void setHandler(RequestType t, FileBasedCache<Set<Long>> handler) { 185 ArgumentNotValid.checkNotNull(t, "RequestType t"); 186 ArgumentNotValid.checkNotNull(handler, "FileBasedCache<Set<Long>> handler"); 187 log.info("Setting handler for RequestType: {}", t); 188 handlers.put(t, handler); 189 } 190 191 /** 192 * Given a request for an index over a set of job ids, use a cache to try to create the index, Then reply result. 193 * <p> 194 * If for any reason not all requested jobs can be indexed, return the subset. The client can then retry with this 195 * subset, in order to get index of that subset. 196 * <p> 197 * Values read from the message in order to handle this: - Type of index requested - will use the index cache of 198 * this type - Set of job IDs - which jobs to generate index for 199 * <p> 200 * Values written to message before replying: - The subset indexed - may be the entire set. ALWAYS set unless reply 201 * !OK - File with index - ONLY if subset is entire set, the index requested. 202 * <p> 203 * This method should ALWAYS reply. May reply with not OK message if: - Message received was not OK - Request type 204 * is null or unknown in message - Set of job ids is null in message - Cache generation throws exception 205 * 206 * @param irMsg A message requesting an index. 207 * @throws ArgumentNotValid on null parameter 208 */ 209 public synchronized void visit(final IndexRequestMessage irMsg) throws ArgumentNotValid { 210 ArgumentNotValid.checkNotNull(irMsg, "IndexRequestMessage irMsg"); 211 // save new msg to requestDir 212 try { 213 saveMsg(irMsg); 214 synchronized (currentJobs) { 215 if (!currentJobs.containsKey(irMsg.getID())) { 216 currentJobs.put(irMsg.getID(), irMsg); 217 } else { 218 final String errMsg = "Should not happen. Skipping msg w/ id= '" + irMsg.getID() 219 + "' because already among current jobs. " 220 + "Unable to initiate indexing. Sending failed message back to sender"; 221 log.warn(errMsg); 222 irMsg.setNotOk(errMsg); 223 JMSConnectionFactory.getInstance().reply(irMsg); 224 return; 225 } 226 } 227 // Limit the number of concurrently indexing job 228 if (currentJobs.size() >= maxConcurrentJobs) { 229 if (isListening.get()) { 230 conn.removeListener(Channels.getTheIndexServer(), this); 231 isListening.set(false); 232 } 233 } 234 235 // Start a new thread to handle the actual request. 236 new Thread() { 237 public void run() { 238 doProcessIndexRequestMessage(irMsg); 239 } 240 }.start(); 241 log.debug("Now {} indexing jobs in progress", currentJobs.size()); 242 } catch (IOException e) { 243 final String errMsg = "Unable to initiate indexing. Send failed message back to sender: " + e; 244 log.warn(errMsg, e); 245 irMsg.setNotOk(errMsg); 246 JMSConnectionFactory.getInstance().reply(irMsg); 247 } 248 } 249 250 /** 251 * Save a IndexRequestMessage to disk. 252 * 253 * @param irMsg A message to store to disk 254 * @throws IOException Throws IOExecption, if unable to save message 255 */ 256 private void saveMsg(IndexRequestMessage irMsg) throws IOException { 257 File dest = new File(requestDir, irMsg.getID()); 258 log.debug("Storing message to {}", dest.getAbsolutePath()); 259 // Writing message to file 260 ObjectOutputStream oos = null; 261 try { 262 FileOutputStream fos = new FileOutputStream(dest); 263 oos = new ObjectOutputStream(fos); 264 oos.writeObject(irMsg); 265 } finally { 266 IOUtils.closeQuietly(oos); 267 } 268 269 } 270 271 /** 272 * Restore message from serialized state. 273 * 274 * @param serializedObject the object stored as a file. 275 * @return the restored message. 276 */ 277 private IndexRequestMessage restoreMessage(File serializedObject) { 278 Object obj = null; 279 ObjectInputStream ois = null; 280 try { 281 // Read the message from disk. 282 FileInputStream fis = new FileInputStream(serializedObject); 283 ois = new ObjectInputStream(fis); 284 285 obj = ois.readObject(); 286 } catch (ClassNotFoundException e) { 287 throw new IllegalState("Not possible to read the stored message from file '" 288 + serializedObject.getAbsolutePath() + "':", e); 289 } catch (IOException e) { 290 throw new IOFailure("Not possible to read the stored message from file '" 291 + serializedObject.getAbsolutePath() + "':", e); 292 } finally { 293 IOUtils.closeQuietly(ois); 294 } 295 296 if (obj instanceof IndexRequestMessage) { 297 return (IndexRequestMessage) obj; 298 } else { 299 throw new IllegalState("The serialized message is not a " + IndexRequestMessage.class.getName() + " but a " 300 + obj.getClass().getName()); 301 } 302 } 303 304 /** 305 * Method that handles the processing of an indexRequestMessage. Returns the requested index immediately, if already 306 * available, otherwise proceeds with the index generation of the requested index. Must be run in its own thread, 307 * because it blocks while the index is generated. 308 * 309 * @param irMsg A message requesting an index 310 * @see #visit(IndexRequestMessage) 311 */ 312 private void doProcessIndexRequestMessage(final IndexRequestMessage irMsg) { 313 final boolean mustReturnIndex = irMsg.mustReturnIndex(); 314 try { 315 checkMessage(irMsg); 316 RequestType type = irMsg.getRequestType(); 317 Set<Long> jobIDs = irMsg.getRequestedJobs(); 318 319 if (log.isInfoEnabled()) { 320 log.info("Request received for an index of type '{}' for the {} jobs [{}]", type, jobIDs.size(), 321 StringUtils.conjoin(",", jobIDs)); 322 } 323 FileBasedCache<Set<Long>> handler = handlers.get(type); 324 325 // Here we need to make sure that we don't accidentally process more than 326 // one message at the time before the whole process is over 327 List<Long> sortedList = new ArrayList<Long>(jobIDs); 328 String allIDsString = StringUtils.conjoin("-", sortedList); 329 String checksum = ChecksumCalculator.calculateMd5(allIDsString.getBytes()); 330 log.debug("Waiting to enter the synchronization zone for the indexing job of size {} with checksum '{}'", 331 jobIDs.size(), checksum); 332 // Begin synchronization 333 synchronized (checksum.intern()) { 334 log.debug("The indexing job of size {} with checksum '{}' is now in the synchronization zone", 335 jobIDs.size(), checksum); 336 Set<Long> foundIDs = handler.cache(jobIDs); 337 irMsg.setFoundJobs(foundIDs); 338 if (foundIDs.equals(jobIDs)) { 339 if (log.isInfoEnabled()) { 340 log.info("Retrieved successfully index of type '{}' for the {} jobs [{}]", type, jobIDs.size(), 341 StringUtils.conjoin(",", jobIDs)); 342 } 343 File cacheFile = handler.getCacheFile(jobIDs); 344 if (mustReturnIndex) { // return index now! 345 packageResultFiles(irMsg, cacheFile); 346 } 347 } else if (satisfactoryTresholdReached(foundIDs, jobIDs)) { 348 log.info("Data for full index w/ {} jobs not available. Only found data for {} jobs - " 349 + "but satisfactoryTreshold reached, so assuming presence of all data", jobIDs.size(), 350 foundIDs.size()); 351 // Make sure that the index of the data available is generated 352 Set<Long> theFoundIDs = handler.cache(foundIDs); 353 // TheFoundIDS should be identical to foundIDs 354 // Lets make sure of that 355 Set<Long> diffSet = new HashSet<Long>(foundIDs); 356 diffSet.removeAll(theFoundIDs); 357 358 // Make a copy of the index available, and give it the name of 359 // the index cache file wanted. 360 File cacheFileWanted = handler.getCacheFile(jobIDs); 361 File cacheFileCreated = handler.getCacheFile(foundIDs); 362 363 log.info("Satisfactory threshold reached - copying index {} '{}' to full index: {}", 364 (cacheFileCreated.isDirectory() ? "dir" : "file"), cacheFileCreated.getAbsolutePath(), 365 cacheFileWanted.getAbsolutePath()); 366 if (cacheFileCreated.isDirectory()) { 367 // create destination cacheFileWanted, and 368 // copy all files in cacheFileCreated to cacheFileWanted. 369 cacheFileWanted.mkdirs(); 370 FileUtils.copyDirectory(cacheFileCreated, cacheFileWanted); 371 } else { 372 FileUtils.copyFile(cacheFileCreated, cacheFileWanted); 373 } 374 375 // TODO This delete-operation commented out, because it is deemed too dangerous, 376 // as the cachedir represented by cacheFileCreated may still be used 377 378 // log.info("Deleting the temporary index " 379 // + cacheFileCreated.getAbsolutePath()); 380 // FileUtils.removeRecursively(cacheFileCreated); 381 log.info("We keep the index '{}', as we don't know if anybody is using it", 382 cacheFileCreated.getAbsolutePath()); 383 384 // Information needed by recipient to store index in local cache 385 irMsg.setFoundJobs(jobIDs); 386 if (mustReturnIndex) { // return index now. 387 packageResultFiles(irMsg, cacheFileWanted); 388 } 389 } else { 390 Set<Long> missingJobIds = new HashSet<Long>(jobIDs); 391 missingJobIds.removeAll(foundIDs); 392 log.warn("Failed generating index of type '{}' for the jobs [{}]. Missing data for jobs [{}].", 393 type, StringUtils.conjoin(",", jobIDs), StringUtils.conjoin(",", missingJobIds)); 394 } 395 396 } // End of synchronization block 397 } catch (Throwable t) { 398 log.warn("Unable to generate index for jobs [" + StringUtils.conjoin(",", irMsg.getRequestedJobs()) + "]", 399 t); 400 irMsg.setNotOk(t); 401 } finally { 402 // Remove job from currentJobs Set 403 synchronized (currentJobs) { 404 currentJobs.remove(irMsg.getID()); 405 } 406 // delete stored message 407 deleteStoredMessage(irMsg); 408 String state = "failed"; 409 if (irMsg.isOk()) { 410 state = "successful"; 411 } 412 if (mustReturnIndex) { 413 log.info("Sending {} reply for IndexRequestMessage back to sender '{}'.", state, irMsg.getReplyTo()); 414 JMSConnectionFactory.getInstance().reply(irMsg); 415 } else { 416 log.info("Sending {} IndexReadyMessage to Scheduler for harvest {}", state, irMsg.getHarvestId()); 417 boolean isindexready = true; 418 if (state.equalsIgnoreCase("failed")) { 419 isindexready = false; 420 } 421 IndexReadyMessage irm = new IndexReadyMessage(irMsg.getHarvestId(), isindexready, irMsg.getReplyTo(), 422 Channels.getTheIndexServer()); 423 JMSConnectionFactory.getInstance().send(irm); 424 } 425 } 426 } 427 428 /** 429 * Package the result files with the message reply. 430 * 431 * @param irMsg the message being answered 432 * @param cacheFile The location of the result on disk. 433 */ 434 private void packageResultFiles(IndexRequestMessage irMsg, File cacheFile) { 435 RemoteFileSettings connectionParams = irMsg.getRemoteFileSettings(); 436 437 if (connectionParams != null) { 438 log.debug("Trying to use client supplied RemoteFileServer: {}", connectionParams.getServerName()); 439 } 440 if (cacheFile.isDirectory()) { 441 // This cache uses multiple files stored in a directory, 442 // so transfer them all. 443 File[] cacheFiles = cacheFile.listFiles(); 444 List<RemoteFile> resultFiles = new ArrayList<RemoteFile>(cacheFiles.length); 445 for (File f : cacheFiles) { 446 resultFiles.add(RemoteFileFactory.getCopyfileInstance(f, irMsg.getRemoteFileSettings())); 447 } 448 irMsg.setResultFiles(resultFiles); 449 } else { 450 irMsg.setResultFile(RemoteFileFactory.getCopyfileInstance(cacheFile, irMsg.getRemoteFileSettings())); 451 } 452 } 453 454 /** 455 * Threshold for when the created index contains enough data to be considered a satisfactory index. Uses the 456 * {@link IndexRequestServer#satisfactoryThresholdPercentage}. 457 * 458 * @param foundIDs The list of IDs contained in the index 459 * @param requestedIDs The list of IDs requested in the index. 460 * @return true, if the ratio foundIDs/requestedIDs is above the 461 * {@link IndexRequestServer#satisfactoryThresholdPercentage}. 462 */ 463 private boolean satisfactoryTresholdReached(Set<Long> foundIDs, Set<Long> requestedIDs) { 464 int jobsRequested = requestedIDs.size(); 465 int jobsFound = foundIDs.size(); 466 int percentage = (jobsFound * 100) / jobsRequested; 467 if (percentage > satisfactoryThresholdPercentage) { 468 return true; 469 } else { 470 return false; 471 } 472 } 473 474 /** 475 * Deleted stored file for given message. 476 * 477 * @param irMsg a given IndexRequestMessage 478 */ 479 private void deleteStoredMessage(IndexRequestMessage irMsg) { 480 File expectedSerializedFile = new File(requestDir, irMsg.getID()); 481 log.debug("Trying to delete stored serialized message: {}", expectedSerializedFile.getAbsolutePath()); 482 if (!expectedSerializedFile.exists()) { 483 log.warn("The file does not exist any more."); 484 return; 485 } 486 boolean deleted = FileUtils.remove(expectedSerializedFile); 487 if (!deleted) { 488 log.debug("The file '{}' was not deleted", expectedSerializedFile); 489 } 490 } 491 492 /** 493 * Helper method to check message properties. Will throw exceptions on any trouble. 494 * 495 * @param irMsg The message to check. 496 * @throws ArgumentNotValid If message is not OK, or if the list of jobs or the index request type is null. 497 * @throws UnknownID If the index request type is of a form that is unknown to the server. 498 */ 499 private void checkMessage(final IndexRequestMessage irMsg) throws UnknownID, ArgumentNotValid { 500 ArgumentNotValid.checkTrue(irMsg.isOk(), "Message was not OK"); 501 ArgumentNotValid.checkNotNull(irMsg.getRequestType(), "RequestType type"); 502 ArgumentNotValid.checkNotNull(irMsg.getRequestedJobs(), "Set<Long> jobIDs"); 503 if (handlers.get(irMsg.getRequestType()) == null) { 504 throw new UnknownID("No handler known for requesttype " + irMsg.getRequestType()); 505 } 506 } 507 508 /** Releases the JMS-connection and resets the singleton. */ 509 public void close() { 510 cleanup(); 511 } 512 513 /** Releases the JMS-connection and resets the singleton. */ 514 public void cleanup() { 515 // shutdown listening timer. 516 checkIflisteningTimer.cancel(); 517 conn.removeListener(Channels.getTheIndexServer(), this); 518 handlers.clear(); 519 520 if (instance != null) { 521 instance = null; 522 } 523 } 524 525 /** 526 * Look for stored messages to be preprocessed, and start processing those. And start the separate thread that 527 * decides if we should listen for index-requests. 528 */ 529 public void start() { 530 restoreRequestsfromRequestDir(); 531 log.info("{} indexing jobs in progress that was stored in requestdir: {}", currentJobs.size(), 532 requestDir.getAbsolutePath()); 533 534 // Define and start thread to observe current jobs: 535 // Only job is to look at the isListening atomicBoolean. 536 // If not listening, check if we are ready to listen again. 537 TimerTask checkIfListening = new ListeningTask(this); 538 isListening.set(false); 539 checkIflisteningTimer.schedule(checkIfListening, 0L, listeningInterval); 540 } 541 542 /** 543 * Defines the task to repeatedly check the listening status. And begin listening again, if we are ready for more 544 * tasks. 545 */ 546 private static class ListeningTask extends TimerTask { 547 /** The indexrequestserver this task is associated with. */ 548 private IndexRequestServer thisIrs; 549 550 /** 551 * Constructor for the ListeningTask. 552 * 553 * @param irs The indexrequestserver this task should be associated with 554 */ 555 ListeningTask(IndexRequestServer irs) { 556 thisIrs = irs; 557 } 558 559 @Override 560 public void run() { 561 log.trace("Checking if we should be listening again"); 562 if (!isListening.get()) { 563 if (maxConcurrentJobs > currentJobs.size()) { 564 log.info("Enabling listening to the indexserver channel '{}'", Channels.getTheIndexServer()); 565 conn.setListener(Channels.getTheIndexServer(), thisIrs); 566 isListening.set(true); 567 } 568 } 569 } 570 571 } 572 573}