001package dk.netarkivet.harvester.tools.dummy; 002 003import java.io.File; 004 005import org.slf4j.Logger; 006import org.slf4j.LoggerFactory; 007 008import dk.netarkivet.common.CommonSettings; 009import dk.netarkivet.common.distribute.ChannelID; 010import dk.netarkivet.common.distribute.JMSConnection; 011import dk.netarkivet.common.distribute.JMSConnectionFactory; 012import dk.netarkivet.common.exceptions.ArgumentNotValid; 013import dk.netarkivet.common.exceptions.IOFailure; 014import dk.netarkivet.common.exceptions.PermissionDenied; 015import dk.netarkivet.common.exceptions.UnknownID; 016import dk.netarkivet.common.lifecycle.PeriodicTaskExecutor; 017import dk.netarkivet.common.utils.ApplicationUtils; 018import dk.netarkivet.common.utils.CleanupIF; 019import dk.netarkivet.common.utils.DomainUtils; 020import dk.netarkivet.common.utils.FileUtils; 021import dk.netarkivet.common.utils.NotificationType; 022import dk.netarkivet.common.utils.NotificationsFactory; 023import dk.netarkivet.common.utils.Settings; 024import dk.netarkivet.common.utils.SystemUtils; 025import dk.netarkivet.common.utils.TimeUtils; 026import dk.netarkivet.harvester.HarvesterSettings; 027import dk.netarkivet.harvester.datamodel.Job; 028import dk.netarkivet.harvester.datamodel.JobStatus; 029import dk.netarkivet.harvester.distribute.HarvesterChannels; 030import dk.netarkivet.harvester.distribute.HarvesterMessageHandler; 031import dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage; 032import dk.netarkivet.harvester.harvesting.distribute.DoOneCrawlMessage; 033import dk.netarkivet.harvester.harvesting.distribute.HarvesterReadyMessage; 034import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationRequest; 035import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationResponse; 036 037/** 038 * This class responds to JMS doOneCrawl messages from the HarvestScheduler and waits 10 minutes before failing the job. 039 * <p> 040 * Initially, it registers its channel with the Scheduler by sending a HarvesterRegistrationRequest and waits for a positive HarvesterRegistrationResponse 041 * that its channel is recognized. If not recognized by the Scheduler, the HarvestControllerServer will send a notification about this, 042 * and then close down the application. 043 * 044 * During its operation CrawlStatus messages are sent to the HarvestSchedulerMonitorServer. When responding to the message, 045 * it sends a message with status 'STARTED'. When 10 minutes have passed, a message is sent with status 'FAILED'. 046 * <p> 047 * Before the 10 minutes starts, the JMS listener is removed to avoid handling more than one doOneCrawlMessage at a time 048 * During the 10 minutes, it will send two (instead of one) HarvesterReadyMessages to the scheduler to test issue NAS-2614. 049 * The interval between sending HarvesterReadyMessages is defined by the setting 'settings.harvester.harvesting.sendReadyDelay'. 050 * <p> 051 */ 052public class FaultyHarvestControllerServer extends HarvesterMessageHandler implements CleanupIF { 053 054 055 /** The logger to use. */ 056 private static final Logger log = LoggerFactory.getLogger(FaultyHarvestControllerServer.class); 057 058 /** The unique instance of this class. */ 059 private static FaultyHarvestControllerServer instance; 060 061 /** The configured application instance id. @see CommonSettings#APPLICATION_INSTANCE_ID */ 062 private final String applicationInstanceId = Settings.get(CommonSettings.APPLICATION_INSTANCE_ID); 063 064 /** The name of the server this <code>HarvestControllerServer</code> is running on. */ 065 private final String physicalServerName = DomainUtils.reduceHostname(SystemUtils.getLocalHostName()); 066 067 /** Min. space required to start a job. */ 068 private final long minSpaceRequired; 069 070 /** The JMSConnection to use. */ 071 private JMSConnection jmsConnection; 072 073 /** The JMS channel on which to listen for {@link HarvesterRegistrationResponse}s. */ 074 public static final ChannelID HARVEST_CHAN_VALID_RESP_ID = HarvesterChannels 075 .getHarvesterRegistrationResponseChannel(); 076 077 /** The CHANNEL of jobs processed by this instance. */ 078 private static final String CHANNEL = Settings.get(HarvesterSettings.HARVEST_CONTROLLER_CHANNEL); 079 080 /** Jobs are fetched from this queue. */ 081 private ChannelID jobChannel; 082 083 /** the serverdir, where the harvesting takes place. */ 084 private final File serverDir; 085 086 /** This is true while a doOneCrawl is running. No jobs are accepted while this is running. */ 087 private CrawlStatus status; 088 089 /** 090 * Returns or creates the unique instance of this singleton. 091 * @return The instance 092 * @throws PermissionDenied If the serverdir or oldjobsdir can't be created 093 * @throws IOFailure if data from old harvests exist, but contain illegal data 094 */ 095 public static synchronized FaultyHarvestControllerServer getInstance() throws IOFailure { 096 if (instance == null) { 097 instance = new FaultyHarvestControllerServer(); 098 } 099 return instance; 100 } 101 102 /** 103 * In this constructor, the server creates an instance of the HarvestController, uploads any arc-files from 104 * incomplete harvests. Then it starts listening for new doOneCrawl messages, unless there is no available space. In 105 * that case, it sends a notification to the administrator and pauses. 106 * @throws PermissionDenied If the serverdir or oldjobsdir can't be created. 107 * @throws IOFailure If harvestInfoFile contains invalid data. 108 * @throws UnknownID if the settings file does not specify a valid queue priority. 109 */ 110 private FaultyHarvestControllerServer() throws IOFailure { 111 log.info("Starting {}.", this.getClass()); 112 log.info("Bound to harvest channel '{}'", CHANNEL); 113 114 // Make sure serverdir (where active crawl-dirs live) and oldJobsDir 115 // (where old crawl dirs are stored) exist. 116 serverDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR)); 117 ApplicationUtils.dirMustExist(serverDir); 118 log.info("Serverdir: '{}'", serverDir); 119 minSpaceRequired = Settings.getLong(HarvesterSettings.HARVEST_SERVERDIR_MINSPACE); 120 if (minSpaceRequired <= 0L) { 121 log.warn("Wrong setting of minSpaceLeft read from Settings: {}", minSpaceRequired); 122 throw new ArgumentNotValid("Wrong setting of minSpaceLeft read from Settings: " + minSpaceRequired); 123 } 124 log.info("Harvesting requires at least {} bytes free.", minSpaceRequired); 125 126 // Get JMS-connection 127 // Channel THIS_CLIENT is only used for replies to store messages so 128 // do not set as listener here. It is registered in the arcrepository 129 // client. 130 // Channel ANY_xxxPRIORIRY_HACO is used for listening for jobs, and 131 // registered below. 132 133 jmsConnection = JMSConnectionFactory.getInstance(); 134 135 status = new CrawlStatus(); 136 log.info("SEND_READY_DELAY used by {} is {}", this.getClass().getName(), status.getSendReadyDelay()); 137 138 139 // Register for listening to harvest channel validity responses 140 JMSConnectionFactory.getInstance().setListener(HARVEST_CHAN_VALID_RESP_ID, this); 141 142 // Ask if the channel this harvester is assigned to is valid 143 jmsConnection.send(new HarvesterRegistrationRequest(FaultyHarvestControllerServer.CHANNEL, applicationInstanceId)); 144 log.info("Requested to check the validity of harvest channel '{}'", FaultyHarvestControllerServer.CHANNEL); 145 } 146 147 /** 148 * Release all jms connections. Close the Controller 149 */ 150 public synchronized void close() { 151 log.info("Closing {}.", this.getClass().getName()); 152 cleanup(); 153 log.info("Closed down {}", this.getClass().getName()); 154 } 155 156 /** 157 * Will be called on shutdown. 158 * 159 * @see CleanupIF#cleanup() 160 */ 161 public void cleanup() { 162 if (jmsConnection != null) { 163 jmsConnection.removeListener(HARVEST_CHAN_VALID_RESP_ID, this); 164 if (jobChannel != null) { 165 jmsConnection.removeListener(jobChannel, this); 166 } 167 } 168 // Stop the sending of status messages 169 status.stopSending(); 170 instance = null; 171 } 172 173 @Override 174 public void visit(HarvesterRegistrationResponse msg) { 175 // If we have already started or the message notifies for another channel, resend it. 176 String channelName = msg.getHarvestChannelName(); 177 if (status.isChannelValid() || !CHANNEL.equals(channelName)) { 178 // Controller has already started 179 jmsConnection.resend(msg, msg.getTo()); 180 if (log.isTraceEnabled()) { 181 log.trace("Resending harvest channel validity message for channel '{}'", channelName); 182 } 183 return; 184 } 185 186 if (!msg.isValid()) { 187 String errMsg = "Received message stating that channel '" + channelName + "' is invalid. Will stop. " 188 + "Probable cause: the channel is not one of the known channels stored in the channels table"; 189 log.error(errMsg); 190 // Send a notification about this, ASAP 191 NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR); 192 close(); 193 return; 194 } 195 196 log.info("Received message stating that channel '{}' is valid.", channelName); 197 // Environment and connections are now ready for processing of messages 198 jobChannel = HarvesterChannels.getHarvestJobChannelId(channelName, msg.isSnapshot()); 199 200 // Only listen for harvester jobs if enough available space 201 beginListeningIfSpaceAvailable(); 202 203 // Notify the harvest dispatcher that we are ready 204 startAcceptingJobs(); 205 status.startSending(); 206 } 207 208 /** Start listening for crawls, if space available. */ 209 private void beginListeningIfSpaceAvailable() { 210 long availableSpace = FileUtils.getBytesFree(serverDir); 211 if (availableSpace > minSpaceRequired) { 212 log.info("Starts to listen to new jobs on queue '{}'", jobChannel); 213 jmsConnection.setListener(jobChannel, this); 214 } else { 215 String pausedMessage = "Not enough available diskspace. Only " + availableSpace + " bytes available." 216 + " Harvester is paused."; 217 log.error(pausedMessage); 218 NotificationsFactory.getInstance().notify(pausedMessage, NotificationType.ERROR); 219 } 220 } 221 222 /** 223 * Start listening for new crawl requests again. This actually doesn't re-add a listener, but the listener only gets 224 * removed when we're so far committed that we're going to exit at the end. So to start accepting jobs again, we 225 * stop resending messages we get. 226 */ 227 private synchronized void startAcceptingJobs() { 228 // allow this harvestControllerServer to receive messages again 229 status.setRunning(false); 230 } 231 232 /** 233 * Stop accepting more jobs. After this is called, all crawl messages received will be resent to the queue. A bit 234 * further down, we will stop listening altogether, but that requires another thread. 235 */ 236 private synchronized void stopAcceptingJobs() { 237 status.setRunning(true); 238 log.debug("No longer accepting jobs."); 239 } 240 241 /** 242 * Stop listening for new crawl requests. 243 */ 244 private void removeListener() { 245 log.debug("Removing listener on CHANNEL '{}'", jobChannel); 246 jmsConnection.removeListener(jobChannel, this); 247 } 248 249 /** 250 * Here we receives a DoOneCrawlMessage, waits 10 minutes, and then fails the job. 251 * 252 * @param msg The crawl job 253 * @throws IOFailure On trouble harvesting, uploading or processing harvestInfo 254 * @throws UnknownID if jobID is null in the message 255 * @throws ArgumentNotValid if the status of the job is not valid - must be SUBMITTED 256 * @throws PermissionDenied if the crawldir can't be created 257 * @see #visit(DoOneCrawlMessage) for more details 258 */ 259 public void visit(DoOneCrawlMessage msg) throws IOFailure, UnknownID, ArgumentNotValid, PermissionDenied { 260 // Only one doOneCrawl at a time. Returning should almost never happen, 261 // since we deregister the listener, but we may receive another message 262 // before the listener is removed. Also, if the job message is 263 // malformed or starting the crawl fails, we re-add the listener. 264 synchronized (this) { 265 if (status.isRunning()) { 266 log.warn( 267 "Received crawl request, but sent it back to queue, as another crawl is already running: '{}'", 268 msg); 269 jmsConnection.resend(msg, jobChannel); 270 try { 271 // Wait a second before listening again, so the message has 272 // a chance of getting snatched by another harvester. 273 Thread.sleep(TimeUtils.SECOND_IN_MILLIS); 274 } catch (InterruptedException e) { 275 // Since we're not waiting for anything critical, we can 276 // ignore this exception. 277 } 278 return; 279 } 280 stopAcceptingJobs(); 281 } 282 283 final Job job = msg.getJob(); 284 285 // Every job must have an ID or we can do nothing with it, not even 286 // send a proper failure message back. 287 Long jobID = job.getJobID(); 288 289 log.info("Received crawlrequest for job {}: '{}'", jobID, msg); 290 291 // Send message to scheduler that job is started 292 CrawlStatusMessage csmStarted = new CrawlStatusMessage(jobID, JobStatus.STARTED); 293 jmsConnection.send(csmStarted); 294 295 removeListener(); 296 297 log.info("Waiting 10 minutes to illustrate a real harvest"); 298 // wait 10 minutes (should this be in a thread of its own??? 299 try { 300 Thread.sleep(10 *// minutes to sleep 301 60 * // seconds to a minute 302 1000); 303 } catch (InterruptedException e) { 304 e.printStackTrace(); 305 } 306 307 csmStarted = new CrawlStatusMessage(jobID, JobStatus.FAILED); 308 jmsConnection.send(csmStarted); 309 310 311 startAcceptingJobs(); 312 } 313 314 315 /** 316 * Sends a CrawlStatusMessage for a failed job with the given short message and detailed message. 317 * 318 * @param jobID ID of the job that failed 319 * @param message A short message indicating what went wrong 320 * @param detailedMessage A more detailed message detailing why it went wrong. 321 */ 322 public void sendErrorMessage(long jobID, String message, String detailedMessage) { 323 CrawlStatusMessage csm = new CrawlStatusMessage(jobID, JobStatus.FAILED, null); 324 csm.setHarvestErrors(message); 325 csm.setHarvestErrorDetails(detailedMessage); 326 jmsConnection.send(csm); 327 } 328 329 /** 330 * Used for maintaining the running status of the crawling, is it running or not. Will also take care of notifying 331 * the HarvestJobManager of the status. 332 */ 333 private class CrawlStatus implements Runnable { 334 335 /** The status. */ 336 private Boolean running = false; 337 338 private boolean channelIsValid = false; 339 340 /** Handles the periodic sending of status messages. */ 341 private PeriodicTaskExecutor statusTransmitter; 342 343 private final int SEND_READY_DELAY = Settings.getInt(HarvesterSettings.SEND_READY_DELAY); 344 345 /** 346 * Starts the sending of status messages. Interval defined by HarvesterSettings.SEND_READY_DELAY . 347 */ 348 public void startSending() { 349 this.channelIsValid = true; 350 statusTransmitter = new PeriodicTaskExecutor("HarvesterStatus", this, 0, 351 getSendReadyDelay()); 352 } 353 354 /** 355 * Stops the sending of status messages. 356 */ 357 public void stopSending() { 358 if (statusTransmitter != null) { 359 statusTransmitter.shutdown(); 360 statusTransmitter = null; 361 } 362 } 363 364 /** 365 * Returns <code>true</code> if the a doOneCrawl is running, else <code>false</code>. 366 * @return Whether a crawl running. 367 */ 368 public boolean isRunning() { 369 return running; 370 } 371 372 /** 373 * Used for changing the running state in methods startAcceptingJobs and stopAcceptingJobs 374 * @param running The new status 375 */ 376 public void setRunning(boolean running) { 377 this.running = running; 378 } 379 380 /** 381 * @return the channelIsValid 382 */ 383 protected final boolean isChannelValid() { 384 return channelIsValid; 385 } 386 387 @Override 388 public void run() { 389 try { 390 Thread.sleep(getSendReadyDelay()); 391 } catch (Exception e) { 392 log.error("Unable to sleep", e); 393 } 394 log.info("Sending ready message #1 from {}", this.getClass()); 395 jmsConnection.send(new HarvesterReadyMessage(applicationInstanceId + " on " + physicalServerName, 396 FaultyHarvestControllerServer.CHANNEL)); 397 log.info("Sending ready message #2 from {}", this.getClass()); 398 jmsConnection.send(new HarvesterReadyMessage(applicationInstanceId + " on " + physicalServerName, 399 FaultyHarvestControllerServer.CHANNEL)); 400 } 401 402 public int getSendReadyDelay() { 403 return SEND_READY_DELAY; 404 } 405 406 } 407}