001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.harvester.datamodel;
025
026import java.sql.Clob;
027import java.sql.Connection;
028import java.sql.PreparedStatement;
029import java.sql.ResultSet;
030import java.sql.SQLException;
031import java.util.ArrayList;
032import java.util.Calendar;
033import java.util.Date;
034import java.util.HashMap;
035import java.util.Iterator;
036import java.util.LinkedList;
037import java.util.List;
038import java.util.Map;
039
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import dk.netarkivet.common.CommonSettings;
044import dk.netarkivet.common.exceptions.ArgumentNotValid;
045import dk.netarkivet.common.exceptions.IOFailure;
046import dk.netarkivet.common.exceptions.IllegalState;
047import dk.netarkivet.common.exceptions.PermissionDenied;
048import dk.netarkivet.common.exceptions.UnknownID;
049import dk.netarkivet.common.utils.DBUtils;
050import dk.netarkivet.common.utils.ExceptionUtils;
051import dk.netarkivet.common.utils.Settings;
052import dk.netarkivet.common.utils.StringUtils;
053import dk.netarkivet.harvester.webinterface.HarvestStatus;
054import dk.netarkivet.harvester.webinterface.HarvestStatusQuery;
055import dk.netarkivet.harvester.webinterface.HarvestStatusQuery.SORT_ORDER;
056
057/**
058 * A database-based implementation of the JobDAO class. The statements to create the tables are now in
059 * scripts/sql/createfullhddb.sql
060 */
061public class JobDBDAO extends JobDAO {
062
063    /** The logger for this class. */
064    private static final Logger log = LoggerFactory.getLogger(JobDBDAO.class);
065
066    /**
067     * Create a new JobDAO implemented using database. This constructor also tries to upgrade the jobs and jobs_configs
068     * tables in the current database. throws and IllegalState exception, if it is impossible to make the necessary
069     * updates.
070     */
071    protected JobDBDAO() {
072        Connection connection = HarvestDBConnection.get();
073        try {
074            HarvesterDatabaseTables.checkVersion(connection, HarvesterDatabaseTables.JOBS);
075            HarvesterDatabaseTables.checkVersion(connection, HarvesterDatabaseTables.JOBCONFIGS);
076        } finally {
077            HarvestDBConnection.release(connection);
078        }
079    }
080
081    /**
082     * Creates an instance in persistent storage of the given job. 
083     * If the job doesn't have an ID (which it shouldn't at this point, one is generated for it.
084     * After that the harvestnamePrefix is set. Both existing harvestnameprefix factory-classes depends on
085     * the JobID being set before being called. 
086     *
087     * @param job a given job to add to persistent storage
088     * @throws PermissionDenied If a job already exists in persistent storage with the same id as the given job
089     * @throws IOFailure If some IOException occurs while writing the job to persistent storage
090     */
091    public synchronized void create(Job job) {
092        ArgumentNotValid.checkNotNull(job, "Job job");
093        // Check that job.getOrigHarvestDefinitionID() refers to existing harvestdefinition.
094        Long harvestId = job.getOrigHarvestDefinitionID();
095        if (!HarvestDefinitionDAO.getInstance().exists(harvestId)) {
096            throw new UnknownID("No harvestdefinition with ID=" + harvestId);
097        }
098
099        Connection connection = HarvestDBConnection.get();
100        if (job.getJobID() != null) {
101            log.warn("The jobId for the job is already set. This should probably never happen.");
102        } else {
103            job.setJobID(generateNextID(connection));
104        }
105        // Set the harvestNamePrefix. Every current implementation depends on the JobID being set before
106        // being initialized.
107        job.setDefaultHarvestNamePrefix();
108        
109        
110        if (job.getCreationDate() != null) {
111            log.warn("The creation time for the job is already set. This should probably never happen.");
112        } else {
113            job.setCreationDate(new Date());
114        }
115
116        log.debug("Creating " + job.toString());
117
118        PreparedStatement statement = null;
119        try {
120            connection.setAutoCommit(false);
121            statement = connection.prepareStatement("INSERT INTO jobs "
122                    + "(job_id, harvest_id, status, channel, forcemaxcount, "
123                    + "forcemaxbytes, forcemaxrunningtime, orderxml, " + "orderxmldoc, seedlist, "
124                    + "harvest_num, startdate, enddate, submitteddate, creationdate, "
125                    + "num_configs, edition, resubmitted_as_job, harvestname_prefix, snapshot) "
126                    + "VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?," + "?, ?, ?, ?, ?, ?)");
127
128            statement.setLong(1, job.getJobID());
129            statement.setLong(2, job.getOrigHarvestDefinitionID());
130            statement.setInt(3, job.getStatus().ordinal());
131            statement.setString(4, job.getChannel());
132            statement.setLong(5, job.getForceMaxObjectsPerDomain());
133            statement.setLong(6, job.getMaxBytesPerDomain());
134            statement.setLong(7, job.getMaxJobRunningTime());
135            DBUtils.setStringMaxLength(statement, 8, job.getOrderXMLName(), Constants.MAX_NAME_SIZE, job,
136                    "order.xml name");
137            final String orderString = job.getOrderXMLdoc().getXML();
138            DBUtils.setClobMaxLength(statement, 9, orderString, Constants.MAX_ORDERXML_SIZE, job, "order.xml");
139            DBUtils.setClobMaxLength(statement, 10, job.getSeedListAsString(), Constants.MAX_COMBINED_SEED_LIST_SIZE,
140                    job, "seedlist");
141            statement.setInt(11, job.getHarvestNum());
142            DBUtils.setDateMaybeNull(statement, 12, job.getActualStart());
143            DBUtils.setDateMaybeNull(statement, 13, job.getActualStop());
144            DBUtils.setDateMaybeNull(statement, 14, job.getSubmittedDate());
145            DBUtils.setDateMaybeNull(statement, 15, job.getCreationDate());
146
147            // The size of the configuration map == number of configurations
148            statement.setInt(16, job.getDomainConfigurationMap().size());
149            long initialEdition = 1;
150            statement.setLong(17, initialEdition);
151            DBUtils.setLongMaybeNull(statement, 18, job.getResubmittedAsJob());
152            statement.setString(19, job.getHarvestFilenamePrefix());
153            statement.setBoolean(20, job.isSnapshot());
154            statement.executeUpdate();
155            createJobConfigsEntries(connection, job);
156            connection.commit();
157            job.setEdition(initialEdition);
158        } catch (SQLException e) {
159            String message = "SQL error creating job " + job + " in database" + "\n"
160                    + ExceptionUtils.getSQLExceptionCause(e);
161            log.warn(message, e);
162            throw new IOFailure(message, e);
163        } finally {
164            DBUtils.rollbackIfNeeded(connection, "create job", job);
165            HarvestDBConnection.release(connection);
166        }
167    }
168
169    /**
170     * Create the entries in the job_configs table for this job. Since some jobs have up to 10000 configs, this must be
171     * optimized. The entries are only created, if job.configsChanged is true.
172     *
173     * @param dbconnection A connection to work on
174     * @param job The job to store entries for
175     * @throws SQLException If any problems occur during creation of the new entries in the job_configs table.
176     */
177    private void createJobConfigsEntries(Connection dbconnection, Job job) throws SQLException {
178        if (job.configsChanged) {
179            PreparedStatement statement = null;
180            String tmpTable = null;
181            Long jobID = job.getJobID();
182            try {
183                statement = dbconnection.prepareStatement("DELETE FROM job_configs WHERE job_id = ?");
184                statement.setLong(1, jobID);
185                statement.executeUpdate();
186                statement.close();
187                tmpTable = DBSpecifics.getInstance().getJobConfigsTmpTable(dbconnection);
188                final Map<String, String> domainConfigurationMap = job.getDomainConfigurationMap();
189                statement = dbconnection.prepareStatement("INSERT INTO " + tmpTable
190                        + " ( domain_name, config_name ) VALUES ( ?, ?)");
191                for (Map.Entry<String, String> entry : domainConfigurationMap.entrySet()) {
192                    statement.setString(1, entry.getKey());
193                    statement.setString(2, entry.getValue());
194                    statement.executeUpdate();
195                    statement.clearParameters();
196                }
197                statement.close();
198                // Now we have a temp table with all the domains and configs
199                statement = dbconnection.prepareStatement("INSERT INTO job_configs " + "( job_id, config_id ) "
200                        + "SELECT ?, configurations.config_id " + "  FROM domains, configurations, " + tmpTable
201                        + " WHERE domains.name = " + tmpTable + ".domain_name"
202                        + "   AND domains.domain_id = configurations.domain_id"
203                        + "   AND configurations.name = " + tmpTable + ".config_name"
204                );
205                statement.setLong(1, jobID);
206                int rows = statement.executeUpdate();
207                if (rows != domainConfigurationMap.size()) {
208                    log.debug("Domain or configuration in table for {} missing: Should have {}, got {}", job,
209                            domainConfigurationMap.size(), rows);
210                }
211                dbconnection.commit();
212            } finally {
213                if (tmpTable != null) {
214                    DBSpecifics.getInstance().dropJobConfigsTmpTable(dbconnection, tmpTable);
215                }
216                job.configsChanged = false;
217            }
218        }
219    }
220
221    /**
222     * Check whether a particular job exists.
223     *
224     * @param jobID Id of the job.
225     * @return true if the job exists in any state.
226     */
227    @Override
228    public boolean exists(Long jobID) {
229        ArgumentNotValid.checkNotNull(jobID, "Long jobID");
230
231        Connection c = HarvestDBConnection.get();
232        try {
233            return exists(c, jobID);
234        } finally {
235            HarvestDBConnection.release(c);
236        }
237    }
238
239    /**
240     * Check whether a particular job exists.
241     *
242     * @param c an open connection to the harvestDatabase
243     * @param jobID Id of the job.
244     * @return true if the job exists in any state.
245     */
246    private boolean exists(Connection c, Long jobID) {
247        return 1 == DBUtils.selectLongValue(c, "SELECT COUNT(*) FROM jobs WHERE job_id = ?", jobID);
248    }
249
250    /**
251     * Generates the next id of job.
252     *
253     * @param c an open connection to the harvestDatabase
254     * @return id
255     */
256    private Long generateNextID(Connection c) {
257        // Set to zero original, can be set after admin machine breakdown,
258        // and the use this as the point of reference.
259        Long restoreId = Settings.getLong(Constants.NEXT_JOB_ID);
260
261        Long maxVal = DBUtils.selectLongValue(c, "SELECT MAX(job_id) FROM jobs");
262        if (maxVal == null) {
263            maxVal = 0L;
264        }
265        // return the largest number of the two numbers: the NEXT_JOB_ID
266        // declared in settings and max value of job_id used
267        // in the jobs table.
268        return ((restoreId > maxVal) ? restoreId : maxVal + 1L);
269    }
270
271    /**
272     * Update a Job in persistent storage.
273     *
274     * @param job The Job to update
275     * @throws ArgumentNotValid If the Job is null
276     * @throws UnknownID If the Job doesn't exist in the DAO
277     * @throws IOFailure If writing the job to persistent storage fails
278     * @throws PermissionDenied If the job has been updated behind our backs
279     */
280    @Override
281    public synchronized void update(Job job) {
282        ArgumentNotValid.checkNotNull(job, "job");
283
284        Connection connection = HarvestDBConnection.get();
285        // Not done as a transaction as it's awfully big.
286        // TODO Make sure that a failed job update does... what?
287        PreparedStatement statement = null;
288        try {
289            final Long jobID = job.getJobID();
290            if (!exists(connection, jobID)) {
291                throw new UnknownID("Job id " + jobID + " is not known in persistent storage");
292            }
293
294            connection.setAutoCommit(false);
295            statement = connection.prepareStatement("UPDATE jobs SET " + "harvest_id = ?, status = ?, channel = ?, "
296                    + "forcemaxcount = ?, forcemaxbytes = ?, " + "forcemaxrunningtime = ?," + "orderxml = ?, "
297                    + "orderxmldoc = ?, seedlist = ?, " + "harvest_num = ?, harvest_errors = ?, "
298                    + "harvest_error_details = ?, upload_errors = ?, " + "upload_error_details = ?, startdate = ?,"
299                    + "enddate = ?, num_configs = ?, edition = ?, " + "submitteddate = ?, creationdate = ?, "
300                    + "resubmitted_as_job = ?, harvestname_prefix = ?," + "snapshot = ?"
301                    + " WHERE job_id = ? AND edition = ?");
302            statement.setLong(1, job.getOrigHarvestDefinitionID());
303            statement.setInt(2, job.getStatus().ordinal());
304            statement.setString(3, job.getChannel());
305            statement.setLong(4, job.getForceMaxObjectsPerDomain());
306            statement.setLong(5, job.getMaxBytesPerDomain());
307            statement.setLong(6, job.getMaxJobRunningTime());
308            DBUtils.setStringMaxLength(statement, 7, job.getOrderXMLName(), Constants.MAX_NAME_SIZE, job,
309                    "order.xml name");
310            final String orderreader = job.getOrderXMLdoc().getXML();
311            DBUtils.setClobMaxLength(statement, 8, orderreader, Constants.MAX_ORDERXML_SIZE, job, "order.xml");
312            DBUtils.setClobMaxLength(statement, 9, job.getSeedListAsString(), Constants.MAX_COMBINED_SEED_LIST_SIZE,
313                    job, "seedlist");
314            statement.setInt(10, job.getHarvestNum()); // Not in job yet
315            DBUtils.setStringMaxLength(statement, 11, job.getHarvestErrors(), Constants.MAX_ERROR_SIZE, job,
316                    "harvest_error");
317            DBUtils.setStringMaxLength(statement, 12, job.getHarvestErrorDetails(), Constants.MAX_ERROR_DETAIL_SIZE,
318                    job, "harvest_error_details");
319            DBUtils.setStringMaxLength(statement, 13, job.getUploadErrors(), Constants.MAX_ERROR_SIZE, job,
320                    "upload_error");
321            DBUtils.setStringMaxLength(statement, 14, job.getUploadErrorDetails(), Constants.MAX_ERROR_DETAIL_SIZE,
322                    job, "upload_error_details");
323            long edition = job.getEdition() + 1;
324            DBUtils.setDateMaybeNull(statement, 15, job.getActualStart());
325            DBUtils.setDateMaybeNull(statement, 16, job.getActualStop());
326            statement.setInt(17, job.getDomainConfigurationMap().size());
327            statement.setLong(18, edition);
328            DBUtils.setDateMaybeNull(statement, 19, job.getSubmittedDate());
329            DBUtils.setDateMaybeNull(statement, 20, job.getCreationDate());
330            DBUtils.setLongMaybeNull(statement, 21, job.getResubmittedAsJob());
331            statement.setString(22, job.getHarvestFilenamePrefix());
332            statement.setBoolean(23, job.isSnapshot());
333            statement.setLong(24, job.getJobID());
334            statement.setLong(25, job.getEdition());
335            final int rows = statement.executeUpdate();
336            if (rows == 0) {
337                String message = "Edition " + job.getEdition() + " has expired, not updating";
338                log.debug(message);
339                throw new PermissionDenied(message);
340            }
341            createJobConfigsEntries(connection, job);
342            connection.commit();
343            job.setEdition(edition);
344        } catch (SQLException e) {
345            String message = "SQL error updating job " + job + " in database" + "\n"
346                    + ExceptionUtils.getSQLExceptionCause(e);
347            log.warn(message, e);
348            throw new IOFailure(message, e);
349        } finally {
350            DBUtils.rollbackIfNeeded(connection, "update job", job);
351            HarvestDBConnection.release(connection);
352        }
353    }
354
355    /**
356     * Read a single job from the job database.
357     *
358     * @param jobID ID of the job.
359     * @return A Job object
360     * @throws UnknownID if the job id does not exist.
361     * @throws IOFailure if there was some problem talking to the database.
362     */
363    @Override
364    public Job read(long jobID) {
365        Connection connection = HarvestDBConnection.get();
366        try {
367            return read(connection, jobID);
368        } finally {
369            HarvestDBConnection.release(connection);
370        }
371    }
372
373    /**
374     * Read a single job from the job database.
375     *
376     * @param jobID ID of the job.
377     * @param connection an open connection to the harvestDatabase
378     * @return A Job object
379     * @throws UnknownID if the job id does not exist.
380     * @throws IOFailure if there was some problem talking to the database.
381     */
382    private synchronized Job read(Connection connection, Long jobID) {
383        if (!exists(connection, jobID)) {
384            throw new UnknownID("Job id " + jobID + " is not known in persistent storage");
385        }
386        PreparedStatement statement = null;
387        try {
388            statement = connection.prepareStatement("SELECT " + "harvest_id, status, channel, "
389                    + "forcemaxcount, forcemaxbytes, " + "forcemaxrunningtime, orderxml, "
390                    + "orderxmldoc, seedlist, harvest_num," + "harvest_errors, harvest_error_details, "
391                    + "upload_errors, upload_error_details, " + "startdate, enddate, submitteddate, creationdate, "
392                    + "edition, resubmitted_as_job, continuationof, harvestname_prefix, snapshot "
393                    + "FROM jobs WHERE job_id = ?");
394            statement.setLong(1, jobID);
395            ResultSet result = statement.executeQuery();
396            result.next();
397            long harvestID = result.getLong(1);
398            JobStatus status = JobStatus.fromOrdinal(result.getInt(2));
399            String channel = result.getString(3);
400            long forceMaxCount = result.getLong(4);
401            long forceMaxBytes = result.getLong(5);
402            long forceMaxRunningTime = result.getLong(6);
403            String orderxml = result.getString(7);
404
405            HeritrixTemplate orderXMLdoc = null;
406
407            boolean useClobs = DBSpecifics.getInstance().supportsClob();
408            String tmpStr;
409            if (useClobs) {
410                Clob clob = result.getClob(8);
411                tmpStr = clob.getSubString(1L, (int)clob.length());
412            } else {
413                tmpStr = result.getString(8);
414            }
415            orderXMLdoc = HeritrixTemplate.getTemplateFromString(tmpStr);
416            String seedlist = "";
417            if (useClobs) {
418                Clob clob = result.getClob(9);
419                seedlist = clob.getSubString(1, (int) clob.length());
420            } else {
421                seedlist = result.getString(9);
422            }
423
424            int harvestNum = result.getInt(10);
425            String harvestErrors = result.getString(11);
426            String harvestErrorDetails = result.getString(12);
427            String uploadErrors = result.getString(13);
428            String uploadErrorDetails = result.getString(14);
429            Date startdate = DBUtils.getDateMaybeNull(result, 15);
430            Date stopdate = DBUtils.getDateMaybeNull(result, 16);
431            Date submittedDate = DBUtils.getDateMaybeNull(result, 17);
432            Date creationDate = DBUtils.getDateMaybeNull(result, 18);
433            Long edition = result.getLong(19);
434            Long resubmittedAsJob = DBUtils.getLongMaybeNull(result, 20);
435            Long continuationOfJob = DBUtils.getLongMaybeNull(result, 21);
436            String harvestnamePrefix = result.getString(22);
437            boolean snapshot = result.getBoolean(23);
438            statement.close();
439            // IDs should match up in a natural join
440            // The following if-block is an attempt to fix Bug 1856, an
441            // unexplained derby deadlock, by making this statement a dirty
442            // read.
443            String domainStatement = "SELECT domains.name, configurations.name "
444                    + "FROM domains, configurations, job_configs " + "WHERE job_configs.job_id = ?"
445                    + "  AND job_configs.config_id = configurations.config_id"
446                    + "  AND domains.domain_id = configurations.domain_id";
447            if (Settings.get(CommonSettings.DB_SPECIFICS_CLASS).contains(CommonSettings.DB_IS_DERBY_IF_CONTAINS)) {
448                statement = connection.prepareStatement(domainStatement + " WITH UR");
449            } else {
450                statement = connection.prepareStatement(domainStatement);
451            }
452            statement.setLong(1, jobID);
453            result = statement.executeQuery();
454            Map<String, String> configurationMap = new HashMap<String, String>();
455            while (result.next()) {
456                String domainName = result.getString(1);
457                String configName = result.getString(2);
458                configurationMap.put(domainName, configName);
459            }
460            final Job job = new Job(harvestID, configurationMap, channel, snapshot, forceMaxCount, forceMaxBytes,
461                    forceMaxRunningTime, status, orderxml, orderXMLdoc, seedlist, harvestNum, continuationOfJob);
462            job.appendHarvestErrors(harvestErrors);
463            job.appendHarvestErrorDetails(harvestErrorDetails);
464            job.appendUploadErrors(uploadErrors);
465            job.appendUploadErrorDetails(uploadErrorDetails);
466            if (startdate != null) {
467                job.setActualStart(startdate);
468            }
469            if (stopdate != null) {
470                job.setActualStop(stopdate);
471            }
472
473            if (submittedDate != null) {
474                job.setSubmittedDate(submittedDate);
475            }
476
477            if (creationDate != null) {
478                job.setCreationDate(creationDate);
479            }
480
481            job.configsChanged = false;
482            job.setJobID(jobID);
483            job.setEdition(edition);
484
485            if (resubmittedAsJob != null) {
486                job.setResubmittedAsJob(resubmittedAsJob);
487            }
488            if (harvestnamePrefix == null) {
489                job.setDefaultHarvestNamePrefix();
490            } else {
491                job.setHarvestFilenamePrefix(harvestnamePrefix);
492            }
493            return job;
494        } catch (SQLException e) {
495            String message = "SQL error reading job " + jobID + " in database" + "\n"
496                    + ExceptionUtils.getSQLExceptionCause(e);
497            log.warn(message, e);
498            throw new IOFailure(message, e);
499        } finally  {
500                try {
501                                statement.close();
502                } catch (SQLException e) {
503                        log.warn("Exception thrown when trying to close statement", e);
504                        }
505        }
506    }
507
508    
509
510    /**
511     * Return a list of all jobs with the given status, ordered by id.
512     *
513     * @param status A given status.
514     * @return A list of all job with given status
515     */
516    @Override
517    public synchronized Iterator<Job> getAll(JobStatus status) {
518        ArgumentNotValid.checkNotNull(status, "JobStatus status");
519
520        Connection c = HarvestDBConnection.get();
521        try {
522            List<Long> idList = DBUtils.selectLongList(c, "SELECT job_id FROM jobs WHERE status = ? "
523                    + "ORDER BY job_id", status.ordinal());
524            List<Job> orderedJobs = new LinkedList<Job>();
525            for (Long jobId : idList) {
526                orderedJobs.add(read(c, jobId));
527            }
528            return orderedJobs.iterator();
529        } finally {
530            HarvestDBConnection.release(c);
531        }
532    }
533
534    /**
535     * Return a list of all job_id's representing jobs with the given status.
536     *
537     * @param status A given status.
538     * @return A list of all job_id's representing jobs with given status
539     * @throws ArgumentNotValid If the given status is not one of the five valid statuses specified in Job.
540     */
541    @Override
542    public Iterator<Long> getAllJobIds(JobStatus status) {
543        ArgumentNotValid.checkNotNull(status, "JobStatus status");
544
545        Connection c = HarvestDBConnection.get();
546        try {
547            List<Long> idList = DBUtils.selectLongList(c, "SELECT job_id FROM jobs WHERE status = ? ORDER BY job_id",
548                    status.ordinal());
549            return idList.iterator();
550        } finally {
551            HarvestDBConnection.release(c);
552        }
553    }
554
555    @Override
556    public Iterator<Long> getAllJobIds(JobStatus status, HarvestChannel channel) {
557        ArgumentNotValid.checkNotNull(status, "JobStatus status");
558        ArgumentNotValid.checkNotNull(channel, "Channel");
559
560        Connection c = HarvestDBConnection.get();
561        try {
562            List<Long> idList = DBUtils.selectLongList(c, "SELECT job_id FROM jobs WHERE status = ? AND channel = ? "
563                    + "ORDER BY job_id", status.ordinal(), channel.getName());
564            return idList.iterator();
565        } finally {
566            HarvestDBConnection.release(c);
567        }
568    }
569
570    /**
571     * Return a list of all jobs.
572     *
573     * @return A list of all jobs
574     */
575    @Override
576    public synchronized Iterator<Job> getAll() {
577        Connection c = HarvestDBConnection.get();
578        try {
579            List<Long> idList = DBUtils.selectLongList(c, "SELECT job_id FROM jobs ORDER BY job_id");
580            List<Job> orderedJobs = new LinkedList<Job>();
581            for (Long jobId : idList) {
582                orderedJobs.add(read(c, jobId));
583            }
584            return orderedJobs.iterator();
585        } finally {
586            HarvestDBConnection.release(c);
587        }
588    }
589
590    /**
591     * Return a list of all job_ids .
592     *
593     * @return A list of all job_ids
594     */
595    public Iterator<Long> getAllJobIds() {
596        Connection c = HarvestDBConnection.get();
597        try {
598            List<Long> idList = DBUtils.selectLongList(c, "SELECT job_id FROM jobs ORDER BY job_id");
599            return idList.iterator();
600        } finally {
601            HarvestDBConnection.release(c);
602        }
603    }
604
605    /**
606     * Get a list of small and immediately usable status information for given status and in given order. Is used by
607     * getStatusInfo functions in order to share code (and SQL)
608     *  TODO should also include given harvest run
609     *
610     * @param connection an open connection to the harvestDatabase
611     * @param jobStatusCode code for jobstatus, -1 if all
612     * @param asc true if it is to be sorted in ascending order, false if it is to be sorted in descending order
613     * @return List of JobStatusInfo objects for all jobs.
614     * @throws ArgumentNotValid for invalid jobStatusCode
615     * @throws IOFailure on trouble getting data from database
616     */
617    private List<JobStatusInfo> getStatusInfo(Connection connection, int jobStatusCode, boolean asc) {
618        // Validate jobStatusCode
619        // Throws ArgumentNotValid if it is an invalid job status
620        if (jobStatusCode != JobStatus.ALL_STATUS_CODE) {
621            JobStatus.fromOrdinal(jobStatusCode);
622        }
623
624        StringBuffer sqlBuffer = new StringBuffer("SELECT jobs.job_id, status, jobs.harvest_id, "
625                + "harvestdefinitions.name, harvest_num, harvest_errors,"
626                + " upload_errors, orderxml, num_configs, submitteddate, creationdate, startdate,"
627                + " enddate, resubmitted_as_job" + " FROM jobs, harvestdefinitions "
628                + " WHERE harvestdefinitions.harvest_id = jobs.harvest_id ");
629
630        if (jobStatusCode != JobStatus.ALL_STATUS_CODE) {
631            sqlBuffer.append(" AND status = ").append(jobStatusCode);
632        }
633        sqlBuffer.append(" ORDER BY jobs.job_id");
634        if (!asc) { // Assume default is ASCENDING
635            sqlBuffer.append(" " + HarvestStatusQuery.SORT_ORDER.DESC.name());
636        }
637
638        PreparedStatement statement = null;
639        try {
640            statement = connection.prepareStatement(sqlBuffer.toString());
641            ResultSet res = statement.executeQuery();
642            return makeJobStatusInfoListFromResultset(res);
643        } catch (SQLException e) {
644            String message = "SQL error asking for job status list in database" + "\n"
645                    + ExceptionUtils.getSQLExceptionCause(e);
646            log.warn(message, e);
647            throw new IOFailure(message, e);
648        }
649    }
650
651    /**
652     * Get a list of small and immediately usable status information for given job status.
653     *
654     * @param status The status asked for.
655     * @return List of JobStatusInfo objects for all jobs with given job status
656     * @throws ArgumentNotValid for invalid jobStatus
657     * @throws IOFailure on trouble getting data from database
658     */
659    @Override
660    public List<JobStatusInfo> getStatusInfo(JobStatus status) {
661        ArgumentNotValid.checkNotNull(status, "status");
662        Connection c = HarvestDBConnection.get();
663        try {
664            return getStatusInfo(c, status.ordinal(), true);
665        } finally {
666            HarvestDBConnection.release(c);
667        }
668    }
669
670    /**
671     * Get a list of small and immediately usable status information for given job status and in given job id order.
672     *
673     * @param query the user query
674     * @throws IOFailure on trouble getting data from database
675     */
676    @Override
677    public HarvestStatus getStatusInfo(HarvestStatusQuery query) {
678        log.debug("Constructing Harveststatus based on given query.");
679        PreparedStatement s = null;
680        Connection c = HarvestDBConnection.get();
681
682        try {
683            // Obtain total count without limit
684            // NB this will be a performance bottleneck if the table gets big
685            long totalRowsCount = 0;
686
687            s = buildSqlQuery(query, true).getPopulatedStatement(c);
688            ResultSet res = s.executeQuery();
689            res.next();
690            totalRowsCount = res.getLong(1);
691
692            s = buildSqlQuery(query, false).getPopulatedStatement(c);
693            res = s.executeQuery();
694            List<JobStatusInfo> jobs = makeJobStatusInfoListFromResultset(res);
695
696            log.debug("Harveststatus constructed based on given query.");
697            return new HarvestStatus(totalRowsCount, jobs);
698        } catch (SQLException e) {
699            String message = "SQL error asking for job status list in database" + "\n"
700                    + ExceptionUtils.getSQLExceptionCause(e);
701            log.warn(message, e);
702            throw new IOFailure(message, e);
703        } finally {
704            HarvestDBConnection.release(c);
705        }
706    }
707
708    /**
709     * Calculate all jobIDs to use for duplication reduction.
710     * <p>
711     * More precisely, this method calculates the following: If the job ID corresponds to a partial harvest, all jobIDs
712     * from the previous scheduled harvest are returned, or the empty list if this harvest hasn't been scheduled before.
713     * <p>
714     * If the job ID corresponds to a full harvest, the entire chain of harvests this is based on is returned, and all
715     * jobIDs from the previous chain of full harvests is returned.
716     * <p>
717     * This method is synchronized to avoid DB locking.
718     *
719     * @param jobID The job ID to find duplicate reduction data for.
720     * @return A list of job IDs (possibly empty) of potential previous harvests of this job, to use for duplicate
721     * reduction.
722     * @throws UnknownID if job ID is unknown
723     * @throws IOFailure on trouble querying database
724     */
725    public synchronized List<Long> getJobIDsForDuplicateReduction(long jobID) throws UnknownID {
726
727        Connection connection = HarvestDBConnection.get();
728        List<Long> jobs;
729        // Select the previous harvest from the same harvestdefinition
730        try {
731            if (!exists(connection, jobID)) {
732                throw new UnknownID("Job ID '" + jobID + "' does not exist in database");
733            }
734
735            jobs = DBUtils.selectLongList(connection, "SELECT jobs.job_id FROM jobs, jobs AS original_jobs"
736                    + " WHERE original_jobs.job_id=?" + " AND jobs.harvest_id=original_jobs.harvest_id"
737                    + " AND jobs.harvest_num=original_jobs.harvest_num-1", jobID);
738            List<Long> harvestDefinitions = getPreviousFullHarvests(connection, jobID);
739            if (!harvestDefinitions.isEmpty()) {
740                // Select all jobs from a given list of harvest definitions
741                jobs.addAll(DBUtils.selectLongList(connection, "SELECT jobs.job_id FROM jobs"
742                        + " WHERE jobs.harvest_id IN (" + StringUtils.conjoin(",", harvestDefinitions) + ")"));
743            }
744            return jobs;
745        } finally {
746            HarvestDBConnection.release(connection);
747        }
748    }
749
750    /**
751     * Find the harvest definition ids from this chain of snapshot harvests and the previous chain of snapshot harvests.
752     *
753     * @param connection an open connection to the harvestDatabase
754     * @param jobID The ID of the job
755     * @return A (possibly empty) list of harvest definition ids
756     */
757    private List<Long> getPreviousFullHarvests(Connection connection, long jobID) {
758        List<Long> results = new ArrayList<Long>();
759        // Find the jobs' fullharvest id
760        Long thisHarvest = DBUtils.selectFirstLongValueIfAny(connection,
761                "SELECT jobs.harvest_id FROM jobs, fullharvests WHERE jobs.harvest_id=fullharvests.harvest_id"
762                        + " AND jobs.job_id=?", jobID);
763
764        if (thisHarvest == null) {
765            // Not a full harvest
766            return results;
767        }
768
769        // Follow the chain of orginating IDs back
770        for (Long originatingHarvest = thisHarvest; originatingHarvest != null; originatingHarvest = DBUtils
771                .selectFirstLongValueIfAny(connection, "SELECT previoushd FROM fullharvests"
772                        + " WHERE fullharvests.harvest_id=?", originatingHarvest)) {
773            if (!originatingHarvest.equals(thisHarvest)) {
774                results.add(originatingHarvest);
775            }
776        }
777
778        // Find the first harvest in the chain
779        Long firstHarvest = thisHarvest;
780        if (!results.isEmpty()) {
781            firstHarvest = results.get(results.size() - 1);
782        }
783
784        // Find the last harvest in the chain before
785        Long olderHarvest = DBUtils.selectFirstLongValueIfAny(connection, "SELECT fullharvests.harvest_id"
786                + " FROM fullharvests, harvestdefinitions," + "  harvestdefinitions AS currenthd"
787                + " WHERE currenthd.harvest_id=?" + " AND fullharvests.harvest_id" + "=harvestdefinitions.harvest_id"
788                + " AND harvestdefinitions.submitted<currenthd.submitted" + " ORDER BY harvestdefinitions.submitted "
789                + HarvestStatusQuery.SORT_ORDER.DESC.name(), firstHarvest);
790        // Follow the chain of originating IDs back
791        // FIXME Rewrite this loop!
792        for (Long originatingHarvest = olderHarvest; originatingHarvest != null; originatingHarvest = DBUtils
793                .selectFirstLongValueIfAny(connection,
794                        "SELECT previoushd FROM fullharvests WHERE fullharvests.harvest_id=?", originatingHarvest)) {
795            results.add(originatingHarvest);
796        }
797        return results;
798    }
799
800    /**
801     * Returns the number of existing jobs.
802     *
803     * @return Number of jobs in 'jobs' table
804     */
805    @Override
806    public int getCountJobs() {
807        Connection c = HarvestDBConnection.get();
808        try {
809            return DBUtils.selectIntValue(c, "SELECT COUNT(*) FROM jobs");
810        } finally {
811            HarvestDBConnection.release(c);
812        }
813    }
814
815    @Override
816    public synchronized long rescheduleJob(long oldJobID) {
817        Connection connection = HarvestDBConnection.get();
818        long newJobID = generateNextID(connection);
819        PreparedStatement statement = null;
820        try {
821            statement = connection.prepareStatement("SELECT status FROM jobs WHERE job_id = ?");
822            statement.setLong(1, oldJobID);
823            ResultSet res = statement.executeQuery();
824            if (!res.next()) {
825                throw new UnknownID("No job with ID " + oldJobID + " to resubmit");
826            }
827            final JobStatus currentJobStatus = JobStatus.fromOrdinal(res.getInt(1));
828            if (currentJobStatus != JobStatus.SUBMITTED && currentJobStatus != JobStatus.FAILED) {
829                throw new IllegalState("Job " + oldJobID + " is not ready to be copied.");
830            }
831
832            // Now do the actual copying.
833            // Note that startdate, and enddate is not copied.
834            // They must be null in JobStatus NEW.
835            statement.close();
836            connection.setAutoCommit(false);
837
838            statement = connection.prepareStatement("INSERT INTO jobs "
839                    + " (job_id, harvest_id, channel, snapshot, status," + "  forcemaxcount, forcemaxbytes, orderxml,"
840                    + "  orderxmldoc, seedlist, harvest_num," + "  num_configs, edition, continuationof) "
841                    + " SELECT ?, harvest_id, channel, snapshot, ?," + "  forcemaxcount, forcemaxbytes, orderxml,"
842                    + "  orderxmldoc, seedlist, harvest_num," + " num_configs, ?, ?" + " FROM jobs WHERE job_id = ?");
843            statement.setLong(1, newJobID);
844            statement.setLong(2, JobStatus.NEW.ordinal());
845            long initialEdition = 1;
846            statement.setLong(3, initialEdition);
847            Long continuationOf = null;
848            // In case we want to try to continue using the Heritrix recover log
849            if (currentJobStatus == JobStatus.FAILED) {
850                continuationOf = oldJobID;
851            }
852            DBUtils.setLongMaybeNull(statement, 4, continuationOf);
853
854            statement.setLong(5, oldJobID);
855
856            statement.executeUpdate();
857            statement.close();
858            statement = connection.prepareStatement("INSERT INTO job_configs "
859                    + "( job_id, config_id ) SELECT ?, config_id FROM job_configs WHERE job_id = ?");
860            statement.setLong(1, newJobID);
861            statement.setLong(2, oldJobID);
862            statement.executeUpdate();
863            statement.close();
864            statement = connection.prepareStatement("UPDATE jobs SET status = ?, resubmitted_as_job = ? "
865                    + " WHERE job_id = ?");
866            statement.setInt(1, JobStatus.RESUBMITTED.ordinal());
867            statement.setLong(2, newJobID);
868            statement.setLong(3, oldJobID);
869            statement.executeUpdate();
870            connection.commit();
871        } catch (SQLException e) {
872            String message = "SQL error rescheduling job #" + oldJobID + " in database" + "\n"
873                    + ExceptionUtils.getSQLExceptionCause(e);
874            log.warn(message, e);
875            throw new IOFailure(message, e);
876        } finally {
877            DBUtils.closeStatementIfOpen(statement);
878            DBUtils.rollbackIfNeeded(connection, "resubmit job", oldJobID);
879            HarvestDBConnection.release(connection);
880        }
881        log.info("Job #{} successfully as job #{}", oldJobID, newJobID);
882        return newJobID;
883    }
884
885    /**
886     * Helper-method that constructs a list of JobStatusInfo objects from the given resultset.
887     *
888     * @param res a given resultset
889     * @return a list of JobStatusInfo objects
890     * @throws SQLException If any problem with accessing the data in the ResultSet
891     */
892    private List<JobStatusInfo> makeJobStatusInfoListFromResultset(ResultSet res) throws SQLException {
893        List<JobStatusInfo> joblist = new ArrayList<JobStatusInfo>();
894        while (res.next()) {
895            final long jobId = res.getLong(1);
896            joblist.add(new JobStatusInfo(jobId, JobStatus.fromOrdinal(res.getInt(2)), res.getLong(3),
897                    res.getString(4), res.getInt(5), res.getString(6), res.getString(7), res.getString(8), res
898                            .getInt(9), DBUtils.getDateMaybeNull(res, 10), DBUtils.getDateMaybeNull(res, 11), DBUtils
899                            .getDateMaybeNull(res, 12), DBUtils.getDateMaybeNull(res, 13), DBUtils.getLongMaybeNull(
900                            res, 14)));
901        }
902        return joblist;
903    }
904
905    /**
906     * Internal utility class to build a SQL query using a prepared statement.
907     */
908    private class HarvestStatusQueryBuilder {
909        /** The sql string. */
910        private String sqlString;
911        // from java.sql.Types
912        /** list of parameter classes. */
913        private LinkedList<Class<?>> paramClasses = new LinkedList<Class<?>>();
914        /** list of parameter values. */
915        private LinkedList<Object> paramValues = new LinkedList<Object>();
916
917        /**
918         * Constructor.
919         */
920        HarvestStatusQueryBuilder() {
921            super();
922        }
923
924        /**
925         * @param sqlString the sqlString to set
926         */
927        void setSqlString(String sqlString) {
928            this.sqlString = sqlString;
929        }
930
931        /**
932         * Add the given class and given value to the list of paramClasses and paramValues respectively.
933         *
934         * @param clazz a given class.
935         * @param value a given value
936         */
937        void addParameter(Class<?> clazz, Object value) {
938            paramClasses.addLast(clazz);
939            paramValues.addLast(value);
940        }
941
942        /**
943         * Prepare a statement for the database that uses the sqlString, and the paramClasses, and paramValues. Only
944         * Integer, Long, String, and Date values accepted.
945         *
946         * @param c an Open connection to the harvestDatabase
947         * @return the prepared statement
948         * @throws SQLException If unable to prepare the statement
949         * @throws UnknownID If one of the parameter classes is unexpected
950         */
951        PreparedStatement getPopulatedStatement(Connection c) throws SQLException {
952            PreparedStatement stm = c.prepareStatement(sqlString);
953
954            Iterator<Class<?>> pClasses = paramClasses.iterator();
955            Iterator<Object> pValues = paramValues.iterator();
956            int pIndex = 0;
957            while (pClasses.hasNext()) {
958                pIndex++;
959                Class<?> pClass = pClasses.next();
960                Object pVal = pValues.next();
961
962                if (Integer.class.equals(pClass)) {
963                    stm.setInt(pIndex, (Integer) pVal);
964                } else if (Long.class.equals(pClass)) {
965                    stm.setLong(pIndex, (Long) pVal);
966                } else if (String.class.equals(pClass)) {
967                    stm.setString(pIndex, (String) pVal);
968                } else if (java.sql.Date.class.equals(pClass)) {
969                    stm.setDate(pIndex, (java.sql.Date) pVal);
970                } else {
971                    throw new UnknownID("Unexpected parameter class " + pClass);
972                }
973            }
974            return stm;
975        }
976
977    }
978
979    /**
980     * Builds a query to fetch jobs according to selection criteria.
981     *
982     * @param query the selection criteria.
983     * @param count build a count query instead of selecting columns.
984     * @return the proper SQL query.
985     */
986    private HarvestStatusQueryBuilder buildSqlQuery(HarvestStatusQuery query, boolean count) {
987        HarvestStatusQueryBuilder sq = new HarvestStatusQueryBuilder();
988        StringBuffer sql = new StringBuffer("SELECT");
989        if (count) {
990            sql.append(" count(*)");
991        } else {
992            sql.append(" jobs.job_id, status, jobs.harvest_id,");
993            sql.append(" harvestdefinitions.name, harvest_num,");
994            sql.append(" harvest_errors, upload_errors, orderxml,");
995            sql.append(" num_configs, submitteddate, creationdate, startdate, enddate,");
996            sql.append(" resubmitted_as_job");
997        }
998        sql.append(" FROM jobs, harvestdefinitions ");
999        sql.append(" WHERE harvestdefinitions.harvest_id = jobs.harvest_id ");
1000
1001        JobStatus[] jobStatuses = query.getSelectedJobStatuses();
1002        if (jobStatuses.length > 0) {
1003            if (jobStatuses.length == 1) {
1004                int statusOrdinal = jobStatuses[0].ordinal();
1005                sql.append(" AND status = ?");
1006                sq.addParameter(Integer.class, statusOrdinal);
1007            } else {
1008                sql.append("AND (status = ");
1009                sql.append(jobStatuses[0].ordinal());
1010                for (int i = 1; i < jobStatuses.length; i++) {
1011                    sql.append(" OR status = ?");
1012                    sq.addParameter(Integer.class, jobStatuses[i].ordinal());
1013                }
1014                sql.append(")");
1015            }
1016        }
1017
1018        String harvestName = query.getHarvestName();
1019        boolean caseSensitiveHarvestName = query.getCaseSensitiveHarvestName();
1020        if (!harvestName.isEmpty()) {
1021            if (caseSensitiveHarvestName) {
1022                if (harvestName.indexOf(HarvestStatusQuery.HARVEST_NAME_WILDCARD) == -1) {
1023                    // No wildcard, exact match
1024                    sql.append(" AND harvestdefinitions.name = ?");
1025                    sq.addParameter(String.class, harvestName);
1026                } else {
1027                    String harvestNamePattern = harvestName.replaceAll("\\*", "%");
1028                    sql.append(" AND harvestdefinitions.name LIKE ?");
1029                    sq.addParameter(String.class, harvestNamePattern);
1030                }
1031            } else {
1032                harvestName = harvestName.toUpperCase();
1033                if (harvestName.indexOf(HarvestStatusQuery.HARVEST_NAME_WILDCARD) == -1) {
1034                    // No wildcard, exact match
1035                    sql.append(" AND UPPER(harvestdefinitions.name) = ?");
1036                    sq.addParameter(String.class, harvestName);
1037                } else {
1038                    String harvestNamePattern = harvestName.replaceAll("\\*", "%");
1039                    sql.append(" AND UPPER(harvestdefinitions.name)  LIKE ?");
1040                    sq.addParameter(String.class, harvestNamePattern);
1041                }
1042            }
1043        }
1044
1045        Long harvestRun = query.getHarvestRunNumber();
1046        if (harvestRun != null) {
1047            sql.append(" AND jobs.harvest_num = ?");
1048            sq.addParameter(Long.class, harvestRun);
1049        }
1050
1051        Long harvestId = query.getHarvestId();
1052        if (harvestId != null) {
1053            sql.append(" AND harvestdefinitions.harvest_id = ?");
1054            sq.addParameter(Long.class, harvestId);
1055        }
1056
1057        long startDate = query.getStartDate();
1058        if (startDate != HarvestStatusQuery.DATE_NONE) {
1059            sql.append(" AND startdate >= ?");
1060            sq.addParameter(java.sql.Date.class, new java.sql.Date(startDate));
1061        }
1062
1063        long endDate = query.getEndDate();
1064        if (endDate != HarvestStatusQuery.DATE_NONE) {
1065            sql.append(" AND enddate < ?");
1066            // end date must be set +1 day at midnight
1067            Calendar cal = Calendar.getInstance();
1068            cal.setTimeInMillis(endDate);
1069            cal.roll(Calendar.DAY_OF_YEAR, 1);
1070            sq.addParameter(java.sql.Date.class, new java.sql.Date(cal.getTimeInMillis()));
1071        }
1072
1073        if (!count) {
1074            sql.append(" ORDER BY jobs.job_id");
1075            if (!query.isSortAscending()) {
1076                sql.append(" " + SORT_ORDER.DESC.name());
1077            } else {
1078                sql.append(" " + SORT_ORDER.ASC.name());
1079            }
1080
1081            long pagesize = query.getPageSize();
1082            if (pagesize != HarvestStatusQuery.PAGE_SIZE_NONE) {
1083                sql.append(" "
1084                        + DBSpecifics.getInstance().getOrderByLimitAndOffsetSubClause(pagesize,
1085                                (query.getStartPageIndex() - 1) * pagesize));
1086            }
1087        }
1088
1089        sq.setSqlString(sql.toString());
1090        return sq;
1091    }
1092
1093    /**
1094     * Get Jobstatus for the job with the given id.
1095     *
1096     * @param jobID A given Jobid
1097     * @return the Jobstatus for the job with the given id.
1098     * @throws UnknownID if no job exists with id jobID
1099     */
1100    public JobStatus getJobStatus(Long jobID) {
1101        ArgumentNotValid.checkNotNull(jobID, "Long jobID");
1102
1103        Connection c = HarvestDBConnection.get();
1104        try {
1105            Integer statusAsInteger = DBUtils.selectIntValue(c, "SELECT status FROM jobs WHERE job_id = ?", jobID);
1106            if (statusAsInteger == null) {
1107                throw new UnknownID("No known job with id=" + jobID);
1108            }
1109            return JobStatus.fromOrdinal(statusAsInteger);
1110        } finally {
1111            HarvestDBConnection.release(c);
1112        }
1113    }
1114
1115    /**
1116     * Get a list of AliasInfo objects for all the domains included in the job.
1117     *
1118     * @return a list of AliasInfo objects for all the domains included in the job.
1119     */
1120    public List<AliasInfo> getJobAliasInfo(Job job) {
1121        List<AliasInfo> aliases = new ArrayList<AliasInfo>();
1122        DomainDAO dao = DomainDAO.getInstance();
1123        for (String domain : job.getDomainConfigurationMap().keySet()) {
1124            aliases.addAll(dao.getAliases(domain));
1125        }
1126        return aliases;
1127    }
1128}