View Javadoc

1   /*
2    * #%L
3    * Bitrepository Audit Trail Service
4    * %%
5    * Copyright (C) 2010 - 2012 The State and University Library, The Royal Library and The State Archives, Denmark
6    * %%
7    * This program is free software: you can redistribute it and/or modify
8    * it under the terms of the GNU Lesser General Public License as 
9    * published by the Free Software Foundation, either version 2.1 of the 
10   * License, or (at your option) any later version.
11   * 
12   * This program is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15   * GNU General Lesser Public License for more details.
16   * 
17   * You should have received a copy of the GNU General Lesser Public 
18   * License along with this program.  If not, see
19   * <http://www.gnu.org/licenses/lgpl-2.1.html>.
20   * #L%
21   */
22  package org.bitrepository.audittrails.collector;
23  
24  import java.math.BigInteger;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.LinkedList;
28  import java.util.List;
29  import access.getaudittrails.AuditTrailClient;
30  import access.getaudittrails.AuditTrailQuery;
31  import access.getaudittrails.BlockingAuditTrailClient;
32  import access.getaudittrails.client.AuditTrailResult;
33  import org.bitrepository.audittrails.store.AuditTrailStore;
34  import org.bitrepository.bitrepositoryelements.AuditTrailEvents;
35  import org.bitrepository.client.eventhandler.EventHandler;
36  import org.bitrepository.client.eventhandler.OperationEvent;
37  import org.bitrepository.client.exceptions.NegativeResponseException;
38  import org.bitrepository.common.utils.TimeUtils;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  /**
43   * Will perform a single collection of audit trails, potential through multiple sequential getAuditTrail calls if the
44   * set of new audit trails is large.
45   */
46  public class IncrementalCollector {
47      private Logger log = LoggerFactory.getLogger(getClass());
48      private final String clientID;
49      private final BlockingAuditTrailClient client;
50      private final AuditTrailStore store;
51      private final int maxNumberOfResults;
52  
53      /** When no file id is wanted for the collecting of audit trails.*/
54      private static final String NO_FILE_ID = null;
55      /** When no delivery address is wanted for the collecting of audit trails.*/
56      private static final String NO_DELIVERY_URL = null;
57      /** Will be used in case of no MaxNumberOfResult are provided */
58      public static final int DEFAULT_MAX_NUMBER_OF_RESULTS = 10000;
59      private final String collectionID;
60  
61      /**
62       * @param clientID The clientID to use for the requests.
63       * @param client The client to use for the operations.
64       * @param store Where to persist the received results.
65       * @param maxNumberOfResults A optional limit on the number of audit trail events to request. If not set,
66       * {}
67       */
68      public IncrementalCollector(String collectionID, String clientID, AuditTrailClient client, AuditTrailStore store,
69                                  BigInteger maxNumberOfResults) {
70          this.collectionID = collectionID;
71          this.clientID = clientID;
72          this.client = new BlockingAuditTrailClient(client);
73          this.store = store;
74          this.maxNumberOfResults = (maxNumberOfResults != null)?
75              maxNumberOfResults.intValue() : DEFAULT_MAX_NUMBER_OF_RESULTS;
76      }
77  
78      /**
79       * Method to get the ID of the collection to get audit trails from
80       * @return String The ID of the collection 
81       */
82      public String getCollectionID() {
83          return collectionID;
84      }
85      
86      /**
87       * Setup and initiates the collection of audit trails through the client.
88       * Adds one to the sequence number to request only newer audit trails.
89       */
90      public void performCollection(Collection<String> contributors) {
91          List<AuditTrailQuery> queries = new ArrayList<AuditTrailQuery>();
92          
93          for(String contributorId : contributors) {
94              int seq = store.largestSequenceNumber(contributorId, collectionID);
95              queries.add(new AuditTrailQuery(contributorId, seq + 1, null, maxNumberOfResults));
96          }
97  
98          AuditCollectorEventHandler handler = new AuditCollectorEventHandler();
99          try {
100             client.getAuditTrails(collectionID, queries.toArray(new AuditTrailQuery[queries.size()]), NO_FILE_ID,
101                     NO_DELIVERY_URL, handler, clientID);
102 
103         } catch (NegativeResponseException e) {
104             log.error("Problem in collecting audit trails, collection will not be complete", e);
105         }
106         if (!handler.contributorsWithPartialResults.isEmpty()) {
107             performCollection(handler.contributorsWithPartialResults);
108         }
109     }
110 
111     /**
112      * Event handler for the audit trail collector. The results of an audit trail operation will be ingested into the
113      * audit trail store.
114      */
115     private class AuditCollectorEventHandler implements EventHandler {
116         List<String> contributorsWithPartialResults = new LinkedList<String>();
117         private final long startTime = System.currentTimeMillis();
118 
119         @Override
120         public void handleEvent(OperationEvent event) {
121             if(event instanceof AuditTrailResult) {
122                 AuditTrailResult auditResult = (AuditTrailResult) event;
123                 if (!auditResult.getCollectionID().equals(collectionID)) {
124                     log.warn("Received bad collection id! Expected '" + collectionID + "', but got '"
125                             + auditResult.getCollectionID() + "'.");
126                     return;
127                 }
128                 if (auditResult.isPartialResult()) {
129                     contributorsWithPartialResults.add(auditResult.getContributorID());
130                 }
131                 AuditTrailEvents events = auditResult.getAuditTrailEvents().getAuditTrailEvents();
132                 store.addAuditTrails(events, collectionID);
133                 if (events != null && events.getAuditTrailEvent() != null) {
134                     log.debug("Collected and stored " + events.getAuditTrailEvent().size() +
135                             " audit trail event from " + auditResult.getContributorID() + " in " +
136                             TimeUtils.millisecondsToHuman(System.currentTimeMillis() - startTime) 
137                             + " (PartialResult=" + auditResult.isPartialResult() + ".");
138                 }
139             } else if (event.getEventType() == OperationEvent.OperationEventType.COMPONENT_FAILED ||
140                 event.getEventType() == OperationEvent.OperationEventType.FAILED ||
141                 event.getEventType() == OperationEvent.OperationEventType.IDENTIFY_TIMEOUT) {
142                 log.warn("Event: " + event.toString());
143             } else {
144                 log.debug("Event:" + event.toString());
145             }
146         }
147     }
148 }