1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.bitrepository.service.workflow;
24
25 import org.bitrepository.common.utils.SettingsUtils;
26 import org.bitrepository.service.scheduler.JobEventListener;
27 import org.bitrepository.service.scheduler.JobScheduler;
28 import org.bitrepository.settings.referencesettings.Schedule;
29 import org.bitrepository.settings.referencesettings.WorkflowConfiguration;
30 import org.bitrepository.settings.referencesettings.WorkflowSettings;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import java.util.*;
35
36 public abstract class WorkflowManager {
37 private final Logger log = LoggerFactory.getLogger(this.getClass());
38 private final JobScheduler scheduler;
39 private final WorkflowContext context;
40 private final Map<JobID, Workflow> workflows = new HashMap<JobID, Workflow>();
41 private final Map<String, List<JobID>> collectionWorkflows = new HashMap<String, List<JobID>>();
42 private final Map<JobID, List<WorkflowStatistic>> statistics = new HashMap<JobID, List<WorkflowStatistic>>();
43 public static final int MAX_NUMBER_OF_STATISTISCS_FOR_A_WORKFLOW = 1;
44
45 public WorkflowManager(
46 WorkflowContext context,
47 WorkflowSettings configuration,
48 JobScheduler scheduler) {
49 this.context = context;
50 this.scheduler = scheduler;
51 loadWorkFlows(configuration);
52 scheduler.addJobEventListener(new WorkflowEventListener());
53 }
54
55 public String startWorkflow(JobID jobID) {
56 return scheduler.startJob(getWorkflow(jobID));
57 }
58
59 public List<JobID> getWorkflows(String collectionID) {
60 return collectionWorkflows.get(collectionID);
61 }
62
63 public Workflow getWorkflow(JobID workflowID) {
64 if (workflows.containsKey(workflowID)) {
65 return workflows.get(workflowID);
66 } else {
67 throw new IllegalArgumentException("Unknown workflow " + workflowID);
68 }
69 }
70
71 public WorkflowStatistic getCurrentStatistics(JobID jobID) {
72 return getWorkflow(jobID).getWorkflowStatistics();
73 }
74
75 public WorkflowStatistic getLastCompleteStatistics(JobID workflowID) {
76 getWorkflow(workflowID);
77 if (statistics.containsKey(workflowID)) {
78 List<WorkflowStatistic> stats = statistics.get(workflowID);
79 return stats.get(stats.size()-1);
80 } else return null;
81 }
82
83 public Date getNextScheduledRun(JobID jobID) {
84 Date nextRun = scheduler.getNextRun(jobID);
85 if (nextRun == null) {
86 if (workflows.containsKey(jobID)) {
87 return null;
88 } else {
89 throw new IllegalArgumentException("Unknown workflow " + jobID);
90 }
91 }
92 return nextRun;
93 }
94
95 public long getRunInterval(JobID jobID) {
96 long interval = scheduler.getRunInterval(jobID);
97 if (interval == -1) {
98 if (workflows.containsKey(jobID)) {
99 return -1;
100 } else {
101 throw new IllegalArgumentException("Unknown workflow " + jobID);
102 }
103 }
104 return interval;
105 }
106
107 private void loadWorkFlows(WorkflowSettings configuration) {
108 for (WorkflowConfiguration workflowConf:configuration.getWorkflow()) {
109 log.info("Scheduling from configuration: " + workflowConf);
110 List<String> unscheduledCollections = new LinkedList<String>(SettingsUtils.getAllCollectionsIDs());
111 try {
112 if (workflowConf.getSchedules() != null) {
113 for (Schedule schedule:workflowConf.getSchedules().getSchedule()) {
114 List<String> collectionsToScheduleWorkflowFor;
115 if (schedule.isSetCollections()) {
116 collectionsToScheduleWorkflowFor = schedule.getCollections().getCollectionID();
117 } else {
118 collectionsToScheduleWorkflowFor = SettingsUtils.getAllCollectionsIDs();
119 }
120 for (String collectionID:collectionsToScheduleWorkflowFor) {
121 Workflow workflow =
122 (Workflow)lookupClass(workflowConf.getWorkflowClass()).newInstance();
123 workflow.initialise(context, collectionID);
124 scheduler.schedule(workflow, schedule.getWorkflowInterval());
125 addWorkflow(collectionID, workflow);
126 unscheduledCollections.remove(collectionID);
127 }
128 }
129 }
130
131 for (String collectionID:unscheduledCollections) {
132 Workflow workflow =
133 (Workflow)Class.forName(workflowConf.getWorkflowClass()).newInstance();
134 workflow.initialise(context, collectionID);
135 addWorkflow(collectionID, workflow);
136 }
137 } catch (Exception e) {
138 log.error("Unable to load workflow " + workflowConf.getWorkflowClass(), e);
139 }
140 }
141 }
142
143 private Class lookupClass(String settingsDefinedClass) throws ClassNotFoundException {
144 String fullClassName;
145 if (settingsDefinedClass.indexOf('.') == -1) {
146 fullClassName = getDefaultWorkflowPackage() + "." + settingsDefinedClass;
147 } else {
148 fullClassName = settingsDefinedClass;
149 }
150 return Class.forName(fullClassName);
151 }
152
153 private void addWorkflow(String collectionID, Workflow workflow) {
154 workflows.put(workflow.getJobID(), workflow);
155 if (!collectionWorkflows.containsKey(collectionID)) {
156 collectionWorkflows.put(collectionID, new LinkedList<JobID>());
157 }
158 collectionWorkflows.get(collectionID).add(workflow.getJobID());
159 }
160
161
162
163
164
165 protected abstract String getDefaultWorkflowPackage();
166
167
168
169
170 public class WorkflowEventListener implements JobEventListener {
171 @Override
172 public void jobStarted(SchedulableJob job) {}
173
174 @Override
175 public void jobFailed(SchedulableJob job) {}
176
177
178
179
180
181
182 @Override
183 public void jobFinished(SchedulableJob job) {
184 if (workflows.containsKey(job.getJobID())) {
185 if (!statistics.containsKey(job.getJobID())) {
186 statistics.put(job.getJobID(), new LinkedList<WorkflowStatistic>());
187 }
188
189 List<WorkflowStatistic> workflowStatistics = statistics.get(job.getJobID());
190 workflowStatistics.add((((Workflow)job).getWorkflowStatistics()));
191 if (workflowStatistics.size() > MAX_NUMBER_OF_STATISTISCS_FOR_A_WORKFLOW) {
192 workflowStatistics.remove(0);
193 }
194 }
195 }
196 }
197 }