View Javadoc

1   /*
2    * #%L
3    * Bitrepository Integrity Service
4    * %%
5    * Copyright (C) 2010 - 2012 The State and University Library, The Royal Library and The State Archives, Denmark
6    * %%
7    * This program is free software: you can redistribute it and/or modify
8    * it under the terms of the GNU Lesser General Public License as 
9    * published by the Free Software Foundation, either version 2.1 of the 
10   * License, or (at your option) any later version.
11   * 
12   * This program is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15   * GNU General Lesser Public License for more details.
16   * 
17   * You should have received a copy of the GNU General Lesser Public 
18   * License along with this program.  If not, see
19   * <http://www.gnu.org/licenses/lgpl-2.1.html>.
20   * #L%
21   */
22  
23  package org.bitrepository.service.workflow;
24  
25  import org.bitrepository.common.utils.SettingsUtils;
26  import org.bitrepository.service.scheduler.JobEventListener;
27  import org.bitrepository.service.scheduler.JobScheduler;
28  import org.bitrepository.settings.referencesettings.Schedule;
29  import org.bitrepository.settings.referencesettings.WorkflowConfiguration;
30  import org.bitrepository.settings.referencesettings.WorkflowSettings;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  import java.util.*;
35  
36  public abstract class WorkflowManager {
37      private final Logger log = LoggerFactory.getLogger(this.getClass());
38      private final JobScheduler scheduler;
39      private final WorkflowContext context;
40      private final Map<JobID, Workflow> workflows = new HashMap<JobID, Workflow>();
41      private final Map<String, List<JobID>> collectionWorkflows = new HashMap<String, List<JobID>>();
42      private final Map<JobID, List<WorkflowStatistic>> statistics = new HashMap<JobID, List<WorkflowStatistic>>();
43      public static final int MAX_NUMBER_OF_STATISTISCS_FOR_A_WORKFLOW = 1;
44  
45      public WorkflowManager(
46              WorkflowContext context,
47              WorkflowSettings configuration,
48              JobScheduler scheduler) {
49          this.context = context;
50          this.scheduler = scheduler;
51          loadWorkFlows(configuration);
52          scheduler.addJobEventListener(new WorkflowEventListener());
53      }
54  
55      public String startWorkflow(JobID jobID) {
56          return scheduler.startJob(getWorkflow(jobID));
57      }
58  
59      public List<JobID> getWorkflows(String collectionID) {
60          return collectionWorkflows.get(collectionID);
61      }
62  
63      public Workflow getWorkflow(JobID workflowID) {
64          if (workflows.containsKey(workflowID)) {
65              return workflows.get(workflowID);
66          } else {
67              throw new IllegalArgumentException("Unknown workflow " + workflowID);
68          }
69      }
70  
71      public WorkflowStatistic getCurrentStatistics(JobID jobID) {
72          return getWorkflow(jobID).getWorkflowStatistics();
73      }
74  
75      public WorkflowStatistic getLastCompleteStatistics(JobID workflowID) {
76          getWorkflow(workflowID);
77          if (statistics.containsKey(workflowID)) {
78              List<WorkflowStatistic> stats = statistics.get(workflowID);
79              return stats.get(stats.size()-1);
80          } else return null;
81      }
82  
83      public Date getNextScheduledRun(JobID jobID) {
84          Date nextRun = scheduler.getNextRun(jobID);
85          if (nextRun == null) {
86              if (workflows.containsKey(jobID)) { // Unscheduled job
87                  return null;
88              } else {
89                  throw new IllegalArgumentException("Unknown workflow " + jobID);
90              }
91          }
92          return nextRun;
93      }
94  
95      public long getRunInterval(JobID jobID) {
96          long interval = scheduler.getRunInterval(jobID);
97          if (interval == -1) {
98              if (workflows.containsKey(jobID)) { // Unscheduled job
99                  return -1;
100             } else {
101                 throw new IllegalArgumentException("Unknown workflow " + jobID);
102             }
103         }
104         return interval;
105     }
106 
107     private void loadWorkFlows(WorkflowSettings configuration) {
108         for (WorkflowConfiguration workflowConf:configuration.getWorkflow()) {
109             log.info("Scheduling from configuration: " + workflowConf);
110             List<String> unscheduledCollections = new LinkedList<String>(SettingsUtils.getAllCollectionsIDs());
111             try {
112                 if (workflowConf.getSchedules() != null) {
113                     for (Schedule schedule:workflowConf.getSchedules().getSchedule()) {
114                         List<String> collectionsToScheduleWorkflowFor;
115                         if (schedule.isSetCollections()) {
116                             collectionsToScheduleWorkflowFor = schedule.getCollections().getCollectionID();
117                         } else {
118                             collectionsToScheduleWorkflowFor = SettingsUtils.getAllCollectionsIDs();
119                         }
120                         for (String collectionID:collectionsToScheduleWorkflowFor) {
121                             Workflow workflow =
122                                     (Workflow)lookupClass(workflowConf.getWorkflowClass()).newInstance();
123                             workflow.initialise(context, collectionID);
124                             scheduler.schedule(workflow, schedule.getWorkflowInterval());
125                             addWorkflow(collectionID, workflow);
126                             unscheduledCollections.remove(collectionID);
127                         }
128                     }
129                 }
130                 // Create a instance of all workflows not explicitly scheduled.
131                 for (String collectionID:unscheduledCollections) {
132                     Workflow workflow =
133                             (Workflow)Class.forName(workflowConf.getWorkflowClass()).newInstance();
134                     workflow.initialise(context, collectionID);
135                     addWorkflow(collectionID, workflow);
136                 }
137             } catch (Exception e) {
138                 log.error("Unable to load workflow " + workflowConf.getWorkflowClass(), e);
139             }
140         }
141     }
142 
143     private Class lookupClass(String settingsDefinedClass) throws ClassNotFoundException {
144         String fullClassName;
145         if (settingsDefinedClass.indexOf('.') == -1) {
146             fullClassName = getDefaultWorkflowPackage() + "." + settingsDefinedClass;
147         } else {
148             fullClassName = settingsDefinedClass;
149         }
150         return Class.forName(fullClassName);
151     }
152 
153     private void addWorkflow(String collectionID, Workflow workflow) {
154         workflows.put(workflow.getJobID(), workflow);
155         if (!collectionWorkflows.containsKey(collectionID)) {
156             collectionWorkflows.put(collectionID, new LinkedList<JobID>());
157         }
158         collectionWorkflows.get(collectionID).add(workflow.getJobID());
159     }
160 
161     /**
162      * Allows subclasses to define a workflow package where workflow classes defined with a simplename in the settings
163      * will be prefixed with the namespace defined here.
164      */
165     protected abstract String getDefaultWorkflowPackage();
166 
167     /**
168      * Stores workflow statistics when a workflow has finished.
169      */
170     public class WorkflowEventListener implements JobEventListener {
171         @Override
172         public void jobStarted(SchedulableJob job) {}
173 
174         @Override
175         public void jobFailed(SchedulableJob job) {}
176 
177         /**
178          * Adds the workflow statistics to the statistics list for this workflow. Will also remove older statistisics
179          * if the number of statistics exceeds <code>MAX_NUMBER_OF_STATISTISCS_FOR_A_WORKFLOW</code>.
180          * @param job
181          */
182         @Override
183         public void jobFinished(SchedulableJob job) {
184             if (workflows.containsKey(job.getJobID())) { // One of mine
185                 if (!statistics.containsKey(job.getJobID())) {
186                     statistics.put(job.getJobID(), new LinkedList<WorkflowStatistic>());
187                 }
188 
189                 List<WorkflowStatistic> workflowStatistics = statistics.get(job.getJobID());
190                 workflowStatistics.add((((Workflow)job).getWorkflowStatistics()));
191                 if (workflowStatistics.size() > MAX_NUMBER_OF_STATISTISCS_FOR_A_WORKFLOW) {
192                     workflowStatistics.remove(0);
193                 }
194             }
195         }
196     }
197 }