1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.bitrepository.pillar.referencepillar;
26
27 import org.bitrepository.common.ArgumentValidator;
28 import org.bitrepository.common.filestore.FileStore;
29 import org.bitrepository.common.settings.Settings;
30 import org.bitrepository.common.utils.SettingsUtils;
31 import org.bitrepository.pillar.Pillar;
32 import org.bitrepository.pillar.cache.ChecksumDAO;
33 import org.bitrepository.pillar.cache.ChecksumDatabaseManager;
34 import org.bitrepository.pillar.cache.ChecksumStore;
35 import org.bitrepository.pillar.common.MessageHandlerContext;
36 import org.bitrepository.pillar.common.PillarAlarmDispatcher;
37 import org.bitrepository.pillar.common.SettingsHelper;
38 import org.bitrepository.pillar.referencepillar.archive.CollectionArchiveManager;
39 import org.bitrepository.pillar.referencepillar.archive.ReferenceChecksumManager;
40 import org.bitrepository.pillar.referencepillar.messagehandler.ReferencePillarMediator;
41 import org.bitrepository.pillar.referencepillar.scheduler.RecalculateChecksumJob;
42 import org.bitrepository.protocol.CoordinationLayerException;
43 import org.bitrepository.protocol.messagebus.MessageBus;
44 import org.bitrepository.service.audit.AuditDatabaseManager;
45 import org.bitrepository.service.audit.AuditTrailContributerDAO;
46 import org.bitrepository.service.audit.AuditTrailManager;
47 import org.bitrepository.service.contributor.ResponseDispatcher;
48 import org.bitrepository.service.database.DBConnector;
49 import org.bitrepository.service.database.DatabaseManager;
50 import org.bitrepository.service.scheduler.JobScheduler;
51 import org.bitrepository.service.scheduler.TimerbasedScheduler;
52 import org.bitrepository.service.workflow.SchedulableJob;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import javax.jms.JMSException;
57
58 import java.lang.reflect.Constructor;
59 import java.util.Arrays;
60
61
62
63
64 public class ReferencePillar implements Pillar {
65
66 private Logger log = LoggerFactory.getLogger(getClass());
67
68 private final MessageBus messageBus;
69
70 private final Settings settings;
71
72 private final ReferencePillarMediator mediator;
73
74 private final FileStore archiveManager;
75
76 private final ChecksumStore csStore;
77
78 private final JobScheduler scheduler;
79
80 private final ReferenceChecksumManager manager;
81
82
83 private static final Long DEFAULT_RECALCULATION_WORKFLOW_TIME = 3600000L;
84
85
86
87
88
89
90 public ReferencePillar(MessageBus messageBus, Settings settings) {
91 ArgumentValidator.checkNotNull(messageBus, "messageBus");
92 ArgumentValidator.checkNotNull(settings, "settings");
93 this.messageBus = messageBus;
94 SettingsUtils.initialize(settings);
95 this.settings = settings;
96
97 log.info("Starting the ReferencePillar");
98 archiveManager = getFileStore(settings);
99 DatabaseManager checksumDatabaseManager = new ChecksumDatabaseManager(settings);
100 csStore = new ChecksumDAO(checksumDatabaseManager);
101 PillarAlarmDispatcher alarmDispatcher = new PillarAlarmDispatcher(settings, messageBus);
102 manager = new ReferenceChecksumManager(archiveManager, csStore, alarmDispatcher, settings);
103 DatabaseManager auditDatabaseManager = new AuditDatabaseManager(
104 settings.getReferenceSettings().getPillarSettings().getAuditTrailContributerDatabase());
105 AuditTrailManager audits = new AuditTrailContributerDAO(settings, auditDatabaseManager);
106 MessageHandlerContext context = new MessageHandlerContext(
107 settings,
108 SettingsHelper.getPillarCollections(settings.getComponentID(), settings.getCollections()),
109 new ResponseDispatcher(settings, messageBus),
110 alarmDispatcher,
111 audits);
112 messageBus.setCollectionFilter(Arrays.asList(context.getPillarCollections()));
113 mediator = new ReferencePillarMediator(messageBus, context, archiveManager, manager);
114 mediator.start();
115
116 this.scheduler = new TimerbasedScheduler();
117 initializeWorkflows();
118 }
119
120
121
122
123 private void initializeWorkflows() {
124 Long interval = DEFAULT_RECALCULATION_WORKFLOW_TIME;
125 if(settings.getReferenceSettings().getPillarSettings().getRecalculateOldChecksumsInterval() != null) {
126 interval = settings.getReferenceSettings().getPillarSettings()
127 .getRecalculateOldChecksumsInterval().longValue();
128 }
129 for(String collectionId : SettingsUtils.getCollectionIDsForPillar(
130 settings.getReferenceSettings().getPillarSettings().getPillarID())) {
131 SchedulableJob workflow = new RecalculateChecksumJob(collectionId, manager);
132 scheduler.schedule(workflow, interval);
133 }
134 }
135
136
137
138
139
140
141 private FileStore getFileStore(Settings settings) {
142 if(settings.getReferenceSettings().getPillarSettings().getFileStoreClass() == null) {
143 return new CollectionArchiveManager(settings);
144 }
145
146 try {
147 Class<FileStore> fsClass = (Class<FileStore>) Class.forName(
148 settings.getReferenceSettings().getPillarSettings().getFileStoreClass());
149 Constructor<FileStore> fsConstructor = fsClass.getConstructor(Settings.class);
150 return fsConstructor.newInstance(settings);
151 } catch (Exception e) {
152 throw new CoordinationLayerException("Could not instantiate the FileStore", e);
153 }
154 }
155
156
157
158
159 public void close() {
160 try {
161 mediator.close();
162 messageBus.close();
163 archiveManager.close();
164 log.info("ReferencePillar stopped!");
165 } catch (JMSException e) {
166 log.warn("Could not close the messagebus.", e);
167 }
168 }
169 }