001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023 024package dk.netarkivet.harvester.datamodel; 025 026import java.sql.Clob; 027import java.sql.Connection; 028import java.sql.PreparedStatement; 029import java.sql.ResultSet; 030import java.sql.SQLException; 031import java.util.ArrayList; 032import java.util.Calendar; 033import java.util.Date; 034import java.util.HashMap; 035import java.util.Iterator; 036import java.util.LinkedList; 037import java.util.List; 038import java.util.Map; 039 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import dk.netarkivet.common.CommonSettings; 044import dk.netarkivet.common.exceptions.ArgumentNotValid; 045import dk.netarkivet.common.exceptions.IOFailure; 046import dk.netarkivet.common.exceptions.IllegalState; 047import dk.netarkivet.common.exceptions.PermissionDenied; 048import dk.netarkivet.common.exceptions.UnknownID; 049import dk.netarkivet.common.utils.DBUtils; 050import dk.netarkivet.common.utils.ExceptionUtils; 051import dk.netarkivet.common.utils.Settings; 052import dk.netarkivet.common.utils.StringUtils; 053import dk.netarkivet.harvester.webinterface.HarvestStatus; 054import dk.netarkivet.harvester.webinterface.HarvestStatusQuery; 055import dk.netarkivet.harvester.webinterface.HarvestStatusQuery.SORT_ORDER; 056 057/** 058 * A database-based implementation of the JobDAO class. The statements to create the tables are now in 059 * scripts/sql/createfullhddb.sql 060 */ 061public class JobDBDAO extends JobDAO { 062 063 /** The logger for this class. */ 064 private static final Logger log = LoggerFactory.getLogger(JobDBDAO.class); 065 066 /** 067 * Create a new JobDAO implemented using database. This constructor also tries to upgrade the jobs and jobs_configs 068 * tables in the current database. throws and IllegalState exception, if it is impossible to make the necessary 069 * updates. 070 */ 071 protected JobDBDAO() { 072 Connection connection = HarvestDBConnection.get(); 073 try { 074 HarvesterDatabaseTables.checkVersion(connection, HarvesterDatabaseTables.JOBS); 075 HarvesterDatabaseTables.checkVersion(connection, HarvesterDatabaseTables.JOBCONFIGS); 076 } finally { 077 HarvestDBConnection.release(connection); 078 } 079 } 080 081 /** 082 * Creates an instance in persistent storage of the given job. 083 * If the job doesn't have an ID (which it shouldn't at this point, one is generated for it. 084 * After that the harvestnamePrefix is set. Both existing harvestnameprefix factory-classes depends on 085 * the JobID being set before being called. 086 * 087 * @param job a given job to add to persistent storage 088 * @throws PermissionDenied If a job already exists in persistent storage with the same id as the given job 089 * @throws IOFailure If some IOException occurs while writing the job to persistent storage 090 */ 091 public synchronized void create(Job job) { 092 ArgumentNotValid.checkNotNull(job, "Job job"); 093 // Check that job.getOrigHarvestDefinitionID() refers to existing harvestdefinition. 094 Long harvestId = job.getOrigHarvestDefinitionID(); 095 if (!HarvestDefinitionDAO.getInstance().exists(harvestId)) { 096 throw new UnknownID("No harvestdefinition with ID=" + harvestId); 097 } 098 099 Connection connection = HarvestDBConnection.get(); 100 if (job.getJobID() != null) { 101 log.warn("The jobId for the job is already set. This should probably never happen."); 102 } else { 103 job.setJobID(generateNextID(connection)); 104 } 105 // Set the harvestNamePrefix. Every current implementation depends on the JobID being set before 106 // being initialized. 107 job.setDefaultHarvestNamePrefix(); 108 109 110 if (job.getCreationDate() != null) { 111 log.warn("The creation time for the job is already set. This should probably never happen."); 112 } else { 113 job.setCreationDate(new Date()); 114 } 115 116 log.debug("Creating " + job.toString()); 117 118 PreparedStatement statement = null; 119 try { 120 connection.setAutoCommit(false); 121 statement = connection.prepareStatement("INSERT INTO jobs " 122 + "(job_id, harvest_id, status, channel, forcemaxcount, " 123 + "forcemaxbytes, forcemaxrunningtime, orderxml, " + "orderxmldoc, seedlist, " 124 + "harvest_num, startdate, enddate, submitteddate, creationdate, " 125 + "num_configs, edition, resubmitted_as_job, harvestname_prefix, snapshot) " 126 + "VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?," + "?, ?, ?, ?, ?, ?)"); 127 128 statement.setLong(1, job.getJobID()); 129 statement.setLong(2, job.getOrigHarvestDefinitionID()); 130 statement.setInt(3, job.getStatus().ordinal()); 131 statement.setString(4, job.getChannel()); 132 statement.setLong(5, job.getForceMaxObjectsPerDomain()); 133 statement.setLong(6, job.getMaxBytesPerDomain()); 134 statement.setLong(7, job.getMaxJobRunningTime()); 135 DBUtils.setStringMaxLength(statement, 8, job.getOrderXMLName(), Constants.MAX_NAME_SIZE, job, 136 "order.xml name"); 137 final String orderString = job.getOrderXMLdoc().getXML(); 138 DBUtils.setClobMaxLength(statement, 9, orderString, Constants.MAX_ORDERXML_SIZE, job, "order.xml"); 139 DBUtils.setClobMaxLength(statement, 10, job.getSeedListAsString(), Constants.MAX_COMBINED_SEED_LIST_SIZE, 140 job, "seedlist"); 141 statement.setInt(11, job.getHarvestNum()); 142 DBUtils.setDateMaybeNull(statement, 12, job.getActualStart()); 143 DBUtils.setDateMaybeNull(statement, 13, job.getActualStop()); 144 DBUtils.setDateMaybeNull(statement, 14, job.getSubmittedDate()); 145 DBUtils.setDateMaybeNull(statement, 15, job.getCreationDate()); 146 147 // The size of the configuration map == number of configurations 148 statement.setInt(16, job.getDomainConfigurationMap().size()); 149 long initialEdition = 1; 150 statement.setLong(17, initialEdition); 151 DBUtils.setLongMaybeNull(statement, 18, job.getResubmittedAsJob()); 152 statement.setString(19, job.getHarvestFilenamePrefix()); 153 statement.setBoolean(20, job.isSnapshot()); 154 statement.executeUpdate(); 155 createJobConfigsEntries(connection, job); 156 connection.commit(); 157 job.setEdition(initialEdition); 158 } catch (SQLException e) { 159 String message = "SQL error creating job " + job + " in database" + "\n" 160 + ExceptionUtils.getSQLExceptionCause(e); 161 log.warn(message, e); 162 throw new IOFailure(message, e); 163 } finally { 164 DBUtils.rollbackIfNeeded(connection, "create job", job); 165 HarvestDBConnection.release(connection); 166 } 167 } 168 169 /** 170 * Create the entries in the job_configs table for this job. Since some jobs have up to 10000 configs, this must be 171 * optimized. The entries are only created, if job.configsChanged is true. 172 * 173 * @param dbconnection A connection to work on 174 * @param job The job to store entries for 175 * @throws SQLException If any problems occur during creation of the new entries in the job_configs table. 176 */ 177 private void createJobConfigsEntries(Connection dbconnection, Job job) throws SQLException { 178 if (job.configsChanged) { 179 PreparedStatement statement = null; 180 String tmpTable = null; 181 Long jobID = job.getJobID(); 182 try { 183 statement = dbconnection.prepareStatement("DELETE FROM job_configs WHERE job_id = ?"); 184 statement.setLong(1, jobID); 185 statement.executeUpdate(); 186 statement.close(); 187 tmpTable = DBSpecifics.getInstance().getJobConfigsTmpTable(dbconnection); 188 final Map<String, String> domainConfigurationMap = job.getDomainConfigurationMap(); 189 statement = dbconnection.prepareStatement("INSERT INTO " + tmpTable 190 + " ( domain_name, config_name ) VALUES ( ?, ?)"); 191 for (Map.Entry<String, String> entry : domainConfigurationMap.entrySet()) { 192 statement.setString(1, entry.getKey()); 193 statement.setString(2, entry.getValue()); 194 statement.executeUpdate(); 195 statement.clearParameters(); 196 } 197 statement.close(); 198 // Now we have a temp table with all the domains and configs 199 statement = dbconnection.prepareStatement("INSERT INTO job_configs " + "( job_id, config_id ) " 200 + "SELECT ?, configurations.config_id " + " FROM domains, configurations, " + tmpTable 201 + " WHERE domains.name = " + tmpTable + ".domain_name" 202 + " AND domains.domain_id = configurations.domain_id" 203 + " AND configurations.name = " + tmpTable + ".config_name" 204 ); 205 statement.setLong(1, jobID); 206 int rows = statement.executeUpdate(); 207 if (rows != domainConfigurationMap.size()) { 208 log.debug("Domain or configuration in table for {} missing: Should have {}, got {}", job, 209 domainConfigurationMap.size(), rows); 210 } 211 dbconnection.commit(); 212 } finally { 213 if (tmpTable != null) { 214 DBSpecifics.getInstance().dropJobConfigsTmpTable(dbconnection, tmpTable); 215 } 216 job.configsChanged = false; 217 } 218 } 219 } 220 221 /** 222 * Check whether a particular job exists. 223 * 224 * @param jobID Id of the job. 225 * @return true if the job exists in any state. 226 */ 227 @Override 228 public boolean exists(Long jobID) { 229 ArgumentNotValid.checkNotNull(jobID, "Long jobID"); 230 231 Connection c = HarvestDBConnection.get(); 232 try { 233 return exists(c, jobID); 234 } finally { 235 HarvestDBConnection.release(c); 236 } 237 } 238 239 /** 240 * Check whether a particular job exists. 241 * 242 * @param c an open connection to the harvestDatabase 243 * @param jobID Id of the job. 244 * @return true if the job exists in any state. 245 */ 246 private boolean exists(Connection c, Long jobID) { 247 return 1 == DBUtils.selectLongValue(c, "SELECT COUNT(*) FROM jobs WHERE job_id = ?", jobID); 248 } 249 250 /** 251 * Generates the next id of job. 252 * 253 * @param c an open connection to the harvestDatabase 254 * @return id 255 */ 256 private Long generateNextID(Connection c) { 257 // Set to zero original, can be set after admin machine breakdown, 258 // and the use this as the point of reference. 259 Long restoreId = Settings.getLong(Constants.NEXT_JOB_ID); 260 261 Long maxVal = DBUtils.selectLongValue(c, "SELECT MAX(job_id) FROM jobs"); 262 if (maxVal == null) { 263 maxVal = 0L; 264 } 265 // return the largest number of the two numbers: the NEXT_JOB_ID 266 // declared in settings and max value of job_id used 267 // in the jobs table. 268 return ((restoreId > maxVal) ? restoreId : maxVal + 1L); 269 } 270 271 /** 272 * Update a Job in persistent storage. 273 * 274 * @param job The Job to update 275 * @throws ArgumentNotValid If the Job is null 276 * @throws UnknownID If the Job doesn't exist in the DAO 277 * @throws IOFailure If writing the job to persistent storage fails 278 * @throws PermissionDenied If the job has been updated behind our backs 279 */ 280 @Override 281 public synchronized void update(Job job) { 282 ArgumentNotValid.checkNotNull(job, "job"); 283 284 Connection connection = HarvestDBConnection.get(); 285 // Not done as a transaction as it's awfully big. 286 // TODO Make sure that a failed job update does... what? 287 PreparedStatement statement = null; 288 try { 289 final Long jobID = job.getJobID(); 290 if (!exists(connection, jobID)) { 291 throw new UnknownID("Job id " + jobID + " is not known in persistent storage"); 292 } 293 294 connection.setAutoCommit(false); 295 statement = connection.prepareStatement("UPDATE jobs SET " + "harvest_id = ?, status = ?, channel = ?, " 296 + "forcemaxcount = ?, forcemaxbytes = ?, " + "forcemaxrunningtime = ?," + "orderxml = ?, " 297 + "orderxmldoc = ?, seedlist = ?, " + "harvest_num = ?, harvest_errors = ?, " 298 + "harvest_error_details = ?, upload_errors = ?, " + "upload_error_details = ?, startdate = ?," 299 + "enddate = ?, num_configs = ?, edition = ?, " + "submitteddate = ?, creationdate = ?, " 300 + "resubmitted_as_job = ?, harvestname_prefix = ?," + "snapshot = ?" 301 + " WHERE job_id = ? AND edition = ?"); 302 statement.setLong(1, job.getOrigHarvestDefinitionID()); 303 statement.setInt(2, job.getStatus().ordinal()); 304 statement.setString(3, job.getChannel()); 305 statement.setLong(4, job.getForceMaxObjectsPerDomain()); 306 statement.setLong(5, job.getMaxBytesPerDomain()); 307 statement.setLong(6, job.getMaxJobRunningTime()); 308 DBUtils.setStringMaxLength(statement, 7, job.getOrderXMLName(), Constants.MAX_NAME_SIZE, job, 309 "order.xml name"); 310 final String orderreader = job.getOrderXMLdoc().getXML(); 311 DBUtils.setClobMaxLength(statement, 8, orderreader, Constants.MAX_ORDERXML_SIZE, job, "order.xml"); 312 DBUtils.setClobMaxLength(statement, 9, job.getSeedListAsString(), Constants.MAX_COMBINED_SEED_LIST_SIZE, 313 job, "seedlist"); 314 statement.setInt(10, job.getHarvestNum()); // Not in job yet 315 DBUtils.setStringMaxLength(statement, 11, job.getHarvestErrors(), Constants.MAX_ERROR_SIZE, job, 316 "harvest_error"); 317 DBUtils.setStringMaxLength(statement, 12, job.getHarvestErrorDetails(), Constants.MAX_ERROR_DETAIL_SIZE, 318 job, "harvest_error_details"); 319 DBUtils.setStringMaxLength(statement, 13, job.getUploadErrors(), Constants.MAX_ERROR_SIZE, job, 320 "upload_error"); 321 DBUtils.setStringMaxLength(statement, 14, job.getUploadErrorDetails(), Constants.MAX_ERROR_DETAIL_SIZE, 322 job, "upload_error_details"); 323 long edition = job.getEdition() + 1; 324 DBUtils.setDateMaybeNull(statement, 15, job.getActualStart()); 325 DBUtils.setDateMaybeNull(statement, 16, job.getActualStop()); 326 statement.setInt(17, job.getDomainConfigurationMap().size()); 327 statement.setLong(18, edition); 328 DBUtils.setDateMaybeNull(statement, 19, job.getSubmittedDate()); 329 DBUtils.setDateMaybeNull(statement, 20, job.getCreationDate()); 330 DBUtils.setLongMaybeNull(statement, 21, job.getResubmittedAsJob()); 331 statement.setString(22, job.getHarvestFilenamePrefix()); 332 statement.setBoolean(23, job.isSnapshot()); 333 statement.setLong(24, job.getJobID()); 334 statement.setLong(25, job.getEdition()); 335 final int rows = statement.executeUpdate(); 336 if (rows == 0) { 337 String message = "Edition " + job.getEdition() + " has expired, not updating"; 338 log.debug(message); 339 throw new PermissionDenied(message); 340 } 341 createJobConfigsEntries(connection, job); 342 connection.commit(); 343 job.setEdition(edition); 344 } catch (SQLException e) { 345 String message = "SQL error updating job " + job + " in database" + "\n" 346 + ExceptionUtils.getSQLExceptionCause(e); 347 log.warn(message, e); 348 throw new IOFailure(message, e); 349 } finally { 350 DBUtils.rollbackIfNeeded(connection, "update job", job); 351 HarvestDBConnection.release(connection); 352 } 353 } 354 355 /** 356 * Read a single job from the job database. 357 * 358 * @param jobID ID of the job. 359 * @return A Job object 360 * @throws UnknownID if the job id does not exist. 361 * @throws IOFailure if there was some problem talking to the database. 362 */ 363 @Override 364 public Job read(long jobID) { 365 Connection connection = HarvestDBConnection.get(); 366 try { 367 return read(connection, jobID); 368 } finally { 369 HarvestDBConnection.release(connection); 370 } 371 } 372 373 /** 374 * Read a single job from the job database. 375 * 376 * @param jobID ID of the job. 377 * @param connection an open connection to the harvestDatabase 378 * @return A Job object 379 * @throws UnknownID if the job id does not exist. 380 * @throws IOFailure if there was some problem talking to the database. 381 */ 382 private synchronized Job read(Connection connection, Long jobID) { 383 if (!exists(connection, jobID)) { 384 throw new UnknownID("Job id " + jobID + " is not known in persistent storage"); 385 } 386 PreparedStatement statement = null; 387 try { 388 statement = connection.prepareStatement("SELECT " + "harvest_id, status, channel, " 389 + "forcemaxcount, forcemaxbytes, " + "forcemaxrunningtime, orderxml, " 390 + "orderxmldoc, seedlist, harvest_num," + "harvest_errors, harvest_error_details, " 391 + "upload_errors, upload_error_details, " + "startdate, enddate, submitteddate, creationdate, " 392 + "edition, resubmitted_as_job, continuationof, harvestname_prefix, snapshot " 393 + "FROM jobs WHERE job_id = ?"); 394 statement.setLong(1, jobID); 395 ResultSet result = statement.executeQuery(); 396 result.next(); 397 long harvestID = result.getLong(1); 398 JobStatus status = JobStatus.fromOrdinal(result.getInt(2)); 399 String channel = result.getString(3); 400 long forceMaxCount = result.getLong(4); 401 long forceMaxBytes = result.getLong(5); 402 long forceMaxRunningTime = result.getLong(6); 403 String orderxml = result.getString(7); 404 405 HeritrixTemplate orderXMLdoc = null; 406 407 boolean useClobs = DBSpecifics.getInstance().supportsClob(); 408 String tmpStr; 409 if (useClobs) { 410 Clob clob = result.getClob(8); 411 tmpStr = clob.getSubString(1L, (int)clob.length()); 412 } else { 413 tmpStr = result.getString(8); 414 } 415 orderXMLdoc = HeritrixTemplate.getTemplateFromString(tmpStr); 416 String seedlist = ""; 417 if (useClobs) { 418 Clob clob = result.getClob(9); 419 seedlist = clob.getSubString(1, (int) clob.length()); 420 } else { 421 seedlist = result.getString(9); 422 } 423 424 int harvestNum = result.getInt(10); 425 String harvestErrors = result.getString(11); 426 String harvestErrorDetails = result.getString(12); 427 String uploadErrors = result.getString(13); 428 String uploadErrorDetails = result.getString(14); 429 Date startdate = DBUtils.getDateMaybeNull(result, 15); 430 Date stopdate = DBUtils.getDateMaybeNull(result, 16); 431 Date submittedDate = DBUtils.getDateMaybeNull(result, 17); 432 Date creationDate = DBUtils.getDateMaybeNull(result, 18); 433 Long edition = result.getLong(19); 434 Long resubmittedAsJob = DBUtils.getLongMaybeNull(result, 20); 435 Long continuationOfJob = DBUtils.getLongMaybeNull(result, 21); 436 String harvestnamePrefix = result.getString(22); 437 boolean snapshot = result.getBoolean(23); 438 statement.close(); 439 // IDs should match up in a natural join 440 // The following if-block is an attempt to fix Bug 1856, an 441 // unexplained derby deadlock, by making this statement a dirty 442 // read. 443 String domainStatement = "SELECT domains.name, configurations.name " 444 + "FROM domains, configurations, job_configs " + "WHERE job_configs.job_id = ?" 445 + " AND job_configs.config_id = configurations.config_id" 446 + " AND domains.domain_id = configurations.domain_id"; 447 if (Settings.get(CommonSettings.DB_SPECIFICS_CLASS).contains(CommonSettings.DB_IS_DERBY_IF_CONTAINS)) { 448 statement = connection.prepareStatement(domainStatement + " WITH UR"); 449 } else { 450 statement = connection.prepareStatement(domainStatement); 451 } 452 statement.setLong(1, jobID); 453 result = statement.executeQuery(); 454 Map<String, String> configurationMap = new HashMap<String, String>(); 455 while (result.next()) { 456 String domainName = result.getString(1); 457 String configName = result.getString(2); 458 configurationMap.put(domainName, configName); 459 } 460 final Job job = new Job(harvestID, configurationMap, channel, snapshot, forceMaxCount, forceMaxBytes, 461 forceMaxRunningTime, status, orderxml, orderXMLdoc, seedlist, harvestNum, continuationOfJob); 462 job.appendHarvestErrors(harvestErrors); 463 job.appendHarvestErrorDetails(harvestErrorDetails); 464 job.appendUploadErrors(uploadErrors); 465 job.appendUploadErrorDetails(uploadErrorDetails); 466 if (startdate != null) { 467 job.setActualStart(startdate); 468 } 469 if (stopdate != null) { 470 job.setActualStop(stopdate); 471 } 472 473 if (submittedDate != null) { 474 job.setSubmittedDate(submittedDate); 475 } 476 477 if (creationDate != null) { 478 job.setCreationDate(creationDate); 479 } 480 481 job.configsChanged = false; 482 job.setJobID(jobID); 483 job.setEdition(edition); 484 485 if (resubmittedAsJob != null) { 486 job.setResubmittedAsJob(resubmittedAsJob); 487 } 488 if (harvestnamePrefix == null) { 489 job.setDefaultHarvestNamePrefix(); 490 } else { 491 job.setHarvestFilenamePrefix(harvestnamePrefix); 492 } 493 return job; 494 } catch (SQLException e) { 495 String message = "SQL error reading job " + jobID + " in database" + "\n" 496 + ExceptionUtils.getSQLExceptionCause(e); 497 log.warn(message, e); 498 throw new IOFailure(message, e); 499 } finally { 500 try { 501 statement.close(); 502 } catch (SQLException e) { 503 log.warn("Exception thrown when trying to close statement", e); 504 } 505 } 506 } 507 508 509 510 /** 511 * Return a list of all jobs with the given status, ordered by id. 512 * 513 * @param status A given status. 514 * @return A list of all job with given status 515 */ 516 @Override 517 public synchronized Iterator<Job> getAll(JobStatus status) { 518 ArgumentNotValid.checkNotNull(status, "JobStatus status"); 519 520 Connection c = HarvestDBConnection.get(); 521 try { 522 List<Long> idList = DBUtils.selectLongList(c, "SELECT job_id FROM jobs WHERE status = ? " 523 + "ORDER BY job_id", status.ordinal()); 524 List<Job> orderedJobs = new LinkedList<Job>(); 525 for (Long jobId : idList) { 526 orderedJobs.add(read(c, jobId)); 527 } 528 return orderedJobs.iterator(); 529 } finally { 530 HarvestDBConnection.release(c); 531 } 532 } 533 534 /** 535 * Return a list of all job_id's representing jobs with the given status. 536 * 537 * @param status A given status. 538 * @return A list of all job_id's representing jobs with given status 539 * @throws ArgumentNotValid If the given status is not one of the five valid statuses specified in Job. 540 */ 541 @Override 542 public Iterator<Long> getAllJobIds(JobStatus status) { 543 ArgumentNotValid.checkNotNull(status, "JobStatus status"); 544 545 Connection c = HarvestDBConnection.get(); 546 try { 547 List<Long> idList = DBUtils.selectLongList(c, "SELECT job_id FROM jobs WHERE status = ? ORDER BY job_id", 548 status.ordinal()); 549 return idList.iterator(); 550 } finally { 551 HarvestDBConnection.release(c); 552 } 553 } 554 555 @Override 556 public Iterator<Long> getAllJobIds(JobStatus status, HarvestChannel channel) { 557 ArgumentNotValid.checkNotNull(status, "JobStatus status"); 558 ArgumentNotValid.checkNotNull(channel, "Channel"); 559 560 Connection c = HarvestDBConnection.get(); 561 try { 562 List<Long> idList = DBUtils.selectLongList(c, "SELECT job_id FROM jobs WHERE status = ? AND channel = ? " 563 + "ORDER BY job_id", status.ordinal(), channel.getName()); 564 return idList.iterator(); 565 } finally { 566 HarvestDBConnection.release(c); 567 } 568 } 569 570 /** 571 * Return a list of all jobs. 572 * 573 * @return A list of all jobs 574 */ 575 @Override 576 public synchronized Iterator<Job> getAll() { 577 Connection c = HarvestDBConnection.get(); 578 try { 579 List<Long> idList = DBUtils.selectLongList(c, "SELECT job_id FROM jobs ORDER BY job_id"); 580 List<Job> orderedJobs = new LinkedList<Job>(); 581 for (Long jobId : idList) { 582 orderedJobs.add(read(c, jobId)); 583 } 584 return orderedJobs.iterator(); 585 } finally { 586 HarvestDBConnection.release(c); 587 } 588 } 589 590 /** 591 * Return a list of all job_ids . 592 * 593 * @return A list of all job_ids 594 */ 595 public Iterator<Long> getAllJobIds() { 596 Connection c = HarvestDBConnection.get(); 597 try { 598 List<Long> idList = DBUtils.selectLongList(c, "SELECT job_id FROM jobs ORDER BY job_id"); 599 return idList.iterator(); 600 } finally { 601 HarvestDBConnection.release(c); 602 } 603 } 604 605 /** 606 * Get a list of small and immediately usable status information for given status and in given order. Is used by 607 * getStatusInfo functions in order to share code (and SQL) 608 * TODO should also include given harvest run 609 * 610 * @param connection an open connection to the harvestDatabase 611 * @param jobStatusCode code for jobstatus, -1 if all 612 * @param asc true if it is to be sorted in ascending order, false if it is to be sorted in descending order 613 * @return List of JobStatusInfo objects for all jobs. 614 * @throws ArgumentNotValid for invalid jobStatusCode 615 * @throws IOFailure on trouble getting data from database 616 */ 617 private List<JobStatusInfo> getStatusInfo(Connection connection, int jobStatusCode, boolean asc) { 618 // Validate jobStatusCode 619 // Throws ArgumentNotValid if it is an invalid job status 620 if (jobStatusCode != JobStatus.ALL_STATUS_CODE) { 621 JobStatus.fromOrdinal(jobStatusCode); 622 } 623 624 StringBuffer sqlBuffer = new StringBuffer("SELECT jobs.job_id, status, jobs.harvest_id, " 625 + "harvestdefinitions.name, harvest_num, harvest_errors," 626 + " upload_errors, orderxml, num_configs, submitteddate, creationdate, startdate," 627 + " enddate, resubmitted_as_job" + " FROM jobs, harvestdefinitions " 628 + " WHERE harvestdefinitions.harvest_id = jobs.harvest_id "); 629 630 if (jobStatusCode != JobStatus.ALL_STATUS_CODE) { 631 sqlBuffer.append(" AND status = ").append(jobStatusCode); 632 } 633 sqlBuffer.append(" ORDER BY jobs.job_id"); 634 if (!asc) { // Assume default is ASCENDING 635 sqlBuffer.append(" " + HarvestStatusQuery.SORT_ORDER.DESC.name()); 636 } 637 638 PreparedStatement statement = null; 639 try { 640 statement = connection.prepareStatement(sqlBuffer.toString()); 641 ResultSet res = statement.executeQuery(); 642 return makeJobStatusInfoListFromResultset(res); 643 } catch (SQLException e) { 644 String message = "SQL error asking for job status list in database" + "\n" 645 + ExceptionUtils.getSQLExceptionCause(e); 646 log.warn(message, e); 647 throw new IOFailure(message, e); 648 } 649 } 650 651 /** 652 * Get a list of small and immediately usable status information for given job status. 653 * 654 * @param status The status asked for. 655 * @return List of JobStatusInfo objects for all jobs with given job status 656 * @throws ArgumentNotValid for invalid jobStatus 657 * @throws IOFailure on trouble getting data from database 658 */ 659 @Override 660 public List<JobStatusInfo> getStatusInfo(JobStatus status) { 661 ArgumentNotValid.checkNotNull(status, "status"); 662 Connection c = HarvestDBConnection.get(); 663 try { 664 return getStatusInfo(c, status.ordinal(), true); 665 } finally { 666 HarvestDBConnection.release(c); 667 } 668 } 669 670 /** 671 * Get a list of small and immediately usable status information for given job status and in given job id order. 672 * 673 * @param query the user query 674 * @throws IOFailure on trouble getting data from database 675 */ 676 @Override 677 public HarvestStatus getStatusInfo(HarvestStatusQuery query) { 678 log.debug("Constructing Harveststatus based on given query."); 679 PreparedStatement s = null; 680 Connection c = HarvestDBConnection.get(); 681 682 try { 683 // Obtain total count without limit 684 // NB this will be a performance bottleneck if the table gets big 685 long totalRowsCount = 0; 686 687 s = buildSqlQuery(query, true).getPopulatedStatement(c); 688 ResultSet res = s.executeQuery(); 689 res.next(); 690 totalRowsCount = res.getLong(1); 691 692 s = buildSqlQuery(query, false).getPopulatedStatement(c); 693 res = s.executeQuery(); 694 List<JobStatusInfo> jobs = makeJobStatusInfoListFromResultset(res); 695 696 log.debug("Harveststatus constructed based on given query."); 697 return new HarvestStatus(totalRowsCount, jobs); 698 } catch (SQLException e) { 699 String message = "SQL error asking for job status list in database" + "\n" 700 + ExceptionUtils.getSQLExceptionCause(e); 701 log.warn(message, e); 702 throw new IOFailure(message, e); 703 } finally { 704 HarvestDBConnection.release(c); 705 } 706 } 707 708 /** 709 * Calculate all jobIDs to use for duplication reduction. 710 * <p> 711 * More precisely, this method calculates the following: If the job ID corresponds to a partial harvest, all jobIDs 712 * from the previous scheduled harvest are returned, or the empty list if this harvest hasn't been scheduled before. 713 * <p> 714 * If the job ID corresponds to a full harvest, the entire chain of harvests this is based on is returned, and all 715 * jobIDs from the previous chain of full harvests is returned. 716 * <p> 717 * This method is synchronized to avoid DB locking. 718 * 719 * @param jobID The job ID to find duplicate reduction data for. 720 * @return A list of job IDs (possibly empty) of potential previous harvests of this job, to use for duplicate 721 * reduction. 722 * @throws UnknownID if job ID is unknown 723 * @throws IOFailure on trouble querying database 724 */ 725 public synchronized List<Long> getJobIDsForDuplicateReduction(long jobID) throws UnknownID { 726 727 Connection connection = HarvestDBConnection.get(); 728 List<Long> jobs; 729 // Select the previous harvest from the same harvestdefinition 730 try { 731 if (!exists(connection, jobID)) { 732 throw new UnknownID("Job ID '" + jobID + "' does not exist in database"); 733 } 734 735 jobs = DBUtils.selectLongList(connection, "SELECT jobs.job_id FROM jobs, jobs AS original_jobs" 736 + " WHERE original_jobs.job_id=?" + " AND jobs.harvest_id=original_jobs.harvest_id" 737 + " AND jobs.harvest_num=original_jobs.harvest_num-1", jobID); 738 List<Long> harvestDefinitions = getPreviousFullHarvests(connection, jobID); 739 if (!harvestDefinitions.isEmpty()) { 740 // Select all jobs from a given list of harvest definitions 741 jobs.addAll(DBUtils.selectLongList(connection, "SELECT jobs.job_id FROM jobs" 742 + " WHERE jobs.harvest_id IN (" + StringUtils.conjoin(",", harvestDefinitions) + ")")); 743 } 744 return jobs; 745 } finally { 746 HarvestDBConnection.release(connection); 747 } 748 } 749 750 /** 751 * Find the harvest definition ids from this chain of snapshot harvests and the previous chain of snapshot harvests. 752 * 753 * @param connection an open connection to the harvestDatabase 754 * @param jobID The ID of the job 755 * @return A (possibly empty) list of harvest definition ids 756 */ 757 private List<Long> getPreviousFullHarvests(Connection connection, long jobID) { 758 List<Long> results = new ArrayList<Long>(); 759 // Find the jobs' fullharvest id 760 Long thisHarvest = DBUtils.selectFirstLongValueIfAny(connection, 761 "SELECT jobs.harvest_id FROM jobs, fullharvests WHERE jobs.harvest_id=fullharvests.harvest_id" 762 + " AND jobs.job_id=?", jobID); 763 764 if (thisHarvest == null) { 765 // Not a full harvest 766 return results; 767 } 768 769 // Follow the chain of orginating IDs back 770 for (Long originatingHarvest = thisHarvest; originatingHarvest != null; originatingHarvest = DBUtils 771 .selectFirstLongValueIfAny(connection, "SELECT previoushd FROM fullharvests" 772 + " WHERE fullharvests.harvest_id=?", originatingHarvest)) { 773 if (!originatingHarvest.equals(thisHarvest)) { 774 results.add(originatingHarvest); 775 } 776 } 777 778 // Find the first harvest in the chain 779 Long firstHarvest = thisHarvest; 780 if (!results.isEmpty()) { 781 firstHarvest = results.get(results.size() - 1); 782 } 783 784 // Find the last harvest in the chain before 785 Long olderHarvest = DBUtils.selectFirstLongValueIfAny(connection, "SELECT fullharvests.harvest_id" 786 + " FROM fullharvests, harvestdefinitions," + " harvestdefinitions AS currenthd" 787 + " WHERE currenthd.harvest_id=?" + " AND fullharvests.harvest_id" + "=harvestdefinitions.harvest_id" 788 + " AND harvestdefinitions.submitted<currenthd.submitted" + " ORDER BY harvestdefinitions.submitted " 789 + HarvestStatusQuery.SORT_ORDER.DESC.name(), firstHarvest); 790 // Follow the chain of originating IDs back 791 // FIXME Rewrite this loop! 792 for (Long originatingHarvest = olderHarvest; originatingHarvest != null; originatingHarvest = DBUtils 793 .selectFirstLongValueIfAny(connection, 794 "SELECT previoushd FROM fullharvests WHERE fullharvests.harvest_id=?", originatingHarvest)) { 795 results.add(originatingHarvest); 796 } 797 return results; 798 } 799 800 /** 801 * Returns the number of existing jobs. 802 * 803 * @return Number of jobs in 'jobs' table 804 */ 805 @Override 806 public int getCountJobs() { 807 Connection c = HarvestDBConnection.get(); 808 try { 809 return DBUtils.selectIntValue(c, "SELECT COUNT(*) FROM jobs"); 810 } finally { 811 HarvestDBConnection.release(c); 812 } 813 } 814 815 @Override 816 public synchronized long rescheduleJob(long oldJobID) { 817 Connection connection = HarvestDBConnection.get(); 818 long newJobID = generateNextID(connection); 819 PreparedStatement statement = null; 820 try { 821 statement = connection.prepareStatement("SELECT status FROM jobs WHERE job_id = ?"); 822 statement.setLong(1, oldJobID); 823 ResultSet res = statement.executeQuery(); 824 if (!res.next()) { 825 throw new UnknownID("No job with ID " + oldJobID + " to resubmit"); 826 } 827 final JobStatus currentJobStatus = JobStatus.fromOrdinal(res.getInt(1)); 828 if (currentJobStatus != JobStatus.SUBMITTED && currentJobStatus != JobStatus.FAILED) { 829 throw new IllegalState("Job " + oldJobID + " is not ready to be copied."); 830 } 831 832 // Now do the actual copying. 833 // Note that startdate, and enddate is not copied. 834 // They must be null in JobStatus NEW. 835 statement.close(); 836 connection.setAutoCommit(false); 837 838 statement = connection.prepareStatement("INSERT INTO jobs " 839 + " (job_id, harvest_id, channel, snapshot, status," + " forcemaxcount, forcemaxbytes, orderxml," 840 + " orderxmldoc, seedlist, harvest_num," + " num_configs, edition, continuationof) " 841 + " SELECT ?, harvest_id, channel, snapshot, ?," + " forcemaxcount, forcemaxbytes, orderxml," 842 + " orderxmldoc, seedlist, harvest_num," + " num_configs, ?, ?" + " FROM jobs WHERE job_id = ?"); 843 statement.setLong(1, newJobID); 844 statement.setLong(2, JobStatus.NEW.ordinal()); 845 long initialEdition = 1; 846 statement.setLong(3, initialEdition); 847 Long continuationOf = null; 848 // In case we want to try to continue using the Heritrix recover log 849 if (currentJobStatus == JobStatus.FAILED) { 850 continuationOf = oldJobID; 851 } 852 DBUtils.setLongMaybeNull(statement, 4, continuationOf); 853 854 statement.setLong(5, oldJobID); 855 856 statement.executeUpdate(); 857 statement.close(); 858 statement = connection.prepareStatement("INSERT INTO job_configs " 859 + "( job_id, config_id ) SELECT ?, config_id FROM job_configs WHERE job_id = ?"); 860 statement.setLong(1, newJobID); 861 statement.setLong(2, oldJobID); 862 statement.executeUpdate(); 863 statement.close(); 864 statement = connection.prepareStatement("UPDATE jobs SET status = ?, resubmitted_as_job = ? " 865 + " WHERE job_id = ?"); 866 statement.setInt(1, JobStatus.RESUBMITTED.ordinal()); 867 statement.setLong(2, newJobID); 868 statement.setLong(3, oldJobID); 869 statement.executeUpdate(); 870 connection.commit(); 871 } catch (SQLException e) { 872 String message = "SQL error rescheduling job #" + oldJobID + " in database" + "\n" 873 + ExceptionUtils.getSQLExceptionCause(e); 874 log.warn(message, e); 875 throw new IOFailure(message, e); 876 } finally { 877 DBUtils.closeStatementIfOpen(statement); 878 DBUtils.rollbackIfNeeded(connection, "resubmit job", oldJobID); 879 HarvestDBConnection.release(connection); 880 } 881 log.info("Job #{} successfully as job #{}", oldJobID, newJobID); 882 return newJobID; 883 } 884 885 /** 886 * Helper-method that constructs a list of JobStatusInfo objects from the given resultset. 887 * 888 * @param res a given resultset 889 * @return a list of JobStatusInfo objects 890 * @throws SQLException If any problem with accessing the data in the ResultSet 891 */ 892 private List<JobStatusInfo> makeJobStatusInfoListFromResultset(ResultSet res) throws SQLException { 893 List<JobStatusInfo> joblist = new ArrayList<JobStatusInfo>(); 894 while (res.next()) { 895 final long jobId = res.getLong(1); 896 joblist.add(new JobStatusInfo(jobId, JobStatus.fromOrdinal(res.getInt(2)), res.getLong(3), 897 res.getString(4), res.getInt(5), res.getString(6), res.getString(7), res.getString(8), res 898 .getInt(9), DBUtils.getDateMaybeNull(res, 10), DBUtils.getDateMaybeNull(res, 11), DBUtils 899 .getDateMaybeNull(res, 12), DBUtils.getDateMaybeNull(res, 13), DBUtils.getLongMaybeNull( 900 res, 14))); 901 } 902 return joblist; 903 } 904 905 /** 906 * Internal utility class to build a SQL query using a prepared statement. 907 */ 908 private class HarvestStatusQueryBuilder { 909 /** The sql string. */ 910 private String sqlString; 911 // from java.sql.Types 912 /** list of parameter classes. */ 913 private LinkedList<Class<?>> paramClasses = new LinkedList<Class<?>>(); 914 /** list of parameter values. */ 915 private LinkedList<Object> paramValues = new LinkedList<Object>(); 916 917 /** 918 * Constructor. 919 */ 920 HarvestStatusQueryBuilder() { 921 super(); 922 } 923 924 /** 925 * @param sqlString the sqlString to set 926 */ 927 void setSqlString(String sqlString) { 928 this.sqlString = sqlString; 929 } 930 931 /** 932 * Add the given class and given value to the list of paramClasses and paramValues respectively. 933 * 934 * @param clazz a given class. 935 * @param value a given value 936 */ 937 void addParameter(Class<?> clazz, Object value) { 938 paramClasses.addLast(clazz); 939 paramValues.addLast(value); 940 } 941 942 /** 943 * Prepare a statement for the database that uses the sqlString, and the paramClasses, and paramValues. Only 944 * Integer, Long, String, and Date values accepted. 945 * 946 * @param c an Open connection to the harvestDatabase 947 * @return the prepared statement 948 * @throws SQLException If unable to prepare the statement 949 * @throws UnknownID If one of the parameter classes is unexpected 950 */ 951 PreparedStatement getPopulatedStatement(Connection c) throws SQLException { 952 PreparedStatement stm = c.prepareStatement(sqlString); 953 954 Iterator<Class<?>> pClasses = paramClasses.iterator(); 955 Iterator<Object> pValues = paramValues.iterator(); 956 int pIndex = 0; 957 while (pClasses.hasNext()) { 958 pIndex++; 959 Class<?> pClass = pClasses.next(); 960 Object pVal = pValues.next(); 961 962 if (Integer.class.equals(pClass)) { 963 stm.setInt(pIndex, (Integer) pVal); 964 } else if (Long.class.equals(pClass)) { 965 stm.setLong(pIndex, (Long) pVal); 966 } else if (String.class.equals(pClass)) { 967 stm.setString(pIndex, (String) pVal); 968 } else if (java.sql.Date.class.equals(pClass)) { 969 stm.setDate(pIndex, (java.sql.Date) pVal); 970 } else { 971 throw new UnknownID("Unexpected parameter class " + pClass); 972 } 973 } 974 return stm; 975 } 976 977 } 978 979 /** 980 * Builds a query to fetch jobs according to selection criteria. 981 * 982 * @param query the selection criteria. 983 * @param count build a count query instead of selecting columns. 984 * @return the proper SQL query. 985 */ 986 private HarvestStatusQueryBuilder buildSqlQuery(HarvestStatusQuery query, boolean count) { 987 HarvestStatusQueryBuilder sq = new HarvestStatusQueryBuilder(); 988 StringBuffer sql = new StringBuffer("SELECT"); 989 if (count) { 990 sql.append(" count(*)"); 991 } else { 992 sql.append(" jobs.job_id, status, jobs.harvest_id,"); 993 sql.append(" harvestdefinitions.name, harvest_num,"); 994 sql.append(" harvest_errors, upload_errors, orderxml,"); 995 sql.append(" num_configs, submitteddate, creationdate, startdate, enddate,"); 996 sql.append(" resubmitted_as_job"); 997 } 998 sql.append(" FROM jobs, harvestdefinitions "); 999 sql.append(" WHERE harvestdefinitions.harvest_id = jobs.harvest_id "); 1000 1001 JobStatus[] jobStatuses = query.getSelectedJobStatuses(); 1002 if (jobStatuses.length > 0) { 1003 if (jobStatuses.length == 1) { 1004 int statusOrdinal = jobStatuses[0].ordinal(); 1005 sql.append(" AND status = ?"); 1006 sq.addParameter(Integer.class, statusOrdinal); 1007 } else { 1008 sql.append("AND (status = "); 1009 sql.append(jobStatuses[0].ordinal()); 1010 for (int i = 1; i < jobStatuses.length; i++) { 1011 sql.append(" OR status = ?"); 1012 sq.addParameter(Integer.class, jobStatuses[i].ordinal()); 1013 } 1014 sql.append(")"); 1015 } 1016 } 1017 1018 String harvestName = query.getHarvestName(); 1019 boolean caseSensitiveHarvestName = query.getCaseSensitiveHarvestName(); 1020 if (!harvestName.isEmpty()) { 1021 if (caseSensitiveHarvestName) { 1022 if (harvestName.indexOf(HarvestStatusQuery.HARVEST_NAME_WILDCARD) == -1) { 1023 // No wildcard, exact match 1024 sql.append(" AND harvestdefinitions.name = ?"); 1025 sq.addParameter(String.class, harvestName); 1026 } else { 1027 String harvestNamePattern = harvestName.replaceAll("\\*", "%"); 1028 sql.append(" AND harvestdefinitions.name LIKE ?"); 1029 sq.addParameter(String.class, harvestNamePattern); 1030 } 1031 } else { 1032 harvestName = harvestName.toUpperCase(); 1033 if (harvestName.indexOf(HarvestStatusQuery.HARVEST_NAME_WILDCARD) == -1) { 1034 // No wildcard, exact match 1035 sql.append(" AND UPPER(harvestdefinitions.name) = ?"); 1036 sq.addParameter(String.class, harvestName); 1037 } else { 1038 String harvestNamePattern = harvestName.replaceAll("\\*", "%"); 1039 sql.append(" AND UPPER(harvestdefinitions.name) LIKE ?"); 1040 sq.addParameter(String.class, harvestNamePattern); 1041 } 1042 } 1043 } 1044 1045 Long harvestRun = query.getHarvestRunNumber(); 1046 if (harvestRun != null) { 1047 sql.append(" AND jobs.harvest_num = ?"); 1048 sq.addParameter(Long.class, harvestRun); 1049 } 1050 1051 Long harvestId = query.getHarvestId(); 1052 if (harvestId != null) { 1053 sql.append(" AND harvestdefinitions.harvest_id = ?"); 1054 sq.addParameter(Long.class, harvestId); 1055 } 1056 1057 long startDate = query.getStartDate(); 1058 if (startDate != HarvestStatusQuery.DATE_NONE) { 1059 sql.append(" AND startdate >= ?"); 1060 sq.addParameter(java.sql.Date.class, new java.sql.Date(startDate)); 1061 } 1062 1063 long endDate = query.getEndDate(); 1064 if (endDate != HarvestStatusQuery.DATE_NONE) { 1065 sql.append(" AND enddate < ?"); 1066 // end date must be set +1 day at midnight 1067 Calendar cal = Calendar.getInstance(); 1068 cal.setTimeInMillis(endDate); 1069 cal.roll(Calendar.DAY_OF_YEAR, 1); 1070 sq.addParameter(java.sql.Date.class, new java.sql.Date(cal.getTimeInMillis())); 1071 } 1072 1073 if (!count) { 1074 sql.append(" ORDER BY jobs.job_id"); 1075 if (!query.isSortAscending()) { 1076 sql.append(" " + SORT_ORDER.DESC.name()); 1077 } else { 1078 sql.append(" " + SORT_ORDER.ASC.name()); 1079 } 1080 1081 long pagesize = query.getPageSize(); 1082 if (pagesize != HarvestStatusQuery.PAGE_SIZE_NONE) { 1083 sql.append(" " 1084 + DBSpecifics.getInstance().getOrderByLimitAndOffsetSubClause(pagesize, 1085 (query.getStartPageIndex() - 1) * pagesize)); 1086 } 1087 } 1088 1089 sq.setSqlString(sql.toString()); 1090 return sq; 1091 } 1092 1093 /** 1094 * Get Jobstatus for the job with the given id. 1095 * 1096 * @param jobID A given Jobid 1097 * @return the Jobstatus for the job with the given id. 1098 * @throws UnknownID if no job exists with id jobID 1099 */ 1100 public JobStatus getJobStatus(Long jobID) { 1101 ArgumentNotValid.checkNotNull(jobID, "Long jobID"); 1102 1103 Connection c = HarvestDBConnection.get(); 1104 try { 1105 Integer statusAsInteger = DBUtils.selectIntValue(c, "SELECT status FROM jobs WHERE job_id = ?", jobID); 1106 if (statusAsInteger == null) { 1107 throw new UnknownID("No known job with id=" + jobID); 1108 } 1109 return JobStatus.fromOrdinal(statusAsInteger); 1110 } finally { 1111 HarvestDBConnection.release(c); 1112 } 1113 } 1114 1115 /** 1116 * Get a list of AliasInfo objects for all the domains included in the job. 1117 * 1118 * @return a list of AliasInfo objects for all the domains included in the job. 1119 */ 1120 public List<AliasInfo> getJobAliasInfo(Job job) { 1121 List<AliasInfo> aliases = new ArrayList<AliasInfo>(); 1122 DomainDAO dao = DomainDAO.getInstance(); 1123 for (String domain : job.getDomainConfigurationMap().keySet()) { 1124 aliases.addAll(dao.getAliases(domain)); 1125 } 1126 return aliases; 1127 } 1128}