001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.harvesting.monitor;
024
025import java.io.File;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.Iterator;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeSet;
032
033import javax.jms.MessageListener;
034
035import org.apache.commons.lang.StringUtils;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import dk.netarkivet.common.distribute.ChannelID;
040import dk.netarkivet.common.distribute.JMSConnectionFactory;
041import dk.netarkivet.common.exceptions.ArgumentNotValid;
042import dk.netarkivet.common.utils.CleanupIF;
043import dk.netarkivet.common.utils.Settings;
044import dk.netarkivet.harvester.HarvesterSettings;
045import dk.netarkivet.harvester.datamodel.JobDAO;
046import dk.netarkivet.harvester.datamodel.JobStatus;
047import dk.netarkivet.harvester.datamodel.RunningJobsInfoDAO;
048import dk.netarkivet.harvester.distribute.HarvesterChannels;
049import dk.netarkivet.harvester.distribute.HarvesterMessageHandler;
050import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage;
051import dk.netarkivet.harvester.harvesting.distribute.FrontierReportMessage;
052import dk.netarkivet.harvester.harvesting.distribute.JobEndedMessage;
053import dk.netarkivet.harvester.harvesting.frontier.ExhaustedQueuesFilter;
054import dk.netarkivet.harvester.harvesting.frontier.InMemoryFrontierReport;
055import dk.netarkivet.harvester.harvesting.frontier.RetiredQueuesFilter;
056import dk.netarkivet.harvester.harvesting.frontier.TopTotalEnqueuesFilter;
057
058/**
059 * Listens for {@link CrawlProgressMessage}s on the proper JMS channel, and stores information to be presented in the
060 * monitoring console.
061 */
062public class HarvestMonitor extends HarvesterMessageHandler implements MessageListener, CleanupIF {
063
064    /** The logger for this class. */
065    private static final Logger LOG = LoggerFactory.getLogger(HarvestMonitor.class);
066
067    /** Singleton instance of the monitor. */
068    private static HarvestMonitor instance;
069    /** Harvest Monitor refresh Interval. */
070    private int refreshInterval;
071
072    /** The JMS channel on which to listen for {@link CrawlProgressMessage}s. */
073    public static final ChannelID HARVEST_MONITOR_CHANNEL_ID = HarvesterChannels.getHarvestMonitorChannel();
074
075    private Map<Long, StartedJobHistoryChartGen> chartGenByJobId = new HashMap<Long, StartedJobHistoryChartGen>();
076
077    private Set<Long> runningJobs = new TreeSet<Long>();
078
079    private HarvestMonitor() {
080        refreshInterval = Settings.getInt(HarvesterSettings.HARVEST_MONITOR_REFRESH_INTERVAL);
081        LOG.info("Initializing HarvestMonitor with refreshInterval={} seconds", refreshInterval);
082        
083        // Perform initial cleanup (in case apps crashed)
084        cleanOnStartup();
085
086        // Register for listening JMS messages
087        JMSConnectionFactory.getInstance().setListener(HARVEST_MONITOR_CHANNEL_ID, this);
088        LOG.info("Started listening to queue {}", HARVEST_MONITOR_CHANNEL_ID);
089    }
090
091    /**
092     * Close down the HarvestMonitor singleton. This removes the HarvestMonitor as listener to the JMS scheduler and
093     * frontier channels, closes the persistence container, and resets the singleton.
094     *
095     * @see CleanupIF#cleanup()
096     */
097    public void cleanup() {
098        JMSConnectionFactory.getInstance().removeListener(HARVEST_MONITOR_CHANNEL_ID, this);
099
100        for (StartedJobHistoryChartGen chartGen : chartGenByJobId.values()) {
101            chartGen.cleanup();
102        }
103
104        instance = null;
105    }
106
107    /**
108     * @return the singleton instance for this class.
109     */
110    public static synchronized HarvestMonitor getInstance() {
111        if (instance == null) {
112            instance = new HarvestMonitor();
113        }
114        return instance;
115    }
116
117    @Override
118    public void visit(CrawlProgressMessage msg) {
119        ArgumentNotValid.checkNotNull(msg, "msg");
120        Long jobId = Long.valueOf(msg.getJobID());
121        
122        JobStatus jobStatus = JobDAO.getInstance().read(jobId).getStatus();
123        if (!JobStatus.STARTED.equals(jobStatus)) {
124                LOG.warn("Receiving CrawlProgressMessage for job {} registered as state {} instead of STARTED. Ignoring message", jobId, jobStatus);
125            return;
126        }
127        
128        StartedJobInfo info = StartedJobInfo.build(msg);
129        LOG.trace("Received CrawlProgressMessage for jobId {}: {}", jobId, info);
130        RunningJobsInfoDAO.getInstance().store(info);
131
132        runningJobs.add(jobId);
133
134        // Start a chart generator if none has been started yet
135        if (chartGenByJobId.get(jobId) == null) {
136            chartGenByJobId.put(jobId, new StartedJobHistoryChartGen(jobId));
137        }
138    }
139
140    /**
141     * Cleans up the database on transitions to status DONE and FAILED.
142     *
143     * @param msg a {@link JobEndedMessage}
144     */
145    @Override
146    public void visit(JobEndedMessage msg) {
147        ArgumentNotValid.checkNotNull(msg, "msg");
148
149        JobStatus newStatus = msg.getJobStatus();
150        long jobId = msg.getJobId();
151
152        // Delete records in the DB
153        RunningJobsInfoDAO dao = RunningJobsInfoDAO.getInstance();
154        int delCount = dao.removeInfoForJob(jobId);
155        LOG.info("Processing JobEndedMessage. Deleted {} running job info records for job ID {} on transition to status {}", delCount, jobId,
156                newStatus.name());
157
158        runningJobs.remove(jobId);
159
160        // Stop chart generation
161        StartedJobHistoryChartGen gen = chartGenByJobId.get(jobId);
162        if (gen != null) {
163            gen.cleanup();
164        }
165    }
166
167    /**
168     * Returns the delay in seconds after which a harvest monitor webpage should refresh itself. This delay is set by
169     * overriding the value of the {@link HarvesterSettings#HARVEST_MONITOR_REFRESH_INTERVAL} property.
170     *
171     * @return the delay in seconds after which a harvest monitor webpage should refresh itself
172     */
173    public static final int getAutoRefreshDelay() {
174        return Settings.getInt(HarvesterSettings.HARVEST_MONITOR_REFRESH_INTERVAL);
175    }
176
177    /**
178     * Returns a configurable number of the most recent running job info records available for the given job ID.
179     *
180     * @param jobId
181     * @return the most recent running job info records available for the given job ID.
182     * @see HarvesterSettings#HARVEST_MONITOR_DISPLAYED_HISTORY_SIZE
183     */
184    public static StartedJobInfo[] getMostRecentRunningJobInfos(long jobId) {
185        int displayedHistorySize = Settings.getInt(HarvesterSettings.HARVEST_MONITOR_DISPLAYED_HISTORY_SIZE);
186        // for now. TODO pagination
187        return RunningJobsInfoDAO.getInstance().getMostRecentByJobId(jobId, 0, displayedHistorySize);
188    }
189
190    /**
191     * Returns the most recent running job info record available for the given job ID.
192     *
193     * @param jobId
194     * @return the most recent running job info records available for the given job ID.
195     */
196    public static StartedJobInfo getMostRecentRunningJobInfo(long jobId) {
197        return RunningJobsInfoDAO.getInstance().getMostRecentByJobId(jobId);
198    }
199
200    @Override
201    public void visit(FrontierReportMessage msg) {
202        ArgumentNotValid.checkNotNull(msg, "msg");
203
204        int insertCount = RunningJobsInfoDAO.getInstance().storeFrontierReport(msg.getFilterId(), msg.getReport(),
205                msg.getJobID());
206        if (LOG.isInfoEnabled() && insertCount > 0) {
207            LOG.info("Stored frontier report {}-{}' ({} lines): inserted {} lines in the DB", msg.getReport()
208                    .getJobName(), msg.getFilterId(), msg.getReport().getSize(), insertCount);
209        }
210    }
211
212    /**
213     * Retrieves the latest frontier report stored for the given job ID.
214     *
215     * @param jobId the job id
216     * @return a frontier report
217     */
218    public static InMemoryFrontierReport getFrontierReport(long jobId) {
219        // Right now there's only one filter and it's not user controlled.
220        return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new TopTotalEnqueuesFilter().getFilterId());
221    }
222
223    /**
224     * Retrieves the latest frontier extract report stored for the given job ID, that contains only retired queues.
225     *
226     * @param jobId the job id
227     * @return a frontier report that contains only retired queues.
228     */
229    public static InMemoryFrontierReport getFrontierRetiredQueues(long jobId) {
230        return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new RetiredQueuesFilter().getFilterId());
231    }
232
233    /**
234     * Retrieves the latest frontier extract report stored for the given job ID, that contains only exhausted queues.
235     *
236     * @param jobId the job id
237     * @return a frontier report that contains only exhausted queues.
238     */
239    public static InMemoryFrontierReport getFrontierExhaustedQueues(long jobId) {
240        return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new ExhaustedQueuesFilter().getFilterId());
241    }
242
243    /** Default chart image. */
244    private static final String EMPTY_CHART_FILE = "empty-history.png";
245
246    /**
247     * Returns the path of the chart image file, relative to the webapp directory. If no chart is available, returns a
248     * default empty image.
249     *
250     * @param jobId the job id
251     * @return the path of the chart image file, relative to the webapp directory.
252     */
253    public static String getChartFilePath(long jobId) {
254        if (instance == null) {
255            return EMPTY_CHART_FILE;
256        }
257        StartedJobHistoryChartGen gen = instance.chartGenByJobId.get(jobId);
258        if (gen != null) {
259            File chartFile = gen.getChartFile();
260            if (chartFile == null) {
261                return EMPTY_CHART_FILE;
262            }
263            return chartFile.getName();
264        }
265        return EMPTY_CHART_FILE;
266    }
267
268    private void cleanOnStartup() {
269        Set<Long> idsToRemove = new TreeSet<Long>();
270
271        RunningJobsInfoDAO dao = RunningJobsInfoDAO.getInstance();
272        idsToRemove.addAll(dao.getHistoryRecordIds());
273        Iterator<Long> startedJobIds = JobDAO.getInstance().getAllJobIds(JobStatus.STARTED);
274        while (startedJobIds.hasNext()) {
275            // don't remove records for jobs still in status STARTED
276            idsToRemove.remove(startedJobIds.next());
277        }
278
279        int delCount = 0;
280        for (long jobId : idsToRemove) {
281            delCount += dao.removeInfoForJob(jobId);
282            delCount += dao.deleteFrontierReports(jobId);
283        }
284        if (delCount > 0) {
285            LOG.info("Cleaned up {} obsolete history records for finished jobs {}", delCount, StringUtils.join(idsToRemove, ","));
286        }
287    }
288
289    public Set getRunningJobs() {
290        return Collections.unmodifiableSet(runningJobs);
291    }
292
293}