001/*
002 * #%L
003 * Netarchivesuite - archive
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.archive.bitarchive.distribute;
024
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028import dk.netarkivet.archive.checksum.distribute.CorrectMessage;
029import dk.netarkivet.archive.checksum.distribute.GetAllChecksumsMessage;
030import dk.netarkivet.archive.checksum.distribute.GetAllFilenamesMessage;
031import dk.netarkivet.archive.checksum.distribute.GetChecksumMessage;
032import dk.netarkivet.archive.distribute.ReplicaClient;
033import dk.netarkivet.common.distribute.ChannelID;
034import dk.netarkivet.common.distribute.Channels;
035import dk.netarkivet.common.distribute.JMSConnection;
036import dk.netarkivet.common.distribute.JMSConnectionFactory;
037import dk.netarkivet.common.distribute.RemoteFile;
038import dk.netarkivet.common.distribute.arcrepository.ReplicaType;
039import dk.netarkivet.common.exceptions.ArgumentNotValid;
040import dk.netarkivet.common.exceptions.IOFailure;
041import dk.netarkivet.common.utils.batch.FileBatchJob;
042
043/**
044 * Proxy for remote bitarchive. Establishes a JMS connection to the remote bitarchive.
045 */
046public final class BitarchiveClient implements ReplicaClient {
047
048    // Each message is assigned a message id
049    /** The log. */
050    private static final Logger log = LoggerFactory.getLogger(BitarchiveClient.class);
051
052    /** Connection to JMS provider. */
053    private JMSConnection jmsCon;
054
055    // connection information
056    /** The ALL_BA channel for this replica. */
057    private ChannelID allBa;
058    /** The ANY_BA channel for this replica. */
059    private ChannelID anyBa;
060    /** The THE_BAMON channel for this replica. */
061    private ChannelID theBamon;
062    /** The channel to the ArcRepository. */
063    private ChannelID clientId = Channels.getTheRepos();
064    /** The name of the replica whose client this is. */
065    private String replicaId;
066
067    /**
068     * Establish the connection to the server.
069     *
070     * @param allBaIn topic to all bitarchives
071     * @param anyBaIn queue to one of the bitarchives
072     * @param theBamonIn queue to the bitarchive monitor
073     * @throws IOFailure If there is a problem making the connection.
074     */
075    private BitarchiveClient(ChannelID allBaIn, ChannelID anyBaIn, ChannelID theBamonIn) throws IOFailure {
076        this.allBa = allBaIn;
077        this.anyBa = anyBaIn;
078        this.theBamon = theBamonIn;
079        replicaId = Channels.retrieveReplicaFromIdentifierChannel(theBamon.getName()).getId();
080        jmsCon = JMSConnectionFactory.getInstance();
081    }
082
083    /**
084     * Factory that establish the connection to the server.
085     *
086     * @param allBaIn topic to all bitarchives
087     * @param anyBaIn queue to one of the bitarchives
088     * @param theBamonIn queue to the bitarchive monitor
089     * @return A BitarchiveClient
090     * @throws IOFailure If there is a problem making the connection.
091     */
092    public static BitarchiveClient getInstance(ChannelID allBaIn, ChannelID anyBaIn, ChannelID theBamonIn)
093            throws IOFailure {
094        return new BitarchiveClient(allBaIn, anyBaIn, theBamonIn);
095    }
096
097    /**
098     * Submit a get request to the bitarchive.
099     *
100     * @param arcfile The file containing the requested record
101     * @param index Offset of the ARC record in the file
102     * @return The submitted message or null if an error occured
103     */
104    public GetMessage get(String arcfile, long index) {
105        ArgumentNotValid.checkNotNullOrEmpty(arcfile, "arcfile");
106        ArgumentNotValid.checkNotNegative(index, "index");
107
108        // Create and send get message
109        GetMessage msg = new GetMessage(allBa, clientId, arcfile, index);
110        jmsCon.send(msg);
111
112        return msg;
113    }
114
115    /**
116     * Submit an already constructed batch message to the archive. The reply goes directly back to whoever sent the
117     * message.
118     *
119     * @param msg the message to be processed by the get command.
120     */
121    public void sendGetMessage(GetMessage msg) {
122        ArgumentNotValid.checkNotNull(msg, "msg");
123
124        log.debug("Resending get message '{}' to bitarchives", msg);
125
126        try {
127            jmsCon.resend(msg, Channels.getAllBa());
128        } catch (Throwable t) {
129            log.warn("Failure while resending {}", msg, t);
130            try {
131                msg.setNotOk(t);
132                jmsCon.reply(msg);
133            } catch (Throwable t1) {
134                log.warn("Failed to send error message back", t1);
135            }
136        }
137    }
138
139    /**
140     * Submit an already constructed getfile message to the archive.
141     *
142     * @param msg get file message to retrieve.
143     */
144    public void sendGetFileMessage(GetFileMessage msg) {
145        ArgumentNotValid.checkNotNull(msg, "msg");
146        log.debug("Resending get file message '{}' to bitarchives", msg);
147        jmsCon.resend(msg, this.allBa);
148    }
149
150    /**
151     * Forward the message to ALL_BA.
152     *
153     * @param msg the message to forward.
154     */
155    public void sendRemoveAndGetFileMessage(RemoveAndGetFileMessage msg) {
156        ArgumentNotValid.checkNotNull(msg, "msg");
157        jmsCon.resend(msg, this.allBa);
158    }
159
160    /**
161     * Sends a message to terminate a running batchjob.
162     *
163     * @param batchID The ID of the batchjob to terminate.
164     * @throws ArgumentNotValid If the batchID is either null or the empty string.
165     */
166    public void sendBatchTerminationMessage(String batchID) throws ArgumentNotValid {
167        ArgumentNotValid.checkNotNullOrEmpty(batchID, "String batchID");
168        // create and send the BatchTerminationMessage.
169        BatchTerminationMessage msg = new BatchTerminationMessage(this.allBa, batchID);
170        jmsCon.send(msg);
171    }
172
173    /**
174     * Submit an upload request to the bitarchive.
175     *
176     * @param rf The file to upload.
177     * @param precomputedChecksum A precomputed checksum
178     *          
179     * @throws IOFailure If access to file denied.
180     * @throws ArgumentNotValid If arcfile is null.
181     */
182    public void sendUploadMessage(RemoteFile rf, String precomputedChecksum) throws IOFailure, ArgumentNotValid {
183        ArgumentNotValid.checkNotNull(rf, "rf");
184        UploadMessage up = new UploadMessage(anyBa, clientId, rf);
185        up.setPrecomputedChecksum(precomputedChecksum);
186        log.debug("Sending upload message\n{}", up.toString());
187        jmsCon.send(up);
188    }
189
190    /**
191     * Submit an already constructed get message to the archive. This is used by the ArcRepository when forwarding batch
192     * jobs from its clients.
193     *
194     * @param bMsg a BatchMessage.
195     * @return The submitted message.
196     * @throws ArgumentNotValid If message is null.
197     */
198    public BatchMessage sendBatchJob(BatchMessage bMsg) throws ArgumentNotValid {
199        ArgumentNotValid.checkNotNull(bMsg, "bMsg");
200        log.debug("Resending batch message '{}' to bitarchive monitor {}", bMsg, this.theBamon);
201        jmsCon.resend(bMsg, this.theBamon);
202        return bMsg;
203    }
204
205    /**
206     * Submit a batch job to the archive. This is used by the ArcRepository when it needs to run batch jobs for its own
207     * reasons, i.e. when checksumming a file as part of the Store operation.
208     *
209     * @param replyChannel The channel that the reply of this job should be sent to.
210     * @param job The job that should be run on the bit archive handled by this client.
211     * @return The submitted message.
212     * @throws ArgumentNotValid If any parameter was null.
213     * @throws IOFailure If sending the batch message did not succeed.
214     */
215    public BatchMessage sendBatchJob(ChannelID replyChannel, FileBatchJob job) throws ArgumentNotValid, IOFailure {
216        ArgumentNotValid.checkNotNull(replyChannel, "replyChannel");
217        ArgumentNotValid.checkNotNull(job, "job");
218        BatchMessage bMsg = new BatchMessage(this.theBamon, replyChannel, job, replicaId);
219        jmsCon.send(bMsg);
220        return bMsg;
221    }
222
223    /**
224     * Release jms connections.
225     */
226    public void close() {
227        log.debug("Client has been shutdown");
228    }
229
230    /**
231     * For correcting an erroneous entry in the archive. The message is sent the replica for correcting the 'bad' entry.
232     *
233     * @param msg The correct message to correct the bad entry in the archive.
234     * @throws ArgumentNotValid If the CorrectMessage is null.
235     */
236    @Override
237    public void sendCorrectMessage(CorrectMessage msg) throws ArgumentNotValid {
238        ArgumentNotValid.checkNotNull(msg, "CorrectMessage msg");
239
240        jmsCon.resend(msg, theBamon);
241
242        log.debug("Sending CorrectMessage: '{}'", msg);
243    }
244
245    /**
246     * Method for sending a GetAllFilenamesMessage to a checksum archive.
247     *
248     * @param msg The GetAllFilenamesMessage, which will be sent through the jms connection to the checksum archive.
249     * @throws ArgumentNotValid If the GetAllFilenamesMessage is null.
250     */
251    public void sendGetAllFilenamesMessage(GetAllFilenamesMessage msg) throws ArgumentNotValid {
252        ArgumentNotValid.checkNotNull(msg, "GetAllFilenamesMessage msg");
253        // send the message to the archive.
254        jmsCon.resend(msg, theBamon);
255
256        // log message.
257        log.debug("Resending GetAllFilenamesMessage: '{}'.", msg.toString());
258    }
259
260    /**
261     * Method for sending the GetAllChecksumMessage to the ChecksumReplica.
262     *
263     * @param msg The GetAllChecksumMessage, which will be sent through the jms connection to the checksum archive.
264     * @throws ArgumentNotValid If the GetAllChecksumsMessage is null.
265     */
266    public void sendGetAllChecksumsMessage(GetAllChecksumsMessage msg) throws ArgumentNotValid {
267        ArgumentNotValid.checkNotNull(msg, "GetAllChecksumsMessage msg");
268        // send the message to the archive.
269        jmsCon.resend(msg, theBamon);
270
271        // log message.
272        log.debug("Sending GetAllChecksumMessage: '{}'.", msg.toString());
273    }
274
275    /**
276     * Method for retrieving the checksum of a specific arcfile within the archive.
277     *
278     * @param msg The GetChecksumMessage which will be sent to the checksum archive though the jms connection.
279     * @throws ArgumentNotValid If the GetChecksumMessage is null.
280     */
281    public void sendGetChecksumMessage(GetChecksumMessage msg) throws ArgumentNotValid {
282        // Validate arguments
283        ArgumentNotValid.checkNotNull(msg, "GetChecksumMessage msg");
284
285        jmsCon.resend(msg, theBamon);
286
287        // log what we are doing.
288        log.debug("Sending GetChecksumMessage: '{}'.", msg.toString());
289    }
290
291    /**
292     * Method for retrieving the checksum of a specific arcfile within the archive.
293     *
294     * @param replyChannel The channel where the reply should be sent.
295     * @param filename The GetChecksumMessage which has been sent to the checksum archive though the jms connection.
296     * @return The GetChecksumMessage which is sent.
297     * @throws ArgumentNotValid If the reply channel is null or if the filename is either null or the empty string.
298     */
299    public GetChecksumMessage sendGetChecksumMessage(ChannelID replyChannel, String filename) throws ArgumentNotValid {
300        // Validate arguments
301        ArgumentNotValid.checkNotNull(replyChannel, "ChannelID replyChannel");
302        ArgumentNotValid.checkNotNullOrEmpty(filename, "String filename");
303
304        // Send a GetChecksumMessage to the replica.
305        GetChecksumMessage msg = new GetChecksumMessage(theBamon, replyChannel, filename, replicaId);
306        jmsCon.send(msg);
307
308        // log what we are doing.
309        log.debug("Sending GetChecksumMessage: '{}'.", msg.toString());
310
311        return msg;
312    }
313
314    /**
315     * Retrieves the type of replica.
316     *
317     * @return The type of this replica. In this case Bitarchive.
318     */
319    public ReplicaType getType() {
320        return ReplicaType.BITARCHIVE;
321    }
322
323}