001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.common.lifecycle;
025
026import java.util.Arrays;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.ScheduledFuture;
029import java.util.concurrent.ScheduledThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import dk.netarkivet.common.exceptions.ArgumentNotValid;
036import dk.netarkivet.common.utils.TimeUtils;
037
038/**
039 * This class wraps a {@link ScheduledThreadPoolExecutor}, allowing to periodically run one or several {@link Runnable}
040 * tasks (fixed rate execution). It actively monitors task execution in a separate "checker" thread, allowing to catch
041 * and process any {@link RuntimeException} that would be thrown during task execution, which cannot be done by simply
042 * overriding {@link ScheduledThreadPoolExecutor#afterExecute}.
043 * <p>
044 * TODO Currently {@link RuntimeException} are only caught and logged, but the executor stops scheduling future
045 * executions. We should implement a configurable restart mechanism, possibly with exception filtering.
046 */
047public final class PeriodicTaskExecutor {
048
049    /** The class logger. */
050    private static final Logger log = LoggerFactory.getLogger(PeriodicTaskExecutor.class);
051
052    /**
053     * Represents a periodic task.
054     */
055    public static class PeriodicTask {
056
057        /** A string identifying the task. It should be unique for this executor, though there is no such check made. */
058        private final String taskId;
059
060        /** The actual task implementation. */
061        private final Runnable task;
062
063        /** Delay in seconds between starting the executor and the initial task execution. */
064        private final long secondsBeforeFirstExec;
065
066        /** Delay in seconds between two successive task executions. */
067        private final long secondsBetweenExec;
068
069        /** The wrapper object for future task executions. */
070        private ScheduledFuture<?> future = null;
071
072        /**
073         * Builds a new task.
074         *
075         * @param taskId the task id string (should be unique)
076         * @param task the actual {@link Runnable} object.
077         * @param secondsBeforeFirstExec the delay in seconds between starting the executor and the initial task
078         * execution.
079         * @param secondsBetweenExec the delay in seconds between two successive task executions.
080         */
081        public PeriodicTask(String taskId, Runnable task, long secondsBeforeFirstExec, long secondsBetweenExec) {
082            super();
083            this.taskId = taskId;
084            this.task = task;
085            this.secondsBeforeFirstExec = secondsBeforeFirstExec;
086            this.secondsBetweenExec = secondsBetweenExec;
087        }
088
089        /**
090         * Set the designated ScheduledFuture object to the one given as argument.
091         *
092         * @param future a given ScheduledFuture
093         */
094        void setFuture(ScheduledFuture<?> future) {
095            this.future = future;
096        }
097
098    }
099
100    /** The actual executor. One thread dedicated to each task. */
101    private final ScheduledThreadPoolExecutor exec;
102
103    /** Execution status flag, used to control the termination of the checker thread. */
104    private boolean alive = false;
105
106    /**
107     * Separate thread that actively monitors the task executions and catches any {@link ExecutionException} that may
108     * occur during an execution.
109     */
110    private Thread checkerThread = null;
111
112    /** The tasks to run. */
113    private final PeriodicTask[] tasks;
114
115    /**
116     * Builds an executor for a single task.
117     *
118     * @param taskId the task id string (should be unique)
119     * @param task the actual {@link Runnable} object.
120     * @param secondsBeforeFirstExec the delay in seconds between starting the executor and the initial task execution.
121     * @param secondsBetweenExec the delay in seconds between two successive task executions.
122     */
123    public PeriodicTaskExecutor(String taskId, Runnable task, long secondsBeforeFirstExec, long secondsBetweenExec) {
124        this(new PeriodicTask(taskId, task, secondsBeforeFirstExec, secondsBetweenExec));
125    }
126
127    /**
128     * Builds an executor for a set of tasks.
129     *
130     * @param tasks the task definitions.
131     */
132    public PeriodicTaskExecutor(PeriodicTask... tasks) {
133        ArgumentNotValid.checkNotNull(tasks, "tasks");
134        ArgumentNotValid.checkNotNullOrEmpty(Arrays.asList(tasks), "tasks");
135
136        this.tasks = tasks;
137        this.exec = new ScheduledThreadPoolExecutor(tasks.length);
138
139        alive = true;
140
141        String id = "";
142        for (PeriodicTask t : tasks) {
143            ScheduledFuture<?> future = exec.scheduleAtFixedRate(t.task, t.secondsBeforeFirstExec,
144                    t.secondsBetweenExec, TimeUnit.SECONDS);
145            t.setFuture(future);
146            id += "_" + t.taskId;
147        }
148
149        checkerThread = new Thread(id.hashCode() + "-checker") {
150            public void run() {
151                while (alive) {
152                    checkExecution();
153                    try {
154                        Thread.sleep(TimeUtils.SECOND_IN_MILLIS);
155                    } catch (InterruptedException e) {
156                        log.trace("checkerThread interrupted.");
157                    }
158                }
159            }
160        };
161
162        checkerThread.start();
163    }
164
165    /**
166     * Checks tasks execution. Called by the checker thread.
167     */
168    private synchronized void checkExecution() {
169        try {
170            for (PeriodicTask t : tasks) {
171                t.future.get();
172            }
173        } catch (InterruptedException e) {
174            log.trace("checkExecution was interrupted.");
175        } catch (ExecutionException e) {
176            log.error("Task threw exception: {}", e.getCause(), e);
177        }
178    }
179
180    /**
181     * Shuts down the executor, attempting to stop any ongoing task execution.
182     */
183    public void shutdown() {
184        alive = false;
185        checkerThread.interrupt();
186        for (PeriodicTask t : tasks) {
187            t.future.cancel(true);
188        }
189    }
190
191}