001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.harvesting.monitor; 024 025import java.io.File; 026import java.util.HashMap; 027import java.util.Iterator; 028import java.util.Map; 029import java.util.Set; 030import java.util.TreeSet; 031 032import javax.jms.MessageListener; 033 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import dk.netarkivet.common.distribute.ChannelID; 038import dk.netarkivet.common.distribute.JMSConnectionFactory; 039import dk.netarkivet.common.exceptions.ArgumentNotValid; 040import dk.netarkivet.common.utils.CleanupIF; 041import dk.netarkivet.common.utils.Settings; 042import dk.netarkivet.harvester.HarvesterSettings; 043import dk.netarkivet.harvester.datamodel.JobDAO; 044import dk.netarkivet.harvester.datamodel.JobStatus; 045import dk.netarkivet.harvester.datamodel.RunningJobsInfoDAO; 046import dk.netarkivet.harvester.distribute.HarvesterChannels; 047import dk.netarkivet.harvester.distribute.HarvesterMessageHandler; 048import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage; 049import dk.netarkivet.harvester.harvesting.distribute.FrontierReportMessage; 050import dk.netarkivet.harvester.harvesting.distribute.JobEndedMessage; 051import dk.netarkivet.harvester.harvesting.frontier.ExhaustedQueuesFilter; 052import dk.netarkivet.harvester.harvesting.frontier.InMemoryFrontierReport; 053import dk.netarkivet.harvester.harvesting.frontier.RetiredQueuesFilter; 054import dk.netarkivet.harvester.harvesting.frontier.TopTotalEnqueuesFilter; 055 056/** 057 * Listens for {@link CrawlProgressMessage}s on the proper JMS channel, and stores information to be presented in the 058 * monitoring console. 059 */ 060public class HarvestMonitor extends HarvesterMessageHandler implements MessageListener, CleanupIF { 061 062 /** The logger for this class. */ 063 private static final Logger LOG = LoggerFactory.getLogger(HarvestMonitor.class); 064 065 /** Singleton instance of the monitor. */ 066 private static HarvestMonitor instance; 067 068 /** The JMS channel on which to listen for {@link CrawlProgressMessage}s. */ 069 public static final ChannelID HARVEST_MONITOR_CHANNEL_ID = HarvesterChannels.getHarvestMonitorChannel(); 070 071 private Map<Long, StartedJobHistoryChartGen> chartGenByJobId = new HashMap<Long, StartedJobHistoryChartGen>(); 072 073 private HarvestMonitor() { 074 // Perform initial cleanup (in case apps crashed) 075 cleanOnStartup(); 076 077 // Register for listening JMS messages 078 JMSConnectionFactory.getInstance().setListener(HARVEST_MONITOR_CHANNEL_ID, this); 079 } 080 081 /** 082 * Close down the HarvestMonitor singleton. This removes the HarvestMonitor as listener to the JMS scheduler and 083 * frontier channels, closes the persistence container, and resets the singleton. 084 * 085 * @see CleanupIF#cleanup() 086 */ 087 public void cleanup() { 088 JMSConnectionFactory.getInstance().removeListener(HARVEST_MONITOR_CHANNEL_ID, this); 089 090 for (StartedJobHistoryChartGen chartGen : chartGenByJobId.values()) { 091 chartGen.cleanup(); 092 } 093 094 instance = null; 095 } 096 097 /** 098 * @return the singleton instance for this class. 099 */ 100 public static HarvestMonitor getInstance() { 101 if (instance == null) { 102 instance = new HarvestMonitor(); 103 } 104 return instance; 105 } 106 107 @Override 108 public void visit(CrawlProgressMessage msg) { 109 ArgumentNotValid.checkNotNull(msg, "msg"); 110 111 Long jobId = Long.valueOf(msg.getJobID()); 112 113 JobStatus jobStatus = JobDAO.getInstance().read(jobId).getStatus(); 114 if (!JobStatus.STARTED.equals(jobStatus)) { 115 return; 116 } 117 118 StartedJobInfo info = StartedJobInfo.build(msg); 119 RunningJobsInfoDAO.getInstance().store(info); 120 121 // Start a chart generator if none has been started yet 122 if (chartGenByJobId.get(jobId) == null) { 123 chartGenByJobId.put(jobId, new StartedJobHistoryChartGen(jobId)); 124 } 125 } 126 127 /** 128 * Cleans up the database on transitions to status DONE and FAILED. 129 * 130 * @param msg a {@link JobEndedMessage} 131 */ 132 @Override 133 public void visit(JobEndedMessage msg) { 134 ArgumentNotValid.checkNotNull(msg, "msg"); 135 136 JobStatus newStatus = msg.getJobStatus(); 137 long jobId = msg.getJobId(); 138 139 // Delete records in the DB 140 RunningJobsInfoDAO dao = RunningJobsInfoDAO.getInstance(); 141 int delCount = dao.removeInfoForJob(jobId); 142 LOG.info("Deleted {} running job info records for job ID {} on transition to status {}", delCount, jobId, 143 newStatus.name()); 144 145 // Stop chart generation 146 StartedJobHistoryChartGen gen = chartGenByJobId.get(jobId); 147 if (gen != null) { 148 gen.cleanup(); 149 } 150 } 151 152 /** 153 * Returns the delay in seconds after which a harvest monitor webpage should refresh itself. This delay is set by 154 * overriding the value of the {@link HarvesterSettings#HARVEST_MONITOR_REFRESH_INTERVAL} property. 155 * 156 * @return the delay in seconds after which a harvest monitor webpage should refresh itself 157 */ 158 public static final int getAutoRefreshDelay() { 159 return Settings.getInt(HarvesterSettings.HARVEST_MONITOR_REFRESH_INTERVAL); 160 } 161 162 /** 163 * Returns a configurable number of the most recent running job info records available for the given job ID. 164 * 165 * @param jobId 166 * @return the most recent running job info records available for the given job ID. 167 * @see HarvesterSettings#HARVEST_MONITOR_DISPLAYED_HISTORY_SIZE 168 */ 169 public static StartedJobInfo[] getMostRecentRunningJobInfos(long jobId) { 170 int displayedHistorySize = Settings.getInt(HarvesterSettings.HARVEST_MONITOR_DISPLAYED_HISTORY_SIZE); 171 // for now. TODO pagination 172 return RunningJobsInfoDAO.getInstance().getMostRecentByJobId(jobId, 0, displayedHistorySize); 173 } 174 175 /** 176 * Returns the most recent running job info record available for the given job ID. 177 * 178 * @param jobId 179 * @return the most recent running job info records available for the given job ID. 180 */ 181 public static StartedJobInfo getMostRecentRunningJobInfo(long jobId) { 182 return RunningJobsInfoDAO.getInstance().getMostRecentByJobId(jobId); 183 } 184 185 @Override 186 public void visit(FrontierReportMessage msg) { 187 ArgumentNotValid.checkNotNull(msg, "msg"); 188 189 int insertCount = RunningJobsInfoDAO.getInstance().storeFrontierReport(msg.getFilterId(), msg.getReport(), 190 msg.getJobID()); 191 if (LOG.isInfoEnabled() && insertCount > 0) { 192 LOG.info("Stored frontier report {}-{}' ({} lines): inserted {} lines in the DB", msg.getReport() 193 .getJobName(), msg.getFilterId(), msg.getReport().getSize(), insertCount); 194 } 195 } 196 197 /** 198 * Retrieves the latest frontier report stored for the given job ID. 199 * 200 * @param jobId the job id 201 * @return a frontier report 202 */ 203 public static InMemoryFrontierReport getFrontierReport(long jobId) { 204 // Right now there's only one filter and it's not user controlled. 205 return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new TopTotalEnqueuesFilter().getFilterId()); 206 } 207 208 /** 209 * Retrieves the latest frontier extract report stored for the given job ID, that contains only retired queues. 210 * 211 * @param jobId the job id 212 * @return a frontier report that contains only retired queues. 213 */ 214 public static InMemoryFrontierReport getFrontierRetiredQueues(long jobId) { 215 return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new RetiredQueuesFilter().getFilterId()); 216 } 217 218 /** 219 * Retrieves the latest frontier extract report stored for the given job ID, that contains only exhausted queues. 220 * 221 * @param jobId the job id 222 * @return a frontier report that contains only exhausted queues. 223 */ 224 public static InMemoryFrontierReport getFrontierExhaustedQueues(long jobId) { 225 return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new ExhaustedQueuesFilter().getFilterId()); 226 } 227 228 /** Default chart image. */ 229 private static final String EMPTY_CHART_FILE = "empty-history.png"; 230 231 /** 232 * Returns the path of the chart image file, relative to the webapp directory. If no chart is available, returns a 233 * default empty image. 234 * 235 * @param jobId the job id 236 * @return the path of the chart image file, relative to the webapp directory. 237 */ 238 public static String getChartFilePath(long jobId) { 239 if (instance == null) { 240 return EMPTY_CHART_FILE; 241 } 242 StartedJobHistoryChartGen gen = instance.chartGenByJobId.get(jobId); 243 if (gen != null) { 244 File chartFile = gen.getChartFile(); 245 if (chartFile == null) { 246 return EMPTY_CHART_FILE; 247 } 248 return chartFile.getName(); 249 } 250 return EMPTY_CHART_FILE; 251 } 252 253 private void cleanOnStartup() { 254 Set<Long> idsToRemove = new TreeSet<Long>(); 255 256 RunningJobsInfoDAO dao = RunningJobsInfoDAO.getInstance(); 257 idsToRemove.addAll(dao.getHistoryRecordIds()); 258 Iterator<Long> startedJobIds = JobDAO.getInstance().getAllJobIds(JobStatus.STARTED); 259 while (startedJobIds.hasNext()) { 260 // don't remove records for jobs still in status STARTED 261 idsToRemove.remove(startedJobIds.next()); 262 } 263 264 int delCount = 0; 265 for (long jobId : idsToRemove) { 266 delCount += dao.removeInfoForJob(jobId); 267 delCount += dao.deleteFrontierReports(jobId); 268 } 269 if (delCount > 0) { 270 LOG.info("Cleaned up {} obsolete history records.", delCount); 271 } 272 273 } 274 275}