001/*
002 * #%L
003 * Netarchivesuite - archive
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.archive.arcrepository.distribute;
024
025import java.io.File;
026import java.util.Collections;
027
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import dk.netarkivet.archive.arcrepository.ArcRepository;
032import dk.netarkivet.archive.arcrepository.bitpreservation.AdminDataMessage;
033import dk.netarkivet.archive.bitarchive.distribute.BatchMessage;
034import dk.netarkivet.archive.bitarchive.distribute.BatchReplyMessage;
035import dk.netarkivet.archive.bitarchive.distribute.GetFileMessage;
036import dk.netarkivet.archive.bitarchive.distribute.GetMessage;
037import dk.netarkivet.archive.bitarchive.distribute.RemoveAndGetFileMessage;
038import dk.netarkivet.archive.bitarchive.distribute.UploadMessage;
039import dk.netarkivet.archive.checksum.distribute.CorrectMessage;
040import dk.netarkivet.archive.checksum.distribute.GetAllChecksumsMessage;
041import dk.netarkivet.archive.checksum.distribute.GetAllFilenamesMessage;
042import dk.netarkivet.archive.checksum.distribute.GetChecksumMessage;
043import dk.netarkivet.archive.distribute.ArchiveMessageHandler;
044import dk.netarkivet.archive.distribute.ReplicaClient;
045import dk.netarkivet.common.CommonSettings;
046import dk.netarkivet.common.distribute.ChannelID;
047import dk.netarkivet.common.distribute.Channels;
048import dk.netarkivet.common.distribute.JMSConnectionFactory;
049import dk.netarkivet.common.exceptions.ArgumentNotValid;
050import dk.netarkivet.common.utils.Settings;
051
052/**
053 * Listens on the queue "TheArcrepos" and submits the messages to a corresponding visit method on BitarchiveClient.
054 */
055public class ArcRepositoryServer extends ArchiveMessageHandler {
056
057    /** The log. */
058    private static final Logger log = LoggerFactory.getLogger(ArcRepositoryServer.class);
059    /** The ArcRepository connected to this server. */
060    private final ArcRepository ar;
061
062    /**
063     * Creates and adds a ArcRepositoryMessageHandler as listener on the "TheArcrepos"-queue.
064     *
065     * @param ar the ArcRepository
066     */
067    public ArcRepositoryServer(ArcRepository ar) {
068        ArgumentNotValid.checkNotNull(ar, "ArcRepository ar");
069        this.ar = ar;
070        ChannelID channel = Channels.getTheRepos();
071        log.info("Listening for arc repository messages on channel '{}'", channel);
072        JMSConnectionFactory.getInstance().setListener(channel, this);
073    }
074
075    /**
076     * Forwards the call to the ArcRepository.store() method with the StoreMessage as parameter. In case of exception
077     * when calling store, a reply message is sent containing the message set as NotOK.
078     *
079     * @param msg the message to be processed by the store command.
080     */
081    public void visit(StoreMessage msg) {
082        ArgumentNotValid.checkNotNull(msg, "msg");
083        try {
084            ar.store(msg.getRemoteFile(), msg);
085        } catch (Throwable t) {
086            log.warn("Failed to handle store request", t);
087            msg.setNotOk(t);
088            JMSConnectionFactory.getInstance().reply(msg);
089        }
090    }
091
092    /**
093     * Request a file to be deleted from bitarchives. This request will be handled by the bitarchives, and the
094     * bitarchive containing the file will reply with the removed file if succesful, or with a notOk message if
095     * unsuccesful.
096     * <p>
097     * Will send a not-ok reply on exceptions handling this request.
098     *
099     * @param msg the message to be processed
100     */
101    public void visit(RemoveAndGetFileMessage msg) {
102        ArgumentNotValid.checkNotNull(msg, "msg");
103
104        try {
105            ar.removeAndGetFile(msg);
106        } catch (Throwable t) {
107            log.warn("Failed to handle request to remove file", t);
108            msg.setNotOk(t);
109            JMSConnectionFactory.getInstance().reply(msg);
110        }
111    }
112
113    /**
114     * Update the admin data in the arcrepository. Reply aftwerwards. Will reply with notOk on exceptions.
115     *
116     * @param msg the message to be processed
117     */
118    public void visit(AdminDataMessage msg) {
119        ArgumentNotValid.checkNotNull(msg, "msg");
120
121        try {
122            ar.updateAdminData(msg);
123            JMSConnectionFactory.getInstance().reply(msg);
124        } catch (Throwable t) {
125            log.warn("Failed to handle request to change admin data", t);
126            msg.setNotOk(t);
127            JMSConnectionFactory.getInstance().reply(msg);
128        }
129    }
130
131    /**
132     * Forwards the handling of upload replies to the arc repository. Will log errors, but otherwise ignore.
133     *
134     * @param msg a UploadMessage
135     * @throws ArgumentNotValid If the message is null.
136     */
137    public void visit(UploadMessage msg) throws ArgumentNotValid {
138        ArgumentNotValid.checkNotNull(msg, "UploadMessage msg");
139        try {
140            ar.onUpload(msg);
141        } catch (Throwable t) {
142            log.warn("Failed to handle upload reply", t);
143        }
144    }
145
146    /**
147     * Forwards the handling of batch replies to the arc repository. Will log errors, but otherwise ignore.
148     *
149     * @param msg a BatchReplyMessage
150     * @throws ArgumentNotValid If the message is null.
151     */
152    public void visit(BatchReplyMessage msg) throws ArgumentNotValid {
153        ArgumentNotValid.checkNotNull(msg, "BatchReplyMessage msg");
154
155        try {
156            ar.onBatchReply(msg);
157        } catch (Throwable t) {
158            log.warn("Failed to handle batch reply", t);
159        }
160    }
161
162    /**
163     * Resends a batch message to the requested bitarchive.
164     * <p>
165     * Note that this circumvents the ArcRepository entirely and that the reply goes directly back to whoever set the
166     * message.
167     *
168     * @param msg the batch message to be resend.
169     * @throws ArgumentNotValid if parameters are null
170     */
171    public void visit(BatchMessage msg) throws ArgumentNotValid {
172        ArgumentNotValid.checkNotNull(ar, "ar");
173        ArgumentNotValid.checkNotNull(msg, "BatchMessage msg");
174
175        try {
176            ReplicaClient rc = ar.getReplicaClientFromReplicaId(msg.getReplicaId());
177            rc.sendBatchJob(msg);
178        } catch (Throwable t) {
179            log.warn("Failed to handle batch request", t);
180            BatchReplyMessage replyMessage = new BatchReplyMessage(msg.getReplyTo(), Channels.getTheRepos(),
181                    msg.getID(), 0, Collections.<File>emptyList(), null);
182            replyMessage.setNotOk(t);
183            JMSConnectionFactory.getInstance().send(replyMessage);
184        }
185    }
186
187    /**
188     * Forwards a get message to the local bitarchive.
189     * <p>
190     * Note that this circumvents the ArcRepository entirely and that the reply goes directly back to whoever sent the
191     * message.
192     *
193     * @param msg the message to be processed by the get command.
194     * @throws ArgumentNotValid If the message is null.
195     */
196    public void visit(GetMessage msg) throws ArgumentNotValid {
197        ArgumentNotValid.checkNotNull(msg, "GetMessage msg");
198
199        try {
200            ReplicaClient rc = ar.getReplicaClientFromReplicaId(Settings.get(CommonSettings.USE_REPLICA_ID));
201            rc.sendGetMessage(msg);
202        } catch (Throwable t) {
203            log.warn("Failed to handle get request", t);
204            msg.setNotOk(t);
205            JMSConnectionFactory.getInstance().reply(msg);
206        }
207    }
208
209    /**
210     * Forwards a getfile message to requested bitarchive replica.
211     * <p>
212     * Note that this circumvents the ArcRepository entirely and that the reply goes directly back to whoever set the
213     * message.
214     *
215     * @param msg the message to be processed by the get command.
216     * @throws ArgumentNotValid If one of the arguments are null.
217     */
218    public void visit(GetFileMessage msg) throws ArgumentNotValid {
219        ArgumentNotValid.checkNotNull(msg, "GetFileMessage msg");
220
221        try {
222            ReplicaClient rc = ar.getReplicaClientFromReplicaId(msg.getReplicaId());
223            rc.sendGetFileMessage(msg);
224        } catch (Throwable t) {
225            log.warn("Failed to handle get file request", t);
226            msg.setNotOk(t);
227            JMSConnectionFactory.getInstance().reply(msg);
228        }
229    }
230
231    /**
232     * For retrieving all the filenames from a replica.
233     *
234     * @param msg The message to be processed.
235     * @throws ArgumentNotValid If the argument is null.
236     */
237    public void visit(GetAllFilenamesMessage msg) throws ArgumentNotValid {
238        ArgumentNotValid.checkNotNull(msg, "GetAllFilenames msg");
239
240        try {
241            // retrieve the checksum client
242            ReplicaClient rc = ar.getReplicaClientFromReplicaId(msg.getReplicaId());
243            rc.sendGetAllFilenamesMessage(msg);
244        } catch (Throwable t) {
245            log.warn("Failed to handle GetAllFilenamesMessage: {}", msg, t);
246            msg.setNotOk(t);
247            JMSConnectionFactory.getInstance().reply(msg);
248        }
249    }
250
251    /**
252     * Method for retrieving all the checksums from a replica.
253     *
254     * @param msg The GetAllChecksumsMessage.
255     * @throws ArgumentNotValid If the GetAllChecksumsMessage is null.
256     */
257    public void visit(GetAllChecksumsMessage msg) throws ArgumentNotValid {
258        ArgumentNotValid.checkNotNull(msg, "GetAllChecksumsMessage msg");
259
260        try {
261            // retrieve the checksum client
262            ReplicaClient rc = ar.getReplicaClientFromReplicaId(msg.getReplicaId());
263            rc.sendGetAllChecksumsMessage(msg);
264        } catch (Throwable t) {
265            log.warn("Failed to handle GetAllChecksumsMessage: {}", msg, t);
266            msg.setNotOk(t);
267            JMSConnectionFactory.getInstance().reply(msg);
268        }
269    }
270
271    /**
272     * Method for handling the results of a GetChecksumMessage. This should be handled similar to a ReplyBatchMessage,
273     * when a batchjob has run on a single file.
274     *
275     * @param msg The GetChecksumMessage message.
276     */
277    public void visit(GetChecksumMessage msg) {
278        ArgumentNotValid.checkNotNull(msg, "GetChecksum msg");
279
280        log.info("Received GetChecksumMessage '{}'.", msg);
281
282        // If it is a reply, then handle by arc-repository.
283        // Otherwise send further.
284        if (msg.getIsReply()) {
285            try {
286                ar.onChecksumReply(msg);
287            } catch (Throwable t) {
288                log.warn("Failed to handle GetChecksumMessage", t);
289            }
290        } else {
291            try {
292                ReplicaClient rc = ar.getReplicaClientFromReplicaId(msg.getReplicaId());
293                rc.sendGetChecksumMessage(msg);
294            } catch (Throwable t) {
295                log.warn("Failed to handle GetChecksumMessage.", t);
296                msg.setNotOk(t);
297                JMSConnectionFactory.getInstance().reply(msg);
298            }
299        }
300    }
301
302    /**
303     * Method for handling CorrectMessages. This message is just sent along to the corresponding replica archive, where
304     * the 'bad' entry will be corrected (made backup of and then replaced).
305     *
306     * @param msg The message for correcting a bad entry in an archive.
307     * @throws ArgumentNotValid If the CorrectMessage is null.
308     */
309    public void visit(CorrectMessage msg) throws ArgumentNotValid {
310        ArgumentNotValid.checkNotNull(msg, "CorrectMessage msg");
311        log.debug("Receiving CorrectMessage: {}", msg);
312
313        try {
314            ReplicaClient rc = ar.getReplicaClientFromReplicaId(msg.getReplicaId());
315            rc.sendCorrectMessage(msg);
316        } catch (Throwable t) {
317            log.warn("Could not handle Correct message properly.", t);
318            msg.setNotOk(t);
319            JMSConnectionFactory.getInstance().reply(msg);
320        }
321    }
322
323    /**
324     * Removes the ArcRepositoryMessageHandler as listener.
325     */
326    public void close() {
327        JMSConnectionFactory.getInstance().removeListener(Channels.getTheRepos(), this);
328    }
329
330}