001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.harvesting.monitor;
024
025import java.io.File;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.Iterator;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeSet;
032
033import javax.jms.MessageListener;
034
035import org.apache.commons.lang.StringUtils;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import dk.netarkivet.common.distribute.ChannelID;
040import dk.netarkivet.common.distribute.JMSConnectionFactory;
041import dk.netarkivet.common.exceptions.ArgumentNotValid;
042import dk.netarkivet.common.utils.CleanupIF;
043import dk.netarkivet.common.utils.Settings;
044import dk.netarkivet.harvester.HarvesterSettings;
045import dk.netarkivet.harvester.datamodel.JobDAO;
046import dk.netarkivet.harvester.datamodel.JobStatus;
047import dk.netarkivet.harvester.datamodel.RunningJobsInfoDAO;
048import dk.netarkivet.harvester.distribute.HarvesterChannels;
049import dk.netarkivet.harvester.distribute.HarvesterMessageHandler;
050import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage;
051import dk.netarkivet.harvester.harvesting.distribute.FrontierReportMessage;
052import dk.netarkivet.harvester.harvesting.distribute.JobEndedMessage;
053import dk.netarkivet.harvester.harvesting.frontier.ExhaustedQueuesFilter;
054import dk.netarkivet.harvester.harvesting.frontier.InMemoryFrontierReport;
055import dk.netarkivet.harvester.harvesting.frontier.RetiredQueuesFilter;
056import dk.netarkivet.harvester.harvesting.frontier.TopTotalEnqueuesFilter;
057
058/**
059 * Listens for {@link CrawlProgressMessage}s on the proper JMS channel, and stores information to be presented in the
060 * monitoring console.
061 */
062public class HarvestMonitor extends HarvesterMessageHandler implements MessageListener, CleanupIF {
063
064    /** The logger for this class. */
065    private static final Logger LOG = LoggerFactory.getLogger(HarvestMonitor.class);
066
067    /** Singleton instance of the monitor. */
068    private static HarvestMonitor instance;
069    /** Harvest Monitor refresh Interval. */
070    private int refreshInterval;
071
072    /** The JMS channel on which to listen for {@link CrawlProgressMessage}s. */
073    public static final ChannelID HARVEST_MONITOR_CHANNEL_ID = HarvesterChannels.getHarvestMonitorChannel();
074
075    private Map<Long, StartedJobHistoryChartGen> chartGenByJobId = new HashMap<Long, StartedJobHistoryChartGen>();
076
077    private Set<Long> runningJobs = new TreeSet<Long>();
078
079    private HarvestMonitor() {
080        refreshInterval = Settings.getInt(HarvesterSettings.HARVEST_MONITOR_REFRESH_INTERVAL);
081        LOG.info("Initializing HarvestMonitor with refreshInterval={} seconds", refreshInterval);
082        
083        // Perform initial cleanup (in case apps crashed)
084        cleanOnStartup();
085
086        // Register for listening JMS messages
087        JMSConnectionFactory.getInstance().setListener(HARVEST_MONITOR_CHANNEL_ID, this);
088        LOG.info("Started listening to queue {}", HARVEST_MONITOR_CHANNEL_ID);
089    }
090
091    /**
092     * Close down the HarvestMonitor singleton. This removes the HarvestMonitor as listener to the JMS scheduler and
093     * frontier channels, closes the persistence container, and resets the singleton.
094     *
095     * @see CleanupIF#cleanup()
096     */
097    public void cleanup() {
098        JMSConnectionFactory.getInstance().removeListener(HARVEST_MONITOR_CHANNEL_ID, this);
099
100        for (StartedJobHistoryChartGen chartGen : chartGenByJobId.values()) {
101            chartGen.cleanup();
102        }
103
104        instance = null;
105    }
106
107    /**
108     * @return the singleton instance for this class.
109     */
110    public static synchronized HarvestMonitor getInstance() {
111        if (instance == null) {
112            instance = new HarvestMonitor();
113        }
114        return instance;
115    }
116
117    @Override
118    public void visit(CrawlProgressMessage msg) {
119        ArgumentNotValid.checkNotNull(msg, "msg");
120        Long jobId = Long.valueOf(msg.getJobID());
121        
122        JobStatus jobStatus = JobDAO.getInstance().read(jobId).getStatus();
123        if (!JobStatus.STARTED.equals(jobStatus)) {
124                LOG.warn("Receiving CrawlProgressMessage for job {} registered as state {} instead of STARTED. Ignoring message", jobId, jobStatus);
125            return;
126        }
127        
128        StartedJobInfo info = StartedJobInfo.build(msg);
129        LOG.trace("Received CrawlProgressMessage for jobId {}: {}", jobId, info);
130        RunningJobsInfoDAO.getInstance().store(info);
131
132        runningJobs.add(jobId);
133
134        // Start a chart generator if none has been started yet
135        if (chartGenByJobId.get(jobId) == null) {
136            chartGenByJobId.put(jobId, new StartedJobHistoryChartGen(jobId));
137        }
138    }
139
140    /**
141     * Cleans up the database on transitions to status DONE and FAILED.
142     *
143     * @param msg a {@link JobEndedMessage}
144     */
145    @Override
146    public void visit(JobEndedMessage msg) {
147        ArgumentNotValid.checkNotNull(msg, "msg");
148
149        JobStatus newStatus = msg.getJobStatus();
150        long jobId = msg.getJobId();
151
152        // Delete records in the DB
153        RunningJobsInfoDAO dao = RunningJobsInfoDAO.getInstance();
154        int delCount = dao.removeInfoForJob(jobId);
155        LOG.info("Processing JobEndedMessage. Deleted {} running job info records for job ID {} on transition to status {}", delCount, jobId,
156                newStatus.name());
157
158        runningJobs.remove(jobId);
159
160        // Stop chart generation
161        StartedJobHistoryChartGen gen = chartGenByJobId.get(jobId);
162        if (gen != null) {
163            gen.cleanup();
164        }
165    }
166
167    /**
168     * Returns the delay in seconds after which a harvest monitor webpage should refresh itself. This delay is set by
169     * overriding the value of the {@link HarvesterSettings#HARVEST_MONITOR_REFRESH_INTERVAL} property.
170     *
171     * @return the delay in seconds after which a harvest monitor webpage should refresh itself
172     */
173    public static final int getAutoRefreshDelay() {
174        return Settings.getInt(HarvesterSettings.HARVEST_MONITOR_REFRESH_INTERVAL);
175    }
176
177    /**
178     * Returns a configurable number of the most recent running job info records available for the given job ID.
179     *
180     * @param jobId
181     * @return the most recent running job info records available for the given job ID.
182     * @see HarvesterSettings#HARVEST_MONITOR_DISPLAYED_HISTORY_SIZE
183     */
184    public static StartedJobInfo[] getMostRecentRunningJobInfos(long jobId) {
185        int displayedHistorySize = Settings.getInt(HarvesterSettings.HARVEST_MONITOR_DISPLAYED_HISTORY_SIZE);
186        // for now. TODO pagination
187        return RunningJobsInfoDAO.getInstance().getMostRecentByJobId(jobId, 0, displayedHistorySize);
188    }
189
190    /**
191     * Returns the most recent running job info record available for the given job ID.
192     *
193     * @param jobId
194     * @return the most recent running job info records available for the given job ID.
195     */
196    public static StartedJobInfo getMostRecentRunningJobInfo(long jobId) {
197        return RunningJobsInfoDAO.getInstance().getMostRecentByJobId(jobId);
198    }
199
200    @Override
201    public void visit(FrontierReportMessage msg) {
202        ArgumentNotValid.checkNotNull(msg, "msg");
203
204        int insertCount = RunningJobsInfoDAO.getInstance().storeFrontierReport(msg.getFilterId(), msg.getReport(),
205                msg.getJobID());
206        if (LOG.isInfoEnabled() && insertCount > 0) {
207            LOG.info("Stored frontier report {}-{}' ({} lines): inserted {} lines in the DB", msg.getReport()
208                    .getJobName(), msg.getFilterId(), msg.getReport().getSize(), insertCount);
209        }
210    }
211
212    /**
213     * Retrieves the latest frontier report stored for the given job ID.
214     *
215     * @param jobId the job id
216     * @return a frontier report
217     */
218    public static InMemoryFrontierReport getFrontierReport(long jobId) {
219        // Right now there's only one filter and it's not user controlled.
220        return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new TopTotalEnqueuesFilter().getFilterId());
221    }
222    
223    /**
224     * Retrieve a frontier report from a job id, with limited results and possibility to sort by totalenqueues DESC
225     *
226     * @param jobId the job id
227     * @param limit the limit of result to query
228     * @param sort if true, sort the results by totalenqueues DESC
229     * @return a frontier report
230     */
231    public static InMemoryFrontierReport getFrontierReport(long jobId, boolean sort) {
232        int displayedHistorySize = 100; //default value
233        try {
234                displayedHistorySize = Settings.getInt(HarvesterSettings.HARVEST_MONITOR_DISPLAYED_FRONTIER_QUEUE_SIZE);
235        } catch (Exception e) {
236                //nothing
237        }
238        // Right now there's only one filter and it's not user controlled.
239        return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, displayedHistorySize, sort);
240    }
241    
242    /**
243     * Retrieve a frontier report from a job id, with limited results and possibility to sort by totalenqueues DESC
244     *
245     * @param jobId the job id
246     * @param limit the limit of result to query
247     * @param sort if true, sort the results by totalenqueues DESC
248     * @return a frontier report
249     */
250    public static InMemoryFrontierReport getFrontierActiveAndInactiveQueuesReport(long jobId, boolean sort) {
251        int displayedHistorySize = 100; //default value
252        try {
253                displayedHistorySize = Settings.getInt(HarvesterSettings.HARVEST_MONITOR_DISPLAYED_FRONTIER_QUEUE_SIZE);
254        } catch (Exception e) {
255                //nothing
256        }
257        // Right now there's only one filter and it's not user controlled.
258        return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new TopTotalEnqueuesFilter().getFilterId(),
259                        displayedHistorySize, sort);
260    }
261
262    /**
263     * Retrieves the latest frontier extract report stored for the given job ID, that contains only retired queues.
264     *
265     * @param jobId the job id
266     * @return a frontier report that contains only retired queues.
267     */
268    public static InMemoryFrontierReport getFrontierRetiredQueues(long jobId) {
269        return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new RetiredQueuesFilter().getFilterId());
270    }
271
272    /**
273     * Retrieves the latest frontier extract report stored for the given job ID, that contains only exhausted queues.
274     *
275     * @param jobId the job id
276     * @return a frontier report that contains only exhausted queues.
277     */
278    public static InMemoryFrontierReport getFrontierExhaustedQueues(long jobId) {
279        return RunningJobsInfoDAO.getInstance().getFrontierReport(jobId, new ExhaustedQueuesFilter().getFilterId());
280    }
281
282    /** Default chart image. */
283    private static final String EMPTY_CHART_FILE = "empty-history.png";
284
285    /**
286     * Returns the path of the chart image file, relative to the webapp directory. If no chart is available, returns a
287     * default empty image.
288     *
289     * @param jobId the job id
290     * @return the path of the chart image file, relative to the webapp directory.
291     */
292    public static String getChartFilePath(long jobId) {
293        if (instance == null) {
294            return EMPTY_CHART_FILE;
295        }
296        StartedJobHistoryChartGen gen = instance.chartGenByJobId.get(jobId);
297        if (gen != null) {
298            File chartFile = gen.getChartFile();
299            if (chartFile == null) {
300                return EMPTY_CHART_FILE;
301            }
302            return chartFile.getName();
303        }
304        return EMPTY_CHART_FILE;
305    }
306
307    private void cleanOnStartup() {
308        Set<Long> idsToRemove = new TreeSet<Long>();
309
310        RunningJobsInfoDAO dao = RunningJobsInfoDAO.getInstance();
311        idsToRemove.addAll(dao.getHistoryRecordIds());
312        Iterator<Long> startedJobIds = JobDAO.getInstance().getAllJobIds(JobStatus.STARTED);
313        while (startedJobIds.hasNext()) {
314            // don't remove records for jobs still in status STARTED
315            idsToRemove.remove(startedJobIds.next());
316        }
317
318        int delCount = 0;
319        for (long jobId : idsToRemove) {
320            delCount += dao.removeInfoForJob(jobId);
321            delCount += dao.deleteFrontierReports(jobId);
322        }
323        if (delCount > 0) {
324            LOG.info("Cleaned up {} obsolete history records for finished jobs {}", delCount, StringUtils.join(idsToRemove, ","));
325        }
326    }
327
328    public Set<Long> getRunningJobs() {
329        return Collections.unmodifiableSet(runningJobs);
330    }
331
332}