001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.datamodel; 024 025import java.sql.Connection; 026import java.sql.PreparedStatement; 027import java.sql.ResultSet; 028import java.sql.SQLException; 029import java.sql.Statement; 030import java.sql.Timestamp; 031import java.util.ArrayList; 032import java.util.Collections; 033import java.util.Date; 034import java.util.HashMap; 035import java.util.LinkedList; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.TreeMap; 040import java.util.TreeSet; 041 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import dk.netarkivet.common.exceptions.ArgumentNotValid; 046import dk.netarkivet.common.exceptions.IOFailure; 047import dk.netarkivet.common.exceptions.UnknownID; 048import dk.netarkivet.common.utils.DBUtils; 049import dk.netarkivet.common.utils.ExceptionUtils; 050import dk.netarkivet.common.utils.Settings; 051import dk.netarkivet.harvester.HarvesterSettings; 052import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage.CrawlStatus; 053import dk.netarkivet.harvester.harvesting.frontier.FrontierReportFilter; 054import dk.netarkivet.harvester.harvesting.frontier.FrontierReportLine; 055import dk.netarkivet.harvester.harvesting.frontier.InMemoryFrontierReport; 056import dk.netarkivet.harvester.harvesting.monitor.StartedJobInfo; 057 058/** 059 * Class implementing the persistence of running job infos. 060 */ 061public class RunningJobsInfoDBDAO extends RunningJobsInfoDAO { 062 063 /** The logger. */ 064 private static final Logger log = LoggerFactory.getLogger(RunningJobsInfoDBDAO.class); 065 066 /** Max length of urls stored in tables. */ 067 private static final int MAX_URL_LENGTH = 1000; 068 069 /** 070 * Defines the order of columns in the runningJobsMonitor table. Used in SQL queries. 071 */ 072 private static enum HM_COLUMN { 073 jobId, harvestName, elapsedSeconds, hostUrl, progress, queuedFilesCount, totalQueuesCount, activeQueuesCount, retiredQueuesCount, exhaustedQueuesCount, alertsCount, downloadedFilesCount, currentProcessedKBPerSec, processedKBPerSec, currentProcessedDocsPerSec, processedDocsPerSec, activeToeCount, status, tstamp; 074 075 /** 076 * Returns the rank in an SQL query (ordinal + 1). 077 * 078 * @return ordinal() + 1 079 */ 080 int rank() { 081 return ordinal() + 1; 082 } 083 084 /** 085 * Returns the SQL substring that lists columns according to their ordinal. 086 * 087 * @return the SQL substring that lists columns in proper order. 088 */ 089 static String getColumnsInOrder() { 090 StringBuffer columns = new StringBuffer(); 091 for (HM_COLUMN c : values()) { 092 columns.append(c.name() + ", "); 093 } 094 return columns.substring(0, columns.lastIndexOf(",")); 095 } 096 } 097 098 /** 099 * Date of last history record per job. 100 */ 101 private static Map<Long, Long> lastSampleDateByJobId = new HashMap<Long, Long>(); 102 103 /** 104 * Rate in milliseconds at which history records should be sampled for a running job. 105 */ 106 private static final long HISTORY_SAMPLE_RATE = 1000 * Settings 107 .getLong(HarvesterSettings.HARVEST_MONITOR_HISTORY_SAMPLE_RATE); 108 109 /** 110 * The constructor of RunningJobsInfoDBDAO. Attempts to update/install the necessary database tables, if they need 111 * to be updated. 112 */ 113 public RunningJobsInfoDBDAO() { 114 Connection connection = HarvestDBConnection.get(); 115 try { 116 /** 117 * Update if necessary the current version of the tables 'runningJobsHistory', 'runningJobsMonitor' and 118 * 'frontierReportMonitor'. 119 */ 120 HarvesterDatabaseTables.checkVersion(connection, HarvesterDatabaseTables.RUNNINGJOBSHISTORY); 121 HarvesterDatabaseTables.checkVersion(connection, HarvesterDatabaseTables.RUNNINGJOBSMONITOR); 122 HarvesterDatabaseTables.checkVersion(connection, HarvesterDatabaseTables.FRONTIERREPORTMONITOR); 123 } finally { 124 HarvestDBConnection.release(connection); 125 } 126 } 127 128 /** 129 * Stores a {@link StartedJobInfo} record to the persistent storage. The record is stored in the monitor table, and 130 * if the elapsed time since the last history sample is equal or superior to the history sample rate, also to the 131 * history table. 132 * 133 * @param startedJobInfo the record to store. 134 */ 135 @Override 136 public synchronized void store(StartedJobInfo startedJobInfo) { 137 ArgumentNotValid.checkNotNull(startedJobInfo, "StartedJobInfo startedJobInfo"); 138 139 Connection c = HarvestDBConnection.get(); 140 141 try { 142 PreparedStatement stm = null; 143 144 // First is there a record in the monitor table? 145 boolean update = false; 146 try { 147 stm = c.prepareStatement("SELECT jobId FROM runningJobsMonitor WHERE jobId=? AND harvestName=?"); 148 stm.setLong(1, startedJobInfo.getJobId()); 149 stm.setString(2, startedJobInfo.getHarvestName()); 150 151 // One row expected, as per PK definition 152 update = stm.executeQuery().next(); 153 154 } catch (SQLException e) { 155 String message = "SQL error checking running jobs monitor table" + "\n" 156 + ExceptionUtils.getSQLExceptionCause(e); 157 log.warn(message, e); 158 throw new IOFailure(message, e); 159 } 160 161 try { 162 // Update or insert latest progress information for this job 163 c.setAutoCommit(false); 164 165 StringBuffer sql = new StringBuffer(); 166 167 if (update) { 168 sql.append("UPDATE runningJobsMonitor SET "); 169 170 StringBuffer columns = new StringBuffer(); 171 // FIXME Seriously, construct an identical SQL string every time and use an enum...?! 172 for (HM_COLUMN setCol : HM_COLUMN.values()) { 173 columns.append(setCol.name() + "=?, "); 174 } 175 sql.append(columns.substring(0, columns.lastIndexOf(","))); 176 sql.append(" WHERE jobId=? AND harvestName=?"); 177 } else { 178 sql.append("INSERT INTO runningJobsMonitor ("); 179 sql.append(HM_COLUMN.getColumnsInOrder()); 180 sql.append(") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); 181 } 182 183 stm = c.prepareStatement(sql.toString()); 184 stm.setLong(HM_COLUMN.jobId.rank(), startedJobInfo.getJobId()); 185 stm.setString(HM_COLUMN.harvestName.rank(), startedJobInfo.getHarvestName()); 186 stm.setLong(HM_COLUMN.elapsedSeconds.rank(), startedJobInfo.getElapsedSeconds()); 187 stm.setString(HM_COLUMN.hostUrl.rank(), startedJobInfo.getHostUrl()); 188 stm.setDouble(HM_COLUMN.progress.rank(), startedJobInfo.getProgress()); 189 stm.setLong(HM_COLUMN.queuedFilesCount.rank(), startedJobInfo.getQueuedFilesCount()); 190 stm.setLong(HM_COLUMN.totalQueuesCount.rank(), startedJobInfo.getTotalQueuesCount()); 191 stm.setLong(HM_COLUMN.activeQueuesCount.rank(), startedJobInfo.getActiveQueuesCount()); 192 stm.setLong(HM_COLUMN.retiredQueuesCount.rank(), startedJobInfo.getRetiredQueuesCount()); 193 stm.setLong(HM_COLUMN.exhaustedQueuesCount.rank(), startedJobInfo.getExhaustedQueuesCount()); 194 stm.setLong(HM_COLUMN.alertsCount.rank(), startedJobInfo.getAlertsCount()); 195 stm.setLong(HM_COLUMN.downloadedFilesCount.rank(), startedJobInfo.getDownloadedFilesCount()); 196 stm.setLong(HM_COLUMN.currentProcessedKBPerSec.rank(), startedJobInfo.getCurrentProcessedKBPerSec()); 197 stm.setLong(HM_COLUMN.processedKBPerSec.rank(), startedJobInfo.getProcessedKBPerSec()); 198 stm.setDouble(HM_COLUMN.currentProcessedDocsPerSec.rank(), 199 startedJobInfo.getCurrentProcessedDocsPerSec()); 200 stm.setDouble(HM_COLUMN.processedDocsPerSec.rank(), startedJobInfo.getProcessedDocsPerSec()); 201 stm.setInt(HM_COLUMN.activeToeCount.rank(), startedJobInfo.getActiveToeCount()); 202 stm.setInt(HM_COLUMN.status.rank(), startedJobInfo.getStatus().ordinal()); 203 stm.setTimestamp(HM_COLUMN.tstamp.rank(), new Timestamp(startedJobInfo.getTimestamp().getTime())); 204 205 if (update) { 206 stm.setLong(HM_COLUMN.values().length + 1, startedJobInfo.getJobId()); 207 stm.setString(HM_COLUMN.values().length + 2, startedJobInfo.getHarvestName()); 208 } 209 210 stm.executeUpdate(); 211 212 c.commit(); 213 } catch (SQLException e) { 214 String message = "SQL error storing started job info " + startedJobInfo + " in monitor table" + "\n" 215 + ExceptionUtils.getSQLExceptionCause(e); 216 log.warn(message, e); 217 throw new IOFailure(message, e); 218 } finally { 219 DBUtils.closeStatementIfOpen(stm); 220 DBUtils.rollbackIfNeeded(c, "store started job info", startedJobInfo); 221 } 222 223 // Should we store an history record? 224 Long lastHistoryStore = lastSampleDateByJobId.get(startedJobInfo.getJobId()); 225 226 long time = System.currentTimeMillis(); 227 boolean shouldSample = lastHistoryStore == null || time >= lastHistoryStore + HISTORY_SAMPLE_RATE; 228 229 if (!shouldSample) { 230 return; // we're done 231 } 232 log.debug("Adding history Record for job {} to runningJobsHistory table", startedJobInfo.getJobId()); 233 try { 234 c.setAutoCommit(false); 235 236 stm = c.prepareStatement("INSERT INTO runningJobsHistory (" + HM_COLUMN.getColumnsInOrder() 237 + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); 238 stm.setLong(HM_COLUMN.jobId.rank(), startedJobInfo.getJobId()); 239 stm.setString(HM_COLUMN.harvestName.rank(), startedJobInfo.getHarvestName()); 240 stm.setLong(HM_COLUMN.elapsedSeconds.rank(), startedJobInfo.getElapsedSeconds()); 241 stm.setString(HM_COLUMN.hostUrl.rank(), startedJobInfo.getHostUrl()); 242 stm.setDouble(HM_COLUMN.progress.rank(), startedJobInfo.getProgress()); 243 stm.setLong(HM_COLUMN.queuedFilesCount.rank(), startedJobInfo.getQueuedFilesCount()); 244 stm.setLong(HM_COLUMN.totalQueuesCount.rank(), startedJobInfo.getTotalQueuesCount()); 245 stm.setLong(HM_COLUMN.activeQueuesCount.rank(), startedJobInfo.getActiveQueuesCount()); 246 stm.setLong(HM_COLUMN.retiredQueuesCount.rank(), startedJobInfo.getRetiredQueuesCount()); 247 stm.setLong(HM_COLUMN.exhaustedQueuesCount.rank(), startedJobInfo.getExhaustedQueuesCount()); 248 stm.setLong(HM_COLUMN.alertsCount.rank(), startedJobInfo.getAlertsCount()); 249 stm.setLong(HM_COLUMN.downloadedFilesCount.rank(), startedJobInfo.getDownloadedFilesCount()); 250 stm.setLong(HM_COLUMN.currentProcessedKBPerSec.rank(), startedJobInfo.getCurrentProcessedKBPerSec()); 251 stm.setLong(HM_COLUMN.processedKBPerSec.rank(), startedJobInfo.getProcessedKBPerSec()); 252 stm.setDouble(HM_COLUMN.currentProcessedDocsPerSec.rank(), 253 startedJobInfo.getCurrentProcessedDocsPerSec()); 254 stm.setDouble(HM_COLUMN.processedDocsPerSec.rank(), startedJobInfo.getProcessedDocsPerSec()); 255 stm.setInt(HM_COLUMN.activeToeCount.rank(), startedJobInfo.getActiveToeCount()); 256 stm.setInt(HM_COLUMN.status.rank(), startedJobInfo.getStatus().ordinal()); 257 stm.setTimestamp(HM_COLUMN.tstamp.rank(), new Timestamp(startedJobInfo.getTimestamp().getTime())); 258 259 stm.executeUpdate(); 260 261 c.commit(); 262 } catch (SQLException e) { 263 String message = "SQL error storing started job info " + startedJobInfo + " in history table" + "\n" 264 + ExceptionUtils.getSQLExceptionCause(e); 265 log.warn(message, e); 266 throw new IOFailure(message, e); 267 } finally { 268 DBUtils.closeStatementIfOpen(stm); 269 DBUtils.rollbackIfNeeded(c, "store started job info", startedJobInfo); 270 } 271 272 // Remember last sampling date 273 lastSampleDateByJobId.put(startedJobInfo.getJobId(), time); 274 } finally { 275 HarvestDBConnection.release(c); 276 } 277 } 278 279 /** 280 * Returns an array of all progress records chronologically sorted for the given job ID. 281 * 282 * @param jobId the job id. 283 * @return an array of all progress records chronologically sorted for the given job ID. 284 */ 285 @Override 286 public StartedJobInfo[] getFullJobHistory(long jobId) { 287 Connection c = HarvestDBConnection.get(); 288 PreparedStatement stm = null; 289 try { 290 stm = c.prepareStatement("SELECT " + HM_COLUMN.getColumnsInOrder() + " FROM runningJobsHistory" 291 + " WHERE jobId=?" + " ORDER BY elapsedSeconds ASC"); 292 stm.setLong(1, jobId); 293 294 ResultSet rs = stm.executeQuery(); 295 List<StartedJobInfo> infosForJob = listFromResultSet(rs); 296 297 return (StartedJobInfo[]) infosForJob.toArray(new StartedJobInfo[infosForJob.size()]); 298 299 } catch (SQLException e) { 300 String message = "SQL error querying runningJobsHistory for job ID " + jobId + " from database" + "\n" 301 + ExceptionUtils.getSQLExceptionCause(e); 302 log.warn(message, e); 303 throw new IOFailure(message, e); 304 } finally { 305 DBUtils.closeStatementIfOpen(stm); 306 HarvestDBConnection.release(c); 307 } 308 } 309 310 /** 311 * Returns the most recent record for every job, partitioned by harvest definition name. 312 * 313 * @return the full listing of started job information, partitioned by harvest definition name. 314 */ 315 @Override 316 public Map<String, List<StartedJobInfo>> getMostRecentByHarvestName() { 317 Connection c = HarvestDBConnection.get(); 318 319 Map<String, List<StartedJobInfo>> infoMap = new TreeMap<String, List<StartedJobInfo>>(); 320 Statement stm = null; 321 try { 322 stm = c.createStatement(); 323 ResultSet rs = stm.executeQuery("SELECT " + HM_COLUMN.getColumnsInOrder() + " FROM runningJobsMonitor"); 324 325 while (rs.next()) { 326 long jobId = rs.getLong(HM_COLUMN.jobId.rank()); 327 String harvestName = rs.getString(HM_COLUMN.harvestName.rank()); 328 329 List<StartedJobInfo> infosForHarvest = infoMap.get(harvestName); 330 if (infosForHarvest == null) { 331 infosForHarvest = new LinkedList<StartedJobInfo>(); 332 infoMap.put(harvestName, infosForHarvest); 333 } 334 335 StartedJobInfo sji = new StartedJobInfo(harvestName, jobId); 336 337 sji.setElapsedSeconds(rs.getLong(HM_COLUMN.elapsedSeconds.rank())); 338 sji.setHostUrl(rs.getString(HM_COLUMN.hostUrl.rank())); 339 sji.setProgress(rs.getDouble(HM_COLUMN.progress.rank())); 340 sji.setQueuedFilesCount(rs.getLong(HM_COLUMN.queuedFilesCount.rank())); 341 sji.setTotalQueuesCount(rs.getLong(HM_COLUMN.totalQueuesCount.rank())); 342 sji.setActiveQueuesCount(rs.getLong(HM_COLUMN.activeQueuesCount.rank())); 343 sji.setRetiredQueuesCount(rs.getLong(HM_COLUMN.retiredQueuesCount.rank())); 344 sji.setExhaustedQueuesCount(rs.getLong(HM_COLUMN.exhaustedQueuesCount.rank())); 345 sji.setAlertsCount(rs.getLong(HM_COLUMN.alertsCount.rank())); 346 sji.setDownloadedFilesCount(rs.getLong(HM_COLUMN.downloadedFilesCount.rank())); 347 sji.setCurrentProcessedKBPerSec(rs.getLong(HM_COLUMN.currentProcessedKBPerSec.rank())); 348 sji.setProcessedKBPerSec(rs.getLong(HM_COLUMN.processedKBPerSec.rank())); 349 sji.setCurrentProcessedDocsPerSec(rs.getDouble(HM_COLUMN.currentProcessedDocsPerSec.rank())); 350 sji.setProcessedDocsPerSec(rs.getDouble(HM_COLUMN.processedDocsPerSec.rank())); 351 sji.setActiveToeCount(rs.getInt(HM_COLUMN.activeToeCount.rank())); 352 sji.setStatus(CrawlStatus.values()[rs.getInt(HM_COLUMN.status.rank())]); 353 sji.setTimestamp(new Date(rs.getTimestamp(HM_COLUMN.tstamp.rank()).getTime())); 354 355 infosForHarvest.add(sji); 356 } 357 358 return infoMap; 359 360 } catch (SQLException e) { 361 String message = "SQL error querying runningJobsMonitor" + "\n" + ExceptionUtils.getSQLExceptionCause(e); 362 log.warn(message, e); 363 throw new IOFailure(message, e); 364 } finally { 365 DBUtils.closeStatementIfOpen(stm); 366 HarvestDBConnection.release(c); 367 } 368 369 } 370 371 /** 372 * Returns the ids of jobs for which history records exist as an immutable set. 373 * 374 * @return the ids of jobs for which history records exist. 375 */ 376 @Override 377 public Set<Long> getHistoryRecordIds() { 378 Connection c = HarvestDBConnection.get(); 379 Set<Long> jobIds = new TreeSet<Long>(); 380 Statement stm = null; 381 try { 382 stm = c.createStatement(); 383 ResultSet rs = stm.executeQuery("SELECT DISTINCT " + HM_COLUMN.jobId + " FROM runningJobsMonitor"); 384 385 while (rs.next()) { 386 jobIds.add(rs.getLong(HM_COLUMN.jobId.name())); 387 } 388 stm.close(); 389 390 stm = c.createStatement(); 391 rs = stm.executeQuery("SELECT DISTINCT " + HM_COLUMN.jobId + " FROM runningJobsHistory"); 392 393 while (rs.next()) { 394 jobIds.add(rs.getLong(HM_COLUMN.jobId.name())); 395 } 396 stm.close(); 397 398 stm = c.createStatement(); 399 rs = stm.executeQuery("SELECT DISTINCT " + HM_COLUMN.jobId + " FROM frontierReportMonitor"); 400 401 while (rs.next()) { 402 jobIds.add(rs.getLong(HM_COLUMN.jobId.name())); 403 } 404 405 return Collections.unmodifiableSet(jobIds); 406 } catch (SQLException e) { 407 String message = "SQL error querying running jobs history" + "\n" + ExceptionUtils.getSQLExceptionCause(e); 408 log.warn(message, e); 409 throw new IOFailure(message, e); 410 } finally { 411 DBUtils.closeStatementIfOpen(stm); 412 HarvestDBConnection.release(c); 413 } 414 } 415 416 /** 417 * Returns an array of chronologically sorted progress records for the given job ID, starting at a given crawl time, 418 * and limited to a given number of record. 419 * 420 * @param jobId the job id. 421 * @param startTime the crawl time (in seconds) to begin. 422 * @param limit the maximum number of records to fetch. 423 * @return an array of chronologically sorted progress records for the given job ID, starting at a given crawl time, 424 * and limited to a given number of record. 425 */ 426 @Override 427 public StartedJobInfo[] getMostRecentByJobId(long jobId, long startTime, int limit) { 428 429 ArgumentNotValid.checkNotNull(jobId, "jobId"); 430 ArgumentNotValid.checkNotNull(startTime, "startTime"); 431 ArgumentNotValid.checkNotNull(limit, "limit"); 432 433 Connection c = HarvestDBConnection.get(); 434 PreparedStatement stm = null; 435 try { 436 stm = c.prepareStatement("SELECT " + HM_COLUMN.getColumnsInOrder() + " FROM runningJobsHistory" 437 + " WHERE jobId=? AND elapsedSeconds >= ?" + " ORDER BY elapsedSeconds DESC" + " " 438 + DBSpecifics.getInstance().getOrderByLimitAndOffsetSubClause(limit, 0)); 439 stm.setLong(1, jobId); 440 stm.setLong(2, startTime); 441 442 ResultSet rs = stm.executeQuery(); 443 List<StartedJobInfo> infosForJob = listFromResultSet(rs); 444 445 return (StartedJobInfo[]) infosForJob.toArray(new StartedJobInfo[infosForJob.size()]); 446 447 } catch (SQLException e) { 448 String message = "SQL error querying runningJobsHistory for job ID " + jobId + " from database" + "\n" 449 + ExceptionUtils.getSQLExceptionCause(e); 450 log.warn(message, e); 451 throw new IOFailure(message, e); 452 } finally { 453 DBUtils.closeStatementIfOpen(stm); 454 HarvestDBConnection.release(c); 455 } 456 } 457 458 /** 459 * Returns the most recent progress record for the given job ID. 460 * 461 * @param jobId the job id. 462 * @return the most recent progress record for the given job ID. 463 */ 464 @Override 465 public StartedJobInfo getMostRecentByJobId(long jobId) { 466 Connection c = HarvestDBConnection.get(); 467 Statement stm = null; 468 try { 469 stm = c.createStatement(); 470 ResultSet rs = stm.executeQuery("SELECT " + HM_COLUMN.getColumnsInOrder() + " FROM runningJobsMonitor" 471 + " WHERE jobId=" + jobId); 472 473 if (rs.next()) { 474 String harvestName = rs.getString(HM_COLUMN.harvestName.rank()); 475 StartedJobInfo sji = new StartedJobInfo(harvestName, jobId); 476 477 sji.setElapsedSeconds(rs.getLong(HM_COLUMN.elapsedSeconds.rank())); 478 sji.setHostUrl(rs.getString(HM_COLUMN.hostUrl.rank())); 479 sji.setProgress(rs.getDouble(HM_COLUMN.progress.rank())); 480 sji.setQueuedFilesCount(rs.getLong(HM_COLUMN.queuedFilesCount.rank())); 481 sji.setTotalQueuesCount(rs.getLong(HM_COLUMN.totalQueuesCount.rank())); 482 sji.setActiveQueuesCount(rs.getLong(HM_COLUMN.activeQueuesCount.rank())); 483 sji.setRetiredQueuesCount(rs.getLong(HM_COLUMN.retiredQueuesCount.rank())); 484 sji.setExhaustedQueuesCount(rs.getLong(HM_COLUMN.exhaustedQueuesCount.rank())); 485 sji.setAlertsCount(rs.getLong(HM_COLUMN.alertsCount.rank())); 486 sji.setDownloadedFilesCount(rs.getLong(HM_COLUMN.downloadedFilesCount.rank())); 487 sji.setCurrentProcessedKBPerSec(rs.getLong(HM_COLUMN.currentProcessedKBPerSec.rank())); 488 sji.setProcessedKBPerSec(rs.getLong(HM_COLUMN.processedKBPerSec.rank())); 489 sji.setCurrentProcessedDocsPerSec(rs.getDouble(HM_COLUMN.currentProcessedDocsPerSec.rank())); 490 sji.setProcessedDocsPerSec(rs.getDouble(HM_COLUMN.processedDocsPerSec.rank())); 491 sji.setActiveToeCount(rs.getInt(HM_COLUMN.activeToeCount.rank())); 492 sji.setStatus(CrawlStatus.values()[rs.getInt(HM_COLUMN.status.rank())]); 493 sji.setTimestamp(new Date(rs.getTimestamp(HM_COLUMN.tstamp.rank()).getTime())); 494 log.debug("getMostRecentByJobId for {}:{}", jobId, sji); 495 return sji; 496 } 497 498 } catch (SQLException e) { 499 String message = "SQL error querying runningJobsMonitor" + "\n" + ExceptionUtils.getSQLExceptionCause(e); 500 log.warn(message, e); 501 throw new IOFailure(message, e); 502 } finally { 503 DBUtils.closeStatementIfOpen(stm); 504 HarvestDBConnection.release(c); 505 } 506 507 throw new UnknownID("No running job with ID " + jobId); 508 } 509 510 /** 511 * Removes all records pertaining to the given job ID from the persistent storage. 512 * 513 * @param jobId the job id. 514 * @return the number of deleted records. 515 */ 516 @Override 517 public int removeInfoForJob(long jobId) { 518 ArgumentNotValid.checkNotNull(jobId, "jobId"); 519 520 Connection c = HarvestDBConnection.get(); 521 PreparedStatement stm = null; 522 523 int deleteCount = 0; 524 try { 525 // Delete from monitor table 526 c.setAutoCommit(false); 527 stm = c.prepareStatement("DELETE FROM runningJobsMonitor WHERE jobId=?"); 528 stm.setLong(1, jobId); 529 deleteCount = stm.executeUpdate(); 530 c.commit(); 531 stm.close(); 532 // Delete from history table 533 c.setAutoCommit(false); 534 stm = c.prepareStatement("DELETE FROM runningJobsHistory WHERE jobId=?"); 535 stm.setLong(1, jobId); 536 deleteCount += stm.executeUpdate(); 537 c.commit(); 538 } catch (SQLException e) { 539 String message = "SQL error deleting from history records for job ID " + jobId + "\n" 540 + ExceptionUtils.getSQLExceptionCause(e); 541 log.warn(message, e); 542 throw new IOFailure(message, e); 543 } finally { 544 DBUtils.closeStatementIfOpen(stm); 545 DBUtils.rollbackIfNeeded(c, "removeInfoForJob", jobId); 546 HarvestDBConnection.release(c); 547 } 548 549 return deleteCount; 550 551 } 552 553 /** 554 * Enum class containing all fields in the frontierReportMonitor table. 555 */ 556 private static enum FR_COLUMN { 557 jobId, filterId, tstamp, domainName, currentSize, totalEnqueues, sessionBalance, lastCost, averageCost, // See 558 // NAS-2168 559 // Often 560 // contains 561 // the 562 // illegal 563 // value 564 // 4.9E-324 565 lastDequeueTime, wakeTime, totalSpend, totalBudget, errorCount, lastPeekUri, lastQueuedUri; 566 567 /** 568 * @return the rank of a member of the enum class. 569 */ 570 int rank() { 571 return ordinal() + 1; 572 } 573 574 /** 575 * Returns the SQL substring that lists columns according to their ordinal. 576 * 577 * @return the SQL substring that lists columns in proper order. 578 */ 579 static String getColumnsInOrder() { 580 String columns = ""; 581 for (FR_COLUMN c : values()) { 582 columns += c.name() + ", "; 583 } 584 return columns.substring(0, columns.lastIndexOf(",")); 585 } 586 } 587 588 ; 589 590 /** 591 * Store frontier report data to the persistent storage. 592 * 593 * @param report the report to store 594 * @param filterId the id of the filter that produced the report 595 * @param jobId The ID of the job responsible for this report 596 * @return the update count 597 */ 598 public int storeFrontierReport(String filterId, InMemoryFrontierReport report, Long jobId) { 599 ArgumentNotValid.checkNotNull(report, "report"); 600 ArgumentNotValid.checkNotNull(jobId, "jobId"); 601 602 Connection c = HarvestDBConnection.get(); 603 PreparedStatement stm = null; 604 try { 605 606 // First drop existing rows 607 try { 608 c.setAutoCommit(false); 609 610 stm = c.prepareStatement("DELETE FROM frontierReportMonitor WHERE jobId=? AND filterId=?"); 611 stm.setLong(1, jobId); 612 stm.setString(2, filterId); 613 614 stm.executeUpdate(); 615 616 c.commit(); 617 } catch (SQLException e) { 618 String message = "SQL error dropping records for job ID " + jobId + " and filterId " + filterId + "\n" 619 + ExceptionUtils.getSQLExceptionCause(e); 620 log.warn(message, e); 621 return 0; 622 } finally { 623 DBUtils.closeStatementIfOpen(stm); 624 DBUtils.rollbackIfNeeded(c, "storeFrontierReport delete", jobId); 625 } 626 627 // Now batch insert report lines 628 try { 629 c.setAutoCommit(false); 630 631 stm = c.prepareStatement("INSERT INTO frontierReportMonitor(" + FR_COLUMN.getColumnsInOrder() 632 + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); 633 634 for (FrontierReportLine frl : report.getLines()) { 635 stm.setLong(FR_COLUMN.jobId.rank(), jobId); 636 stm.setString(FR_COLUMN.filterId.rank(), filterId); 637 stm.setTimestamp(FR_COLUMN.tstamp.rank(), new Timestamp(report.getTimestamp())); 638 stm.setString(FR_COLUMN.domainName.rank(), frl.getDomainName()); 639 stm.setLong(FR_COLUMN.currentSize.rank(), frl.getCurrentSize()); 640 stm.setLong(FR_COLUMN.totalEnqueues.rank(), frl.getTotalEnqueues()); 641 stm.setLong(FR_COLUMN.sessionBalance.rank(), frl.getSessionBalance()); 642 stm.setDouble(FR_COLUMN.lastCost.rank(), frl.getLastCost()); 643 stm.setDouble(FR_COLUMN.averageCost.rank(), 644 correctNumericIfIllegalAverageCost(frl.getAverageCost())); 645 stm.setString(FR_COLUMN.lastDequeueTime.rank(), frl.getLastDequeueTime()); 646 stm.setString(FR_COLUMN.wakeTime.rank(), frl.getWakeTime()); 647 stm.setLong(FR_COLUMN.totalSpend.rank(), frl.getTotalSpend()); 648 stm.setLong(FR_COLUMN.totalBudget.rank(), frl.getTotalBudget()); 649 stm.setLong(FR_COLUMN.errorCount.rank(), frl.getErrorCount()); 650 651 // URIs are to be truncated to 1000 characters 652 // (see SQL scripts) 653 DBUtils.setStringMaxLength(stm, FR_COLUMN.lastPeekUri.rank(), frl.getLastPeekUri(), MAX_URL_LENGTH, 654 frl, "lastPeekUri"); 655 DBUtils.setStringMaxLength(stm, FR_COLUMN.lastQueuedUri.rank(), frl.getLastQueuedUri(), 656 MAX_URL_LENGTH, frl, "lastQueuedUri"); 657 658 stm.addBatch(); 659 } 660 661 int[] updCounts = stm.executeBatch(); 662 int updCountTotal = 0; 663 for (int count : updCounts) { 664 updCountTotal += count; 665 } 666 667 c.commit(); 668 669 return updCountTotal; 670 } catch (SQLException e) { 671 String message = "SQL error writing records for job ID " + jobId + " and filterId " + filterId + "\n" 672 + ExceptionUtils.getSQLExceptionCause(e); 673 log.warn(message, e); 674 return 0; 675 } finally { 676 DBUtils.closeStatementIfOpen(stm); 677 DBUtils.rollbackIfNeeded(c, "storeFrontierReport insert", jobId); 678 } 679 680 } finally { 681 HarvestDBConnection.release(c); 682 } 683 } 684 685 /** 686 * Correct the given double if it is equal to 4.9E-324. Part of fix for NAS-2168 687 * 688 * @param value A given double 689 * @return 0.0 if value is 4.9E-324, otherwise the value as is 690 */ 691 private double correctNumericIfIllegalAverageCost(double value) { 692 if (value == 4.9E-324) { 693 log.warn("Found illegal double value '" + 4.9E-324 + "'. Changed it to 0.0"); 694 return 0.0; 695 } else { 696 return value; 697 } 698 } 699 700 /** 701 * Returns the list of the available frontier report types. 702 * 703 * @return the list of the available frontier report types. 704 * @see FrontierReportFilter#getFilterId() 705 */ 706 public String[] getFrontierReportFilterTypes() { 707 List<String> filterIds = new ArrayList<String>(); 708 709 Connection c = HarvestDBConnection.get(); 710 PreparedStatement stm = null; 711 try { 712 stm = c.prepareStatement("SELECT DISTINCT filterId FROM frontierReportMonitor"); 713 714 ResultSet rs = stm.executeQuery(); 715 while (rs.next()) { 716 filterIds.add(rs.getString(1)); 717 } 718 719 } catch (SQLException e) { 720 String message = "SQL error fetching filter IDs" + "\n" + ExceptionUtils.getSQLExceptionCause(e); 721 log.warn(message, e); 722 } finally { 723 DBUtils.closeStatementIfOpen(stm); 724 HarvestDBConnection.release(c); 725 } 726 727 return filterIds.toArray(new String[filterIds.size()]); 728 } 729 730 /** 731 * Retrieve a frontier report from a job id and a given filter class. 732 * 733 * @param jobId the job id 734 * @param filterId the id of the filter that produced the report 735 * @return a frontier report 736 */ 737 public InMemoryFrontierReport getFrontierReport(long jobId, String filterId) { 738 739 ArgumentNotValid.checkNotNull(jobId, "jobId"); 740 ArgumentNotValid.checkNotNull(filterId, "filterId"); 741 742 InMemoryFrontierReport report = new InMemoryFrontierReport(Long.toString(jobId)); 743 744 Connection c = HarvestDBConnection.get(); 745 PreparedStatement stm = null; 746 try { 747 stm = c.prepareStatement("SELECT " + FR_COLUMN.getColumnsInOrder() + " FROM frontierReportMonitor" 748 + " WHERE jobId=? AND filterId=?"); 749 stm.setLong(1, jobId); 750 stm.setString(2, filterId); 751 752 ResultSet rs = stm.executeQuery(); 753 754 // Process first line to get report timestamp 755 if (rs.next()) { 756 report.setTimestamp(rs.getTimestamp(FR_COLUMN.tstamp.rank()).getTime()); 757 report.addLine(getLine(rs)); 758 759 while (rs.next()) { 760 report.addLine(getLine(rs)); 761 } 762 } 763 764 } catch (SQLException e) { 765 String message = "SQL error fetching report for job ID " + jobId + " and filterId " + filterId + "\n" 766 + ExceptionUtils.getSQLExceptionCause(e); 767 log.warn(message, e); 768 } finally { 769 DBUtils.closeStatementIfOpen(stm); 770 HarvestDBConnection.release(c); 771 } 772 773 return report; 774 } 775 776 /** 777 * Retrieve a frontier report from a job id, with limited results and possibility to sort by totalenqueues DESC 778 * 779 * @param jobId the job id 780 * @param limit the limit of result to query 781 * @param filterId the id of the filter that produced the report 782 * @param sort if true, sort the results by totalenqueues DESC 783 * @return a frontier report 784 */ 785 public InMemoryFrontierReport getFrontierReport(long jobId, String filterId, int limit, boolean sort) { 786 787 ArgumentNotValid.checkNotNull(jobId, "jobId"); 788 ArgumentNotValid.checkNotNull(limit, "limit"); 789 ArgumentNotValid.checkNotNull(sort, "sort"); 790 ArgumentNotValid.checkNotNull(filterId, "filterId"); 791 792 InMemoryFrontierReport report = new InMemoryFrontierReport(Long.toString(jobId)); 793 794 Connection c = HarvestDBConnection.get(); 795 PreparedStatement stm = null; 796 try { 797 String sqlString = "SELECT " + FR_COLUMN.getColumnsInOrder() + " FROM frontierReportMonitor" 798 + " WHERE jobId=? AND filterId=? "; 799 if(sort) { 800 sqlString = sqlString + " ORDER BY totalenqueues DESC "; 801 } 802 if(limit > 0) { 803 sqlString = sqlString + " LIMIT ? "; 804 } 805 stm = c.prepareStatement(sqlString); 806 stm.setLong(1, jobId); 807 stm.setString(2, filterId); 808 if(limit > 0) { 809 stm.setInt(3, limit); 810 } 811 812 ResultSet rs = stm.executeQuery(); 813 814 // Process first line to get report timestamp 815 if (rs.next()) { 816 report.setTimestamp(rs.getTimestamp(FR_COLUMN.tstamp.rank()).getTime()); 817 report.addLine(getLine(rs)); 818 819 while (rs.next()) { 820 report.addLine(getLine(rs)); 821 } 822 } 823 824 } catch (SQLException e) { 825 String message = "SQL error fetching report for job ID " + jobId + " and limit " + limit + "\n" 826 + ExceptionUtils.getSQLExceptionCause(e); 827 log.warn(message, e); 828 } finally { 829 DBUtils.closeStatementIfOpen(stm); 830 HarvestDBConnection.release(c); 831 } 832 833 return report; 834 } 835 836 /** 837 * Retrieve a frontier report from a job id, with limited results and possibility to sort by totalenqueues DESC 838 * 839 * @param jobId the job id 840 * @param limit the limit of result to query 841 * @param sort if true, sort the results by totalenqueues DESC 842 * @return a frontier report 843 */ 844 public InMemoryFrontierReport getFrontierReport(long jobId, int limit, boolean sort) { 845 ArgumentNotValid.checkNotNull(jobId, "jobId"); 846 ArgumentNotValid.checkNotNull(limit, "limit"); 847 ArgumentNotValid.checkNotNull(sort, "sort"); 848 849 InMemoryFrontierReport report = new InMemoryFrontierReport(Long.toString(jobId)); 850 851 Connection c = HarvestDBConnection.get(); 852 PreparedStatement stm = null; 853 try { 854 String sqlString = "SELECT " + FR_COLUMN.getColumnsInOrder() + " FROM frontierReportMonitor" 855 + " WHERE jobId=? "; 856 if(sort) { 857 sqlString = sqlString + " ORDER BY totalenqueues DESC "; 858 } 859 if(limit > 0) { 860 sqlString = sqlString + " LIMIT ? "; 861 } 862 stm = c.prepareStatement(sqlString); 863 stm.setLong(1, jobId); 864 stm.setInt(2, limit); 865 866 ResultSet rs = stm.executeQuery(); 867 868 // Process first line to get report timestamp 869 if (rs.next()) { 870 report.setTimestamp(rs.getTimestamp(FR_COLUMN.tstamp.rank()).getTime()); 871 report.addLine(getLine(rs)); 872 873 while (rs.next()) { 874 report.addLine(getLine(rs)); 875 } 876 } 877 878 } catch (SQLException e) { 879 String message = "SQL error fetching report for job ID " + jobId + " and limit " + limit + "\n" 880 + ExceptionUtils.getSQLExceptionCause(e); 881 log.warn(message, e); 882 } finally { 883 DBUtils.closeStatementIfOpen(stm); 884 HarvestDBConnection.release(c); 885 } 886 887 return report; 888 } 889 890 /** 891 * Deletes all frontier report data pertaining to the given job id from the persistent storage. 892 * 893 * @param jobId the job id 894 * @return the update count 895 */ 896 public int deleteFrontierReports(long jobId) { 897 ArgumentNotValid.checkNotNull(jobId, "jobId"); 898 899 Connection c = HarvestDBConnection.get(); 900 PreparedStatement stm = null; 901 try { 902 c.setAutoCommit(false); 903 904 stm = c.prepareStatement("DELETE FROM frontierReportMonitor WHERE jobId=?"); 905 stm.setLong(1, jobId); 906 907 int delCount = stm.executeUpdate(); 908 909 c.commit(); 910 911 return delCount; 912 } catch (SQLException e) { 913 String message = "SQL error deleting report lines for job ID " + jobId + "\n" 914 + ExceptionUtils.getSQLExceptionCause(e); 915 log.warn(message, e); 916 return 0; 917 } finally { 918 DBUtils.closeStatementIfOpen(stm); 919 DBUtils.rollbackIfNeeded(c, "deleteFrontierReports", jobId); 920 HarvestDBConnection.release(c); 921 } 922 } 923 924 /** 925 * Get a frontierReportLine from the resultSet. 926 * 927 * @param rs the resultset with data from table frontierReportMonitor 928 * @return a frontierReportLine from the resultSet. 929 * @throws SQLException If unable to get data from resultSet 930 */ 931 private FrontierReportLine getLine(ResultSet rs) throws SQLException { 932 FrontierReportLine line = new FrontierReportLine(); 933 934 line.setAverageCost(rs.getDouble(FR_COLUMN.averageCost.rank())); 935 line.setCurrentSize(rs.getLong(FR_COLUMN.currentSize.rank())); 936 line.setDomainName(rs.getString(FR_COLUMN.domainName.rank())); 937 line.setErrorCount(rs.getLong(FR_COLUMN.errorCount.rank())); 938 line.setLastCost(rs.getDouble(FR_COLUMN.lastCost.rank())); 939 line.setLastDequeueTime(rs.getString(FR_COLUMN.lastDequeueTime.rank())); 940 line.setLastPeekUri(rs.getString(FR_COLUMN.lastPeekUri.rank())); 941 line.setLastQueuedUri(rs.getString(FR_COLUMN.lastQueuedUri.rank())); 942 line.setSessionBalance(rs.getLong(FR_COLUMN.sessionBalance.rank())); 943 line.setTotalBudget(rs.getLong(FR_COLUMN.totalBudget.rank())); 944 line.setTotalEnqueues(rs.getLong(FR_COLUMN.totalEnqueues.rank())); 945 line.setTotalSpend(rs.getLong(FR_COLUMN.totalSpend.rank())); 946 line.setWakeTime(rs.getString(FR_COLUMN.wakeTime.rank())); 947 948 return line; 949 } 950 951 /** 952 * Get a list of StartedJobInfo objects from a resultset of entries from runningJobsHistory table. 953 * 954 * @param rs a resultset with entries from table runningJobsHistory. 955 * @return a list of StartedJobInfo objects from the resultset 956 * @throws SQLException If any problems reading data from the resultset 957 */ 958 private List<StartedJobInfo> listFromResultSet(ResultSet rs) throws SQLException { 959 List<StartedJobInfo> list = new LinkedList<StartedJobInfo>(); 960 while (rs.next()) { 961 StartedJobInfo sji = new StartedJobInfo(rs.getString(HM_COLUMN.harvestName.rank()), 962 rs.getLong(HM_COLUMN.jobId.rank())); 963 sji.setElapsedSeconds(rs.getLong(HM_COLUMN.elapsedSeconds.rank())); 964 sji.setHostUrl(rs.getString(HM_COLUMN.hostUrl.rank())); 965 sji.setProgress(rs.getDouble(HM_COLUMN.progress.rank())); 966 sji.setQueuedFilesCount(rs.getLong(HM_COLUMN.queuedFilesCount.rank())); 967 sji.setTotalQueuesCount(rs.getLong(HM_COLUMN.totalQueuesCount.rank())); 968 sji.setActiveQueuesCount(rs.getLong(HM_COLUMN.activeQueuesCount.rank())); 969 sji.setExhaustedQueuesCount(rs.getLong(HM_COLUMN.exhaustedQueuesCount.rank())); 970 sji.setAlertsCount(rs.getLong(HM_COLUMN.alertsCount.rank())); 971 sji.setDownloadedFilesCount(rs.getLong(HM_COLUMN.downloadedFilesCount.rank())); 972 sji.setCurrentProcessedKBPerSec(rs.getLong(HM_COLUMN.currentProcessedKBPerSec.rank())); 973 sji.setProcessedKBPerSec(rs.getLong(HM_COLUMN.processedKBPerSec.rank())); 974 sji.setCurrentProcessedDocsPerSec(rs.getDouble(HM_COLUMN.currentProcessedDocsPerSec.rank())); 975 sji.setProcessedDocsPerSec(rs.getDouble(HM_COLUMN.processedDocsPerSec.rank())); 976 sji.setActiveToeCount(rs.getInt(HM_COLUMN.activeToeCount.rank())); 977 sji.setStatus(CrawlStatus.values()[rs.getInt(HM_COLUMN.status.rank())]); 978 sji.setTimestamp(new Date(rs.getTimestamp(HM_COLUMN.tstamp.rank()).getTime())); 979 980 list.add(sji); 981 } 982 return list; 983 } 984 985}