001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.datamodel; 024 025import java.sql.Connection; 026import java.sql.PreparedStatement; 027import java.sql.ResultSet; 028import java.sql.SQLException; 029import java.sql.Statement; 030import java.sql.Timestamp; 031import java.util.ArrayList; 032import java.util.Collections; 033import java.util.Date; 034import java.util.HashMap; 035import java.util.LinkedList; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.TreeMap; 040import java.util.TreeSet; 041 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import dk.netarkivet.common.exceptions.ArgumentNotValid; 046import dk.netarkivet.common.exceptions.IOFailure; 047import dk.netarkivet.common.exceptions.UnknownID; 048import dk.netarkivet.common.utils.DBUtils; 049import dk.netarkivet.common.utils.ExceptionUtils; 050import dk.netarkivet.common.utils.Settings; 051import dk.netarkivet.harvester.HarvesterSettings; 052import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage.CrawlStatus; 053import dk.netarkivet.harvester.harvesting.frontier.FrontierReportFilter; 054import dk.netarkivet.harvester.harvesting.frontier.FrontierReportLine; 055import dk.netarkivet.harvester.harvesting.frontier.InMemoryFrontierReport; 056import dk.netarkivet.harvester.harvesting.monitor.StartedJobInfo; 057 058/** 059 * Class implementing the persistence of running job infos. 060 */ 061public class RunningJobsInfoDBDAO extends RunningJobsInfoDAO { 062 063 /** The logger. */ 064 private static final Logger log = LoggerFactory.getLogger(RunningJobsInfoDBDAO.class); 065 066 /** Max length of urls stored in tables. */ 067 private static final int MAX_URL_LENGTH = 1000; 068 069 /** 070 * Defines the order of columns in the runningJobsMonitor table. Used in SQL queries. 071 */ 072 private static enum HM_COLUMN { 073 jobId, harvestName, elapsedSeconds, hostUrl, progress, queuedFilesCount, totalQueuesCount, activeQueuesCount, retiredQueuesCount, exhaustedQueuesCount, alertsCount, downloadedFilesCount, currentProcessedKBPerSec, processedKBPerSec, currentProcessedDocsPerSec, processedDocsPerSec, activeToeCount, status, tstamp; 074 075 /** 076 * Returns the rank in an SQL query (ordinal + 1). 077 * 078 * @return ordinal() + 1 079 */ 080 int rank() { 081 return ordinal() + 1; 082 } 083 084 /** 085 * Returns the SQL substring that lists columns according to their ordinal. 086 * 087 * @return the SQL substring that lists columns in proper order. 088 */ 089 static String getColumnsInOrder() { 090 StringBuffer columns = new StringBuffer(); 091 for (HM_COLUMN c : values()) { 092 columns.append(c.name() + ", "); 093 } 094 return columns.substring(0, columns.lastIndexOf(",")); 095 } 096 } 097 098 /** 099 * Date of last history record per job. 100 */ 101 private static Map<Long, Long> lastSampleDateByJobId = new HashMap<Long, Long>(); 102 103 /** 104 * Rate in milliseconds at which history records should be sampled for a running job. 105 */ 106 private static final long HISTORY_SAMPLE_RATE = 1000 * Settings 107 .getLong(HarvesterSettings.HARVEST_MONITOR_HISTORY_SAMPLE_RATE); 108 109 /** 110 * The constructor of RunningJobsInfoDBDAO. Attempts to update/install the necessary database tables, if they need 111 * to be updated. 112 */ 113 public RunningJobsInfoDBDAO() { 114 Connection connection = HarvestDBConnection.get(); 115 try { 116 /** 117 * Update if necessary the current version of the tables 'runningJobsHistory', 'runningJobsMonitor' and 118 * 'frontierReportMonitor'. 119 */ 120 HarvesterDatabaseTables.checkVersion(connection, HarvesterDatabaseTables.RUNNINGJOBSHISTORY); 121 HarvesterDatabaseTables.checkVersion(connection, HarvesterDatabaseTables.RUNNINGJOBSMONITOR); 122 HarvesterDatabaseTables.checkVersion(connection, HarvesterDatabaseTables.FRONTIERREPORTMONITOR); 123 } finally { 124 HarvestDBConnection.release(connection); 125 } 126 } 127 128 /** 129 * Stores a {@link StartedJobInfo} record to the persistent storage. The record is stored in the monitor table, and 130 * if the elapsed time since the last history sample is equal or superior to the history sample rate, also to the 131 * history table. 132 * 133 * @param startedJobInfo the record to store. 134 */ 135 @Override 136 public synchronized void store(StartedJobInfo startedJobInfo) { 137 ArgumentNotValid.checkNotNull(startedJobInfo, "StartedJobInfo startedJobInfo"); 138 139 Connection c = HarvestDBConnection.get(); 140 141 try { 142 PreparedStatement stm = null; 143 144 // First is there a record in the monitor table? 145 boolean update = false; 146 try { 147 stm = c.prepareStatement("SELECT jobId FROM runningJobsMonitor WHERE jobId=? AND harvestName=?"); 148 stm.setLong(1, startedJobInfo.getJobId()); 149 stm.setString(2, startedJobInfo.getHarvestName()); 150 151 // One row expected, as per PK definition 152 update = stm.executeQuery().next(); 153 154 } catch (SQLException e) { 155 String message = "SQL error checking running jobs monitor table" + "\n" 156 + ExceptionUtils.getSQLExceptionCause(e); 157 log.warn(message, e); 158 throw new IOFailure(message, e); 159 } 160 161 try { 162 // Update or insert latest progress information for this job 163 c.setAutoCommit(false); 164 165 StringBuffer sql = new StringBuffer(); 166 167 if (update) { 168 sql.append("UPDATE runningJobsMonitor SET "); 169 170 StringBuffer columns = new StringBuffer(); 171 // FIXME Seriously, construct an identical SQL string every time and use an enum...?! 172 for (HM_COLUMN setCol : HM_COLUMN.values()) { 173 columns.append(setCol.name() + "=?, "); 174 } 175 sql.append(columns.substring(0, columns.lastIndexOf(","))); 176 sql.append(" WHERE jobId=? AND harvestName=?"); 177 } else { 178 sql.append("INSERT INTO runningJobsMonitor ("); 179 sql.append(HM_COLUMN.getColumnsInOrder()); 180 sql.append(") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); 181 } 182 183 stm = c.prepareStatement(sql.toString()); 184 stm.setLong(HM_COLUMN.jobId.rank(), startedJobInfo.getJobId()); 185 stm.setString(HM_COLUMN.harvestName.rank(), startedJobInfo.getHarvestName()); 186 stm.setLong(HM_COLUMN.elapsedSeconds.rank(), startedJobInfo.getElapsedSeconds()); 187 stm.setString(HM_COLUMN.hostUrl.rank(), startedJobInfo.getHostUrl()); 188 stm.setDouble(HM_COLUMN.progress.rank(), startedJobInfo.getProgress()); 189 stm.setLong(HM_COLUMN.queuedFilesCount.rank(), startedJobInfo.getQueuedFilesCount()); 190 stm.setLong(HM_COLUMN.totalQueuesCount.rank(), startedJobInfo.getTotalQueuesCount()); 191 stm.setLong(HM_COLUMN.activeQueuesCount.rank(), startedJobInfo.getActiveQueuesCount()); 192 stm.setLong(HM_COLUMN.retiredQueuesCount.rank(), startedJobInfo.getRetiredQueuesCount()); 193 stm.setLong(HM_COLUMN.exhaustedQueuesCount.rank(), startedJobInfo.getExhaustedQueuesCount()); 194 stm.setLong(HM_COLUMN.alertsCount.rank(), startedJobInfo.getAlertsCount()); 195 stm.setLong(HM_COLUMN.downloadedFilesCount.rank(), startedJobInfo.getDownloadedFilesCount()); 196 stm.setLong(HM_COLUMN.currentProcessedKBPerSec.rank(), startedJobInfo.getCurrentProcessedKBPerSec()); 197 stm.setLong(HM_COLUMN.processedKBPerSec.rank(), startedJobInfo.getProcessedKBPerSec()); 198 stm.setDouble(HM_COLUMN.currentProcessedDocsPerSec.rank(), 199 startedJobInfo.getCurrentProcessedDocsPerSec()); 200 stm.setDouble(HM_COLUMN.processedDocsPerSec.rank(), startedJobInfo.getProcessedDocsPerSec()); 201 stm.setInt(HM_COLUMN.activeToeCount.rank(), startedJobInfo.getActiveToeCount()); 202 stm.setInt(HM_COLUMN.status.rank(), startedJobInfo.getStatus().ordinal()); 203 stm.setTimestamp(HM_COLUMN.tstamp.rank(), new Timestamp(startedJobInfo.getTimestamp().getTime())); 204 205 if (update) { 206 stm.setLong(HM_COLUMN.values().length + 1, startedJobInfo.getJobId()); 207 stm.setString(HM_COLUMN.values().length + 2, startedJobInfo.getHarvestName()); 208 } 209 210 stm.executeUpdate(); 211 212 c.commit(); 213 } catch (SQLException e) { 214 String message = "SQL error storing started job info " + startedJobInfo + " in monitor table" + "\n" 215 + ExceptionUtils.getSQLExceptionCause(e); 216 log.warn(message, e); 217 throw new IOFailure(message, e); 218 } finally { 219 DBUtils.closeStatementIfOpen(stm); 220 DBUtils.rollbackIfNeeded(c, "store started job info", startedJobInfo); 221 } 222 223 // Should we store an history record? 224 Long lastHistoryStore = lastSampleDateByJobId.get(startedJobInfo.getJobId()); 225 226 long time = System.currentTimeMillis(); 227 boolean shouldSample = lastHistoryStore == null || time >= lastHistoryStore + HISTORY_SAMPLE_RATE; 228 229 if (!shouldSample) { 230 return; // we're done 231 } 232 233 try { 234 c.setAutoCommit(false); 235 236 stm = c.prepareStatement("INSERT INTO runningJobsHistory (" + HM_COLUMN.getColumnsInOrder() 237 + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); 238 stm.setLong(HM_COLUMN.jobId.rank(), startedJobInfo.getJobId()); 239 stm.setString(HM_COLUMN.harvestName.rank(), startedJobInfo.getHarvestName()); 240 stm.setLong(HM_COLUMN.elapsedSeconds.rank(), startedJobInfo.getElapsedSeconds()); 241 stm.setString(HM_COLUMN.hostUrl.rank(), startedJobInfo.getHostUrl()); 242 stm.setDouble(HM_COLUMN.progress.rank(), startedJobInfo.getProgress()); 243 stm.setLong(HM_COLUMN.queuedFilesCount.rank(), startedJobInfo.getQueuedFilesCount()); 244 stm.setLong(HM_COLUMN.totalQueuesCount.rank(), startedJobInfo.getTotalQueuesCount()); 245 stm.setLong(HM_COLUMN.activeQueuesCount.rank(), startedJobInfo.getActiveQueuesCount()); 246 stm.setLong(HM_COLUMN.retiredQueuesCount.rank(), startedJobInfo.getRetiredQueuesCount()); 247 stm.setLong(HM_COLUMN.exhaustedQueuesCount.rank(), startedJobInfo.getExhaustedQueuesCount()); 248 stm.setLong(HM_COLUMN.alertsCount.rank(), startedJobInfo.getAlertsCount()); 249 stm.setLong(HM_COLUMN.downloadedFilesCount.rank(), startedJobInfo.getDownloadedFilesCount()); 250 stm.setLong(HM_COLUMN.currentProcessedKBPerSec.rank(), startedJobInfo.getCurrentProcessedKBPerSec()); 251 stm.setLong(HM_COLUMN.processedKBPerSec.rank(), startedJobInfo.getProcessedKBPerSec()); 252 stm.setDouble(HM_COLUMN.currentProcessedDocsPerSec.rank(), 253 startedJobInfo.getCurrentProcessedDocsPerSec()); 254 stm.setDouble(HM_COLUMN.processedDocsPerSec.rank(), startedJobInfo.getProcessedDocsPerSec()); 255 stm.setInt(HM_COLUMN.activeToeCount.rank(), startedJobInfo.getActiveToeCount()); 256 stm.setInt(HM_COLUMN.status.rank(), startedJobInfo.getStatus().ordinal()); 257 stm.setTimestamp(HM_COLUMN.tstamp.rank(), new Timestamp(startedJobInfo.getTimestamp().getTime())); 258 259 stm.executeUpdate(); 260 261 c.commit(); 262 } catch (SQLException e) { 263 String message = "SQL error storing started job info " + startedJobInfo + " in history table" + "\n" 264 + ExceptionUtils.getSQLExceptionCause(e); 265 log.warn(message, e); 266 throw new IOFailure(message, e); 267 } finally { 268 DBUtils.closeStatementIfOpen(stm); 269 DBUtils.rollbackIfNeeded(c, "store started job info", startedJobInfo); 270 } 271 272 // Remember last sampling date 273 lastSampleDateByJobId.put(startedJobInfo.getJobId(), time); 274 } finally { 275 HarvestDBConnection.release(c); 276 } 277 } 278 279 /** 280 * Returns an array of all progress records chronologically sorted for the given job ID. 281 * 282 * @param jobId the job id. 283 * @return an array of all progress records chronologically sorted for the given job ID. 284 */ 285 @Override 286 public StartedJobInfo[] getFullJobHistory(long jobId) { 287 Connection c = HarvestDBConnection.get(); 288 PreparedStatement stm = null; 289 try { 290 stm = c.prepareStatement("SELECT " + HM_COLUMN.getColumnsInOrder() + " FROM runningJobsHistory" 291 + " WHERE jobId=?" + " ORDER BY elapsedSeconds ASC"); 292 stm.setLong(1, jobId); 293 294 ResultSet rs = stm.executeQuery(); 295 List<StartedJobInfo> infosForJob = listFromResultSet(rs); 296 297 return (StartedJobInfo[]) infosForJob.toArray(new StartedJobInfo[infosForJob.size()]); 298 299 } catch (SQLException e) { 300 String message = "SQL error querying runningJobsHistory for job ID " + jobId + " from database" + "\n" 301 + ExceptionUtils.getSQLExceptionCause(e); 302 log.warn(message, e); 303 throw new IOFailure(message, e); 304 } finally { 305 DBUtils.closeStatementIfOpen(stm); 306 HarvestDBConnection.release(c); 307 } 308 } 309 310 /** 311 * Returns the most recent record for every job, partitioned by harvest definition name. 312 * 313 * @return the full listing of started job information, partitioned by harvest definition name. 314 */ 315 @Override 316 public Map<String, List<StartedJobInfo>> getMostRecentByHarvestName() { 317 Connection c = HarvestDBConnection.get(); 318 319 Map<String, List<StartedJobInfo>> infoMap = new TreeMap<String, List<StartedJobInfo>>(); 320 Statement stm = null; 321 try { 322 stm = c.createStatement(); 323 ResultSet rs = stm.executeQuery("SELECT " + HM_COLUMN.getColumnsInOrder() + " FROM runningJobsMonitor"); 324 325 while (rs.next()) { 326 long jobId = rs.getLong(HM_COLUMN.jobId.rank()); 327 String harvestName = rs.getString(HM_COLUMN.harvestName.rank()); 328 329 List<StartedJobInfo> infosForHarvest = infoMap.get(harvestName); 330 if (infosForHarvest == null) { 331 infosForHarvest = new LinkedList<StartedJobInfo>(); 332 infoMap.put(harvestName, infosForHarvest); 333 } 334 335 StartedJobInfo sji = new StartedJobInfo(harvestName, jobId); 336 337 sji.setElapsedSeconds(rs.getLong(HM_COLUMN.elapsedSeconds.rank())); 338 sji.setHostUrl(rs.getString(HM_COLUMN.hostUrl.rank())); 339 sji.setProgress(rs.getDouble(HM_COLUMN.progress.rank())); 340 sji.setQueuedFilesCount(rs.getLong(HM_COLUMN.queuedFilesCount.rank())); 341 sji.setTotalQueuesCount(rs.getLong(HM_COLUMN.totalQueuesCount.rank())); 342 sji.setActiveQueuesCount(rs.getLong(HM_COLUMN.activeQueuesCount.rank())); 343 sji.setRetiredQueuesCount(rs.getLong(HM_COLUMN.retiredQueuesCount.rank())); 344 sji.setExhaustedQueuesCount(rs.getLong(HM_COLUMN.exhaustedQueuesCount.rank())); 345 sji.setAlertsCount(rs.getLong(HM_COLUMN.alertsCount.rank())); 346 sji.setDownloadedFilesCount(rs.getLong(HM_COLUMN.downloadedFilesCount.rank())); 347 sji.setCurrentProcessedKBPerSec(rs.getLong(HM_COLUMN.currentProcessedKBPerSec.rank())); 348 sji.setProcessedKBPerSec(rs.getLong(HM_COLUMN.processedKBPerSec.rank())); 349 sji.setCurrentProcessedDocsPerSec(rs.getDouble(HM_COLUMN.currentProcessedDocsPerSec.rank())); 350 sji.setProcessedDocsPerSec(rs.getDouble(HM_COLUMN.processedDocsPerSec.rank())); 351 sji.setActiveToeCount(rs.getInt(HM_COLUMN.activeToeCount.rank())); 352 sji.setStatus(CrawlStatus.values()[rs.getInt(HM_COLUMN.status.rank())]); 353 sji.setTimestamp(new Date(rs.getTimestamp(HM_COLUMN.tstamp.rank()).getTime())); 354 355 infosForHarvest.add(sji); 356 } 357 358 return infoMap; 359 360 } catch (SQLException e) { 361 String message = "SQL error querying runningJobsMonitor" + "\n" + ExceptionUtils.getSQLExceptionCause(e); 362 log.warn(message, e); 363 throw new IOFailure(message, e); 364 } finally { 365 DBUtils.closeStatementIfOpen(stm); 366 HarvestDBConnection.release(c); 367 } 368 369 } 370 371 /** 372 * Returns the ids of jobs for which history records exist as an immutable set. 373 * 374 * @return the ids of jobs for which history records exist. 375 */ 376 @Override 377 public Set<Long> getHistoryRecordIds() { 378 Connection c = HarvestDBConnection.get(); 379 Set<Long> jobIds = new TreeSet<Long>(); 380 Statement stm = null; 381 try { 382 stm = c.createStatement(); 383 ResultSet rs = stm.executeQuery("SELECT DISTINCT " + HM_COLUMN.jobId + " FROM runningJobsMonitor"); 384 385 while (rs.next()) { 386 jobIds.add(rs.getLong(HM_COLUMN.jobId.name())); 387 } 388 stm.close(); 389 390 stm = c.createStatement(); 391 rs = stm.executeQuery("SELECT DISTINCT " + HM_COLUMN.jobId + " FROM runningJobsHistory"); 392 393 while (rs.next()) { 394 jobIds.add(rs.getLong(HM_COLUMN.jobId.name())); 395 } 396 stm.close(); 397 398 stm = c.createStatement(); 399 rs = stm.executeQuery("SELECT DISTINCT " + HM_COLUMN.jobId + " FROM frontierReportMonitor"); 400 401 while (rs.next()) { 402 jobIds.add(rs.getLong(HM_COLUMN.jobId.name())); 403 } 404 405 return Collections.unmodifiableSet(jobIds); 406 } catch (SQLException e) { 407 String message = "SQL error querying running jobs history" + "\n" + ExceptionUtils.getSQLExceptionCause(e); 408 log.warn(message, e); 409 throw new IOFailure(message, e); 410 } finally { 411 DBUtils.closeStatementIfOpen(stm); 412 HarvestDBConnection.release(c); 413 } 414 } 415 416 /** 417 * Returns an array of chronologically sorted progress records for the given job ID, starting at a given crawl time, 418 * and limited to a given number of record. 419 * 420 * @param jobId the job id. 421 * @param startTime the crawl time (in seconds) to begin. 422 * @param limit the maximum number of records to fetch. 423 * @return an array of chronologically sorted progress records for the given job ID, starting at a given crawl time, 424 * and limited to a given number of record. 425 */ 426 @Override 427 public StartedJobInfo[] getMostRecentByJobId(long jobId, long startTime, int limit) { 428 429 ArgumentNotValid.checkNotNull(jobId, "jobId"); 430 ArgumentNotValid.checkNotNull(startTime, "startTime"); 431 ArgumentNotValid.checkNotNull(limit, "limit"); 432 433 Connection c = HarvestDBConnection.get(); 434 PreparedStatement stm = null; 435 try { 436 stm = c.prepareStatement("SELECT " + HM_COLUMN.getColumnsInOrder() + " FROM runningJobsHistory" 437 + " WHERE jobId=? AND elapsedSeconds >= ?" + " ORDER BY elapsedSeconds DESC" + " " 438 + DBSpecifics.getInstance().getOrderByLimitAndOffsetSubClause(limit, 0)); 439 stm.setLong(1, jobId); 440 stm.setLong(2, startTime); 441 442 ResultSet rs = stm.executeQuery(); 443 List<StartedJobInfo> infosForJob = listFromResultSet(rs); 444 445 return (StartedJobInfo[]) infosForJob.toArray(new StartedJobInfo[infosForJob.size()]); 446 447 } catch (SQLException e) { 448 String message = "SQL error querying runningJobsHistory for job ID " + jobId + " from database" + "\n" 449 + ExceptionUtils.getSQLExceptionCause(e); 450 log.warn(message, e); 451 throw new IOFailure(message, e); 452 } finally { 453 DBUtils.closeStatementIfOpen(stm); 454 HarvestDBConnection.release(c); 455 } 456 } 457 458 /** 459 * Returns the most recent progress record for the given job ID. 460 * 461 * @param jobId the job id. 462 * @return the most recent progress record for the given job ID. 463 */ 464 @Override 465 public StartedJobInfo getMostRecentByJobId(long jobId) { 466 Connection c = HarvestDBConnection.get(); 467 Statement stm = null; 468 try { 469 stm = c.createStatement(); 470 ResultSet rs = stm.executeQuery("SELECT " + HM_COLUMN.getColumnsInOrder() + " FROM runningJobsMonitor" 471 + " WHERE jobId=" + jobId); 472 473 if (rs.next()) { 474 String harvestName = rs.getString(HM_COLUMN.harvestName.rank()); 475 StartedJobInfo sji = new StartedJobInfo(harvestName, jobId); 476 477 sji.setElapsedSeconds(rs.getLong(HM_COLUMN.elapsedSeconds.rank())); 478 sji.setHostUrl(rs.getString(HM_COLUMN.hostUrl.rank())); 479 sji.setProgress(rs.getDouble(HM_COLUMN.progress.rank())); 480 sji.setQueuedFilesCount(rs.getLong(HM_COLUMN.queuedFilesCount.rank())); 481 sji.setTotalQueuesCount(rs.getLong(HM_COLUMN.totalQueuesCount.rank())); 482 sji.setActiveQueuesCount(rs.getLong(HM_COLUMN.activeQueuesCount.rank())); 483 sji.setRetiredQueuesCount(rs.getLong(HM_COLUMN.retiredQueuesCount.rank())); 484 sji.setExhaustedQueuesCount(rs.getLong(HM_COLUMN.exhaustedQueuesCount.rank())); 485 sji.setAlertsCount(rs.getLong(HM_COLUMN.alertsCount.rank())); 486 sji.setDownloadedFilesCount(rs.getLong(HM_COLUMN.downloadedFilesCount.rank())); 487 sji.setCurrentProcessedKBPerSec(rs.getLong(HM_COLUMN.currentProcessedKBPerSec.rank())); 488 sji.setProcessedKBPerSec(rs.getLong(HM_COLUMN.processedKBPerSec.rank())); 489 sji.setCurrentProcessedDocsPerSec(rs.getDouble(HM_COLUMN.currentProcessedDocsPerSec.rank())); 490 sji.setProcessedDocsPerSec(rs.getDouble(HM_COLUMN.processedDocsPerSec.rank())); 491 sji.setActiveToeCount(rs.getInt(HM_COLUMN.activeToeCount.rank())); 492 sji.setStatus(CrawlStatus.values()[rs.getInt(HM_COLUMN.status.rank())]); 493 sji.setTimestamp(new Date(rs.getTimestamp(HM_COLUMN.tstamp.rank()).getTime())); 494 495 return sji; 496 } 497 498 } catch (SQLException e) { 499 String message = "SQL error querying runningJobsMonitor" + "\n" + ExceptionUtils.getSQLExceptionCause(e); 500 log.warn(message, e); 501 throw new IOFailure(message, e); 502 } finally { 503 DBUtils.closeStatementIfOpen(stm); 504 HarvestDBConnection.release(c); 505 } 506 507 throw new UnknownID("No running job with ID " + jobId); 508 } 509 510 /** 511 * Removes all records pertaining to the given job ID from the persistent storage. 512 * 513 * @param jobId the job id. 514 * @return the number of deleted records. 515 */ 516 @Override 517 public int removeInfoForJob(long jobId) { 518 ArgumentNotValid.checkNotNull(jobId, "jobId"); 519 520 Connection c = HarvestDBConnection.get(); 521 PreparedStatement stm = null; 522 523 int deleteCount = 0; 524 try { 525 // Delete from monitor table 526 c.setAutoCommit(false); 527 stm = c.prepareStatement("DELETE FROM runningJobsMonitor WHERE jobId=?"); 528 stm.setLong(1, jobId); 529 deleteCount = stm.executeUpdate(); 530 c.commit(); 531 stm.close(); 532 // Delete from history table 533 c.setAutoCommit(false); 534 stm = c.prepareStatement("DELETE FROM runningJobsHistory WHERE jobId=?"); 535 stm.setLong(1, jobId); 536 deleteCount += stm.executeUpdate(); 537 c.commit(); 538 } catch (SQLException e) { 539 String message = "SQL error deleting from history records for job ID " + jobId + "\n" 540 + ExceptionUtils.getSQLExceptionCause(e); 541 log.warn(message, e); 542 throw new IOFailure(message, e); 543 } finally { 544 DBUtils.closeStatementIfOpen(stm); 545 DBUtils.rollbackIfNeeded(c, "removeInfoForJob", jobId); 546 HarvestDBConnection.release(c); 547 } 548 549 return deleteCount; 550 551 } 552 553 /** 554 * Enum class containing all fields in the frontierReportMonitor table. 555 */ 556 private static enum FR_COLUMN { 557 jobId, filterId, tstamp, domainName, currentSize, totalEnqueues, sessionBalance, lastCost, averageCost, // See 558 // NAS-2168 559 // Often 560 // contains 561 // the 562 // illegal 563 // value 564 // 4.9E-324 565 lastDequeueTime, wakeTime, totalSpend, totalBudget, errorCount, lastPeekUri, lastQueuedUri; 566 567 /** 568 * @return the rank of a member of the enum class. 569 */ 570 int rank() { 571 return ordinal() + 1; 572 } 573 574 /** 575 * Returns the SQL substring that lists columns according to their ordinal. 576 * 577 * @return the SQL substring that lists columns in proper order. 578 */ 579 static String getColumnsInOrder() { 580 String columns = ""; 581 for (FR_COLUMN c : values()) { 582 columns += c.name() + ", "; 583 } 584 return columns.substring(0, columns.lastIndexOf(",")); 585 } 586 } 587 588 ; 589 590 /** 591 * Store frontier report data to the persistent storage. 592 * 593 * @param report the report to store 594 * @param filterId the id of the filter that produced the report 595 * @param jobId The ID of the job responsible for this report 596 * @return the update count 597 */ 598 public int storeFrontierReport(String filterId, InMemoryFrontierReport report, Long jobId) { 599 ArgumentNotValid.checkNotNull(report, "report"); 600 ArgumentNotValid.checkNotNull(jobId, "jobId"); 601 602 Connection c = HarvestDBConnection.get(); 603 PreparedStatement stm = null; 604 try { 605 606 // First drop existing rows 607 try { 608 c.setAutoCommit(false); 609 610 stm = c.prepareStatement("DELETE FROM frontierReportMonitor WHERE jobId=? AND filterId=?"); 611 stm.setLong(1, jobId); 612 stm.setString(2, filterId); 613 614 stm.executeUpdate(); 615 616 c.commit(); 617 } catch (SQLException e) { 618 String message = "SQL error dropping records for job ID " + jobId + " and filterId " + filterId + "\n" 619 + ExceptionUtils.getSQLExceptionCause(e); 620 log.warn(message, e); 621 return 0; 622 } finally { 623 DBUtils.closeStatementIfOpen(stm); 624 DBUtils.rollbackIfNeeded(c, "storeFrontierReport delete", jobId); 625 } 626 627 // Now batch insert report lines 628 try { 629 c.setAutoCommit(false); 630 631 stm = c.prepareStatement("INSERT INTO frontierReportMonitor(" + FR_COLUMN.getColumnsInOrder() 632 + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); 633 634 for (FrontierReportLine frl : report.getLines()) { 635 stm.setLong(FR_COLUMN.jobId.rank(), jobId); 636 stm.setString(FR_COLUMN.filterId.rank(), filterId); 637 stm.setTimestamp(FR_COLUMN.tstamp.rank(), new Timestamp(report.getTimestamp())); 638 stm.setString(FR_COLUMN.domainName.rank(), frl.getDomainName()); 639 stm.setLong(FR_COLUMN.currentSize.rank(), frl.getCurrentSize()); 640 stm.setLong(FR_COLUMN.totalEnqueues.rank(), frl.getTotalEnqueues()); 641 stm.setLong(FR_COLUMN.sessionBalance.rank(), frl.getSessionBalance()); 642 stm.setDouble(FR_COLUMN.lastCost.rank(), frl.getLastCost()); 643 stm.setDouble(FR_COLUMN.averageCost.rank(), 644 correctNumericIfIllegalAverageCost(frl.getAverageCost())); 645 stm.setString(FR_COLUMN.lastDequeueTime.rank(), frl.getLastDequeueTime()); 646 stm.setString(FR_COLUMN.wakeTime.rank(), frl.getWakeTime()); 647 stm.setLong(FR_COLUMN.totalSpend.rank(), frl.getTotalSpend()); 648 stm.setLong(FR_COLUMN.totalBudget.rank(), frl.getTotalBudget()); 649 stm.setLong(FR_COLUMN.errorCount.rank(), frl.getErrorCount()); 650 651 // URIs are to be truncated to 1000 characters 652 // (see SQL scripts) 653 DBUtils.setStringMaxLength(stm, FR_COLUMN.lastPeekUri.rank(), frl.getLastPeekUri(), MAX_URL_LENGTH, 654 frl, "lastPeekUri"); 655 DBUtils.setStringMaxLength(stm, FR_COLUMN.lastQueuedUri.rank(), frl.getLastQueuedUri(), 656 MAX_URL_LENGTH, frl, "lastQueuedUri"); 657 658 stm.addBatch(); 659 } 660 661 int[] updCounts = stm.executeBatch(); 662 int updCountTotal = 0; 663 for (int count : updCounts) { 664 updCountTotal += count; 665 } 666 667 c.commit(); 668 669 return updCountTotal; 670 } catch (SQLException e) { 671 String message = "SQL error writing records for job ID " + jobId + " and filterId " + filterId + "\n" 672 + ExceptionUtils.getSQLExceptionCause(e); 673 log.warn(message, e); 674 return 0; 675 } finally { 676 DBUtils.closeStatementIfOpen(stm); 677 DBUtils.rollbackIfNeeded(c, "storeFrontierReport insert", jobId); 678 } 679 680 } finally { 681 HarvestDBConnection.release(c); 682 } 683 } 684 685 /** 686 * Correct the given double if it is equal to 4.9E-324. Part of fix for NAS-2168 687 * 688 * @param value A given double 689 * @return 0.0 if value is 4.9E-324, otherwise the value as is 690 */ 691 private double correctNumericIfIllegalAverageCost(double value) { 692 if (value == 4.9E-324) { 693 log.warn("Found illegal double value '" + 4.9E-324 + "'. Changed it to 0.0"); 694 return 0.0; 695 } else { 696 return value; 697 } 698 } 699 700 /** 701 * Returns the list of the available frontier report types. 702 * 703 * @return the list of the available frontier report types. 704 * @see FrontierReportFilter#getFilterId() 705 */ 706 public String[] getFrontierReportFilterTypes() { 707 List<String> filterIds = new ArrayList<String>(); 708 709 Connection c = HarvestDBConnection.get(); 710 PreparedStatement stm = null; 711 try { 712 stm = c.prepareStatement("SELECT DISTINCT filterId FROM frontierReportMonitor"); 713 714 ResultSet rs = stm.executeQuery(); 715 while (rs.next()) { 716 filterIds.add(rs.getString(1)); 717 } 718 719 } catch (SQLException e) { 720 String message = "SQL error fetching filter IDs" + "\n" + ExceptionUtils.getSQLExceptionCause(e); 721 log.warn(message, e); 722 } finally { 723 DBUtils.closeStatementIfOpen(stm); 724 HarvestDBConnection.release(c); 725 } 726 727 return filterIds.toArray(new String[filterIds.size()]); 728 } 729 730 /** 731 * Retrieve a frontier report from a job id and a given filter class. 732 * 733 * @param jobId the job id 734 * @param filterId the id of the filter that produced the report 735 * @return a frontier report 736 */ 737 public InMemoryFrontierReport getFrontierReport(long jobId, String filterId) { 738 739 ArgumentNotValid.checkNotNull(jobId, "jobId"); 740 ArgumentNotValid.checkNotNull(filterId, "filterId"); 741 742 InMemoryFrontierReport report = new InMemoryFrontierReport(Long.toString(jobId)); 743 744 Connection c = HarvestDBConnection.get(); 745 PreparedStatement stm = null; 746 try { 747 stm = c.prepareStatement("SELECT " + FR_COLUMN.getColumnsInOrder() + " FROM frontierReportMonitor" 748 + " WHERE jobId=? AND filterId=?"); 749 stm.setLong(1, jobId); 750 stm.setString(2, filterId); 751 752 ResultSet rs = stm.executeQuery(); 753 754 // Process first line to get report timestamp 755 if (rs.next()) { 756 report.setTimestamp(rs.getTimestamp(FR_COLUMN.tstamp.rank()).getTime()); 757 report.addLine(getLine(rs)); 758 759 while (rs.next()) { 760 report.addLine(getLine(rs)); 761 } 762 } 763 764 } catch (SQLException e) { 765 String message = "SQL error fetching report for job ID " + jobId + " and filterId " + filterId + "\n" 766 + ExceptionUtils.getSQLExceptionCause(e); 767 log.warn(message, e); 768 } finally { 769 DBUtils.closeStatementIfOpen(stm); 770 HarvestDBConnection.release(c); 771 } 772 773 return report; 774 } 775 776 /** 777 * Deletes all frontier report data pertaining to the given job id from the persistent storage. 778 * 779 * @param jobId the job id 780 * @return the update count 781 */ 782 public int deleteFrontierReports(long jobId) { 783 ArgumentNotValid.checkNotNull(jobId, "jobId"); 784 785 Connection c = HarvestDBConnection.get(); 786 PreparedStatement stm = null; 787 try { 788 c.setAutoCommit(false); 789 790 stm = c.prepareStatement("DELETE FROM frontierReportMonitor WHERE jobId=?"); 791 stm.setLong(1, jobId); 792 793 int delCount = stm.executeUpdate(); 794 795 c.commit(); 796 797 return delCount; 798 } catch (SQLException e) { 799 String message = "SQL error deleting report lines for job ID " + jobId + "\n" 800 + ExceptionUtils.getSQLExceptionCause(e); 801 log.warn(message, e); 802 return 0; 803 } finally { 804 DBUtils.closeStatementIfOpen(stm); 805 DBUtils.rollbackIfNeeded(c, "deleteFrontierReports", jobId); 806 HarvestDBConnection.release(c); 807 } 808 } 809 810 /** 811 * Get a frontierReportLine from the resultSet. 812 * 813 * @param rs the resultset with data from table frontierReportMonitor 814 * @return a frontierReportLine from the resultSet. 815 * @throws SQLException If unable to get data from resultSet 816 */ 817 private FrontierReportLine getLine(ResultSet rs) throws SQLException { 818 FrontierReportLine line = new FrontierReportLine(); 819 820 line.setAverageCost(rs.getDouble(FR_COLUMN.averageCost.rank())); 821 line.setCurrentSize(rs.getLong(FR_COLUMN.currentSize.rank())); 822 line.setDomainName(rs.getString(FR_COLUMN.domainName.rank())); 823 line.setErrorCount(rs.getLong(FR_COLUMN.errorCount.rank())); 824 line.setLastCost(rs.getDouble(FR_COLUMN.lastCost.rank())); 825 line.setLastDequeueTime(rs.getString(FR_COLUMN.lastDequeueTime.rank())); 826 line.setLastPeekUri(rs.getString(FR_COLUMN.lastPeekUri.rank())); 827 line.setLastQueuedUri(rs.getString(FR_COLUMN.lastQueuedUri.rank())); 828 line.setSessionBalance(rs.getLong(FR_COLUMN.sessionBalance.rank())); 829 line.setTotalBudget(rs.getLong(FR_COLUMN.totalBudget.rank())); 830 line.setTotalEnqueues(rs.getLong(FR_COLUMN.totalEnqueues.rank())); 831 line.setTotalSpend(rs.getLong(FR_COLUMN.totalSpend.rank())); 832 line.setWakeTime(rs.getString(FR_COLUMN.wakeTime.rank())); 833 834 return line; 835 } 836 837 /** 838 * Get a list of StartedJobInfo objects from a resultset of entries from runningJobsHistory table. 839 * 840 * @param rs a resultset with entries from table runningJobsHistory. 841 * @return a list of StartedJobInfo objects from the resultset 842 * @throws SQLException If any problems reading data from the resultset 843 */ 844 private List<StartedJobInfo> listFromResultSet(ResultSet rs) throws SQLException { 845 List<StartedJobInfo> list = new LinkedList<StartedJobInfo>(); 846 while (rs.next()) { 847 StartedJobInfo sji = new StartedJobInfo(rs.getString(HM_COLUMN.harvestName.rank()), 848 rs.getLong(HM_COLUMN.jobId.rank())); 849 sji.setElapsedSeconds(rs.getLong(HM_COLUMN.elapsedSeconds.rank())); 850 sji.setHostUrl(rs.getString(HM_COLUMN.hostUrl.rank())); 851 sji.setProgress(rs.getDouble(HM_COLUMN.progress.rank())); 852 sji.setQueuedFilesCount(rs.getLong(HM_COLUMN.queuedFilesCount.rank())); 853 sji.setTotalQueuesCount(rs.getLong(HM_COLUMN.totalQueuesCount.rank())); 854 sji.setActiveQueuesCount(rs.getLong(HM_COLUMN.activeQueuesCount.rank())); 855 sji.setExhaustedQueuesCount(rs.getLong(HM_COLUMN.exhaustedQueuesCount.rank())); 856 sji.setAlertsCount(rs.getLong(HM_COLUMN.alertsCount.rank())); 857 sji.setDownloadedFilesCount(rs.getLong(HM_COLUMN.downloadedFilesCount.rank())); 858 sji.setCurrentProcessedKBPerSec(rs.getLong(HM_COLUMN.currentProcessedKBPerSec.rank())); 859 sji.setProcessedKBPerSec(rs.getLong(HM_COLUMN.processedKBPerSec.rank())); 860 sji.setCurrentProcessedDocsPerSec(rs.getDouble(HM_COLUMN.currentProcessedDocsPerSec.rank())); 861 sji.setProcessedDocsPerSec(rs.getDouble(HM_COLUMN.processedDocsPerSec.rank())); 862 sji.setActiveToeCount(rs.getInt(HM_COLUMN.activeToeCount.rank())); 863 sji.setStatus(CrawlStatus.values()[rs.getInt(HM_COLUMN.status.rank())]); 864 sji.setTimestamp(new Date(rs.getTimestamp(HM_COLUMN.tstamp.rank()).getTime())); 865 866 list.add(sji); 867 } 868 return list; 869 } 870 871}