001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.harvesting.monitor;
024
025import java.io.File;
026import java.util.HashMap;
027import java.util.Iterator;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeSet;
031
032import javax.jms.MessageListener;
033
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import dk.netarkivet.common.distribute.ChannelID;
038import dk.netarkivet.common.distribute.JMSConnectionFactory;
039import dk.netarkivet.common.exceptions.ArgumentNotValid;
040import dk.netarkivet.common.utils.CleanupIF;
041import dk.netarkivet.common.utils.Settings;
042import dk.netarkivet.harvester.HarvesterSettings;
043import dk.netarkivet.harvester.datamodel.JobDAO;
044import dk.netarkivet.harvester.datamodel.JobStatus;
045import dk.netarkivet.harvester.datamodel.RunningJobsInfoDAO;
046import dk.netarkivet.harvester.distribute.HarvesterChannels;
047import dk.netarkivet.harvester.distribute.HarvesterMessageHandler;
048import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage;
049import dk.netarkivet.harvester.harvesting.distribute.FrontierReportMessage;
050import dk.netarkivet.harvester.harvesting.distribute.JobEndedMessage;
051import dk.netarkivet.harvester.harvesting.frontier.ExhaustedQueuesFilter;
052import dk.netarkivet.harvester.harvesting.frontier.InMemoryFrontierReport;
053import dk.netarkivet.harvester.harvesting.frontier.RetiredQueuesFilter;
054import dk.netarkivet.harvester.harvesting.frontier.TopTotalEnqueuesFilter;
055
056/**
057 * Listens for {@link CrawlProgressMessage}s on the proper JMS channel, and stores information to be presented in the
058 * monitoring console.
059 */
060public class HarvestMonitor extends HarvesterMessageHandler implements MessageListener, CleanupIF {
061
062    /** The logger for this class. */
063    private static final Logger LOG = LoggerFactory.getLogger(HarvestMonitor.class);
064
065    /** Singleton instance of the monitor. */
066    private static HarvestMonitor instance;
067
068    /** The JMS channel on which to listen for {@link CrawlProgressMessage}s. */
069    public static final ChannelID HARVEST_MONITOR_CHANNEL_ID = HarvesterChannels.getHarvestMonitorChannel();
070
071    private Map<Long, StartedJobHistoryChartGen> chartGenByJobId = new HashMap<Long, StartedJobHistoryChartGen>();
072
073    private HarvestMonitor() {
074        // Perform initial cleanup (in case apps crashed)
075        cleanOnStartup();
076
077        // Register for listening JMS messages
078        JMSConnectionFactory.getInstance().setListener(HARVEST_MONITOR_CHANNEL_ID, this);
079    }
080
081    /**
082     * Close down the HarvestMonitor singleton. This removes the HarvestMonitor as listener to the JMS scheduler and
083     * frontier channels, closes the persistence container, and resets the singleton.
084     *
085     * @see CleanupIF#cleanup()
086     */
087    public void cleanup() {
088        JMSConnectionFactory.getInstance().removeListener(HARVEST_MONITOR_CHANNEL_ID, this);
089
090        for (StartedJobHistoryChartGen chartGen : chartGenByJobId.values()) {
091            chartGen.cleanup();
092        }
093
094        instance = null;
095    }
096
097    /**
098     * @return the singleton instance for this class.
099     */
100    public static HarvestMonitor getInstance() {
101        if (instance == null) {
102            instance = new HarvestMonitor();
103        }
104        return instance;
105    }
106
107    @Override
108    public void visit(CrawlProgressMessage msg) {
109        ArgumentNotValid.checkNotNull(msg, "msg");
110
111        Long jobId = Long.valueOf(msg.getJobID());
112
113        JobStatus jobStatus = JobDAO.getInstance().read(jobId).getStatus();
114        if (!JobStatus.STARTED.equals(jobStatus)) {
115            return;
116        }
117
118        StartedJobInfo info = StartedJobInfo.build(msg);
119        RunningJobsInfoDAO.getInstance().store(info);
120
121        // Start a chart generator if none has been started yet
122        if (chartGenByJobId.get(jobId) == null) {
123            chartGenByJobId.put(jobId, new StartedJobHistoryChartGen(jobId));
124        }
125    }
126
127    /**
128     * Cleans up the database on transitions to status DONE and FAILED.
129     *
130     * @param msg a {@link JobEndedMessage}
131     */
132    @Override
133    public void visit(JobEndedMessage msg) {
134        ArgumentNotValid.checkNotNull(msg, "msg");
135
136        JobStatus newStatus = msg.getJobStatus();
137        long jobId = msg.getJobId();
138
139        // Delete records in the DB
140        RunningJobsInfoDAO dao = RunningJobsInfoDAO.getInstance();
141        int delCount = dao.removeInfoForJob(jobId);
142        LOG.info("Deleted {} running job info records for job ID {} on transition to status {}", delCount, jobId,
143                newStatus.name());
144
145        // Stop chart generation
146        StartedJobHistoryChartGen gen = chartGenByJobId.get(jobId);
147        if (gen != null) {
148            gen.cleanup();
149        }
150    }
151
152    /**
153     * Returns the delay in seconds after which a harvest monitor webpage should refresh itself. This delay is set by
154     * overriding the value of the {@link HarvesterSettings#HARVEST_MONITOR_REFRESH_INTERVAL} property.
155     *
156     * @return the delay in seconds after which a harvest monitor webpage should refresh itself
157     */
158    public static final int getAutoRefreshDelay() {
159        return Settings.getInt(HarvesterSettings.HARVEST_MONITOR_REFRESH_INTERVAL);
160    }
161
162    /**
163     * Returns a configurable number of the most recent running job info records available for the given job ID.
164     *
165     * @param jobId
166     * @return the most recent running job info records available for the given job ID.
167     * @see HarvesterSettings#HARVEST_MONITOR_DISPLAYED_HISTORY_SIZE
168     */
169    public static StartedJobInfo[] getMostRecentRunningJobInfos(long jobId) {
170        int displayedHistorySize = Settings.getInt(HarvesterSettings.HARVEST_MONITOR_DISPLAYED_HISTORY_SIZE);
171        // for now. TODO pagination
172        return RunningJobsInfoDAO.getInstance().getMostRecentByJobId(jobId, 0, displayedHistorySize);
173    }
174
175    /**
176     * Returns the most recent running job info record available for the given job ID.
177     *
178     * @param jobId
179     * @return the most recent running job info records available for the given job ID.
180     */
181    public static StartedJobInfo getMostRecentRunningJobInfo(long jobId) {
182        return RunningJobsInfoDAO.getInstance().getMostRecentByJobId(jobId);
183    }
184
185    @Override
186    public void visit(FrontierReportMessage msg) {
187        ArgumentNotValid.checkNotNull(msg, "msg");
188
189        int insertCount = RunningJobsInfoDAO.getInstance().storeFrontierReport(msg.getFilterId(), msg.getReport(),
190                msg.getJobID());
191        if (LOG.isInfoEnabled() && insertCount > 0) {
192            LOG.info("Stored frontier report {}-{}' ({} lines): inserted {} lines in the DB", msg.getReport()
193                    .getJobName(), msg.getFilterId(), msg.getReport().getSize(), insertCount);
194        }
195    }
196
197    /**
198     * Retrieves the latest frontier report stored for the given job ID.
199     *
200     * @param jobId the job id
201     * @return a frontier report
202     */
203    public static InMemoryFrontierReport getFrontierReport(long jobId) {
204        // Right now there's only one filter and it's not user controlled.
205        return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new TopTotalEnqueuesFilter().getFilterId());
206    }
207
208    /**
209     * Retrieves the latest frontier extract report stored for the given job ID, that contains only retired queues.
210     *
211     * @param jobId the job id
212     * @return a frontier report that contains only retired queues.
213     */
214    public static InMemoryFrontierReport getFrontierRetiredQueues(long jobId) {
215        return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new RetiredQueuesFilter().getFilterId());
216    }
217
218    /**
219     * Retrieves the latest frontier extract report stored for the given job ID, that contains only exhausted queues.
220     *
221     * @param jobId the job id
222     * @return a frontier report that contains only exhausted queues.
223     */
224    public static InMemoryFrontierReport getFrontierExhaustedQueues(long jobId) {
225        return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new ExhaustedQueuesFilter().getFilterId());
226    }
227
228    /** Default chart image. */
229    private static final String EMPTY_CHART_FILE = "empty-history.png";
230
231    /**
232     * Returns the path of the chart image file, relative to the webapp directory. If no chart is available, returns a
233     * default empty image.
234     *
235     * @param jobId the job id
236     * @return the path of the chart image file, relative to the webapp directory.
237     */
238    public static String getChartFilePath(long jobId) {
239        if (instance == null) {
240            return EMPTY_CHART_FILE;
241        }
242        StartedJobHistoryChartGen gen = instance.chartGenByJobId.get(jobId);
243        if (gen != null) {
244            File chartFile = gen.getChartFile();
245            if (chartFile == null) {
246                return EMPTY_CHART_FILE;
247            }
248            return chartFile.getName();
249        }
250        return EMPTY_CHART_FILE;
251    }
252
253    private void cleanOnStartup() {
254        Set<Long> idsToRemove = new TreeSet<Long>();
255
256        RunningJobsInfoDAO dao = RunningJobsInfoDAO.getInstance();
257        idsToRemove.addAll(dao.getHistoryRecordIds());
258        Iterator<Long> startedJobIds = JobDAO.getInstance().getAllJobIds(JobStatus.STARTED);
259        while (startedJobIds.hasNext()) {
260            // don't remove records for jobs still in status STARTED
261            idsToRemove.remove(startedJobIds.next());
262        }
263
264        int delCount = 0;
265        for (long jobId : idsToRemove) {
266            delCount += dao.removeInfoForJob(jobId);
267            delCount += dao.deleteFrontierReports(jobId);
268        }
269        if (delCount > 0) {
270            LOG.info("Cleaned up {} obsolete history records.", delCount);
271        }
272
273    }
274
275}