001/*
002 * #%L
003 * Netarchivesuite - archive
004 * %%
005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.archive.arcrepository;
024
025import java.io.File;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import dk.netarkivet.archive.ArchiveSettings;
036import dk.netarkivet.archive.arcrepository.bitpreservation.AdminDataMessage;
037import dk.netarkivet.archive.arcrepository.distribute.ArcRepositoryServer;
038import dk.netarkivet.archive.arcrepository.distribute.StoreMessage;
039import dk.netarkivet.archive.arcrepositoryadmin.Admin;
040import dk.netarkivet.archive.arcrepositoryadmin.AdminFactory;
041import dk.netarkivet.archive.bitarchive.distribute.BatchReplyMessage;
042import dk.netarkivet.archive.bitarchive.distribute.BitarchiveClient;
043import dk.netarkivet.archive.bitarchive.distribute.RemoveAndGetFileMessage;
044import dk.netarkivet.archive.bitarchive.distribute.UploadMessage;
045import dk.netarkivet.archive.checksum.distribute.ChecksumClient;
046import dk.netarkivet.archive.checksum.distribute.GetChecksumMessage;
047import dk.netarkivet.archive.distribute.ReplicaClient;
048import dk.netarkivet.common.distribute.ChannelID;
049import dk.netarkivet.common.distribute.Channels;
050import dk.netarkivet.common.distribute.JMSConnectionFactory;
051import dk.netarkivet.common.distribute.NetarkivetMessage;
052import dk.netarkivet.common.distribute.NullRemoteFile;
053import dk.netarkivet.common.distribute.RemoteFile;
054import dk.netarkivet.common.distribute.arcrepository.Replica;
055import dk.netarkivet.common.distribute.arcrepository.ReplicaStoreState;
056import dk.netarkivet.common.distribute.arcrepository.ReplicaType;
057import dk.netarkivet.common.exceptions.ArgumentNotValid;
058import dk.netarkivet.common.exceptions.IOFailure;
059import dk.netarkivet.common.exceptions.IllegalState;
060import dk.netarkivet.common.exceptions.UnknownID;
061import dk.netarkivet.common.utils.CleanupIF;
062import dk.netarkivet.common.utils.FileUtils;
063import dk.netarkivet.common.utils.NotificationType;
064import dk.netarkivet.common.utils.NotificationsFactory;
065import dk.netarkivet.common.utils.Settings;
066import dk.netarkivet.common.utils.batch.ChecksumJob;
067
068/**
069 * The Arcrepository handles the communication with the different replicas. This class ensures that arc files are stored
070 * in all available replica and verifies that the storage process succeeded. Retrieval of data from a replica goes
071 * through the JMSArcRepositoryClient that contacts the appropriate (typically nearest) replica and retrieves data from
072 * this archive. Batch execution is sent to the bitarchive replica(s), since batch cannot be executed on checksum
073 * replicas. Correction operations are typically only allowed on one replica.
074 */
075@SuppressWarnings({"deprecation"})
076public class ArcRepository implements CleanupIF {
077
078    /** The log. */
079    private static final Logger log = LoggerFactory.getLogger(ArcRepository.class);
080
081    /** The unique instance (singleton) of this class. */
082    private static ArcRepository instance;
083
084    /** The administration data associated with the arcrepository. */
085    private Admin ad;
086
087    /** The class which listens to messages sent to this instance of Arcrepository or its subclasses. */
088    private ArcRepositoryServer arcReposhandler;
089
090    /** A Map of a Replica and their corresponding ReplicaClient. From this Map the relevant channels can be found. */
091    private final Map<Replica, ReplicaClient> connectedReplicas = new HashMap<Replica, ReplicaClient>();
092
093    /** Map from MessageId to arcfiles for which there are outstanding checksum jobs. */
094    private final Map<String, String> outstandingChecksumFiles = new HashMap<String, String>();
095    
096    /**
097     * Map from filenames to remote files. Used for retrieving a remote file reference while a store operation is in
098     * process.
099     */
100    private final Map<String, RemoteFile> outstandingRemoteFiles = new HashMap<String, RemoteFile>();
101
102    private final Map<String, String> outstandingRemoteFilesC = new HashMap<String, String>();
103    
104    
105    /**
106     * Map from bitarchive names to Map from filenames to the number of times a file has been attempted uploaded to the
107     * the bitarchive.
108     */
109    private final Map<String, Map<String, Integer>> uploadRetries = new HashMap<String, Map<String, Integer>>();
110
111    /**
112     * Constructor for the ArcRepository. Connects the ArcRepository to all BitArchives, and initialises admin data.
113     *
114     * @throws IOFailure if admin data cannot be read/initialised or we cannot' connect to some bitarchive.
115     * @throws IllegalState if inconsistent channel info is given in settings.
116     */
117    protected ArcRepository() throws IOFailure, IllegalState {
118        // UpdateableAdminData Throws IOFailure
119        this.ad = AdminFactory.getInstance();
120        this.arcReposhandler = new ArcRepositoryServer(this);
121        initialiseReplicaClients();
122
123        log.info("Starting the ArcRepository");
124    }
125
126    /**
127     * Returns the unique ArcRepository instance.
128     *
129     * @return the instance.
130     * @throws IOFailure if admin data cannot be read/initialised or we cannot' connect to some bitarchive.
131     * @throws IllegalState if inconsistent channel info is given in settings.
132     */
133    public static synchronized ArcRepository getInstance() throws IllegalState, IOFailure {
134        if (instance == null) {
135            instance = new ArcRepository();
136        }
137        return instance;
138    }
139
140    /**
141     * Method for initialising the replica clients.
142     */
143    private void initialiseReplicaClients() {
144        // Get channels
145        ChannelID[] allBas = Channels.getAllArchives_ALL_BAs();
146        ChannelID[] anyBas = Channels.getAllArchives_ANY_BAs();
147        ChannelID[] theBamons = Channels.getAllArchives_BAMONs();
148        ChannelID[] theCrs = Channels.getAllArchives_CRs();
149
150        Replica[] replicas = Replica.getKnown().toArray(new Replica[theBamons.length]);
151        // Checks equal number of channels
152        checkChannels(allBas, anyBas, theBamons);
153
154        for (int i = 0; i < allBas.length; i++) {
155            Replica rep = replicas[i];
156            if (rep.getType() == ReplicaType.BITARCHIVE) {
157                connectedReplicas.put(rep, BitarchiveClient.getInstance(allBas[i], anyBas[i], theBamons[i]));
158            } else { // checksum replica
159                connectedReplicas.put(rep, ChecksumClient.getInstance(theCrs[i]));
160            }
161        }
162    }
163
164    /**
165     * Sanity check for data consistency in the construction of the ArcRepository, specifically that the number of
166     * ALL_BA, ANY_BA, and THE_BAMON queues are all equal to the number of credentials.
167     *
168     * @param allBas The topics for bitarchives
169     * @param anyBas The queues for bitarchives
170     * @param theBamons The queues for bitarchive monitors
171     * @throws IllegalState if inconsistent data is found
172     */
173    private void checkChannels(ChannelID[] allBas, ChannelID[] anyBas, ChannelID[] theBamons) throws IllegalState {
174        if (theBamons.length != allBas.length || theBamons.length != anyBas.length) {
175            throw new IllegalState("Inconsistent data found in construction of " + "ArcRepository: \n\nALL_BAs: "
176                    + Arrays.toString(allBas) + "\nANY_BAs: " + Arrays.toString(anyBas) + "\nTHE_BAMONs: "
177                    + Arrays.toString(theBamons));
178        }
179    }
180
181    /**
182     * Stores a file in all known replicas. It sends out a upload message to all replicas.
183     *
184     * @param rf The remotefile to be stored.
185     * @param replyInfo A StoreMessage used to reply with success or failure.
186     * @throws IOFailure If file couldn't be stored.
187     * @throws ArgumentNotValid If a input parameter is null.
188     */
189    public synchronized void store(RemoteFile rf, StoreMessage replyInfo) throws IOFailure, ArgumentNotValid {
190        ArgumentNotValid.checkNotNull(rf, "rf");
191        ArgumentNotValid.checkNotNull(replyInfo, "replyInfo");
192
193        final String filename = rf.getName();
194        log.info("Store started: '{}'", filename);
195
196        // Record, that store of this filename is in progress
197        // needed for retrying uploads.
198        if (outstandingRemoteFiles.containsKey(filename)) {
199            log.info("File: '{}' was outstanding from the start.", filename);
200        }
201        outstandingRemoteFiles.put(filename, rf);
202        outstandingRemoteFilesC.put(filename, replyInfo.getPrecomputedChecksum()); // Hack
203
204        if (ad.hasEntry(filename)) {
205            // Any valid entry (and all existing entries are now
206            // known to be valid) by definition has a checksum.
207            if (!rf.getChecksum().equals(ad.getCheckSum(filename))) {
208                String msg = "Attempting to store file '" + filename + "' with a different checksum than before: "
209                        + "Old checksum: " + ad.getCheckSum(filename) + ", new checksum: " + rf.getChecksum();
210                log.warn(msg);
211                replyNotOK(filename, replyInfo);
212                return;
213            }
214            log.debug("Retrying store of already known file '{}'," + " Already completed: {}", filename,
215                    isStoreCompleted(filename));
216            ad.setReplyInfo(filename, replyInfo);
217        } else {
218            ad.addEntry(filename, replyInfo, rf.getChecksum());
219        }
220        for (Map.Entry<Replica, ReplicaClient> entry : connectedReplicas.entrySet()) {
221            startUpload(rf, entry.getValue(), entry.getKey(), replyInfo);
222        }
223
224        // Check state and reply if needed
225        considerReplyingOnStore(filename);
226    }
227
228    /**
229     * Initiate uploading of file to a specific replica. The corresponding upload record in admin data is created.
230     *
231     * @param rf Remotefile to upload to replica.
232     * @param replicaClient The replica client to upload to.
233     * @param replica The replica where RemoteFile is to be stored.
234     * @param replyInfo 
235     */
236    private synchronized void startUpload(RemoteFile rf, ReplicaClient replicaClient, Replica replica, StoreMessage replyInfo) {
237        final String filename = rf.getName();
238        log.debug("Upload started of file '{}' to replica '{}'", filename, replica.getId());
239
240        String replicaChannelId = replica.getIdentificationChannel().getName();
241
242        if (!ad.hasState(filename, replicaChannelId)) {
243            // New upload
244            ad.setState(filename, replicaChannelId, ReplicaStoreState.UPLOAD_STARTED);
245            replicaClient.sendUploadMessage(rf, replyInfo.getPrecomputedChecksum()); // Updated to include checksum information
246        } else {
247            // Recovery from old upload
248            ReplicaStoreState storeState = ad.getState(filename, replicaChannelId);
249            switch (storeState) {
250            case UPLOAD_FAILED:
251            case UPLOAD_STARTED:
252            case DATA_UPLOADED:
253                log.debug("Recovery from old upload. StoreState: {}. Sending new Checksum request for verifying "
254                        + "whether the file '{}' has been succesfully uploaded for replica: '{}'", storeState,
255                        filename, replica);
256                // Unknown condition in bitarchive. Test with checksum job.
257                if (storeState == ReplicaStoreState.UPLOAD_FAILED) {
258                    ad.setState(filename, replicaChannelId, ReplicaStoreState.UPLOAD_STARTED);
259                    log.info("ReplicaStoreState for file '{}' on replica '{}' changed from '{}' to '{}'", filename,
260                            replica, ReplicaStoreState.UPLOAD_FAILED, ReplicaStoreState.UPLOAD_STARTED);
261                }
262                sendChecksumRequestForFile(filename, replicaClient);
263                break;
264            case UPLOAD_COMPLETED:
265                log.warn("Trying to upload file '{}' that already has state UPLOAD_COMPLETED for this replica",
266                        filename);
267                break;
268            default:
269                throw new UnknownID("Unknown state: '" + storeState + "'");
270            }
271        }
272    }
273
274    /**
275     * Method for retrieving the checksum of a specific file from the archive of a specific replica. If the replica is a
276     * BitArchive, then a Batch message with the ChecksumJob is sent. If the replica is a ChecksumArchive, then a
277     * GetChecksumMessage is sent.
278     *
279     * @param filename The file to checksum.
280     * @param replicaClient The client to retrieve the checksum of the file from.
281     */
282    private void sendChecksumRequestForFile(String filename, ReplicaClient replicaClient) {
283        NetarkivetMessage msg;
284
285        // Retrieve the checksum of the file.
286        msg = replicaClient.sendGetChecksumMessage(Channels.getTheRepos(), filename);
287
288        outstandingChecksumFiles.put(msg.getID(), filename);
289        log.debug("Checksum job message submitted for file '{}' with message id: '{}'", filename, msg.getID());
290    }
291
292    /**
293     * Test whether the current state is such that we may send a reply for the file we are currently processing, and
294     * send the reply if it is. We reply only when there is an outstanding message to reply to, and a) The file is
295     * reported complete in all replicas or b) No replica has outstanding reply messages AND some replica has reported
296     * failure.
297     *
298     * @param arcFileName The arcfile we consider replying to.
299     */
300    private synchronized void considerReplyingOnStore(String arcFileName) {
301        if (ad.hasReplyInfo(arcFileName)) {
302            if (isStoreCompleted(arcFileName)) {
303                replyOK(arcFileName, ad.removeReplyInfo(arcFileName));
304            } else if (oneReplicaHasFailed(arcFileName) && noReplicaInStateUploadStarted(arcFileName)) {
305                replyNotOK(arcFileName, ad.removeReplyInfo(arcFileName));
306            }
307        }
308    }
309
310    /**
311     * Reply to a store message with status Ok.
312     *
313     * @param arcFileName The file for which we are replying.
314     * @param msg The message to reply to.
315     */
316    private synchronized void replyOK(String arcFileName, StoreMessage msg) {
317        outstandingRemoteFiles.remove(arcFileName);
318        outstandingRemoteFilesC.remove(arcFileName);
319        clearRetries(arcFileName);
320        log.info("Store OK: '{}'", arcFileName);
321        log.debug("Sending store OK reply to message '{}'", msg);
322        JMSConnectionFactory.getInstance().reply(msg);
323    }
324
325    /**
326     * Reply to a store message with status NotOk.
327     *
328     * @param arcFileName The file for which we are replying.
329     * @param msg The message to reply to.
330     */
331    private synchronized void replyNotOK(String arcFileName, StoreMessage msg) {
332        outstandingRemoteFiles.remove(arcFileName);
333        outstandingRemoteFilesC.remove(arcFileName);
334        clearRetries(arcFileName);
335        msg.setNotOk("Failure while trying to store ARC file: " + arcFileName);
336        log.warn("Store NOT OK: '{}'", arcFileName);
337        log.debug("Sending store NOT OK reply to message '{}'", msg);
338        JMSConnectionFactory.getInstance().reply(msg);
339    }
340
341    /**
342     * Check if all replicas have reported that storage has been successfully completed. If this is the case return true
343     * else false.
344     *
345     * @param arcfileName The file being stored.
346     * @return true only if all replicas report UPLOAD_COMPLETED.
347     */
348    private boolean isStoreCompleted(String arcfileName) {
349        // TODO remove quadratic scaling hidden here!!
350        for (Replica rep : connectedReplicas.keySet()) {
351            try {
352                // retrieve the replica channel and check upload status.
353                if (ad.getState(arcfileName, rep.getIdentificationChannel().getName()) != ReplicaStoreState.UPLOAD_COMPLETED) {
354                    return false;
355                }
356            } catch (UnknownID e) {
357                // Since no upload status exists, then it cannot be completed!
358                log.warn("Non-fatal error! A replica does not have a upload status for the file '{}'.", arcfileName, e);
359                return false;
360            }
361        }
362
363        // Since no replica has a storestate differing from 'UPLOAD_COMPLETED'
364        // and no errors, then the store is completed for the entire system.
365        return true;
366    }
367
368    /**
369     * Checks if there are at least one replica that has reported that storage has failed. If this is the case return
370     * true else false.
371     *
372     * @param arcFileName the name of file being stored.
373     * @return true only if at least one replica report UPLOAD_FAILED.
374     */
375    private boolean oneReplicaHasFailed(String arcFileName) {
376        for (Replica rep : connectedReplicas.keySet()) {
377            try {
378                // retrieve the replica channel and check upload status.
379                String repChannel = rep.getIdentificationChannel().getName();
380                if (ad.getState(arcFileName, repChannel) == ReplicaStoreState.UPLOAD_FAILED) {
381                    return true;
382                }
383            } catch (UnknownID e) {
384                log.warn("Non-fatal error. One replica does not have a upload status for the file '{}'.", arcFileName,
385                        e);
386                return true;
387            }
388        }
389        return false;
390    }
391
392    /**
393     * Checks if no replicas which has reported that upload is in started state. If this is the case return true else
394     * false.
395     *
396     * @param arcFileName The name of the file being stored.
397     * @return true only if no replica report UPLOAD_STARTED.
398     */
399    private boolean noReplicaInStateUploadStarted(String arcFileName) {
400        for (Replica rep : connectedReplicas.keySet()) {
401            try {
402                // retrieve the replica channel and check upload status.
403                String repChannelName = rep.getIdentificationChannel().getName();
404                if (ad.getState(arcFileName, repChannelName) == ReplicaStoreState.UPLOAD_STARTED) {
405                    return false;
406                }
407            } catch (UnknownID e) {
408                // When no upload exists, then upload cannot have started.
409                log.warn("Non-fatal error! A replica does not have a upload status for tshe file '{}'.", arcFileName, e);
410            }
411        }
412
413        return true;
414    }
415
416    /**
417     * Returns a replica client based on a replica id.
418     *
419     * @param replicaId the replica id
420     * @return A replica client.
421     * @throws ArgumentNotValid if replicaId parameter is null
422     */
423    public ReplicaClient getReplicaClientFromReplicaId(String replicaId) throws ArgumentNotValid {
424        ArgumentNotValid.checkNotNullOrEmpty(replicaId, "replicaId");
425
426        // retrieve the replica client.
427        ReplicaClient rc = connectedReplicas.get(Replica.getReplicaFromId(replicaId));
428
429        return rc;
430    }
431
432    /**
433     * Finds the identification channel for the replica. If the replica is a BitArchive then the channel to the
434     * BitArchiveMonitor is returned, and if the replica is a ChecksumArchive then the checksum replica channel is
435     * returned. This means that only the channels for the bitarchive should be changed into the bamon channel, e.g.
436     * replacing the ALL_BA and ANY_BA identifiers with THE_BAMON. This change does not affect the checksum channel, and
437     * is therefore also performed on it.
438     *
439     * @param channel A channel to the replica.
440     * @return The name of the channel which identifies the replica.
441     */
442    private String resolveReplicaChannel(String channel) {
443        return channel.replaceAll("ALL_BA", "THE_BAMON").replaceAll("ANY_BA", "THE_BAMON");
444    }
445
446    /**
447     * Event handler for upload messages reporting the upload result. Checks the success status of the upload and
448     * updates admin data accordingly.
449     *
450     * @param msg an UploadMessage.
451     */
452    public synchronized void onUpload(UploadMessage msg) {
453        ArgumentNotValid.checkNotNull(msg, "msg");
454        log.debug("Received upload reply: {}", msg.toString());
455
456        String repChannelName = resolveReplicaChannel(msg.getTo().getName());
457
458        if (msg.isOk()) {
459            processDataUploaded(msg.getArcfileName(), repChannelName);
460        } else {
461            processUploadFailed(msg.getArcfileName(), repChannelName);
462        }
463    }
464
465    /**
466     * Process the report by a bitarchive that a file was correctly uploaded.
467     * <ol>
468     * <il>1. Update the upload, and store states appropriately.</il><br/>
469     * <il>2. Verify that data are correctly stored in the archive by running a batch job on the archived file to
470     * perform a MD5 checksum comparison.</il> <br/>
471     * <il>3. Check if store operation is completed and update admin data if so.</il><br/>
472     * </ol>
473     *
474     * @param arcfileName The arcfile that was uploaded.
475     * @param replicaChannelName The name of the identification channel for the replica that uploaded it (THE_BAMON for
476     * bitarchive and THE_CR for checksum).
477     */
478    private synchronized void processDataUploaded(String arcfileName, String replicaChannelName) {
479        log.debug("Data uploaded '{}' ,{}", arcfileName, replicaChannelName);
480        ad.setState(arcfileName, replicaChannelName, ReplicaStoreState.DATA_UPLOADED);
481
482        // retrieve the replica
483        Replica rep = Channels.retrieveReplicaFromIdentifierChannel(replicaChannelName);
484        // Verify that the file has been correctly uploaded.
485        sendChecksumRequestForFile(arcfileName, connectedReplicas.get(rep));
486    }
487
488    /**
489     * Update admin data with the information that upload to a replica failed. The replica record is set to
490     * UPLOAD_FAILED.
491     *
492     * @param arcfileName The file that resulted in an upload failure.
493     * @param replicaChannelName The name of the idenfiticaiton channel for the replica that could not upload the file.
494     */
495    private void processUploadFailed(String arcfileName, String replicaChannelName) {
496        log.warn("Upload failed for ARC file '{}' to bit archive '{}'", arcfileName, replicaChannelName);
497
498        // Update state to reflect upload failure
499        ad.setState(arcfileName, replicaChannelName, ReplicaStoreState.UPLOAD_FAILED);
500        considerReplyingOnStore(arcfileName);
501    }
502
503    /**
504     * Called when we receive replies on our checksum batch jobs.
505     * <p>
506     * This does not handle checksum replicas.
507     *
508     * @param msg a BatchReplyMessage.
509     */
510    public synchronized void onBatchReply(BatchReplyMessage msg) {
511        ArgumentNotValid.checkNotNull(msg, "msg");
512        log.debug("BatchReplyMessage received: '{}'", msg);
513
514        if (!outstandingChecksumFiles.containsKey(msg.getReplyOfId())) {
515            // Message was NOT expected
516            log.warn("Received batchreply message with unknown originating ID {}\n{}\n. Known IDs are: {}",
517                    msg.getReplyOfId(), msg.toString(), outstandingChecksumFiles.keySet().toString());
518            return;
519        }
520
521        String arcfileName = outstandingChecksumFiles.remove(msg.getReplyOfId());
522
523        // Check incoming message
524        if (!msg.isOk()) {
525            // Checksum job has ended with errors, but can contain checksum
526            // anyway, therefore it is logged - but we try to go on
527            log.warn("Message '" + msg.getID() + "' is reported not okay" + "\nReported error: '" + msg.getErrMsg()
528                    + "'" + "\nTrying to process anyway.");
529        }
530
531        // Parse results
532        // if legal result is found it is placed in reportedChecksum
533        // if illegal or errors occurs reportedChecksum is set to ""
534        RemoteFile checksumResFile = msg.getResultFile();
535        String reportedChecksum = "";
536        boolean checksumReadOk = false;
537        if (checksumResFile == null) {
538            log.debug("The results of message '{}' was null.\nNo checksum to use for file '{}'", msg.getID(),
539                    arcfileName);
540        } else if (checksumResFile instanceof NullRemoteFile) {
541            log.debug(
542                    "The results of the message '{}' was instance of NullRemoteFile\nNo checksum to use for file '{}'",
543                    msg.getID(), arcfileName);
544        } else {
545            // Read checksum
546            // Copy result to a local file
547            File outputFile = new File(FileUtils.getTempDir(), msg.getReplyTo().getName() + "_" + arcfileName
548                    + "_checksumOutput.txt");
549            try {
550                checksumResFile.copyTo(outputFile);
551
552                // Read checksum from local file
553                reportedChecksum = readChecksum(outputFile, arcfileName);
554                checksumReadOk = true;
555            } catch (IOFailure e) {
556                log.warn("Couldn't read checksumjob " + "output for '{}'", arcfileName, e);
557            } catch (IllegalState e) {
558                log.warn("Couldn't read result of checksumjob " + "in '{}'", arcfileName, e);
559            }
560
561            // Clean up output file and remote file
562            // clean up does NOT result in general error, i.e.
563            // reportedChecksum is NOT set to "" in case of errors
564            try {
565                FileUtils.removeRecursively(outputFile);
566            } catch (IOFailure e) {
567                log.warn("Couldn't clean up checksumjob " + "output file '{}'", outputFile, e);
568            }
569            try {
570                checksumResFile.cleanup();
571            } catch (IOFailure e) {
572                log.warn("Couldn't clean up checksumjob " + "remote file '{}'", checksumResFile.getName(), e);
573            }
574        }
575
576        // Process result
577        String orgCheckSum = ad.getCheckSum(arcfileName);
578        String repChannel = resolveReplicaChannel(msg.getReplyTo().getName());
579        processCheckSum(arcfileName, repChannel, orgCheckSum, reportedChecksum, msg.isOk() && checksumReadOk);
580    }
581
582    /**
583     * The message for handling the results of the GetChecksumMessage.
584     *
585     * @param msg The message containing the checksum of a specific file.
586     */
587    public synchronized void onChecksumReply(GetChecksumMessage msg) {
588        ArgumentNotValid.checkNotNull(msg, "msg");
589
590        log.debug("Received the reply to a GetChecksumMessage with ID: '{}'", msg.getID());
591
592        // handle the case when unwanted reply.
593        if (!outstandingChecksumFiles.containsKey(msg.getID())) {
594            log.warn("Received GetChecksumMessage with unknown originating ID {}\n{}\n. Known IDs are: {}",
595                    msg.getReplyOfId(), msg.toString(), outstandingChecksumFiles.keySet().toString());
596            return;
597        }
598
599        String arcfileName = outstandingChecksumFiles.remove(msg.getID());
600
601        // Check incoming message
602        if (!msg.isOk()) {
603            // Checksum job has ended with errors, but can contain checksum
604            // anyway, therefore it is logged - but we try to go on
605            log.warn("Message '{}' is reported not okay\nReported error: '{}'\nTrying to process anyway.", msg.getID(),
606                    msg.getErrMsg());
607        }
608
609        String reportedChecksum = msg.getChecksum();
610
611        // check the checksum
612        if (reportedChecksum == null) {
613            // set the reported checksum to empty, like for BAs.
614            reportedChecksum = "";
615        }
616
617        // The checksum was not read by the batchjob, therefore this variable
618        // cannot be false
619        boolean checksumReadOk = true;
620
621        // process the checksum.
622        String orgChecksum = ad.getCheckSum(arcfileName);
623        if (orgChecksum == null) {
624            throw new IllegalState("The admin checksum for file '" + arcfileName + "' is null. Should never happen.");
625        }
626        String repChannelName = resolveReplicaChannel(msg.getTo().getName());
627        processCheckSum(arcfileName, repChannelName, orgChecksum, reportedChecksum, checksumReadOk);
628    }
629
630    /**
631     * Reads output from a checksum file. Only the first instance of the desired file will be used. If other filenames
632     * are encountered than the wanted one, they will be logged at level warning, as that is indicative of serious
633     * errors. Having more than one instance of the desired file merely means it was found in several bitarchives, which
634     * is not our problem.
635     *
636     * @param outputFile The file to read checksum from.
637     * @param arcfileName The arcfile to find checksum for.
638     * @return The checksum, or the empty string if no checksum found for arcfilename.
639     * @throws IOFailure If any error occurs reading the file.
640     * @throws IllegalState if the read format is wrong
641     */
642    private String readChecksum(File outputFile, String arcfileName) throws IOFailure, IllegalState {
643        // List of lines in batch (checksum job) output file
644        List<String> lines = FileUtils.readListFromFile(outputFile);
645        // List of checksums found in batch (checksum job) output file
646        List<String> checksumList = new ArrayList<String>();
647
648        // Extract checksums for arcfile from lines
649        // If errors occurs then throw exception
650        for (String line : lines) {
651            String readFileName = "";
652            String checksum = "";
653            String[] tokens = line.split(ChecksumJob.STRING_FILENAME_SEPARATOR);
654            boolean ignoreLine = false;
655
656            // Check line format
657            ignoreLine = (tokens.length == 0 || line.isEmpty());
658            if (tokens.length != 2 && !ignoreLine) { // wrong format
659                throw new IllegalState("Read checksum line had unexpected format '" + line + "'");
660            }
661
662            // Check checksum and arc-file name in line
663            if (!ignoreLine) {
664                readFileName = tokens[0];
665                checksum = tokens[1];
666                if (checksum.length() == 0) { // wrong format of checksum
667                    // do not exit - there may be more checksums
668                    ignoreLine = true;
669                    log.warn("There were an empty checksum in result for checksums to arc-file '{}' (line: '{}')",
670                            arcfileName, line);
671                } else {
672                    if (!readFileName.equals(arcfileName)) { // wrong arcfile
673                        // do not exit - there may be more checksums
674                        ignoreLine = true;
675                        log.warn(
676                                "There were an unexpected arc-file name in checksum result for arc-file '{}' (line: '{}')",
677                                arcfileName, line);
678                    }
679                }
680            }
681
682            // If previously checksum found, then check if they are different.
683            if (checksumList.size() > 0 && !ignoreLine && !checksum.equals(checksumList.get(checksumList.size() - 1))) {
684                String errMsg = "The arc-file '" + arcfileName + "' was found with two different checksums: "
685                        + checksumList.get(0) + " and " + checksum + ". Last line: '" + line + "'.";
686                log.warn(errMsg);
687                throw new IllegalState(errMsg);
688            }
689
690            // Add error free non-empty found checksum in list
691            if (!ignoreLine) {
692                checksumList.add(checksum);
693            }
694        }
695
696        // Check that checksum list contain a result,
697        // log if it has more than one result
698        if (checksumList.size() > 1) {
699            // Log and proceed - the checksums are equal
700            log.warn("Arcfile '{}' was found with {} occurrences of the checksum: {}", arcfileName,
701                    checksumList.size(), checksumList.get(0));
702        }
703
704        if (checksumList.size() == 0) {
705            log.debug("Arcfile '{}' not found in lines of checksum output file '{}': {}", arcfileName, outputFile,
706                    FileUtils.readListFromFile(outputFile));
707            return "";
708        } else {
709            return checksumList.get(0);
710        }
711    }
712
713    /**
714     * Process reporting of a checksum from a bitarchive for a specific file as part of a store operation for the file.
715     * Verify that the checksum is correct, update the BitArchiveStoreState state. Invariant: upload-state is changed or
716     * retry count is increased.
717     *
718     * @param arcFileName The file being stored.
719     * @param replicaChannelName The id of the replica reporting a checksum.
720     * @param orgChecksum The original checksum.
721     * @param reportedChecksum The checksum calculated by the replica. This value is "", if an error has occurred
722     * (except reply NOT ok from replica).
723     * @param checksumReadOk Tells whether the checksum was read ok by batch job.
724     */
725    private synchronized void processCheckSum(String arcFileName, String replicaChannelName, String orgChecksum,
726            String reportedChecksum, boolean checksumReadOk) {
727        log.debug("Checksum received for file '{}'... processing", arcFileName);
728        ArgumentNotValid.checkNotNullOrEmpty(arcFileName, "String arcfileName");
729        ArgumentNotValid.checkNotNullOrEmpty(replicaChannelName, "String replicaChannelName");
730        ArgumentNotValid.checkNotNullOrEmpty(orgChecksum, "String orgChecksum");
731        ArgumentNotValid.checkNotNull(reportedChecksum, "String reportedChecksum");
732
733        // Log if we do not find file outstanding
734        // we proceed anyway in order to be sure to update stae of file
735        if (!outstandingRemoteFiles.containsKey(arcFileName)) {
736            log.warn("Could not find arc-file as outstanding remote file: '{}'", arcFileName);
737        }
738
739        // If everything works fine complete process of this checksum
740        if (orgChecksum.equals(reportedChecksum) && !reportedChecksum.isEmpty()) {
741
742            // Checksum is valid and job matches expected results
743            ad.setState(arcFileName, replicaChannelName, ReplicaStoreState.UPLOAD_COMPLETED);
744
745            // Find out if and how to make general reply on store()
746            // remove file from outstandingRemoteFiles if a reply is given
747            considerReplyingOnStore(arcFileName);
748            log.debug("Checksum processing for file '{}'... completed.", arcFileName);
749            return;
750        }
751
752        // Log error or retry upload
753        if (reportedChecksum.isEmpty()) { // no checksum found
754            if (checksumReadOk) { // no errors in finding no checksum
755                if (retryOk(replicaChannelName, arcFileName)) { // we can retry
756                    if (outstandingRemoteFiles.containsKey(arcFileName)) {
757                        RemoteFile rf = outstandingRemoteFiles.get(arcFileName);
758                        String preComputedChecksum = outstandingRemoteFilesC.get(arcFileName);
759                        // Retry upload only if allowed and in case we are sure
760                        // that the empty checksum means that the arcfile is not
761                        // in the archive
762                        log.debug("Retrying upload of '{}'", arcFileName);
763                        ad.setState(rf.getName(), replicaChannelName, ReplicaStoreState.UPLOAD_STARTED);
764                        // retrieve the replica from the name of the channel.
765                        Replica rep = Channels.retrieveReplicaFromIdentifierChannel(replicaChannelName);
766                        connectedReplicas.get(rep).sendUploadMessage(rf, preComputedChecksum);
767                        incRetry(replicaChannelName, arcFileName);
768                        log.debug("Checksum processing for file '{}'... completed.", arcFileName);
769                        return;
770                    } // else logging was already done above
771                } else { // cannot retry
772                    log.warn("Cannot do more retry upload of remote file: '{}' to '{}', reported checksum='{}'",
773                            arcFileName, replicaChannelName, reportedChecksum);
774                }
775            } else { // error in getting checksum
776                log.warn(
777                        "Cannot retry upload of remote file: '{}' to '{}', reported checksum='{}' due to earlier batchjob error.",
778                        arcFileName, replicaChannelName, reportedChecksum);
779            }
780        } else { // non empty checksum
781            if (!orgChecksum.equals(reportedChecksum)) {
782                log.warn("Cannot upload (wrong checksum) '{}' to '{}', reported checksum='{}'", arcFileName,
783                        replicaChannelName, reportedChecksum);
784            } else {
785                log.warn("Cannot upload (unknown reason) '{}' to '{}', reported checksum='{}'", arcFileName,
786                        replicaChannelName, reportedChecksum);
787            }
788        }
789
790        // This point is reached if there is some kind of (logged) error, i.e.
791        // - the file has not been accepted as completed
792        // - the file has not been sent to retry of upload
793        ad.setState(arcFileName, replicaChannelName, ReplicaStoreState.UPLOAD_FAILED);
794        considerReplyingOnStore(arcFileName);
795        log.debug("Checksum processing for file '{}'... completed.", arcFileName);
796    }
797
798    /**
799     * Keep track of upload retries of an arcfile to an archive.
800     *
801     * @param replicaChannelName The name of a given replica.
802     * @param arcfileName The name of a given ARC file
803     * @return true if it is ok to retry an upload of arcfileName to the replica through the replicaChannelName.
804     */
805    private boolean retryOk(String replicaChannelName, String arcfileName) {
806        Map<String, Integer> bitarchiveRetries = uploadRetries.get(replicaChannelName);
807        if (bitarchiveRetries == null) {
808            return true;
809        }
810        Integer retryCount = bitarchiveRetries.get(arcfileName);
811        if (retryCount == null) {
812            return true;
813        }
814
815        if (retryCount >= Settings.getInt(ArchiveSettings.ARCREPOSITORY_UPLOAD_RETRIES)) {
816            return false;
817        }
818
819        return true;
820    }
821
822    /**
823     * Increment the number of upload retries.
824     *
825     * @param replicaChannelName The name of the identification channel for the replica.
826     * @param arcfileName The name of a given ARC file.
827     */
828    private void incRetry(String replicaChannelName, String arcfileName) {
829        Map<String, Integer> replicaRetries = uploadRetries.get(replicaChannelName);
830        if (replicaRetries == null) {
831            replicaRetries = new HashMap<String, Integer>();
832            uploadRetries.put(replicaChannelName, replicaRetries);
833        }
834
835        Integer retryCount = replicaRetries.get(arcfileName);
836        if (retryCount == null) {
837            replicaRetries.put(arcfileName, Integer.valueOf(1));
838            return;
839        }
840
841        replicaRetries.put(arcfileName, Integer.valueOf(retryCount + 1));
842    }
843
844    /**
845     * Remove all retry tracking information for the arcfile.
846     *
847     * @param arcfileName The name of a given ARC file
848     */
849    private void clearRetries(String arcfileName) {
850        for (String replicaChannelName : uploadRetries.keySet()) {
851            Map<String, Integer> baretries = uploadRetries.get(replicaChannelName);
852            baretries.remove(arcfileName);
853        }
854    }
855
856    /**
857     * Change admin data entry for a given file.
858     * <p>
859     * The following information is contained in the given AdminDataMessage: 1) The name of the given file to change the
860     * entry for, 2) the name of the bitarchive to modify the entry for, 3) a boolean that says whether or not to
861     * replace the checksum for the entry for the given file in AdminData, 4) a replacement for the case where the
862     * former value is true.
863     *
864     * @param msg an AdminDataMessage object
865     */
866    public void updateAdminData(AdminDataMessage msg) {
867
868        if (!ad.hasEntry(msg.getFileName())) {
869            throw new ArgumentNotValid("No admin entry exists for the file '" + msg.getFileName() + "'");
870        }
871
872        String message = "Handling request to change admin data for '" + msg.getFileName() + "'. ";
873        // add information if store-state is changed.
874        if (msg.isChangeStoreState()) {
875            message += "Change store state to " + msg.getNewvalue();
876        }
877        // add information if checksum is changed.
878        if (msg.isChangeChecksum()) {
879            message += "Change checksum to " + msg.getChecksum();
880        }
881        // log the message.
882        log.warn(message);
883        NotificationsFactory.getInstance().notify(message, NotificationType.WARNING);
884
885        if (msg.isChangeStoreState()) {
886            String replicaChannelName = Replica.getReplicaFromId(msg.getReplicaId()).getIdentificationChannel()
887                    .getName();
888            ad.setState(msg.getFileName(), replicaChannelName, msg.getNewvalue());
889        }
890
891        if (msg.isChangeChecksum()) {
892            ad.setCheckSum(msg.getFileName(), msg.getChecksum());
893        }
894    }
895
896    /**
897     * Forwards a RemoveAndGetFileMessage to the designated bitarchive. Before forwarding the message it is verified
898     * that the checksum of the file to remove differs from the registered checksum of the file to remove. If no
899     * registration exists for the file to remove the message is always forwarded.
900     *
901     * @param msg the message to forward to a bitarchive
902     */
903    public void removeAndGetFile(RemoveAndGetFileMessage msg) {
904        // Prevent removal of files with correct checksum
905        if (ad.hasEntry(msg.getFileName())) {
906            String refchecksum = ad.getCheckSum(msg.getFileName());
907            if (msg.getCheckSum().equals(refchecksum)) {
908                throw new ArgumentNotValid("Attempting to remove file with correct checksum. File=" + msg.getFileName()
909                        + "; with checksum:" + msg.getCheckSum() + ";");
910            }
911        }
912
913        // checksum ok - try to remove the file
914        String errMsg = "Requesting remove of file '" + msg.getFileName() + "' with checksum '" + msg.getCheckSum()
915                + "' from: '" + msg.getReplicaId() + "'";
916        log.warn(errMsg);
917        NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING);
918        ReplicaClient rc = getReplicaClientFromReplicaId(msg.getReplicaId());
919        rc.sendRemoveAndGetFileMessage(msg);
920    }
921
922    /**
923     * Close all replicas connections, open loggers, and the ArcRepository handler.
924     */
925    public void close() {
926        log.info("Closing down ArcRepository");
927        cleanup();
928        log.info("Closed ArcRepository");
929    }
930
931    /**
932     * Closes all connections and nulls the instance. The ArcRepositoryHandler, the AdminData and the ReplicaClients are
933     * closed along with all their connections.
934     */
935    public void cleanup() {
936        if (arcReposhandler != null && arcReposhandler instanceof ArcRepositoryServer) {
937            ((ArcRepositoryServer) arcReposhandler).close();
938            arcReposhandler = null;
939        }
940        if (connectedReplicas != null) {
941            for (ReplicaClient rc : connectedReplicas.values()) {
942                rc.close();
943            }
944            connectedReplicas.clear();
945        }
946        if (ad != null) {
947            ad.close();
948            ad = null;
949        }
950        instance = null;
951    }
952
953}