001/* 002 * #%L 003 * Netarchivesuite - common 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023 024package dk.netarkivet.common.lifecycle; 025 026import java.util.Arrays; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.ScheduledFuture; 029import java.util.concurrent.ScheduledThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import dk.netarkivet.common.exceptions.ArgumentNotValid; 036import dk.netarkivet.common.utils.TimeUtils; 037 038/** 039 * This class wraps a {@link ScheduledThreadPoolExecutor}, allowing to periodically run one or several {@link Runnable} 040 * tasks (fixed rate execution). It actively monitors task execution in a separate "checker" thread, allowing to catch 041 * and process any {@link RuntimeException} that would be thrown during task execution, which cannot be done by simply 042 * overriding {@link ScheduledThreadPoolExecutor#afterExecute}. 043 * <p> 044 * TODO Currently {@link RuntimeException} are only caught and logged, but the executor stops scheduling future 045 * executions. We should implement a configurable restart mechanism, possibly with exception filtering. 046 */ 047public final class PeriodicTaskExecutor { 048 049 /** The class logger. */ 050 private static final Logger log = LoggerFactory.getLogger(PeriodicTaskExecutor.class); 051 052 /** 053 * Represents a periodic task. 054 */ 055 public static class PeriodicTask { 056 057 /** A string identifying the task. It should be unique for this executor, though there is no such check made. */ 058 private final String taskId; 059 060 /** The actual task implementation. */ 061 private final Runnable task; 062 063 /** Delay in seconds between starting the executor and the initial task execution. */ 064 private final long secondsBeforeFirstExec; 065 066 /** Delay in seconds between two successive task executions. */ 067 private final long secondsBetweenExec; 068 069 /** The wrapper object for future task executions. */ 070 private ScheduledFuture<?> future = null; 071 072 /** 073 * Builds a new task. 074 * 075 * @param taskId the task id string (should be unique) 076 * @param task the actual {@link Runnable} object. 077 * @param secondsBeforeFirstExec the delay in seconds between starting the executor and the initial task 078 * execution. 079 * @param secondsBetweenExec the delay in seconds between two successive task executions. 080 */ 081 public PeriodicTask(String taskId, Runnable task, long secondsBeforeFirstExec, long secondsBetweenExec) { 082 super(); 083 this.taskId = taskId; 084 this.task = task; 085 this.secondsBeforeFirstExec = secondsBeforeFirstExec; 086 this.secondsBetweenExec = secondsBetweenExec; 087 } 088 089 /** 090 * Set the designated ScheduledFuture object to the one given as argument. 091 * 092 * @param future a given ScheduledFuture 093 */ 094 void setFuture(ScheduledFuture<?> future) { 095 this.future = future; 096 } 097 098 } 099 100 /** The actual executor. One thread dedicated to each task. */ 101 private final ScheduledThreadPoolExecutor exec; 102 103 /** Execution status flag, used to control the termination of the checker thread. */ 104 private boolean alive = false; 105 106 /** 107 * Separate thread that actively monitors the task executions and catches any {@link ExecutionException} that may 108 * occur during an execution. 109 */ 110 private Thread checkerThread = null; 111 112 /** The tasks to run. */ 113 private final PeriodicTask[] tasks; 114 115 /** 116 * Builds an executor for a single task. 117 * 118 * @param taskId the task id string (should be unique) 119 * @param task the actual {@link Runnable} object. 120 * @param secondsBeforeFirstExec the delay in seconds between starting the executor and the initial task execution. 121 * @param secondsBetweenExec the delay in seconds between two successive task executions. 122 */ 123 public PeriodicTaskExecutor(String taskId, Runnable task, long secondsBeforeFirstExec, long secondsBetweenExec) { 124 this(new PeriodicTask(taskId, task, secondsBeforeFirstExec, secondsBetweenExec)); 125 } 126 127 /** 128 * Builds an executor for a set of tasks. 129 * 130 * @param tasks the task definitions. 131 */ 132 public PeriodicTaskExecutor(PeriodicTask... tasks) { 133 ArgumentNotValid.checkNotNull(tasks, "tasks"); 134 ArgumentNotValid.checkNotNullOrEmpty(Arrays.asList(tasks), "tasks"); 135 136 this.tasks = tasks; 137 this.exec = new ScheduledThreadPoolExecutor(tasks.length); 138 139 alive = true; 140 141 String id = ""; 142 for (PeriodicTask t : tasks) { 143 ScheduledFuture<?> future = exec.scheduleAtFixedRate(t.task, t.secondsBeforeFirstExec, 144 t.secondsBetweenExec, TimeUnit.SECONDS); 145 t.setFuture(future); 146 id += "_" + t.taskId; 147 } 148 149 checkerThread = new Thread(id.hashCode() + "-checker") { 150 public void run() { 151 while (alive) { 152 checkExecution(); 153 try { 154 Thread.sleep(TimeUtils.SECOND_IN_MILLIS); 155 } catch (InterruptedException e) { 156 log.trace("checkerThread interrupted."); 157 } 158 } 159 } 160 }; 161 162 checkerThread.start(); 163 } 164 165 /** 166 * Checks tasks execution. Called by the checker thread. 167 */ 168 private synchronized void checkExecution() { 169 try { 170 for (PeriodicTask t : tasks) { 171 t.future.get(); 172 } 173 } catch (InterruptedException e) { 174 log.trace("checkExecution was interrupted."); 175 } catch (ExecutionException e) { 176 log.error("Task threw exception: {}", e.getCause(), e); 177 } 178 } 179 180 /** 181 * Shuts down the executor, attempting to stop any ongoing task execution. 182 */ 183 public void shutdown() { 184 alive = false; 185 checkerThread.interrupt(); 186 for (PeriodicTask t : tasks) { 187 t.future.cancel(true); 188 } 189 } 190 191}