1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.bitrepository.audittrails.collector;
23
24 import java.math.BigInteger;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.LinkedList;
28 import java.util.List;
29 import access.getaudittrails.AuditTrailClient;
30 import access.getaudittrails.AuditTrailQuery;
31 import access.getaudittrails.BlockingAuditTrailClient;
32 import access.getaudittrails.client.AuditTrailResult;
33 import org.bitrepository.audittrails.store.AuditTrailStore;
34 import org.bitrepository.bitrepositoryelements.AuditTrailEvents;
35 import org.bitrepository.client.eventhandler.EventHandler;
36 import org.bitrepository.client.eventhandler.OperationEvent;
37 import org.bitrepository.client.exceptions.NegativeResponseException;
38 import org.bitrepository.common.utils.TimeUtils;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42
43
44
45
46 public class IncrementalCollector {
47 private Logger log = LoggerFactory.getLogger(getClass());
48 private final String clientID;
49 private final BlockingAuditTrailClient client;
50 private final AuditTrailStore store;
51 private final int maxNumberOfResults;
52
53
54 private static final String NO_FILE_ID = null;
55
56 private static final String NO_DELIVERY_URL = null;
57
58 public static final int DEFAULT_MAX_NUMBER_OF_RESULTS = 10000;
59 private final String collectionID;
60
61
62
63
64
65
66
67
68 public IncrementalCollector(String collectionID, String clientID, AuditTrailClient client, AuditTrailStore store,
69 BigInteger maxNumberOfResults) {
70 this.collectionID = collectionID;
71 this.clientID = clientID;
72 this.client = new BlockingAuditTrailClient(client);
73 this.store = store;
74 this.maxNumberOfResults = (maxNumberOfResults != null)?
75 maxNumberOfResults.intValue() : DEFAULT_MAX_NUMBER_OF_RESULTS;
76 }
77
78
79
80
81
82 public String getCollectionID() {
83 return collectionID;
84 }
85
86
87
88
89
90 public void performCollection(Collection<String> contributors) {
91 List<AuditTrailQuery> queries = new ArrayList<AuditTrailQuery>();
92
93 for(String contributorId : contributors) {
94 int seq = store.largestSequenceNumber(contributorId, collectionID);
95 queries.add(new AuditTrailQuery(contributorId, seq + 1, null, maxNumberOfResults));
96 }
97
98 AuditCollectorEventHandler handler = new AuditCollectorEventHandler();
99 try {
100 client.getAuditTrails(collectionID, queries.toArray(new AuditTrailQuery[queries.size()]), NO_FILE_ID,
101 NO_DELIVERY_URL, handler, clientID);
102
103 } catch (NegativeResponseException e) {
104 log.error("Problem in collecting audit trails, collection will not be complete", e);
105 }
106 if (!handler.contributorsWithPartialResults.isEmpty()) {
107 performCollection(handler.contributorsWithPartialResults);
108 }
109 }
110
111
112
113
114
115 private class AuditCollectorEventHandler implements EventHandler {
116 List<String> contributorsWithPartialResults = new LinkedList<String>();
117 private final long startTime = System.currentTimeMillis();
118
119 @Override
120 public void handleEvent(OperationEvent event) {
121 if(event instanceof AuditTrailResult) {
122 AuditTrailResult auditResult = (AuditTrailResult) event;
123 if (!auditResult.getCollectionID().equals(collectionID)) {
124 log.warn("Received bad collection id! Expected '" + collectionID + "', but got '"
125 + auditResult.getCollectionID() + "'.");
126 return;
127 }
128 if (auditResult.isPartialResult()) {
129 contributorsWithPartialResults.add(auditResult.getContributorID());
130 }
131 AuditTrailEvents events = auditResult.getAuditTrailEvents().getAuditTrailEvents();
132 store.addAuditTrails(events, collectionID);
133 if (events != null && events.getAuditTrailEvent() != null) {
134 log.debug("Collected and stored " + events.getAuditTrailEvent().size() +
135 " audit trail event from " + auditResult.getContributorID() + " in " +
136 TimeUtils.millisecondsToHuman(System.currentTimeMillis() - startTime)
137 + " (PartialResult=" + auditResult.isPartialResult() + ".");
138 }
139 } else if (event.getEventType() == OperationEvent.OperationEventType.COMPONENT_FAILED ||
140 event.getEventType() == OperationEvent.OperationEventType.FAILED ||
141 event.getEventType() == OperationEvent.OperationEventType.IDENTIFY_TIMEOUT) {
142 log.warn("Event: " + event.toString());
143 } else {
144 log.debug("Event:" + event.toString());
145 }
146 }
147 }
148 }