001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.datamodel;
024
025import java.sql.Connection;
026import java.sql.PreparedStatement;
027import java.sql.ResultSet;
028import java.sql.SQLException;
029import java.sql.Statement;
030import java.sql.Timestamp;
031import java.util.ArrayList;
032import java.util.Collections;
033import java.util.Date;
034import java.util.HashMap;
035import java.util.LinkedList;
036import java.util.List;
037import java.util.Map;
038import java.util.Set;
039import java.util.TreeMap;
040import java.util.TreeSet;
041
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import dk.netarkivet.common.exceptions.ArgumentNotValid;
046import dk.netarkivet.common.exceptions.IOFailure;
047import dk.netarkivet.common.exceptions.UnknownID;
048import dk.netarkivet.common.utils.DBUtils;
049import dk.netarkivet.common.utils.ExceptionUtils;
050import dk.netarkivet.common.utils.Settings;
051import dk.netarkivet.harvester.HarvesterSettings;
052import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage.CrawlStatus;
053import dk.netarkivet.harvester.harvesting.frontier.FrontierReportFilter;
054import dk.netarkivet.harvester.harvesting.frontier.FrontierReportLine;
055import dk.netarkivet.harvester.harvesting.frontier.InMemoryFrontierReport;
056import dk.netarkivet.harvester.harvesting.monitor.StartedJobInfo;
057
058/**
059 * Class implementing the persistence of running job infos.
060 */
061public class RunningJobsInfoDBDAO extends RunningJobsInfoDAO {
062
063    /** The logger. */
064    private static final Logger log = LoggerFactory.getLogger(RunningJobsInfoDBDAO.class);
065
066    /** Max length of urls stored in tables. */
067    private static final int MAX_URL_LENGTH = 1000;
068
069    /**
070     * Defines the order of columns in the runningJobsMonitor table. Used in SQL queries.
071     */
072    private static enum HM_COLUMN {
073        jobId, harvestName, elapsedSeconds, hostUrl, progress, queuedFilesCount, totalQueuesCount, activeQueuesCount, retiredQueuesCount, exhaustedQueuesCount, alertsCount, downloadedFilesCount, currentProcessedKBPerSec, processedKBPerSec, currentProcessedDocsPerSec, processedDocsPerSec, activeToeCount, status, tstamp;
074
075        /**
076         * Returns the rank in an SQL query (ordinal + 1).
077         *
078         * @return ordinal() + 1
079         */
080        int rank() {
081            return ordinal() + 1;
082        }
083
084        /**
085         * Returns the SQL substring that lists columns according to their ordinal.
086         *
087         * @return the SQL substring that lists columns in proper order.
088         */
089        static String getColumnsInOrder() {
090            StringBuffer columns = new StringBuffer();
091            for (HM_COLUMN c : values()) {
092                columns.append(c.name() + ", ");
093            }
094            return columns.substring(0, columns.lastIndexOf(","));
095        }
096    }
097
098    /**
099     * Date of last history record per job.
100     */
101    private static Map<Long, Long> lastSampleDateByJobId = new HashMap<Long, Long>();
102
103    /**
104     * Rate in milliseconds at which history records should be sampled for a running job.
105     */
106    private static final long HISTORY_SAMPLE_RATE = 1000 * Settings
107            .getLong(HarvesterSettings.HARVEST_MONITOR_HISTORY_SAMPLE_RATE);
108
109    /**
110     * The constructor of RunningJobsInfoDBDAO. Attempts to update/install the necessary database tables, if they need
111     * to be updated.
112     */
113    public RunningJobsInfoDBDAO() {
114        Connection connection = HarvestDBConnection.get();
115        try {
116            /**
117             * Update if necessary the current version of the tables 'runningJobsHistory', 'runningJobsMonitor' and
118             * 'frontierReportMonitor'.
119             */
120            HarvesterDatabaseTables.checkVersion(connection, HarvesterDatabaseTables.RUNNINGJOBSHISTORY);
121            HarvesterDatabaseTables.checkVersion(connection, HarvesterDatabaseTables.RUNNINGJOBSMONITOR);
122            HarvesterDatabaseTables.checkVersion(connection, HarvesterDatabaseTables.FRONTIERREPORTMONITOR);
123        } finally {
124            HarvestDBConnection.release(connection);
125        }
126    }
127
128    /**
129     * Stores a {@link StartedJobInfo} record to the persistent storage. The record is stored in the monitor table, and
130     * if the elapsed time since the last history sample is equal or superior to the history sample rate, also to the
131     * history table.
132     *
133     * @param startedJobInfo the record to store.
134     */
135    @Override
136    public synchronized void store(StartedJobInfo startedJobInfo) {
137        ArgumentNotValid.checkNotNull(startedJobInfo, "StartedJobInfo startedJobInfo");
138
139        Connection c = HarvestDBConnection.get();
140
141        try {
142            PreparedStatement stm = null;
143
144            // First is there a record in the monitor table?
145            boolean update = false;
146            try {
147                stm = c.prepareStatement("SELECT jobId FROM runningJobsMonitor WHERE jobId=? AND harvestName=?");
148                stm.setLong(1, startedJobInfo.getJobId());
149                stm.setString(2, startedJobInfo.getHarvestName());
150
151                // One row expected, as per PK definition
152                update = stm.executeQuery().next();
153
154            } catch (SQLException e) {
155                String message = "SQL error checking running jobs monitor table" + "\n"
156                        + ExceptionUtils.getSQLExceptionCause(e);
157                log.warn(message, e);
158                throw new IOFailure(message, e);
159            }
160
161            try {
162                // Update or insert latest progress information for this job
163                c.setAutoCommit(false);
164
165                StringBuffer sql = new StringBuffer();
166
167                if (update) {
168                    sql.append("UPDATE runningJobsMonitor SET ");
169
170                    StringBuffer columns = new StringBuffer();
171                    // FIXME Seriously, construct an identical SQL string every time and use an enum...?!
172                    for (HM_COLUMN setCol : HM_COLUMN.values()) {
173                        columns.append(setCol.name() + "=?, ");
174                    }
175                    sql.append(columns.substring(0, columns.lastIndexOf(",")));
176                    sql.append(" WHERE jobId=? AND harvestName=?");
177                } else {
178                    sql.append("INSERT INTO runningJobsMonitor (");
179                    sql.append(HM_COLUMN.getColumnsInOrder());
180                    sql.append(") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
181                }
182
183                stm = c.prepareStatement(sql.toString());
184                stm.setLong(HM_COLUMN.jobId.rank(), startedJobInfo.getJobId());
185                stm.setString(HM_COLUMN.harvestName.rank(), startedJobInfo.getHarvestName());
186                stm.setLong(HM_COLUMN.elapsedSeconds.rank(), startedJobInfo.getElapsedSeconds());
187                stm.setString(HM_COLUMN.hostUrl.rank(), startedJobInfo.getHostUrl());
188                stm.setDouble(HM_COLUMN.progress.rank(), startedJobInfo.getProgress());
189                stm.setLong(HM_COLUMN.queuedFilesCount.rank(), startedJobInfo.getQueuedFilesCount());
190                stm.setLong(HM_COLUMN.totalQueuesCount.rank(), startedJobInfo.getTotalQueuesCount());
191                stm.setLong(HM_COLUMN.activeQueuesCount.rank(), startedJobInfo.getActiveQueuesCount());
192                stm.setLong(HM_COLUMN.retiredQueuesCount.rank(), startedJobInfo.getRetiredQueuesCount());
193                stm.setLong(HM_COLUMN.exhaustedQueuesCount.rank(), startedJobInfo.getExhaustedQueuesCount());
194                stm.setLong(HM_COLUMN.alertsCount.rank(), startedJobInfo.getAlertsCount());
195                stm.setLong(HM_COLUMN.downloadedFilesCount.rank(), startedJobInfo.getDownloadedFilesCount());
196                stm.setLong(HM_COLUMN.currentProcessedKBPerSec.rank(), startedJobInfo.getCurrentProcessedKBPerSec());
197                stm.setLong(HM_COLUMN.processedKBPerSec.rank(), startedJobInfo.getProcessedKBPerSec());
198                stm.setDouble(HM_COLUMN.currentProcessedDocsPerSec.rank(),
199                        startedJobInfo.getCurrentProcessedDocsPerSec());
200                stm.setDouble(HM_COLUMN.processedDocsPerSec.rank(), startedJobInfo.getProcessedDocsPerSec());
201                stm.setInt(HM_COLUMN.activeToeCount.rank(), startedJobInfo.getActiveToeCount());
202                stm.setInt(HM_COLUMN.status.rank(), startedJobInfo.getStatus().ordinal());
203                stm.setTimestamp(HM_COLUMN.tstamp.rank(), new Timestamp(startedJobInfo.getTimestamp().getTime()));
204
205                if (update) {
206                    stm.setLong(HM_COLUMN.values().length + 1, startedJobInfo.getJobId());
207                    stm.setString(HM_COLUMN.values().length + 2, startedJobInfo.getHarvestName());
208                }
209
210                stm.executeUpdate();
211
212                c.commit();
213            } catch (SQLException e) {
214                String message = "SQL error storing started job info " + startedJobInfo + " in monitor table" + "\n"
215                        + ExceptionUtils.getSQLExceptionCause(e);
216                log.warn(message, e);
217                throw new IOFailure(message, e);
218            } finally {
219                DBUtils.closeStatementIfOpen(stm);
220                DBUtils.rollbackIfNeeded(c, "store started job info", startedJobInfo);
221            }
222
223            // Should we store an history record?
224            Long lastHistoryStore = lastSampleDateByJobId.get(startedJobInfo.getJobId());
225
226            long time = System.currentTimeMillis();
227            boolean shouldSample = lastHistoryStore == null || time >= lastHistoryStore + HISTORY_SAMPLE_RATE;
228
229            if (!shouldSample) {
230                return; // we're done
231            }
232
233            try {
234                c.setAutoCommit(false);
235
236                stm = c.prepareStatement("INSERT INTO runningJobsHistory (" + HM_COLUMN.getColumnsInOrder()
237                        + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
238                stm.setLong(HM_COLUMN.jobId.rank(), startedJobInfo.getJobId());
239                stm.setString(HM_COLUMN.harvestName.rank(), startedJobInfo.getHarvestName());
240                stm.setLong(HM_COLUMN.elapsedSeconds.rank(), startedJobInfo.getElapsedSeconds());
241                stm.setString(HM_COLUMN.hostUrl.rank(), startedJobInfo.getHostUrl());
242                stm.setDouble(HM_COLUMN.progress.rank(), startedJobInfo.getProgress());
243                stm.setLong(HM_COLUMN.queuedFilesCount.rank(), startedJobInfo.getQueuedFilesCount());
244                stm.setLong(HM_COLUMN.totalQueuesCount.rank(), startedJobInfo.getTotalQueuesCount());
245                stm.setLong(HM_COLUMN.activeQueuesCount.rank(), startedJobInfo.getActiveQueuesCount());
246                stm.setLong(HM_COLUMN.retiredQueuesCount.rank(), startedJobInfo.getRetiredQueuesCount());
247                stm.setLong(HM_COLUMN.exhaustedQueuesCount.rank(), startedJobInfo.getExhaustedQueuesCount());
248                stm.setLong(HM_COLUMN.alertsCount.rank(), startedJobInfo.getAlertsCount());
249                stm.setLong(HM_COLUMN.downloadedFilesCount.rank(), startedJobInfo.getDownloadedFilesCount());
250                stm.setLong(HM_COLUMN.currentProcessedKBPerSec.rank(), startedJobInfo.getCurrentProcessedKBPerSec());
251                stm.setLong(HM_COLUMN.processedKBPerSec.rank(), startedJobInfo.getProcessedKBPerSec());
252                stm.setDouble(HM_COLUMN.currentProcessedDocsPerSec.rank(),
253                        startedJobInfo.getCurrentProcessedDocsPerSec());
254                stm.setDouble(HM_COLUMN.processedDocsPerSec.rank(), startedJobInfo.getProcessedDocsPerSec());
255                stm.setInt(HM_COLUMN.activeToeCount.rank(), startedJobInfo.getActiveToeCount());
256                stm.setInt(HM_COLUMN.status.rank(), startedJobInfo.getStatus().ordinal());
257                stm.setTimestamp(HM_COLUMN.tstamp.rank(), new Timestamp(startedJobInfo.getTimestamp().getTime()));
258
259                stm.executeUpdate();
260
261                c.commit();
262            } catch (SQLException e) {
263                String message = "SQL error storing started job info " + startedJobInfo + " in history table" + "\n"
264                        + ExceptionUtils.getSQLExceptionCause(e);
265                log.warn(message, e);
266                throw new IOFailure(message, e);
267            } finally {
268                DBUtils.closeStatementIfOpen(stm);
269                DBUtils.rollbackIfNeeded(c, "store started job info", startedJobInfo);
270            }
271
272            // Remember last sampling date
273            lastSampleDateByJobId.put(startedJobInfo.getJobId(), time);
274        } finally {
275            HarvestDBConnection.release(c);
276        }
277    }
278
279    /**
280     * Returns an array of all progress records chronologically sorted for the given job ID.
281     *
282     * @param jobId the job id.
283     * @return an array of all progress records chronologically sorted for the given job ID.
284     */
285    @Override
286    public StartedJobInfo[] getFullJobHistory(long jobId) {
287        Connection c = HarvestDBConnection.get();
288        PreparedStatement stm = null;
289        try {
290            stm = c.prepareStatement("SELECT " + HM_COLUMN.getColumnsInOrder() + " FROM runningJobsHistory"
291                    + " WHERE jobId=?" + " ORDER BY elapsedSeconds ASC");
292            stm.setLong(1, jobId);
293
294            ResultSet rs = stm.executeQuery();
295            List<StartedJobInfo> infosForJob = listFromResultSet(rs);
296
297            return (StartedJobInfo[]) infosForJob.toArray(new StartedJobInfo[infosForJob.size()]);
298
299        } catch (SQLException e) {
300            String message = "SQL error querying runningJobsHistory for job ID " + jobId + " from database" + "\n"
301                    + ExceptionUtils.getSQLExceptionCause(e);
302            log.warn(message, e);
303            throw new IOFailure(message, e);
304        } finally {
305            DBUtils.closeStatementIfOpen(stm);
306            HarvestDBConnection.release(c);
307        }
308    }
309
310    /**
311     * Returns the most recent record for every job, partitioned by harvest definition name.
312     *
313     * @return the full listing of started job information, partitioned by harvest definition name.
314     */
315    @Override
316    public Map<String, List<StartedJobInfo>> getMostRecentByHarvestName() {
317        Connection c = HarvestDBConnection.get();
318
319        Map<String, List<StartedJobInfo>> infoMap = new TreeMap<String, List<StartedJobInfo>>();
320        Statement stm = null;
321        try {
322            stm = c.createStatement();
323            ResultSet rs = stm.executeQuery("SELECT " + HM_COLUMN.getColumnsInOrder() + " FROM runningJobsMonitor");
324
325            while (rs.next()) {
326                long jobId = rs.getLong(HM_COLUMN.jobId.rank());
327                String harvestName = rs.getString(HM_COLUMN.harvestName.rank());
328
329                List<StartedJobInfo> infosForHarvest = infoMap.get(harvestName);
330                if (infosForHarvest == null) {
331                    infosForHarvest = new LinkedList<StartedJobInfo>();
332                    infoMap.put(harvestName, infosForHarvest);
333                }
334
335                StartedJobInfo sji = new StartedJobInfo(harvestName, jobId);
336
337                sji.setElapsedSeconds(rs.getLong(HM_COLUMN.elapsedSeconds.rank()));
338                sji.setHostUrl(rs.getString(HM_COLUMN.hostUrl.rank()));
339                sji.setProgress(rs.getDouble(HM_COLUMN.progress.rank()));
340                sji.setQueuedFilesCount(rs.getLong(HM_COLUMN.queuedFilesCount.rank()));
341                sji.setTotalQueuesCount(rs.getLong(HM_COLUMN.totalQueuesCount.rank()));
342                sji.setActiveQueuesCount(rs.getLong(HM_COLUMN.activeQueuesCount.rank()));
343                sji.setRetiredQueuesCount(rs.getLong(HM_COLUMN.retiredQueuesCount.rank()));
344                sji.setExhaustedQueuesCount(rs.getLong(HM_COLUMN.exhaustedQueuesCount.rank()));
345                sji.setAlertsCount(rs.getLong(HM_COLUMN.alertsCount.rank()));
346                sji.setDownloadedFilesCount(rs.getLong(HM_COLUMN.downloadedFilesCount.rank()));
347                sji.setCurrentProcessedKBPerSec(rs.getLong(HM_COLUMN.currentProcessedKBPerSec.rank()));
348                sji.setProcessedKBPerSec(rs.getLong(HM_COLUMN.processedKBPerSec.rank()));
349                sji.setCurrentProcessedDocsPerSec(rs.getDouble(HM_COLUMN.currentProcessedDocsPerSec.rank()));
350                sji.setProcessedDocsPerSec(rs.getDouble(HM_COLUMN.processedDocsPerSec.rank()));
351                sji.setActiveToeCount(rs.getInt(HM_COLUMN.activeToeCount.rank()));
352                sji.setStatus(CrawlStatus.values()[rs.getInt(HM_COLUMN.status.rank())]);
353                sji.setTimestamp(new Date(rs.getTimestamp(HM_COLUMN.tstamp.rank()).getTime()));
354
355                infosForHarvest.add(sji);
356            }
357
358            return infoMap;
359
360        } catch (SQLException e) {
361            String message = "SQL error querying runningJobsMonitor" + "\n" + ExceptionUtils.getSQLExceptionCause(e);
362            log.warn(message, e);
363            throw new IOFailure(message, e);
364        } finally {
365            DBUtils.closeStatementIfOpen(stm);
366            HarvestDBConnection.release(c);
367        }
368
369    }
370
371    /**
372     * Returns the ids of jobs for which history records exist as an immutable set.
373     *
374     * @return the ids of jobs for which history records exist.
375     */
376    @Override
377    public Set<Long> getHistoryRecordIds() {
378        Connection c = HarvestDBConnection.get();
379        Set<Long> jobIds = new TreeSet<Long>();
380        Statement stm = null;
381        try {
382            stm = c.createStatement();
383            ResultSet rs = stm.executeQuery("SELECT DISTINCT " + HM_COLUMN.jobId + " FROM runningJobsMonitor");
384
385            while (rs.next()) {
386                jobIds.add(rs.getLong(HM_COLUMN.jobId.name()));
387            }
388            stm.close();
389
390            stm = c.createStatement();
391            rs = stm.executeQuery("SELECT DISTINCT " + HM_COLUMN.jobId + " FROM runningJobsHistory");
392
393            while (rs.next()) {
394                jobIds.add(rs.getLong(HM_COLUMN.jobId.name()));
395            }
396            stm.close();
397
398            stm = c.createStatement();
399            rs = stm.executeQuery("SELECT DISTINCT " + HM_COLUMN.jobId + " FROM frontierReportMonitor");
400
401            while (rs.next()) {
402                jobIds.add(rs.getLong(HM_COLUMN.jobId.name()));
403            }
404
405            return Collections.unmodifiableSet(jobIds);
406        } catch (SQLException e) {
407            String message = "SQL error querying running jobs history" + "\n" + ExceptionUtils.getSQLExceptionCause(e);
408            log.warn(message, e);
409            throw new IOFailure(message, e);
410        } finally {
411            DBUtils.closeStatementIfOpen(stm);
412            HarvestDBConnection.release(c);
413        }
414    }
415
416    /**
417     * Returns an array of chronologically sorted progress records for the given job ID, starting at a given crawl time,
418     * and limited to a given number of record.
419     *
420     * @param jobId the job id.
421     * @param startTime the crawl time (in seconds) to begin.
422     * @param limit the maximum number of records to fetch.
423     * @return an array of chronologically sorted progress records for the given job ID, starting at a given crawl time,
424     * and limited to a given number of record.
425     */
426    @Override
427    public StartedJobInfo[] getMostRecentByJobId(long jobId, long startTime, int limit) {
428
429        ArgumentNotValid.checkNotNull(jobId, "jobId");
430        ArgumentNotValid.checkNotNull(startTime, "startTime");
431        ArgumentNotValid.checkNotNull(limit, "limit");
432
433        Connection c = HarvestDBConnection.get();
434        PreparedStatement stm = null;
435        try {
436            stm = c.prepareStatement("SELECT " + HM_COLUMN.getColumnsInOrder() + " FROM runningJobsHistory"
437                    + " WHERE jobId=? AND elapsedSeconds >= ?" + " ORDER BY elapsedSeconds DESC" + " "
438                    + DBSpecifics.getInstance().getOrderByLimitAndOffsetSubClause(limit, 0));
439            stm.setLong(1, jobId);
440            stm.setLong(2, startTime);
441
442            ResultSet rs = stm.executeQuery();
443            List<StartedJobInfo> infosForJob = listFromResultSet(rs);
444
445            return (StartedJobInfo[]) infosForJob.toArray(new StartedJobInfo[infosForJob.size()]);
446
447        } catch (SQLException e) {
448            String message = "SQL error querying runningJobsHistory for job ID " + jobId + " from database" + "\n"
449                    + ExceptionUtils.getSQLExceptionCause(e);
450            log.warn(message, e);
451            throw new IOFailure(message, e);
452        } finally {
453            DBUtils.closeStatementIfOpen(stm);
454            HarvestDBConnection.release(c);
455        }
456    }
457
458    /**
459     * Returns the most recent progress record for the given job ID.
460     *
461     * @param jobId the job id.
462     * @return the most recent progress record for the given job ID.
463     */
464    @Override
465    public StartedJobInfo getMostRecentByJobId(long jobId) {
466        Connection c = HarvestDBConnection.get();
467        Statement stm = null;
468        try {
469            stm = c.createStatement();
470            ResultSet rs = stm.executeQuery("SELECT " + HM_COLUMN.getColumnsInOrder() + " FROM runningJobsMonitor"
471                    + " WHERE jobId=" + jobId);
472
473            if (rs.next()) {
474                String harvestName = rs.getString(HM_COLUMN.harvestName.rank());
475                StartedJobInfo sji = new StartedJobInfo(harvestName, jobId);
476
477                sji.setElapsedSeconds(rs.getLong(HM_COLUMN.elapsedSeconds.rank()));
478                sji.setHostUrl(rs.getString(HM_COLUMN.hostUrl.rank()));
479                sji.setProgress(rs.getDouble(HM_COLUMN.progress.rank()));
480                sji.setQueuedFilesCount(rs.getLong(HM_COLUMN.queuedFilesCount.rank()));
481                sji.setTotalQueuesCount(rs.getLong(HM_COLUMN.totalQueuesCount.rank()));
482                sji.setActiveQueuesCount(rs.getLong(HM_COLUMN.activeQueuesCount.rank()));
483                sji.setRetiredQueuesCount(rs.getLong(HM_COLUMN.retiredQueuesCount.rank()));
484                sji.setExhaustedQueuesCount(rs.getLong(HM_COLUMN.exhaustedQueuesCount.rank()));
485                sji.setAlertsCount(rs.getLong(HM_COLUMN.alertsCount.rank()));
486                sji.setDownloadedFilesCount(rs.getLong(HM_COLUMN.downloadedFilesCount.rank()));
487                sji.setCurrentProcessedKBPerSec(rs.getLong(HM_COLUMN.currentProcessedKBPerSec.rank()));
488                sji.setProcessedKBPerSec(rs.getLong(HM_COLUMN.processedKBPerSec.rank()));
489                sji.setCurrentProcessedDocsPerSec(rs.getDouble(HM_COLUMN.currentProcessedDocsPerSec.rank()));
490                sji.setProcessedDocsPerSec(rs.getDouble(HM_COLUMN.processedDocsPerSec.rank()));
491                sji.setActiveToeCount(rs.getInt(HM_COLUMN.activeToeCount.rank()));
492                sji.setStatus(CrawlStatus.values()[rs.getInt(HM_COLUMN.status.rank())]);
493                sji.setTimestamp(new Date(rs.getTimestamp(HM_COLUMN.tstamp.rank()).getTime()));
494
495                return sji;
496            }
497
498        } catch (SQLException e) {
499            String message = "SQL error querying runningJobsMonitor" + "\n" + ExceptionUtils.getSQLExceptionCause(e);
500            log.warn(message, e);
501            throw new IOFailure(message, e);
502        } finally {
503            DBUtils.closeStatementIfOpen(stm);
504            HarvestDBConnection.release(c);
505        }
506
507        throw new UnknownID("No running job with ID " + jobId);
508    }
509
510    /**
511     * Removes all records pertaining to the given job ID from the persistent storage.
512     *
513     * @param jobId the job id.
514     * @return the number of deleted records.
515     */
516    @Override
517    public int removeInfoForJob(long jobId) {
518        ArgumentNotValid.checkNotNull(jobId, "jobId");
519
520        Connection c = HarvestDBConnection.get();
521        PreparedStatement stm = null;
522
523        int deleteCount = 0;
524        try {
525            // Delete from monitor table
526            c.setAutoCommit(false);
527            stm = c.prepareStatement("DELETE FROM runningJobsMonitor WHERE jobId=?");
528            stm.setLong(1, jobId);
529            deleteCount = stm.executeUpdate();
530            c.commit();
531            stm.close();
532            // Delete from history table
533            c.setAutoCommit(false);
534            stm = c.prepareStatement("DELETE FROM runningJobsHistory WHERE jobId=?");
535            stm.setLong(1, jobId);
536            deleteCount += stm.executeUpdate();
537            c.commit();
538        } catch (SQLException e) {
539            String message = "SQL error deleting from history records for job ID " + jobId + "\n"
540                    + ExceptionUtils.getSQLExceptionCause(e);
541            log.warn(message, e);
542            throw new IOFailure(message, e);
543        } finally {
544            DBUtils.closeStatementIfOpen(stm);
545            DBUtils.rollbackIfNeeded(c, "removeInfoForJob", jobId);
546            HarvestDBConnection.release(c);
547        }
548
549        return deleteCount;
550
551    }
552
553    /**
554     * Enum class containing all fields in the frontierReportMonitor table.
555     */
556    private static enum FR_COLUMN {
557        jobId, filterId, tstamp, domainName, currentSize, totalEnqueues, sessionBalance, lastCost, averageCost, // See
558        // NAS-2168
559        // Often
560        // contains
561        // the
562        // illegal
563        // value
564        // 4.9E-324
565        lastDequeueTime, wakeTime, totalSpend, totalBudget, errorCount, lastPeekUri, lastQueuedUri;
566
567        /**
568         * @return the rank of a member of the enum class.
569         */
570        int rank() {
571            return ordinal() + 1;
572        }
573
574        /**
575         * Returns the SQL substring that lists columns according to their ordinal.
576         *
577         * @return the SQL substring that lists columns in proper order.
578         */
579        static String getColumnsInOrder() {
580            String columns = "";
581            for (FR_COLUMN c : values()) {
582                columns += c.name() + ", ";
583            }
584            return columns.substring(0, columns.lastIndexOf(","));
585        }
586    }
587
588    ;
589
590    /**
591     * Store frontier report data to the persistent storage.
592     *
593     * @param report the report to store
594     * @param filterId the id of the filter that produced the report
595     * @param jobId The ID of the job responsible for this report
596     * @return the update count
597     */
598    public int storeFrontierReport(String filterId, InMemoryFrontierReport report, Long jobId) {
599        ArgumentNotValid.checkNotNull(report, "report");
600        ArgumentNotValid.checkNotNull(jobId, "jobId");
601
602        Connection c = HarvestDBConnection.get();
603        PreparedStatement stm = null;
604        try {
605
606            // First drop existing rows
607            try {
608                c.setAutoCommit(false);
609
610                stm = c.prepareStatement("DELETE FROM frontierReportMonitor WHERE jobId=? AND filterId=?");
611                stm.setLong(1, jobId);
612                stm.setString(2, filterId);
613
614                stm.executeUpdate();
615
616                c.commit();
617            } catch (SQLException e) {
618                String message = "SQL error dropping records for job ID " + jobId + " and filterId " + filterId + "\n"
619                        + ExceptionUtils.getSQLExceptionCause(e);
620                log.warn(message, e);
621                return 0;
622            } finally {
623                DBUtils.closeStatementIfOpen(stm);
624                DBUtils.rollbackIfNeeded(c, "storeFrontierReport delete", jobId);
625            }
626
627            // Now batch insert report lines
628            try {
629                c.setAutoCommit(false);
630
631                stm = c.prepareStatement("INSERT INTO frontierReportMonitor(" + FR_COLUMN.getColumnsInOrder()
632                        + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
633
634                for (FrontierReportLine frl : report.getLines()) {
635                    stm.setLong(FR_COLUMN.jobId.rank(), jobId);
636                    stm.setString(FR_COLUMN.filterId.rank(), filterId);
637                    stm.setTimestamp(FR_COLUMN.tstamp.rank(), new Timestamp(report.getTimestamp()));
638                    stm.setString(FR_COLUMN.domainName.rank(), frl.getDomainName());
639                    stm.setLong(FR_COLUMN.currentSize.rank(), frl.getCurrentSize());
640                    stm.setLong(FR_COLUMN.totalEnqueues.rank(), frl.getTotalEnqueues());
641                    stm.setLong(FR_COLUMN.sessionBalance.rank(), frl.getSessionBalance());
642                    stm.setDouble(FR_COLUMN.lastCost.rank(), frl.getLastCost());
643                    stm.setDouble(FR_COLUMN.averageCost.rank(),
644                            correctNumericIfIllegalAverageCost(frl.getAverageCost()));
645                    stm.setString(FR_COLUMN.lastDequeueTime.rank(), frl.getLastDequeueTime());
646                    stm.setString(FR_COLUMN.wakeTime.rank(), frl.getWakeTime());
647                    stm.setLong(FR_COLUMN.totalSpend.rank(), frl.getTotalSpend());
648                    stm.setLong(FR_COLUMN.totalBudget.rank(), frl.getTotalBudget());
649                    stm.setLong(FR_COLUMN.errorCount.rank(), frl.getErrorCount());
650
651                    // URIs are to be truncated to 1000 characters
652                    // (see SQL scripts)
653                    DBUtils.setStringMaxLength(stm, FR_COLUMN.lastPeekUri.rank(), frl.getLastPeekUri(), MAX_URL_LENGTH,
654                            frl, "lastPeekUri");
655                    DBUtils.setStringMaxLength(stm, FR_COLUMN.lastQueuedUri.rank(), frl.getLastQueuedUri(),
656                            MAX_URL_LENGTH, frl, "lastQueuedUri");
657
658                    stm.addBatch();
659                }
660
661                int[] updCounts = stm.executeBatch();
662                int updCountTotal = 0;
663                for (int count : updCounts) {
664                    updCountTotal += count;
665                }
666
667                c.commit();
668
669                return updCountTotal;
670            } catch (SQLException e) {
671                String message = "SQL error writing records for job ID " + jobId + " and filterId " + filterId + "\n"
672                        + ExceptionUtils.getSQLExceptionCause(e);
673                log.warn(message, e);
674                return 0;
675            } finally {
676                DBUtils.closeStatementIfOpen(stm);
677                DBUtils.rollbackIfNeeded(c, "storeFrontierReport insert", jobId);
678            }
679
680        } finally {
681            HarvestDBConnection.release(c);
682        }
683    }
684
685    /**
686     * Correct the given double if it is equal to 4.9E-324. Part of fix for NAS-2168
687     *
688     * @param value A given double
689     * @return 0.0 if value is 4.9E-324, otherwise the value as is
690     */
691    private double correctNumericIfIllegalAverageCost(double value) {
692        if (value == 4.9E-324) {
693            log.warn("Found illegal double value '" + 4.9E-324 + "'. Changed it to 0.0");
694            return 0.0;
695        } else {
696            return value;
697        }
698    }
699
700    /**
701     * Returns the list of the available frontier report types.
702     *
703     * @return the list of the available frontier report types.
704     * @see FrontierReportFilter#getFilterId()
705     */
706    public String[] getFrontierReportFilterTypes() {
707        List<String> filterIds = new ArrayList<String>();
708
709        Connection c = HarvestDBConnection.get();
710        PreparedStatement stm = null;
711        try {
712            stm = c.prepareStatement("SELECT DISTINCT filterId FROM frontierReportMonitor");
713
714            ResultSet rs = stm.executeQuery();
715            while (rs.next()) {
716                filterIds.add(rs.getString(1));
717            }
718
719        } catch (SQLException e) {
720            String message = "SQL error fetching filter IDs" + "\n" + ExceptionUtils.getSQLExceptionCause(e);
721            log.warn(message, e);
722        } finally {
723            DBUtils.closeStatementIfOpen(stm);
724            HarvestDBConnection.release(c);
725        }
726
727        return filterIds.toArray(new String[filterIds.size()]);
728    }
729
730    /**
731     * Retrieve a frontier report from a job id and a given filter class.
732     *
733     * @param jobId the job id
734     * @param filterId the id of the filter that produced the report
735     * @return a frontier report
736     */
737    public InMemoryFrontierReport getFrontierReport(long jobId, String filterId) {
738
739        ArgumentNotValid.checkNotNull(jobId, "jobId");
740        ArgumentNotValid.checkNotNull(filterId, "filterId");
741
742        InMemoryFrontierReport report = new InMemoryFrontierReport(Long.toString(jobId));
743
744        Connection c = HarvestDBConnection.get();
745        PreparedStatement stm = null;
746        try {
747            stm = c.prepareStatement("SELECT " + FR_COLUMN.getColumnsInOrder() + " FROM frontierReportMonitor"
748                    + " WHERE jobId=? AND filterId=?");
749            stm.setLong(1, jobId);
750            stm.setString(2, filterId);
751
752            ResultSet rs = stm.executeQuery();
753
754            // Process first line to get report timestamp
755            if (rs.next()) {
756                report.setTimestamp(rs.getTimestamp(FR_COLUMN.tstamp.rank()).getTime());
757                report.addLine(getLine(rs));
758
759                while (rs.next()) {
760                    report.addLine(getLine(rs));
761                }
762            }
763
764        } catch (SQLException e) {
765            String message = "SQL error fetching report for job ID " + jobId + " and filterId " + filterId + "\n"
766                    + ExceptionUtils.getSQLExceptionCause(e);
767            log.warn(message, e);
768        } finally {
769            DBUtils.closeStatementIfOpen(stm);
770            HarvestDBConnection.release(c);
771        }
772
773        return report;
774    }
775
776    /**
777     * Deletes all frontier report data pertaining to the given job id from the persistent storage.
778     *
779     * @param jobId the job id
780     * @return the update count
781     */
782    public int deleteFrontierReports(long jobId) {
783        ArgumentNotValid.checkNotNull(jobId, "jobId");
784
785        Connection c = HarvestDBConnection.get();
786        PreparedStatement stm = null;
787        try {
788            c.setAutoCommit(false);
789
790            stm = c.prepareStatement("DELETE FROM frontierReportMonitor WHERE jobId=?");
791            stm.setLong(1, jobId);
792
793            int delCount = stm.executeUpdate();
794
795            c.commit();
796
797            return delCount;
798        } catch (SQLException e) {
799            String message = "SQL error deleting report lines for job ID " + jobId + "\n"
800                    + ExceptionUtils.getSQLExceptionCause(e);
801            log.warn(message, e);
802            return 0;
803        } finally {
804            DBUtils.closeStatementIfOpen(stm);
805            DBUtils.rollbackIfNeeded(c, "deleteFrontierReports", jobId);
806            HarvestDBConnection.release(c);
807        }
808    }
809
810    /**
811     * Get a frontierReportLine from the resultSet.
812     *
813     * @param rs the resultset with data from table frontierReportMonitor
814     * @return a frontierReportLine from the resultSet.
815     * @throws SQLException If unable to get data from resultSet
816     */
817    private FrontierReportLine getLine(ResultSet rs) throws SQLException {
818        FrontierReportLine line = new FrontierReportLine();
819
820        line.setAverageCost(rs.getDouble(FR_COLUMN.averageCost.rank()));
821        line.setCurrentSize(rs.getLong(FR_COLUMN.currentSize.rank()));
822        line.setDomainName(rs.getString(FR_COLUMN.domainName.rank()));
823        line.setErrorCount(rs.getLong(FR_COLUMN.errorCount.rank()));
824        line.setLastCost(rs.getDouble(FR_COLUMN.lastCost.rank()));
825        line.setLastDequeueTime(rs.getString(FR_COLUMN.lastDequeueTime.rank()));
826        line.setLastPeekUri(rs.getString(FR_COLUMN.lastPeekUri.rank()));
827        line.setLastQueuedUri(rs.getString(FR_COLUMN.lastQueuedUri.rank()));
828        line.setSessionBalance(rs.getLong(FR_COLUMN.sessionBalance.rank()));
829        line.setTotalBudget(rs.getLong(FR_COLUMN.totalBudget.rank()));
830        line.setTotalEnqueues(rs.getLong(FR_COLUMN.totalEnqueues.rank()));
831        line.setTotalSpend(rs.getLong(FR_COLUMN.totalSpend.rank()));
832        line.setWakeTime(rs.getString(FR_COLUMN.wakeTime.rank()));
833
834        return line;
835    }
836
837    /**
838     * Get a list of StartedJobInfo objects from a resultset of entries from runningJobsHistory table.
839     *
840     * @param rs a resultset with entries from table runningJobsHistory.
841     * @return a list of StartedJobInfo objects from the resultset
842     * @throws SQLException If any problems reading data from the resultset
843     */
844    private List<StartedJobInfo> listFromResultSet(ResultSet rs) throws SQLException {
845        List<StartedJobInfo> list = new LinkedList<StartedJobInfo>();
846        while (rs.next()) {
847            StartedJobInfo sji = new StartedJobInfo(rs.getString(HM_COLUMN.harvestName.rank()),
848                    rs.getLong(HM_COLUMN.jobId.rank()));
849            sji.setElapsedSeconds(rs.getLong(HM_COLUMN.elapsedSeconds.rank()));
850            sji.setHostUrl(rs.getString(HM_COLUMN.hostUrl.rank()));
851            sji.setProgress(rs.getDouble(HM_COLUMN.progress.rank()));
852            sji.setQueuedFilesCount(rs.getLong(HM_COLUMN.queuedFilesCount.rank()));
853            sji.setTotalQueuesCount(rs.getLong(HM_COLUMN.totalQueuesCount.rank()));
854            sji.setActiveQueuesCount(rs.getLong(HM_COLUMN.activeQueuesCount.rank()));
855            sji.setExhaustedQueuesCount(rs.getLong(HM_COLUMN.exhaustedQueuesCount.rank()));
856            sji.setAlertsCount(rs.getLong(HM_COLUMN.alertsCount.rank()));
857            sji.setDownloadedFilesCount(rs.getLong(HM_COLUMN.downloadedFilesCount.rank()));
858            sji.setCurrentProcessedKBPerSec(rs.getLong(HM_COLUMN.currentProcessedKBPerSec.rank()));
859            sji.setProcessedKBPerSec(rs.getLong(HM_COLUMN.processedKBPerSec.rank()));
860            sji.setCurrentProcessedDocsPerSec(rs.getDouble(HM_COLUMN.currentProcessedDocsPerSec.rank()));
861            sji.setProcessedDocsPerSec(rs.getDouble(HM_COLUMN.processedDocsPerSec.rank()));
862            sji.setActiveToeCount(rs.getInt(HM_COLUMN.activeToeCount.rank()));
863            sji.setStatus(CrawlStatus.values()[rs.getInt(HM_COLUMN.status.rank())]);
864            sji.setTimestamp(new Date(rs.getTimestamp(HM_COLUMN.tstamp.rank()).getTime()));
865
866            list.add(sji);
867        }
868        return list;
869    }
870
871}