001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.indexserver.distribute; 024 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.FileOutputStream; 028import java.io.FileReader; 029import java.io.IOException; 030import java.io.ObjectInputStream; 031import java.io.ObjectOutputStream; 032import java.util.ArrayList; 033import java.util.EnumMap; 034import java.util.HashMap; 035import java.util.HashSet; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.Timer; 040import java.util.TimerTask; 041import java.util.concurrent.atomic.AtomicBoolean; 042 043import org.apache.commons.io.IOUtils; 044import org.apache.commons.io.LineIterator; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import dk.netarkivet.common.distribute.Channels; 049import dk.netarkivet.common.distribute.JMSConnection; 050import dk.netarkivet.common.distribute.JMSConnectionFactory; 051import dk.netarkivet.common.distribute.RemoteFile; 052import dk.netarkivet.common.distribute.RemoteFileFactory; 053import dk.netarkivet.common.distribute.indexserver.RequestType; 054import dk.netarkivet.common.exceptions.ArgumentNotValid; 055import dk.netarkivet.common.exceptions.IOFailure; 056import dk.netarkivet.common.exceptions.IllegalState; 057import dk.netarkivet.common.exceptions.UnknownID; 058import dk.netarkivet.common.utils.CleanupIF; 059import dk.netarkivet.common.utils.FileUtils; 060import dk.netarkivet.common.utils.Settings; 061import dk.netarkivet.common.utils.StringUtils; 062import dk.netarkivet.harvester.HarvesterSettings; 063import dk.netarkivet.harvester.distribute.HarvesterMessageHandler; 064import dk.netarkivet.harvester.distribute.IndexReadyMessage; 065import dk.netarkivet.harvester.indexserver.FileBasedCache; 066import dk.netarkivet.harvester.indexserver.IndexRequestServerInterface; 067 068/** 069 * Index request server singleton. 070 * <p> 071 * This class contains a singleton that handles requesting an index over JMS. 072 * <p> 073 * This has two modes. 1) Given a file with a list of jobIDs, it will always return the same lucene index based on the 074 * list of job identifiers in the file regardless of what kind of index the client is requesting. 2) if setting 075 * "settings.harvester.indexserver.alwaysSetIsIndexReadyToFalse" is true it will always return the IndexRequestMessage 076 * with isindexready set to false. 077 */ 078public final class TestIndexRequestServer extends HarvesterMessageHandler implements CleanupIF, 079 IndexRequestServerInterface { 080 081 /** The class logger. */ 082 private static final Logger log = LoggerFactory.getLogger(TestIndexRequestServer.class); 083 084 /** The default place in classpath where the settings file can be found. */ 085 private static String defaultSettingsClasspath = "dk/netarkivet/harvester/" 086 + "indexserver/distribute/TestIndexRequestServerSettings.xml"; 087 088 /* 089 * The static initialiser is called when the class is loaded. It will add default values for all settings defined in 090 * this class, by loading them from a settings.xml file in classpath. 091 */ 092 static { 093 Settings.addDefaultClasspathSettings(defaultSettingsClasspath); 094 } 095 096 /** 097 * <b>settings.harvester.indexserver.fileContainingJobsForTestindex<b>: The file containing the list of jobids that 098 * the test index uses as data. The default name of the file is "jobids.txt" 099 */ 100 public static String JOBS_FOR_TESTINDEX = "settings.harvester.indexserver.indexrequestserver.fileContainingJobsForTestindex"; 101 102 /** 103 * <b>settings.archive.indexserver.alwaysSetIsIndexReadyToFalse<b>: The default: false. If set to true, the 104 * IndexRequestMessage returned has always isindexready = false. 105 */ 106 public static String ALWAYS_SET_ISINDEX_READY_TO_FALSE = "settings.harvester.indexserver.indexrequestserver.alwaysSetIsIndexReadyToFalse"; 107 108 /** The unique instance. */ 109 private static TestIndexRequestServer instance; 110 /** The handlers for index request types. */ 111 private Map<RequestType, FileBasedCache<Set<Long>>> handlers; 112 113 /** The connection to the JMSBroker. */ 114 private static JMSConnection conn; 115 /** A set with the current indexing jobs in progress. */ 116 private static Map<String, IndexRequestMessage> currentJobs; 117 /** The max number of concurrent jobs. */ 118 private static long maxConcurrentJobs; 119 /** Are we listening, now. */ 120 private static AtomicBoolean isListening = new AtomicBoolean(); 121 122 /** Interval in milliseconds between listening checks. */ 123 private static long listeningInterval; 124 /** The timer that initiates the checkIflisteningTask. */ 125 private Timer checkIflisteningTimer = new Timer(); 126 127 /** The File containing the list of jobids, that the default index consists of. */ 128 private File jobsForDefaultIndex; 129 130 private boolean alwaysReturnFalseMode = false; 131 132 /** The set of Jobs ids used for the default index. */ 133 private Set<Long> defaultIDs; 134 135 /** 136 * The directory to store backup copies of the currentJobs. In case of the indexserver crashing. 137 */ 138 private File requestDir; 139 140 /** 141 * Initialise index request server with no handlers, listening to the index JMS channel. 142 */ 143 private TestIndexRequestServer() { 144 maxConcurrentJobs = Settings.getLong(HarvesterSettings.INDEXSERVER_INDEXING_MAXCLIENTS); 145 requestDir = Settings.getFile(HarvesterSettings.INDEXSERVER_INDEXING_REQUESTDIR); 146 listeningInterval = Settings.getLong(HarvesterSettings.INDEXSERVER_INDEXING_LISTENING_INTERVAL); 147 148 alwaysReturnFalseMode = Settings.getBoolean(ALWAYS_SET_ISINDEX_READY_TO_FALSE); 149 if (alwaysReturnFalseMode) { 150 log.info("alwaysSetIsIndexReadyToFalse is true"); 151 } else { 152 log.info("alwaysSetIsIndexReadyToFalse is false"); 153 } 154 155 jobsForDefaultIndex = Settings.getFile(JOBS_FOR_TESTINDEX); 156 157 if (!jobsForDefaultIndex.exists()) { 158 final String msg = "The file '" + jobsForDefaultIndex.getAbsolutePath() + "' does not exist"; 159 log.error("The file containing job identifiers for default index '{}' does not exist", 160 jobsForDefaultIndex.getAbsolutePath()); 161 System.err.println(msg + ". Exiting program"); 162 System.exit(1); 163 } 164 defaultIDs = readLongsFromFile(jobsForDefaultIndex); 165 currentJobs = new HashMap<String, IndexRequestMessage>(); 166 handlers = new EnumMap<RequestType, FileBasedCache<Set<Long>>>(RequestType.class); 167 conn = JMSConnectionFactory.getInstance(); 168 checkIflisteningTimer = new Timer(); 169 } 170 171 private Set<Long> readLongsFromFile(File fileWithLongs) { 172 Set<Long> resultSet = new HashSet<Long>(); 173 try { 174 LineIterator lineIterator = new LineIterator(new FileReader(fileWithLongs)); 175 while (lineIterator.hasNext()) { 176 String line = lineIterator.next(); 177 resultSet.add(Long.parseLong(line)); 178 } 179 } catch (IOException e) { 180 log.error("Unable to read from file '{}'. Returns set of size {}", fileWithLongs.getAbsolutePath(), 181 resultSet.size()); 182 } 183 184 return resultSet; 185 } 186 187 /** 188 * Restore old requests from requestDir. 189 */ 190 private void restoreRequestsfromRequestDir() { 191 if (!requestDir.exists()) { 192 log.info("requestdir not found: creating request dir"); 193 if (!requestDir.mkdirs()) { 194 throw new IOFailure("Unable to create requestdir '" + requestDir.getAbsolutePath() + "'"); 195 } else { 196 return; // requestdir was just created, so nothing to do 197 } 198 } 199 200 File[] requests = requestDir.listFiles(); 201 // Fill up the currentJobs 202 for (File request : requests) { 203 if (request.isFile()) { 204 final IndexRequestMessage msg = restoreMessage(request); 205 synchronized (currentJobs) { 206 if (!currentJobs.containsKey(msg.getID())) { 207 currentJobs.put(msg.getID(), msg); 208 } else { 209 log.debug("Skipped message w/id='{}'. Already among current jobs", msg.getID()); 210 continue; 211 } 212 213 } 214 // Start a new thread to handle the actual request. 215 new Thread() { 216 public void run() { 217 doGenerateIndex(msg); 218 } 219 }.start(); 220 log.info("Restarting indexjob w/ ID={}", msg.getID()); 221 } else { 222 log.debug("Ignoring directory in requestdir: {}", request.getAbsolutePath()); 223 } 224 } 225 } 226 227 /** 228 * Get the unique index request server instance. 229 * 230 * @return The index request server. 231 */ 232 public static synchronized TestIndexRequestServer getInstance() { 233 if (instance == null) { 234 instance = new TestIndexRequestServer(); 235 } 236 237 return instance; 238 } 239 240 /** 241 * Set handler for certain type of index request. If called more than once, new handler overwrites old one. 242 * 243 * @param t The type of index requested 244 * @param handler The handler that should handle this request. 245 */ 246 public void setHandler(RequestType t, FileBasedCache<Set<Long>> handler) { 247 ArgumentNotValid.checkNotNull(t, "RequestType t"); 248 ArgumentNotValid.checkNotNull(handler, "FileBasedCache<Set<Long>> handler"); 249 log.info("Setting handler for RequestType: " + t); 250 handlers.put(t, handler); 251 } 252 253 /** 254 * Given a request for an index over a set of job ids, use a cache to try to create the index, Then reply result. 255 * <p> 256 * If for any reason not all requested jobs can be indexed, return the subset. The client can then retry with this 257 * subset, in order to get index of that subset. 258 * <p> 259 * Values read from the message in order to handle this: - Type of index requested - will use the index cache of 260 * this type - Set of job IDs - which jobs to generate index for 261 * <p> 262 * Values written to message before replying: - The subset indexed - may be the entire set. ALWAYS set unless reply 263 * !OK - File with index - ONLY if subset is entire set, the index requested. 264 * <p> 265 * This method should ALWAYS reply. May reply with not OK message if: - Message received was not OK - Request type 266 * is null or unknown in message - Set of job ids is null in message - Cache generation throws exception 267 * 268 * @param irMsg A message requesting an index. 269 * @throws ArgumentNotValid on null parameter 270 */ 271 public synchronized void visit(final IndexRequestMessage irMsg) throws ArgumentNotValid { 272 ArgumentNotValid.checkNotNull(irMsg, "IndexRequestMessage irMsg"); 273 // save new msg to requestDir 274 try { 275 saveMsg(irMsg); 276 synchronized (currentJobs) { 277 if (!currentJobs.containsKey(irMsg.getID())) { 278 currentJobs.put(irMsg.getID(), irMsg); 279 } else { 280 final String errMsg = "Should not happen. Skipping msg w/ id= '" + irMsg.getID() + "' " 281 + "because already among current jobs. " 282 + "Unable to initiate indexing. Sending failed message back to sender"; 283 log.warn(errMsg); 284 irMsg.setNotOk(errMsg); 285 JMSConnectionFactory.getInstance().reply(irMsg); 286 return; 287 } 288 } 289 // Limit the number of concurrently indexing job 290 if (currentJobs.size() >= maxConcurrentJobs) { 291 if (isListening.get()) { 292 conn.removeListener(Channels.getTheIndexServer(), this); 293 isListening.set(false); 294 } 295 } 296 297 // Start a new thread to handle the actual request. 298 new Thread() { 299 public void run() { 300 doGenerateIndex(irMsg); 301 } 302 }.start(); 303 log.debug("Now {} indexing jobs in progress", currentJobs.size()); 304 } catch (IOException e) { 305 final String errMsg = "Unable to initiate indexing. Send failed message back to sender: " + e; 306 log.warn(errMsg, e); 307 irMsg.setNotOk(errMsg); 308 JMSConnectionFactory.getInstance().reply(irMsg); 309 } 310 } 311 312 /** 313 * Save a IndexRequestMessage to disk. 314 * 315 * @param irMsg A message to store to disk 316 * @throws IOException Throws IOExecption, if unable to save message 317 */ 318 private void saveMsg(IndexRequestMessage irMsg) throws IOException { 319 File dest = new File(requestDir, irMsg.getID()); 320 log.debug("Storing message to {}", dest.getAbsolutePath()); 321 // Writing message to file 322 ObjectOutputStream oos = null; 323 try { 324 FileOutputStream fos = new FileOutputStream(dest); 325 oos = new ObjectOutputStream(fos); 326 oos.writeObject(irMsg); 327 } finally { 328 IOUtils.closeQuietly(oos); 329 } 330 } 331 332 /** 333 * Restore message from serialized state. 334 * 335 * @param serializedObject the object stored as a file. 336 * @return the restored message. 337 */ 338 private IndexRequestMessage restoreMessage(File serializedObject) { 339 Object obj = null; 340 ObjectInputStream ois = null; 341 try { 342 // Read the message from disk. 343 FileInputStream fis = new FileInputStream(serializedObject); 344 345 ois = new ObjectInputStream(fis); 346 347 obj = ois.readObject(); 348 } catch (ClassNotFoundException e) { 349 throw new IllegalState("Not possible to read the stored message from file '" 350 + serializedObject.getAbsolutePath() + "':", e); 351 } catch (IOException e) { 352 throw new IOFailure("Not possible to read the stored message from file '" 353 + serializedObject.getAbsolutePath() + "':", e); 354 } finally { 355 IOUtils.closeQuietly(ois); 356 } 357 358 if (obj instanceof IndexRequestMessage) { 359 return (IndexRequestMessage) obj; 360 } else { 361 throw new IllegalState("The serialized message is not a " + IndexRequestMessage.class.getName() + " but a " 362 + obj.getClass().getName()); 363 } 364 } 365 366 /** 367 * Method that handles generating an index; supposed to be run in its own thread, because it blocks while the index 368 * is generated. 369 * 370 * @param irMsg A message requesting an index 371 * @see #visit(IndexRequestMessage) 372 */ 373 private void doGenerateIndex(final IndexRequestMessage irMsg) { 374 final boolean mustReturnIndex = irMsg.mustReturnIndex(); 375 try { 376 checkMessage(irMsg); 377 RequestType type = irMsg.getRequestType(); 378 Set<Long> requestedJobIDs = irMsg.getRequestedJobs(); 379 380 log.info("Generating an index of type '{}' for the jobs [{}]", type, StringUtils.conjoin(",", defaultIDs)); 381 382 FileBasedCache<Set<Long>> handler = handlers.get(type); 383 Set<Long> foundIDs = handler.cache(defaultIDs); 384 if (!foundIDs.containsAll(defaultIDs)) { 385 defaultIDs = foundIDs; 386 } 387 irMsg.setFoundJobs(requestedJobIDs); // Say that everything was found 388 389 log.info("Returning default index"); 390 391 File cacheFile = handler.getCacheFile(defaultIDs); 392 393 if (mustReturnIndex) { // return index now! (default behaviour) 394 if (cacheFile.isDirectory()) { 395 // This cache uses multiple files stored in a directory, 396 // so transfer them all. 397 File[] cacheFiles = cacheFile.listFiles(); 398 List<RemoteFile> resultFiles = new ArrayList<RemoteFile>(cacheFiles.length); 399 for (File f : cacheFiles) { 400 resultFiles.add(RemoteFileFactory.getCopyfileInstance(f)); 401 } 402 irMsg.setResultFiles(resultFiles); 403 } else { 404 irMsg.setResultFile(RemoteFileFactory.getCopyfileInstance(cacheFile)); 405 } 406 } 407 408 } catch (Throwable t) { 409 log.warn("Unable to generate index for jobs [{}]", StringUtils.conjoin(",", irMsg.getRequestedJobs()), t); 410 irMsg.setNotOk(t); 411 } finally { 412 // Remove job from currentJobs Set 413 synchronized (currentJobs) { 414 currentJobs.remove(irMsg.getID()); 415 } 416 // delete stored message 417 deleteStoredMessage(irMsg); 418 String state = "failed"; 419 if (irMsg.isOk()) { 420 state = "successful"; 421 } 422 if (mustReturnIndex) { 423 log.info("Sending {} reply for IndexRequestMessage back to sender '{}'.", state, irMsg.getReplyTo()); 424 JMSConnectionFactory.getInstance().reply(irMsg); 425 } else { 426 log.info("Sending{} IndexReadyMessage to Scheduler", state); 427 boolean isindexready = true; 428 if (state.equalsIgnoreCase("failed")) { 429 isindexready = false; 430 } 431 if (alwaysReturnFalseMode) { 432 log.info("Setting isindexready = false in return message"); 433 isindexready = false; 434 } 435 IndexReadyMessage irm = new IndexReadyMessage(irMsg.getHarvestId(), isindexready, irMsg.getReplyTo(), 436 Channels.getTheIndexServer()); 437 JMSConnectionFactory.getInstance().send(irm); 438 } 439 } 440 } 441 442 /** 443 * Deleted stored file for given message. 444 * 445 * @param irMsg a given IndexRequestMessage 446 */ 447 private void deleteStoredMessage(IndexRequestMessage irMsg) { 448 File expectedSerializedFile = new File(requestDir, irMsg.getID()); 449 log.debug("Trying to delete stored serialized message: {}", expectedSerializedFile.getAbsolutePath()); 450 if (!expectedSerializedFile.exists()) { 451 log.warn("The file does not exist any more."); 452 return; 453 } 454 boolean deleted = FileUtils.remove(expectedSerializedFile); 455 if (!deleted) { 456 log.debug("The file '{}' was not deleted", expectedSerializedFile); 457 } 458 } 459 460 /** 461 * Helper method to check message properties. Will throw exceptions on any trouble. 462 * 463 * @param irMsg The message to check. 464 * @throws ArgumentNotValid If message is not OK, or if the list of jobs or the index request type is null. 465 * @throws UnknownID If the index request type is of a form that is unknown to the server. 466 */ 467 private void checkMessage(final IndexRequestMessage irMsg) throws UnknownID, ArgumentNotValid { 468 ArgumentNotValid.checkTrue(irMsg.isOk(), "Message was not OK"); 469 ArgumentNotValid.checkNotNull(irMsg.getRequestType(), "RequestType type"); 470 ArgumentNotValid.checkNotNull(irMsg.getRequestedJobs(), "Set<Long> jobIDs"); 471 if (handlers.get(irMsg.getRequestType()) == null) { 472 throw new UnknownID("No handler known for requesttype " + irMsg.getRequestType()); 473 } 474 } 475 476 /** Releases the JMS-connection and resets the singleton. */ 477 public void close() { 478 cleanup(); 479 } 480 481 /** Releases the JMS-connection and resets the singleton. */ 482 public void cleanup() { 483 // shutdown listening timer. 484 checkIflisteningTimer.cancel(); 485 conn.removeListener(Channels.getTheIndexServer(), this); 486 handlers.clear(); 487 488 if (instance != null) { 489 instance = null; 490 } 491 } 492 493 /** 494 * Look for stored messages to be preprocessed, and start processing those. And start the separate thread that 495 * decides if we should listen for index-requests. 496 */ 497 public void start() { 498 restoreRequestsfromRequestDir(); 499 log.info("{} indexing jobs in progress that was stored in requestdir: {}", currentJobs.size(), 500 requestDir.getAbsolutePath()); 501 502 // Define and start thread to observe current jobs: 503 // Only job is to look at the isListening atomicBoolean. 504 // If not listening, check if we are ready to listen again. 505 TimerTask checkIfListening = new ListeningTask(this); 506 isListening.set(false); 507 checkIflisteningTimer.schedule(checkIfListening, 0L, listeningInterval); 508 } 509 510 /** 511 * Defines the task to repeatedly check the listening status. And begin listening again, if we are ready for more 512 * tasks. 513 */ 514 private static class ListeningTask extends TimerTask { 515 516 /** The indexrequestserver this task is associated with. */ 517 private TestIndexRequestServer thisIrs; 518 519 /** 520 * Constructor for the ListeningTask. 521 * 522 * @param irs The indexrequestserver this task should be associated with 523 */ 524 ListeningTask(TestIndexRequestServer irs) { 525 thisIrs = irs; 526 } 527 528 @Override 529 public void run() { 530 log.trace("Checking if we should be listening again"); 531 if (!isListening.get()) { 532 if (maxConcurrentJobs > currentJobs.size()) { 533 log.info("Enabling listening to the indexserver channel '{}'", Channels.getTheIndexServer()); 534 conn.setListener(Channels.getTheIndexServer(), thisIrs); 535 isListening.set(true); 536 } 537 } 538 } 539 540 } 541 542}