001/*
002 * #%L
003 * Netarchivesuite - archive
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.archive.arcrepository;
024
025import java.io.File;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import dk.netarkivet.archive.ArchiveSettings;
036import dk.netarkivet.archive.arcrepository.bitpreservation.AdminDataMessage;
037import dk.netarkivet.archive.arcrepository.distribute.ArcRepositoryServer;
038import dk.netarkivet.archive.arcrepository.distribute.StoreMessage;
039import dk.netarkivet.archive.arcrepositoryadmin.Admin;
040import dk.netarkivet.archive.arcrepositoryadmin.AdminFactory;
041import dk.netarkivet.archive.bitarchive.distribute.BatchReplyMessage;
042import dk.netarkivet.archive.bitarchive.distribute.BitarchiveClient;
043import dk.netarkivet.archive.bitarchive.distribute.RemoveAndGetFileMessage;
044import dk.netarkivet.archive.bitarchive.distribute.UploadMessage;
045import dk.netarkivet.archive.checksum.distribute.ChecksumClient;
046import dk.netarkivet.archive.checksum.distribute.GetChecksumMessage;
047import dk.netarkivet.archive.distribute.ReplicaClient;
048import dk.netarkivet.common.distribute.ChannelID;
049import dk.netarkivet.common.distribute.Channels;
050import dk.netarkivet.common.distribute.JMSConnectionFactory;
051import dk.netarkivet.common.distribute.NetarkivetMessage;
052import dk.netarkivet.common.distribute.NullRemoteFile;
053import dk.netarkivet.common.distribute.RemoteFile;
054import dk.netarkivet.common.distribute.arcrepository.Replica;
055import dk.netarkivet.common.distribute.arcrepository.ReplicaStoreState;
056import dk.netarkivet.common.distribute.arcrepository.ReplicaType;
057import dk.netarkivet.common.exceptions.ArgumentNotValid;
058import dk.netarkivet.common.exceptions.IOFailure;
059import dk.netarkivet.common.exceptions.IllegalState;
060import dk.netarkivet.common.exceptions.UnknownID;
061import dk.netarkivet.common.utils.CleanupIF;
062import dk.netarkivet.common.utils.FileUtils;
063import dk.netarkivet.common.utils.NotificationType;
064import dk.netarkivet.common.utils.NotificationsFactory;
065import dk.netarkivet.common.utils.Settings;
066import dk.netarkivet.common.utils.batch.ChecksumJob;
067
068/**
069 * The Arcrepository handles the communication with the different replicas. This class ensures that arc files are stored
070 * in all available replica and verifies that the storage process succeeded. Retrieval of data from a replica goes
071 * through the JMSArcRepositoryClient that contacts the appropriate (typically nearest) replica and retrieves data from
072 * this archive. Batch execution is sent to the bitarchive replica(s), since batch cannot be executed on checksum
073 * replicas. Correction operations are typically only allowed on one replica.
074 */
075@SuppressWarnings({"deprecation"})
076public class ArcRepository implements CleanupIF {
077
078    /** The log. */
079    private static final Logger log = LoggerFactory.getLogger(ArcRepository.class);
080
081    /** The unique instance (singleton) of this class. */
082    private static ArcRepository instance;
083
084    /** The administration data associated with the arcrepository. */
085    private Admin ad;
086
087    /** The class which listens to messages sent to this instance of Arcrepository or its subclasses. */
088    private ArcRepositoryServer arcReposhandler;
089
090    /** A Map of a Replica and their corresponding ReplicaClient. From this Map the relevant channels can be found. */
091    private final Map<Replica, ReplicaClient> connectedReplicas = new HashMap<Replica, ReplicaClient>();
092
093    /** Map from MessageId to arcfiles for which there are outstanding checksum jobs. */
094    private final Map<String, String> outstandingChecksumFiles = new HashMap<String, String>();
095    
096    /**
097     * Map from filenames to remote files. Used for retrieving a remote file reference while a store operation is in
098     * process.
099     */
100    private final Map<String, RemoteFile> outstandingRemoteFiles = new HashMap<String, RemoteFile>();
101
102    private final Map<String, String> outstandingRemoteFilesC = new HashMap<String, String>();
103    
104    
105    /**
106     * Map from bitarchive names to Map from filenames to the number of times a file has been attempted uploaded to the
107     * the bitarchive.
108     */
109    private final Map<String, Map<String, Integer>> uploadRetries = new HashMap<String, Map<String, Integer>>();
110
111    /**
112     * Constructor for the ArcRepository. Connects the ArcRepository to all BitArchives, and initialises admin data.
113     *
114     * @throws IOFailure if admin data cannot be read/initialised or we cannot' connect to some bitarchive.
115     * @throws IllegalState if inconsistent channel info is given in settings.
116     */
117    protected ArcRepository() throws IOFailure, IllegalState {
118        // UpdateableAdminData Throws IOFailure
119        this.ad = AdminFactory.getInstance();
120        this.arcReposhandler = new ArcRepositoryServer(this);
121
122        initialiseReplicaClients();
123
124        log.info("Starting the ArcRepository");
125    }
126
127    /**
128     * Returns the unique ArcRepository instance.
129     *
130     * @return the instance.
131     * @throws IOFailure if admin data cannot be read/initialised or we cannot' connect to some bitarchive.
132     * @throws IllegalState if inconsistent channel info is given in settings.
133     */
134    public static synchronized ArcRepository getInstance() throws IllegalState, IOFailure {
135        if (instance == null) {
136            instance = new ArcRepository();
137        }
138        return instance;
139    }
140
141    /**
142     * Method for initialising the replica clients.
143     */
144    private void initialiseReplicaClients() {
145        // Get channels
146        ChannelID[] allBas = Channels.getAllArchives_ALL_BAs();
147        ChannelID[] anyBas = Channels.getAllArchives_ANY_BAs();
148        ChannelID[] theBamons = Channels.getAllArchives_BAMONs();
149        ChannelID[] theCrs = Channels.getAllArchives_CRs();
150
151        Replica[] replicas = Replica.getKnown().toArray(new Replica[theBamons.length]);
152        // Checks equal number of channels
153        checkChannels(allBas, anyBas, theBamons);
154
155        for (int i = 0; i < allBas.length; i++) {
156            Replica rep = replicas[i];
157            if (rep.getType() == ReplicaType.BITARCHIVE) {
158                connectedReplicas.put(rep, BitarchiveClient.getInstance(allBas[i], anyBas[i], theBamons[i]));
159            } else { // checksum replica
160                connectedReplicas.put(rep, ChecksumClient.getInstance(theCrs[i]));
161            }
162        }
163    }
164
165    /**
166     * Sanity check for data consistency in the construction of the ArcRepository, specifically that the number of
167     * ALL_BA, ANY_BA, and THE_BAMON queues are all equal to the number of credentials.
168     *
169     * @param allBas The topics for bitarchives
170     * @param anyBas The queues for bitarchives
171     * @param theBamons The queues for bitarchive monitors
172     * @throws IllegalState if inconsistent data is found
173     */
174    private void checkChannels(ChannelID[] allBas, ChannelID[] anyBas, ChannelID[] theBamons) throws IllegalState {
175        if (theBamons.length != allBas.length || theBamons.length != anyBas.length) {
176            throw new IllegalState("Inconsistent data found in construction of " + "ArcRepository: \n\nALL_BAs: "
177                    + Arrays.toString(allBas) + "\nANY_BAs: " + Arrays.toString(anyBas) + "\nTHE_BAMONs: "
178                    + Arrays.toString(theBamons));
179        }
180    }
181
182    /**
183     * Stores a file in all known replicas. It sends out a upload message to all replicas.
184     *
185     * @param rf The remotefile to be stored.
186     * @param replyInfo A StoreMessage used to reply with success or failure.
187     * @throws IOFailure If file couldn't be stored.
188     * @throws ArgumentNotValid If a input parameter is null.
189     */
190    public synchronized void store(RemoteFile rf, StoreMessage replyInfo) throws IOFailure, ArgumentNotValid {
191        ArgumentNotValid.checkNotNull(rf, "rf");
192        ArgumentNotValid.checkNotNull(replyInfo, "replyInfo");
193
194        final String filename = rf.getName();
195        log.info("Store started: '{}'", filename);
196
197        // Record, that store of this filename is in progress
198        // needed for retrying uploads.
199        if (outstandingRemoteFiles.containsKey(filename)) {
200            log.info("File: '{}' was outstanding from the start.", filename);
201        }
202        outstandingRemoteFiles.put(filename, rf);
203        outstandingRemoteFilesC.put(filename, replyInfo.getPrecomputedChecksum()); // Hack
204
205        if (ad.hasEntry(filename)) {
206            // Any valid entry (and all existing entries are now
207            // known to be valid) by definition has a checksum.
208            if (!rf.getChecksum().equals(ad.getCheckSum(filename))) {
209                String msg = "Attempting to store file '" + filename + "' with a different checksum than before: "
210                        + "Old checksum: " + ad.getCheckSum(filename) + ", new checksum: " + rf.getChecksum();
211                log.warn(msg);
212                replyNotOK(filename, replyInfo);
213                return;
214            }
215            log.debug("Retrying store of already known file '{}'," + " Already completed: {}", filename,
216                    isStoreCompleted(filename));
217            ad.setReplyInfo(filename, replyInfo);
218        } else {
219            ad.addEntry(filename, replyInfo, rf.getChecksum());
220        }
221        for (Map.Entry<Replica, ReplicaClient> entry : connectedReplicas.entrySet()) {
222            startUpload(rf, entry.getValue(), entry.getKey(), replyInfo);
223        }
224
225        // Check state and reply if needed
226        considerReplyingOnStore(filename);
227    }
228
229    /**
230     * Initiate uploading of file to a specific replica. The corresponding upload record in admin data is created.
231     *
232     * @param rf Remotefile to upload to replica.
233     * @param replicaClient The replica client to upload to.
234     * @param replica The replica where RemoteFile is to be stored.
235     * @param replyInfo 
236     */
237    private synchronized void startUpload(RemoteFile rf, ReplicaClient replicaClient, Replica replica, StoreMessage replyInfo) {
238        final String filename = rf.getName();
239        log.debug("Upload started of file '{}' to replica '{}'", filename, replica.getId());
240
241        String replicaChannelId = replica.getIdentificationChannel().getName();
242
243        if (!ad.hasState(filename, replicaChannelId)) {
244            // New upload
245            ad.setState(filename, replicaChannelId, ReplicaStoreState.UPLOAD_STARTED);
246            replicaClient.sendUploadMessage(rf, replyInfo.getPrecomputedChecksum()); // Updated to include checksum information
247        } else {
248            // Recovery from old upload
249            ReplicaStoreState storeState = ad.getState(filename, replicaChannelId);
250            switch (storeState) {
251            case UPLOAD_FAILED:
252            case UPLOAD_STARTED:
253            case DATA_UPLOADED:
254                log.debug("Recovery from old upload. StoreState: {}. Sending new Checksum request for verifying "
255                        + "whether the file '{}' has been succesfully uploaded for replica: '{}'", storeState,
256                        filename, replica);
257                // Unknown condition in bitarchive. Test with checksum job.
258                if (storeState == ReplicaStoreState.UPLOAD_FAILED) {
259                    ad.setState(filename, replicaChannelId, ReplicaStoreState.UPLOAD_STARTED);
260                    log.info("ReplicaStoreState for file '{}' on replica '{}' changed from '{}' to '{}'", filename,
261                            replica, ReplicaStoreState.UPLOAD_FAILED, ReplicaStoreState.UPLOAD_STARTED);
262                }
263                sendChecksumRequestForFile(filename, replicaClient);
264                break;
265            case UPLOAD_COMPLETED:
266                log.warn("Trying to upload file '{}' that already has state UPLOAD_COMPLETED for this replica",
267                        filename);
268                break;
269            default:
270                throw new UnknownID("Unknown state: '" + storeState + "'");
271            }
272        }
273    }
274
275    /**
276     * Method for retrieving the checksum of a specific file from the archive of a specific replica. If the replica is a
277     * BitArchive, then a Batch message with the ChecksumJob is sent. If the replica is a ChecksumArchive, then a
278     * GetChecksumMessage is sent.
279     *
280     * @param filename The file to checksum.
281     * @param replicaClient The client to retrieve the checksum of the file from.
282     */
283    private void sendChecksumRequestForFile(String filename, ReplicaClient replicaClient) {
284        NetarkivetMessage msg;
285
286        // Retrieve the checksum of the file.
287        msg = replicaClient.sendGetChecksumMessage(Channels.getTheRepos(), filename);
288
289        outstandingChecksumFiles.put(msg.getID(), filename);
290        log.debug("Checksum job message submitted for file '{}' with message id: '{}'", filename, msg.getID());
291    }
292
293    /**
294     * Test whether the current state is such that we may send a reply for the file we are currently processing, and
295     * send the reply if it is. We reply only when there is an outstanding message to reply to, and a) The file is
296     * reported complete in all replicas or b) No replica has outstanding reply messages AND some replica has reported
297     * failure.
298     *
299     * @param arcFileName The arcfile we consider replying to.
300     */
301    private synchronized void considerReplyingOnStore(String arcFileName) {
302        if (ad.hasReplyInfo(arcFileName)) {
303            if (isStoreCompleted(arcFileName)) {
304                replyOK(arcFileName, ad.removeReplyInfo(arcFileName));
305            } else if (oneReplicaHasFailed(arcFileName) && noReplicaInStateUploadStarted(arcFileName)) {
306                replyNotOK(arcFileName, ad.removeReplyInfo(arcFileName));
307            }
308        }
309    }
310
311    /**
312     * Reply to a store message with status Ok.
313     *
314     * @param arcFileName The file for which we are replying.
315     * @param msg The message to reply to.
316     */
317    private synchronized void replyOK(String arcFileName, StoreMessage msg) {
318        outstandingRemoteFiles.remove(arcFileName);
319        outstandingRemoteFilesC.remove(arcFileName);
320        clearRetries(arcFileName);
321        log.info("Store OK: '{}'", arcFileName);
322        log.debug("Sending store OK reply to message '{}'", msg);
323        JMSConnectionFactory.getInstance().reply(msg);
324    }
325
326    /**
327     * Reply to a store message with status NotOk.
328     *
329     * @param arcFileName The file for which we are replying.
330     * @param msg The message to reply to.
331     */
332    private synchronized void replyNotOK(String arcFileName, StoreMessage msg) {
333        outstandingRemoteFiles.remove(arcFileName);
334        outstandingRemoteFilesC.remove(arcFileName);
335        clearRetries(arcFileName);
336        msg.setNotOk("Failure while trying to store ARC file: " + arcFileName);
337        log.warn("Store NOT OK: '{}'", arcFileName);
338        log.debug("Sending store NOT OK reply to message '{}'", msg);
339        JMSConnectionFactory.getInstance().reply(msg);
340    }
341
342    /**
343     * Check if all replicas have reported that storage has been successfully completed. If this is the case return true
344     * else false.
345     *
346     * @param arcfileName The file being stored.
347     * @return true only if all replicas report UPLOAD_COMPLETED.
348     */
349    private boolean isStoreCompleted(String arcfileName) {
350        // TODO remove quadratic scaling hidden here!!
351        for (Replica rep : connectedReplicas.keySet()) {
352            try {
353                // retrieve the replica channel and check upload status.
354                if (ad.getState(arcfileName, rep.getIdentificationChannel().getName()) != ReplicaStoreState.UPLOAD_COMPLETED) {
355                    return false;
356                }
357            } catch (UnknownID e) {
358                // Since no upload status exists, then it cannot be completed!
359                log.warn("Non-fatal error! A replica does not have a upload status for the file '{}'.", arcfileName, e);
360                return false;
361            }
362        }
363
364        // Since no replica has a storestate differing from 'UPLOAD_COMPLETED'
365        // and no errors, then the store is completed for the entire system.
366        return true;
367    }
368
369    /**
370     * Checks if there are at least one replica that has reported that storage has failed. If this is the case return
371     * true else false.
372     *
373     * @param arcFileName the name of file being stored.
374     * @return true only if at least one replica report UPLOAD_FAILED.
375     */
376    private boolean oneReplicaHasFailed(String arcFileName) {
377        for (Replica rep : connectedReplicas.keySet()) {
378            try {
379                // retrieve the replica channel and check upload status.
380                String repChannel = rep.getIdentificationChannel().getName();
381                if (ad.getState(arcFileName, repChannel) == ReplicaStoreState.UPLOAD_FAILED) {
382                    return true;
383                }
384            } catch (UnknownID e) {
385                log.warn("Non-fatal error. One replica does not have a upload status for the file '{}'.", arcFileName,
386                        e);
387                return true;
388            }
389        }
390        return false;
391    }
392
393    /**
394     * Checks if no replicas which has reported that upload is in started state. If this is the case return true else
395     * false.
396     *
397     * @param arcFileName The name of the file being stored.
398     * @return true only if no replica report UPLOAD_STARTED.
399     */
400    private boolean noReplicaInStateUploadStarted(String arcFileName) {
401        for (Replica rep : connectedReplicas.keySet()) {
402            try {
403                // retrieve the replica channel and check upload status.
404                String repChannelName = rep.getIdentificationChannel().getName();
405                if (ad.getState(arcFileName, repChannelName) == ReplicaStoreState.UPLOAD_STARTED) {
406                    return false;
407                }
408            } catch (UnknownID e) {
409                // When no upload exists, then upload cannot have started.
410                log.warn("Non-fatal error! A replica does not have a upload status for tshe file '{}'.", arcFileName, e);
411            }
412        }
413
414        return true;
415    }
416
417    /**
418     * Returns a replica client based on a replica id.
419     *
420     * @param replicaId the replica id
421     * @return A replica client.
422     * @throws ArgumentNotValid if replicaId parameter is null
423     */
424    public ReplicaClient getReplicaClientFromReplicaId(String replicaId) throws ArgumentNotValid {
425        ArgumentNotValid.checkNotNullOrEmpty(replicaId, "replicaId");
426
427        // retrieve the replica client.
428        ReplicaClient rc = connectedReplicas.get(Replica.getReplicaFromId(replicaId));
429
430        return rc;
431    }
432
433    /**
434     * Finds the identification channel for the replica. If the replica is a BitArchive then the channel to the
435     * BitArchiveMonitor is returned, and if the replica is a ChecksumArchive then the checksum replica channel is
436     * returned. This means that only the channels for the bitarchive should be changed into the bamon channel, e.g.
437     * replacing the ALL_BA and ANY_BA identifiers with THE_BAMON. This change does not affect the checksum channel, and
438     * is therefore also performed on it.
439     *
440     * @param channel A channel to the replica.
441     * @return The name of the channel which identifies the replica.
442     */
443    private String resolveReplicaChannel(String channel) {
444        return channel.replaceAll("ALL_BA", "THE_BAMON").replaceAll("ANY_BA", "THE_BAMON");
445    }
446
447    /**
448     * Event handler for upload messages reporting the upload result. Checks the success status of the upload and
449     * updates admin data accordingly.
450     *
451     * @param msg an UploadMessage.
452     */
453    public synchronized void onUpload(UploadMessage msg) {
454        ArgumentNotValid.checkNotNull(msg, "msg");
455        log.debug("Received upload reply: {}", msg.toString());
456
457        String repChannelName = resolveReplicaChannel(msg.getTo().getName());
458
459        if (msg.isOk()) {
460            processDataUploaded(msg.getArcfileName(), repChannelName);
461        } else {
462            processUploadFailed(msg.getArcfileName(), repChannelName);
463        }
464    }
465
466    /**
467     * Process the report by a bitarchive that a file was correctly uploaded.
468     * <ol>
469     * <il>1. Update the upload, and store states appropriately.</il><br/>
470     * <il>2. Verify that data are correctly stored in the archive by running a batch job on the archived file to
471     * perform a MD5 checksum comparison.</il> <br/>
472     * <il>3. Check if store operation is completed and update admin data if so.</il><br/>
473     * </ol>
474     *
475     * @param arcfileName The arcfile that was uploaded.
476     * @param replicaChannelName The name of the identification channel for the replica that uploaded it (THE_BAMON for
477     * bitarchive and THE_CR for checksum).
478     */
479    private synchronized void processDataUploaded(String arcfileName, String replicaChannelName) {
480        log.debug("Data uploaded '{}' ,{}", arcfileName, replicaChannelName);
481        ad.setState(arcfileName, replicaChannelName, ReplicaStoreState.DATA_UPLOADED);
482
483        // retrieve the replica
484        Replica rep = Channels.retrieveReplicaFromIdentifierChannel(replicaChannelName);
485        // Verify that the file has been correctly uploaded.
486        sendChecksumRequestForFile(arcfileName, connectedReplicas.get(rep));
487    }
488
489    /**
490     * Update admin data with the information that upload to a replica failed. The replica record is set to
491     * UPLOAD_FAILED.
492     *
493     * @param arcfileName The file that resulted in an upload failure.
494     * @param replicaChannelName The name of the idenfiticaiton channel for the replica that could not upload the file.
495     */
496    private void processUploadFailed(String arcfileName, String replicaChannelName) {
497        log.warn("Upload failed for ARC file '{}' to bit archive '{}'", arcfileName, replicaChannelName);
498
499        // Update state to reflect upload failure
500        ad.setState(arcfileName, replicaChannelName, ReplicaStoreState.UPLOAD_FAILED);
501        considerReplyingOnStore(arcfileName);
502    }
503
504    /**
505     * Called when we receive replies on our checksum batch jobs.
506     * <p>
507     * This does not handle checksum replicas.
508     *
509     * @param msg a BatchReplyMessage.
510     */
511    public synchronized void onBatchReply(BatchReplyMessage msg) {
512        ArgumentNotValid.checkNotNull(msg, "msg");
513        log.debug("BatchReplyMessage received: '{}'", msg);
514
515        if (!outstandingChecksumFiles.containsKey(msg.getReplyOfId())) {
516            // Message was NOT expected
517            log.warn("Received batchreply message with unknown originating ID {}\n{}\n. Known IDs are: {}",
518                    msg.getReplyOfId(), msg.toString(), outstandingChecksumFiles.keySet().toString());
519            return;
520        }
521
522        String arcfileName = outstandingChecksumFiles.remove(msg.getReplyOfId());
523
524        // Check incoming message
525        if (!msg.isOk()) {
526            // Checksum job has ended with errors, but can contain checksum
527            // anyway, therefore it is logged - but we try to go on
528            log.warn("Message '" + msg.getID() + "' is reported not okay" + "\nReported error: '" + msg.getErrMsg()
529                    + "'" + "\nTrying to process anyway.");
530        }
531
532        // Parse results
533        // if legal result is found it is placed in reportedChecksum
534        // if illegal or errors occurs reportedChecksum is set to ""
535        RemoteFile checksumResFile = msg.getResultFile();
536        String reportedChecksum = "";
537        boolean checksumReadOk = false;
538        if (checksumResFile == null) {
539            log.debug("The results of message '{}' was null.\nNo checksum to use for file '{}'", msg.getID(),
540                    arcfileName);
541        } else if (checksumResFile instanceof NullRemoteFile) {
542            log.debug(
543                    "The results of the message '{}' was instance of NullRemoteFile\nNo checksum to use for file '{}'",
544                    msg.getID(), arcfileName);
545        } else {
546            // Read checksum
547            // Copy result to a local file
548            File outputFile = new File(FileUtils.getTempDir(), msg.getReplyTo().getName() + "_" + arcfileName
549                    + "_checksumOutput.txt");
550            try {
551                checksumResFile.copyTo(outputFile);
552
553                // Read checksum from local file
554                reportedChecksum = readChecksum(outputFile, arcfileName);
555                checksumReadOk = true;
556            } catch (IOFailure e) {
557                log.warn("Couldn't read checksumjob " + "output for '{}'", arcfileName, e);
558            } catch (IllegalState e) {
559                log.warn("Couldn't read result of checksumjob " + "in '{}'", arcfileName, e);
560            }
561
562            // Clean up output file and remote file
563            // clean up does NOT result in general error, i.e.
564            // reportedChecksum is NOT set to "" in case of errors
565            try {
566                FileUtils.removeRecursively(outputFile);
567            } catch (IOFailure e) {
568                log.warn("Couldn't clean up checksumjob " + "output file '{}'", outputFile, e);
569            }
570            try {
571                checksumResFile.cleanup();
572            } catch (IOFailure e) {
573                log.warn("Couldn't clean up checksumjob " + "remote file '{}'", checksumResFile.getName(), e);
574            }
575        }
576
577        // Process result
578        String orgCheckSum = ad.getCheckSum(arcfileName);
579        String repChannel = resolveReplicaChannel(msg.getReplyTo().getName());
580        processCheckSum(arcfileName, repChannel, orgCheckSum, reportedChecksum, msg.isOk() && checksumReadOk);
581    }
582
583    /**
584     * The message for handling the results of the GetChecksumMessage.
585     *
586     * @param msg The message containing the checksum of a specific file.
587     */
588    public synchronized void onChecksumReply(GetChecksumMessage msg) {
589        ArgumentNotValid.checkNotNull(msg, "msg");
590
591        log.debug("Received the reply to a GetChecksumMessage with ID: '{}'", msg.getID());
592
593        // handle the case when unwanted reply.
594        if (!outstandingChecksumFiles.containsKey(msg.getID())) {
595            log.warn("Received GetChecksumMessage with unknown originating ID {}\n{}\n. Known IDs are: {}",
596                    msg.getReplyOfId(), msg.toString(), outstandingChecksumFiles.keySet().toString());
597            return;
598        }
599
600        String arcfileName = outstandingChecksumFiles.remove(msg.getID());
601
602        // Check incoming message
603        if (!msg.isOk()) {
604            // Checksum job has ended with errors, but can contain checksum
605            // anyway, therefore it is logged - but we try to go on
606            log.warn("Message '{}' is reported not okay\nReported error: '{}'\nTrying to process anyway.", msg.getID(),
607                    msg.getErrMsg());
608        }
609
610        String reportedChecksum = msg.getChecksum();
611
612        // check the checksum
613        if (reportedChecksum == null) {
614            // set the reported checksum to empty, like for BAs.
615            reportedChecksum = "";
616        }
617
618        // The checksum was not read by the batchjob, therefore this variable
619        // cannot be false
620        boolean checksumReadOk = true;
621
622        // process the checksum.
623        String orgChecksum = ad.getCheckSum(arcfileName);
624        if (orgChecksum == null) {
625            throw new IllegalState("The admin checksum for file '" + arcfileName + "' is null. Should never happen.");
626        }
627        String repChannelName = resolveReplicaChannel(msg.getTo().getName());
628        processCheckSum(arcfileName, repChannelName, orgChecksum, reportedChecksum, checksumReadOk);
629    }
630
631    /**
632     * Reads output from a checksum file. Only the first instance of the desired file will be used. If other filenames
633     * are encountered than the wanted one, they will be logged at level warning, as that is indicative of serious
634     * errors. Having more than one instance of the desired file merely means it was found in several bitarchives, which
635     * is not our problem.
636     *
637     * @param outputFile The file to read checksum from.
638     * @param arcfileName The arcfile to find checksum for.
639     * @return The checksum, or the empty string if no checksum found for arcfilename.
640     * @throws IOFailure If any error occurs reading the file.
641     * @throws IllegalState if the read format is wrong
642     */
643    private String readChecksum(File outputFile, String arcfileName) throws IOFailure, IllegalState {
644        // List of lines in batch (checksum job) output file
645        List<String> lines = FileUtils.readListFromFile(outputFile);
646        // List of checksums found in batch (checksum job) output file
647        List<String> checksumList = new ArrayList<String>();
648
649        // Extract checksums for arcfile from lines
650        // If errors occurs then throw exception
651        for (String line : lines) {
652            String readFileName = "";
653            String checksum = "";
654            String[] tokens = line.split(ChecksumJob.STRING_FILENAME_SEPARATOR);
655            boolean ignoreLine = false;
656
657            // Check line format
658            ignoreLine = (tokens.length == 0 || line.isEmpty());
659            if (tokens.length != 2 && !ignoreLine) { // wrong format
660                throw new IllegalState("Read checksum line had unexpected format '" + line + "'");
661            }
662
663            // Check checksum and arc-file name in line
664            if (!ignoreLine) {
665                readFileName = tokens[0];
666                checksum = tokens[1];
667                if (checksum.length() == 0) { // wrong format of checksum
668                    // do not exit - there may be more checksums
669                    ignoreLine = true;
670                    log.warn("There were an empty checksum in result for checksums to arc-file '{}' (line: '{}')",
671                            arcfileName, line);
672                } else {
673                    if (!readFileName.equals(arcfileName)) { // wrong arcfile
674                        // do not exit - there may be more checksums
675                        ignoreLine = true;
676                        log.warn(
677                                "There were an unexpected arc-file name in checksum result for arc-file '{}' (line: '{}')",
678                                arcfileName, line);
679                    }
680                }
681            }
682
683            // If previously checksum found, then check if they are different.
684            if (checksumList.size() > 0 && !ignoreLine && !checksum.equals(checksumList.get(checksumList.size() - 1))) {
685                String errMsg = "The arc-file '" + arcfileName + "' was found with two different checksums: "
686                        + checksumList.get(0) + " and " + checksum + ". Last line: '" + line + "'.";
687                log.warn(errMsg);
688                throw new IllegalState(errMsg);
689            }
690
691            // Add error free non-empty found checksum in list
692            if (!ignoreLine) {
693                checksumList.add(checksum);
694            }
695        }
696
697        // Check that checksum list contain a result,
698        // log if it has more than one result
699        if (checksumList.size() > 1) {
700            // Log and proceed - the checksums are equal
701            log.warn("Arcfile '{}' was found with {} occurrences of the checksum: {}", arcfileName,
702                    checksumList.size(), checksumList.get(0));
703        }
704
705        if (checksumList.size() == 0) {
706            log.debug("Arcfile '{}' not found in lines of checksum output file '{}': {}", arcfileName, outputFile,
707                    FileUtils.readListFromFile(outputFile));
708            return "";
709        } else {
710            return checksumList.get(0);
711        }
712    }
713
714    /**
715     * Process reporting of a checksum from a bitarchive for a specific file as part of a store operation for the file.
716     * Verify that the checksum is correct, update the BitArchiveStoreState state. Invariant: upload-state is changed or
717     * retry count is increased.
718     *
719     * @param arcFileName The file being stored.
720     * @param replicaChannelName The id of the replica reporting a checksum.
721     * @param orgChecksum The original checksum.
722     * @param reportedChecksum The checksum calculated by the replica. This value is "", if an error has occurred
723     * (except reply NOT ok from replica).
724     * @param checksumReadOk Tells whether the checksum was read ok by batch job.
725     */
726    private synchronized void processCheckSum(String arcFileName, String replicaChannelName, String orgChecksum,
727            String reportedChecksum, boolean checksumReadOk) {
728        log.debug("Checksum received for file '{}'... processing", arcFileName);
729        ArgumentNotValid.checkNotNullOrEmpty(arcFileName, "String arcfileName");
730        ArgumentNotValid.checkNotNullOrEmpty(replicaChannelName, "String replicaChannelName");
731        ArgumentNotValid.checkNotNullOrEmpty(orgChecksum, "String orgChecksum");
732        ArgumentNotValid.checkNotNull(reportedChecksum, "String reportedChecksum");
733
734        // Log if we do not find file outstanding
735        // we proceed anyway in order to be sure to update stae of file
736        if (!outstandingRemoteFiles.containsKey(arcFileName)) {
737            log.warn("Could not find arc-file as outstanding remote file: '{}'", arcFileName);
738        }
739
740        // If everything works fine complete process of this checksum
741        if (orgChecksum.equals(reportedChecksum) && !reportedChecksum.isEmpty()) {
742
743            // Checksum is valid and job matches expected results
744            ad.setState(arcFileName, replicaChannelName, ReplicaStoreState.UPLOAD_COMPLETED);
745
746            // Find out if and how to make general reply on store()
747            // remove file from outstandingRemoteFiles if a reply is given
748            considerReplyingOnStore(arcFileName);
749            log.debug("Checksum processing for file '{}'... completed.", arcFileName);
750            return;
751        }
752
753        // Log error or retry upload
754        if (reportedChecksum.isEmpty()) { // no checksum found
755            if (checksumReadOk) { // no errors in finding no checksum
756                if (retryOk(replicaChannelName, arcFileName)) { // we can retry
757                    if (outstandingRemoteFiles.containsKey(arcFileName)) {
758                        RemoteFile rf = outstandingRemoteFiles.get(arcFileName);
759                        String preComputedChecksum = outstandingRemoteFilesC.get(arcFileName);
760                        // Retry upload only if allowed and in case we are sure
761                        // that the empty checksum means that the arcfile is not
762                        // in the archive
763                        log.debug("Retrying upload of '{}'", arcFileName);
764                        ad.setState(rf.getName(), replicaChannelName, ReplicaStoreState.UPLOAD_STARTED);
765                        // retrieve the replica from the name of the channel.
766                        Replica rep = Channels.retrieveReplicaFromIdentifierChannel(replicaChannelName);
767                        connectedReplicas.get(rep).sendUploadMessage(rf, preComputedChecksum);
768                        incRetry(replicaChannelName, arcFileName);
769                        log.debug("Checksum processing for file '{}'... completed.", arcFileName);
770                        return;
771                    } // else logging was already done above
772                } else { // cannot retry
773                    log.warn("Cannot do more retry upload of remote file: '{}' to '{}', reported checksum='{}'",
774                            arcFileName, replicaChannelName, reportedChecksum);
775                }
776            } else { // error in getting checksum
777                log.warn(
778                        "Cannot retry upload of remote file: '{}' to '{}', reported checksum='{}' due to earlier batchjob error.",
779                        arcFileName, replicaChannelName, reportedChecksum);
780            }
781        } else { // non empty checksum
782            if (!orgChecksum.equals(reportedChecksum)) {
783                log.warn("Cannot upload (wrong checksum) '{}' to '{}', reported checksum='{}'", arcFileName,
784                        replicaChannelName, reportedChecksum);
785            } else {
786                log.warn("Cannot upload (unknown reason) '{}' to '{}', reported checksum='{}'", arcFileName,
787                        replicaChannelName, reportedChecksum);
788            }
789        }
790
791        // This point is reached if there is some kind of (logged) error, i.e.
792        // - the file has not been accepted as completed
793        // - the file has not been sent to retry of upload
794        ad.setState(arcFileName, replicaChannelName, ReplicaStoreState.UPLOAD_FAILED);
795        considerReplyingOnStore(arcFileName);
796        log.debug("Checksum processing for file '{}'... completed.", arcFileName);
797    }
798
799    /**
800     * Keep track of upload retries of an arcfile to an archive.
801     *
802     * @param replicaChannelName The name of a given replica.
803     * @param arcfileName The name of a given ARC file
804     * @return true if it is ok to retry an upload of arcfileName to the replica through the replicaChannelName.
805     */
806    private boolean retryOk(String replicaChannelName, String arcfileName) {
807        Map<String, Integer> bitarchiveRetries = uploadRetries.get(replicaChannelName);
808        if (bitarchiveRetries == null) {
809            return true;
810        }
811        Integer retryCount = bitarchiveRetries.get(arcfileName);
812        if (retryCount == null) {
813            return true;
814        }
815
816        if (retryCount >= Settings.getInt(ArchiveSettings.ARCREPOSITORY_UPLOAD_RETRIES)) {
817            return false;
818        }
819
820        return true;
821    }
822
823    /**
824     * Increment the number of upload retries.
825     *
826     * @param replicaChannelName The name of the identification channel for the replica.
827     * @param arcfileName The name of a given ARC file.
828     */
829    private void incRetry(String replicaChannelName, String arcfileName) {
830        Map<String, Integer> replicaRetries = uploadRetries.get(replicaChannelName);
831        if (replicaRetries == null) {
832            replicaRetries = new HashMap<String, Integer>();
833            uploadRetries.put(replicaChannelName, replicaRetries);
834        }
835
836        Integer retryCount = replicaRetries.get(arcfileName);
837        if (retryCount == null) {
838            replicaRetries.put(arcfileName, Integer.valueOf(1));
839            return;
840        }
841
842        replicaRetries.put(arcfileName, Integer.valueOf(retryCount + 1));
843    }
844
845    /**
846     * Remove all retry tracking information for the arcfile.
847     *
848     * @param arcfileName The name of a given ARC file
849     */
850    private void clearRetries(String arcfileName) {
851        for (String replicaChannelName : uploadRetries.keySet()) {
852            Map<String, Integer> baretries = uploadRetries.get(replicaChannelName);
853            baretries.remove(arcfileName);
854        }
855    }
856
857    /**
858     * Change admin data entry for a given file.
859     * <p>
860     * The following information is contained in the given AdminDataMessage: 1) The name of the given file to change the
861     * entry for, 2) the name of the bitarchive to modify the entry for, 3) a boolean that says whether or not to
862     * replace the checksum for the entry for the given file in AdminData, 4) a replacement for the case where the
863     * former value is true.
864     *
865     * @param msg an AdminDataMessage object
866     */
867    public void updateAdminData(AdminDataMessage msg) {
868
869        if (!ad.hasEntry(msg.getFileName())) {
870            throw new ArgumentNotValid("No admin entry exists for the file '" + msg.getFileName() + "'");
871        }
872
873        String message = "Handling request to change admin data for '" + msg.getFileName() + "'. ";
874        // add information if store-state is changed.
875        if (msg.isChangeStoreState()) {
876            message += "Change store state to " + msg.getNewvalue();
877        }
878        // add information if checksum is changed.
879        if (msg.isChangeChecksum()) {
880            message += "Change checksum to " + msg.getChecksum();
881        }
882        // log the message.
883        log.warn(message);
884        NotificationsFactory.getInstance().notify(message, NotificationType.WARNING);
885
886        if (msg.isChangeStoreState()) {
887            String replicaChannelName = Replica.getReplicaFromId(msg.getReplicaId()).getIdentificationChannel()
888                    .getName();
889            ad.setState(msg.getFileName(), replicaChannelName, msg.getNewvalue());
890        }
891
892        if (msg.isChangeChecksum()) {
893            ad.setCheckSum(msg.getFileName(), msg.getChecksum());
894        }
895    }
896
897    /**
898     * Forwards a RemoveAndGetFileMessage to the designated bitarchive. Before forwarding the message it is verified
899     * that the checksum of the file to remove differs from the registered checksum of the file to remove. If no
900     * registration exists for the file to remove the message is always forwarded.
901     *
902     * @param msg the message to forward to a bitarchive
903     */
904    public void removeAndGetFile(RemoveAndGetFileMessage msg) {
905        // Prevent removal of files with correct checksum
906        if (ad.hasEntry(msg.getFileName())) {
907            String refchecksum = ad.getCheckSum(msg.getFileName());
908            if (msg.getCheckSum().equals(refchecksum)) {
909                throw new ArgumentNotValid("Attempting to remove file with correct checksum. File=" + msg.getFileName()
910                        + "; with checksum:" + msg.getCheckSum() + ";");
911            }
912        }
913
914        // checksum ok - try to remove the file
915        String errMsg = "Requesting remove of file '" + msg.getFileName() + "' with checksum '" + msg.getCheckSum()
916                + "' from: '" + msg.getReplicaId() + "'";
917        log.warn(errMsg);
918        NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING);
919        ReplicaClient rc = getReplicaClientFromReplicaId(msg.getReplicaId());
920        rc.sendRemoveAndGetFileMessage(msg);
921    }
922
923    /**
924     * Close all replicas connections, open loggers, and the ArcRepository handler.
925     */
926    public void close() {
927        log.info("Closing down ArcRepository");
928        cleanup();
929        log.info("Closed ArcRepository");
930    }
931
932    /**
933     * Closes all connections and nulls the instance. The ArcRepositoryHandler, the AdminData and the ReplicaClients are
934     * closed along with all their connections.
935     */
936    public void cleanup() {
937        if (arcReposhandler != null) {
938            arcReposhandler.close();
939            arcReposhandler = null;
940        }
941        if (connectedReplicas != null) {
942            for (ReplicaClient rc : connectedReplicas.values()) {
943                rc.close();
944            }
945            connectedReplicas.clear();
946        }
947        if (ad != null) {
948            ad.close();
949            ad = null;
950        }
951        instance = null;
952    }
953
954}