1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.bitrepository.integrityservice.collector;
23
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29
30 import org.bitrepository.access.getchecksums.conversation.ChecksumsCompletePillarEvent;
31 import org.bitrepository.access.getfileids.conversation.FileIDsCompletePillarEvent;
32 import org.bitrepository.client.eventhandler.ContributorFailedEvent;
33 import org.bitrepository.client.eventhandler.EventHandler;
34 import org.bitrepository.client.eventhandler.OperationEvent;
35 import org.bitrepository.client.eventhandler.OperationEvent.OperationEventType;
36 import org.bitrepository.common.utils.SettingsUtils;
37 import org.bitrepository.integrityservice.alerter.IntegrityAlerter;
38 import org.bitrepository.integrityservice.cache.IntegrityModel;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42
43
44
45
46
47 public class IntegrityCollectorEventHandler implements EventHandler {
48
49 private Logger log = LoggerFactory.getLogger(getClass());
50
51 private final IntegrityModel store;
52
53 private final IntegrityAlerter alerter;
54
55 private final long timeout;
56
57
58 private final BlockingQueue<OperationEvent> finalEventQueue = new LinkedBlockingQueue<OperationEvent>();
59
60 private final List<String> contributorsWithPartialResults = new ArrayList<String>();
61
62
63
64
65
66
67
68 public IntegrityCollectorEventHandler(IntegrityModel model, IntegrityAlerter alerter, long timeout) {
69 this.store = model;
70 this.alerter = alerter;
71 this.timeout = timeout;
72 }
73
74 @Override
75 public void handleEvent(OperationEvent event) {
76 if(event.getEventType() == OperationEventType.COMPONENT_COMPLETE) {
77 log.debug("Component complete: " + event.toString());
78 handleResult(event);
79 } else if(event.getEventType() == OperationEventType.COMPLETE) {
80 log.debug("Complete: " + event.toString());
81 finalEventQueue.add(event);
82 } else if(event.getEventType() == OperationEventType.FAILED) {
83 log.warn("Failure: " + event.toString());
84 alerter.operationFailed("Failed integrity operation: " + event.toString(), event.getCollectionID());
85
86 for(String pillarId : SettingsUtils.getPillarIDsForCollection(event.getCollectionID())) {
87 store.setPreviouslySeenToExisting(event.getCollectionID(), pillarId);
88 }
89
90 finalEventQueue.add(event);
91 } else if(event.getEventType() == OperationEventType.COMPONENT_FAILED) {
92 ContributorFailedEvent cfe = (ContributorFailedEvent) event;
93 log.warn("Component failure for '" + cfe.getContributorID()
94 + "'. Settings previously seen files to existing.");
95 store.setPreviouslySeenToExisting(cfe.getCollectionID(), cfe.getContributorID());
96 } else {
97 log.debug("Received event: " + event.toString());
98 }
99 }
100
101
102
103
104
105
106
107 public OperationEvent getFinish() throws InterruptedException {
108 return finalEventQueue.poll(timeout, TimeUnit.MILLISECONDS);
109 }
110
111
112
113
114 public List<String> getPillarsWithPartialResult() {
115 return contributorsWithPartialResults;
116 }
117
118
119
120
121
122 private void handleResult(OperationEvent event) {
123 if(event instanceof ChecksumsCompletePillarEvent) {
124 ChecksumsCompletePillarEvent checksumEvent = (ChecksumsCompletePillarEvent) event;
125 log.trace("Receiving GetChecksums result: {}",
126 checksumEvent.getChecksums().getChecksumDataItems().toString());
127 store.addChecksums(checksumEvent.getChecksums().getChecksumDataItems(), checksumEvent.getContributorID(),
128 checksumEvent.getCollectionID());
129 if(checksumEvent.isPartialResult()) {
130 contributorsWithPartialResults.add(checksumEvent.getContributorID());
131 }
132 } else if(event instanceof FileIDsCompletePillarEvent) {
133 FileIDsCompletePillarEvent fileidEvent = (FileIDsCompletePillarEvent) event;
134 log.trace("Receiving GetFileIDs result: {}", fileidEvent.getFileIDs().getFileIDsData().toString());
135 store.addFileIDs(fileidEvent.getFileIDs().getFileIDsData(), fileidEvent.getContributorID(),
136 fileidEvent.getCollectionID());
137 if(fileidEvent.isPartialResult()) {
138 contributorsWithPartialResults.add(fileidEvent.getContributorID());
139 }
140 } else {
141 log.warn("Unexpected component complete event: " + event.toString());
142 }
143 }
144 }