001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.harvesting.monitor; 024 025import java.io.File; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.Iterator; 029import java.util.Map; 030import java.util.Set; 031import java.util.TreeSet; 032 033import javax.jms.MessageListener; 034 035import org.apache.commons.lang.StringUtils; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import dk.netarkivet.common.distribute.ChannelID; 040import dk.netarkivet.common.distribute.JMSConnectionFactory; 041import dk.netarkivet.common.exceptions.ArgumentNotValid; 042import dk.netarkivet.common.utils.CleanupIF; 043import dk.netarkivet.common.utils.Settings; 044import dk.netarkivet.harvester.HarvesterSettings; 045import dk.netarkivet.harvester.datamodel.JobDAO; 046import dk.netarkivet.harvester.datamodel.JobStatus; 047import dk.netarkivet.harvester.datamodel.RunningJobsInfoDAO; 048import dk.netarkivet.harvester.distribute.HarvesterChannels; 049import dk.netarkivet.harvester.distribute.HarvesterMessageHandler; 050import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage; 051import dk.netarkivet.harvester.harvesting.distribute.FrontierReportMessage; 052import dk.netarkivet.harvester.harvesting.distribute.JobEndedMessage; 053import dk.netarkivet.harvester.harvesting.frontier.ExhaustedQueuesFilter; 054import dk.netarkivet.harvester.harvesting.frontier.InMemoryFrontierReport; 055import dk.netarkivet.harvester.harvesting.frontier.RetiredQueuesFilter; 056import dk.netarkivet.harvester.harvesting.frontier.TopTotalEnqueuesFilter; 057 058/** 059 * Listens for {@link CrawlProgressMessage}s on the proper JMS channel, and stores information to be presented in the 060 * monitoring console. 061 */ 062public class HarvestMonitor extends HarvesterMessageHandler implements MessageListener, CleanupIF { 063 064 /** The logger for this class. */ 065 private static final Logger LOG = LoggerFactory.getLogger(HarvestMonitor.class); 066 067 /** Singleton instance of the monitor. */ 068 private static HarvestMonitor instance; 069 /** Harvest Monitor refresh Interval. */ 070 private int refreshInterval; 071 072 /** The JMS channel on which to listen for {@link CrawlProgressMessage}s. */ 073 public static final ChannelID HARVEST_MONITOR_CHANNEL_ID = HarvesterChannels.getHarvestMonitorChannel(); 074 075 private Map<Long, StartedJobHistoryChartGen> chartGenByJobId = new HashMap<Long, StartedJobHistoryChartGen>(); 076 077 private Set<Long> runningJobs = new TreeSet<Long>(); 078 079 private HarvestMonitor() { 080 refreshInterval = Settings.getInt(HarvesterSettings.HARVEST_MONITOR_REFRESH_INTERVAL); 081 LOG.info("Initializing HarvestMonitor with refreshInterval={} seconds", refreshInterval); 082 083 // Perform initial cleanup (in case apps crashed) 084 cleanOnStartup(); 085 086 // Register for listening JMS messages 087 JMSConnectionFactory.getInstance().setListener(HARVEST_MONITOR_CHANNEL_ID, this); 088 LOG.info("Started listening to queue {}", HARVEST_MONITOR_CHANNEL_ID); 089 } 090 091 /** 092 * Close down the HarvestMonitor singleton. This removes the HarvestMonitor as listener to the JMS scheduler and 093 * frontier channels, closes the persistence container, and resets the singleton. 094 * 095 * @see CleanupIF#cleanup() 096 */ 097 public void cleanup() { 098 JMSConnectionFactory.getInstance().removeListener(HARVEST_MONITOR_CHANNEL_ID, this); 099 100 for (StartedJobHistoryChartGen chartGen : chartGenByJobId.values()) { 101 chartGen.cleanup(); 102 } 103 104 instance = null; 105 } 106 107 /** 108 * @return the singleton instance for this class. 109 */ 110 public static synchronized HarvestMonitor getInstance() { 111 if (instance == null) { 112 instance = new HarvestMonitor(); 113 } 114 return instance; 115 } 116 117 @Override 118 public void visit(CrawlProgressMessage msg) { 119 ArgumentNotValid.checkNotNull(msg, "msg"); 120 Long jobId = Long.valueOf(msg.getJobID()); 121 122 JobStatus jobStatus = JobDAO.getInstance().read(jobId).getStatus(); 123 if (!JobStatus.STARTED.equals(jobStatus)) { 124 LOG.warn("Receiving CrawlProgressMessage for job {} registered as state {} instead of STARTED. Ignoring message", jobId, jobStatus); 125 return; 126 } 127 128 StartedJobInfo info = StartedJobInfo.build(msg); 129 LOG.trace("Received CrawlProgressMessage for jobId {}: {}", jobId, info); 130 RunningJobsInfoDAO.getInstance().store(info); 131 132 runningJobs.add(jobId); 133 134 // Start a chart generator if none has been started yet 135 if (chartGenByJobId.get(jobId) == null) { 136 chartGenByJobId.put(jobId, new StartedJobHistoryChartGen(jobId)); 137 } 138 } 139 140 /** 141 * Cleans up the database on transitions to status DONE and FAILED. 142 * 143 * @param msg a {@link JobEndedMessage} 144 */ 145 @Override 146 public void visit(JobEndedMessage msg) { 147 ArgumentNotValid.checkNotNull(msg, "msg"); 148 149 JobStatus newStatus = msg.getJobStatus(); 150 long jobId = msg.getJobId(); 151 152 // Delete records in the DB 153 RunningJobsInfoDAO dao = RunningJobsInfoDAO.getInstance(); 154 int delCount = dao.removeInfoForJob(jobId); 155 LOG.info("Processing JobEndedMessage. Deleted {} running job info records for job ID {} on transition to status {}", delCount, jobId, 156 newStatus.name()); 157 158 runningJobs.remove(jobId); 159 160 // Stop chart generation 161 StartedJobHistoryChartGen gen = chartGenByJobId.get(jobId); 162 if (gen != null) { 163 gen.cleanup(); 164 } 165 } 166 167 /** 168 * Returns the delay in seconds after which a harvest monitor webpage should refresh itself. This delay is set by 169 * overriding the value of the {@link HarvesterSettings#HARVEST_MONITOR_REFRESH_INTERVAL} property. 170 * 171 * @return the delay in seconds after which a harvest monitor webpage should refresh itself 172 */ 173 public static final int getAutoRefreshDelay() { 174 return Settings.getInt(HarvesterSettings.HARVEST_MONITOR_REFRESH_INTERVAL); 175 } 176 177 /** 178 * Returns a configurable number of the most recent running job info records available for the given job ID. 179 * 180 * @param jobId 181 * @return the most recent running job info records available for the given job ID. 182 * @see HarvesterSettings#HARVEST_MONITOR_DISPLAYED_HISTORY_SIZE 183 */ 184 public static StartedJobInfo[] getMostRecentRunningJobInfos(long jobId) { 185 int displayedHistorySize = Settings.getInt(HarvesterSettings.HARVEST_MONITOR_DISPLAYED_HISTORY_SIZE); 186 // for now. TODO pagination 187 return RunningJobsInfoDAO.getInstance().getMostRecentByJobId(jobId, 0, displayedHistorySize); 188 } 189 190 /** 191 * Returns the most recent running job info record available for the given job ID. 192 * 193 * @param jobId 194 * @return the most recent running job info records available for the given job ID. 195 */ 196 public static StartedJobInfo getMostRecentRunningJobInfo(long jobId) { 197 return RunningJobsInfoDAO.getInstance().getMostRecentByJobId(jobId); 198 } 199 200 @Override 201 public void visit(FrontierReportMessage msg) { 202 ArgumentNotValid.checkNotNull(msg, "msg"); 203 204 int insertCount = RunningJobsInfoDAO.getInstance().storeFrontierReport(msg.getFilterId(), msg.getReport(), 205 msg.getJobID()); 206 if (LOG.isInfoEnabled() && insertCount > 0) { 207 LOG.info("Stored frontier report {}-{}' ({} lines): inserted {} lines in the DB", msg.getReport() 208 .getJobName(), msg.getFilterId(), msg.getReport().getSize(), insertCount); 209 } 210 } 211 212 /** 213 * Retrieves the latest frontier report stored for the given job ID. 214 * 215 * @param jobId the job id 216 * @return a frontier report 217 */ 218 public static InMemoryFrontierReport getFrontierReport(long jobId) { 219 // Right now there's only one filter and it's not user controlled. 220 return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new TopTotalEnqueuesFilter().getFilterId()); 221 } 222 223 /** 224 * Retrieves the latest frontier extract report stored for the given job ID, that contains only retired queues. 225 * 226 * @param jobId the job id 227 * @return a frontier report that contains only retired queues. 228 */ 229 public static InMemoryFrontierReport getFrontierRetiredQueues(long jobId) { 230 return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new RetiredQueuesFilter().getFilterId()); 231 } 232 233 /** 234 * Retrieves the latest frontier extract report stored for the given job ID, that contains only exhausted queues. 235 * 236 * @param jobId the job id 237 * @return a frontier report that contains only exhausted queues. 238 */ 239 public static InMemoryFrontierReport getFrontierExhaustedQueues(long jobId) { 240 return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new ExhaustedQueuesFilter().getFilterId()); 241 } 242 243 /** Default chart image. */ 244 private static final String EMPTY_CHART_FILE = "empty-history.png"; 245 246 /** 247 * Returns the path of the chart image file, relative to the webapp directory. If no chart is available, returns a 248 * default empty image. 249 * 250 * @param jobId the job id 251 * @return the path of the chart image file, relative to the webapp directory. 252 */ 253 public static String getChartFilePath(long jobId) { 254 if (instance == null) { 255 return EMPTY_CHART_FILE; 256 } 257 StartedJobHistoryChartGen gen = instance.chartGenByJobId.get(jobId); 258 if (gen != null) { 259 File chartFile = gen.getChartFile(); 260 if (chartFile == null) { 261 return EMPTY_CHART_FILE; 262 } 263 return chartFile.getName(); 264 } 265 return EMPTY_CHART_FILE; 266 } 267 268 private void cleanOnStartup() { 269 Set<Long> idsToRemove = new TreeSet<Long>(); 270 271 RunningJobsInfoDAO dao = RunningJobsInfoDAO.getInstance(); 272 idsToRemove.addAll(dao.getHistoryRecordIds()); 273 Iterator<Long> startedJobIds = JobDAO.getInstance().getAllJobIds(JobStatus.STARTED); 274 while (startedJobIds.hasNext()) { 275 // don't remove records for jobs still in status STARTED 276 idsToRemove.remove(startedJobIds.next()); 277 } 278 279 int delCount = 0; 280 for (long jobId : idsToRemove) { 281 delCount += dao.removeInfoForJob(jobId); 282 delCount += dao.deleteFrontierReports(jobId); 283 } 284 if (delCount > 0) { 285 LOG.info("Cleaned up {} obsolete history records for finished jobs {}", delCount, StringUtils.join(idsToRemove, ",")); 286 } 287 } 288 289 public Set getRunningJobs() { 290 return Collections.unmodifiableSet(runningJobs); 291 } 292 293}