001/* 002 * #%L 003 * Netarchivesuite - archive 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.archive.arcrepository.distribute; 024 025import java.io.File; 026import java.util.Collections; 027 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import dk.netarkivet.archive.arcrepository.ArcRepository; 032import dk.netarkivet.archive.arcrepository.bitpreservation.AdminDataMessage; 033import dk.netarkivet.archive.bitarchive.distribute.BatchMessage; 034import dk.netarkivet.archive.bitarchive.distribute.BatchReplyMessage; 035import dk.netarkivet.archive.bitarchive.distribute.GetFileMessage; 036import dk.netarkivet.archive.bitarchive.distribute.GetMessage; 037import dk.netarkivet.archive.bitarchive.distribute.RemoveAndGetFileMessage; 038import dk.netarkivet.archive.bitarchive.distribute.UploadMessage; 039import dk.netarkivet.archive.checksum.distribute.CorrectMessage; 040import dk.netarkivet.archive.checksum.distribute.GetAllChecksumsMessage; 041import dk.netarkivet.archive.checksum.distribute.GetAllFilenamesMessage; 042import dk.netarkivet.archive.checksum.distribute.GetChecksumMessage; 043import dk.netarkivet.archive.distribute.ArchiveMessageHandler; 044import dk.netarkivet.archive.distribute.ReplicaClient; 045import dk.netarkivet.common.CommonSettings; 046import dk.netarkivet.common.distribute.ChannelID; 047import dk.netarkivet.common.distribute.Channels; 048import dk.netarkivet.common.distribute.JMSConnectionFactory; 049import dk.netarkivet.common.exceptions.ArgumentNotValid; 050import dk.netarkivet.common.utils.Settings; 051 052/** 053 * Listens on the queue "TheArcrepos" and submits the messages to a corresponding visit method on BitarchiveClient. 054 */ 055public class ArcRepositoryServer extends ArchiveMessageHandler { 056 057 /** The log. */ 058 private static final Logger log = LoggerFactory.getLogger(ArcRepositoryServer.class); 059 /** The ArcRepository connected to this server. */ 060 private final ArcRepository ar; 061 062 /** 063 * Creates and adds a ArcRepositoryMessageHandler as listener on the "TheArcrepos"-queue. 064 * 065 * @param ar the ArcRepository 066 */ 067 public ArcRepositoryServer(ArcRepository ar) { 068 ArgumentNotValid.checkNotNull(ar, "ArcRepository ar"); 069 this.ar = ar; 070 ChannelID channel = Channels.getTheRepos(); 071 log.info("Listening for arc repository messages on channel '{}'", channel); 072 JMSConnectionFactory.getInstance().setListener(channel, this); 073 } 074 075 /** 076 * Forwards the call to the ArcRepository.store() method with the StoreMessage as parameter. In case of exception 077 * when calling store, a reply message is sent containing the message set as NotOK. 078 * 079 * @param msg the message to be processed by the store command. 080 */ 081 public void visit(StoreMessage msg) { 082 ArgumentNotValid.checkNotNull(msg, "msg"); 083 try { 084 ar.store(msg.getRemoteFile(), msg); 085 } catch (Throwable t) { 086 log.warn("Failed to handle store request", t); 087 msg.setNotOk(t); 088 JMSConnectionFactory.getInstance().reply(msg); 089 } 090 } 091 092 /** 093 * Request a file to be deleted from bitarchives. This request will be handled by the bitarchives, and the 094 * bitarchive containing the file will reply with the removed file if succesful, or with a notOk message if 095 * unsuccesful. 096 * <p> 097 * Will send a not-ok reply on exceptions handling this request. 098 * 099 * @param msg the message to be processed 100 */ 101 public void visit(RemoveAndGetFileMessage msg) { 102 ArgumentNotValid.checkNotNull(msg, "msg"); 103 104 try { 105 ar.removeAndGetFile(msg); 106 } catch (Throwable t) { 107 log.warn("Failed to handle request to remove file", t); 108 msg.setNotOk(t); 109 JMSConnectionFactory.getInstance().reply(msg); 110 } 111 } 112 113 /** 114 * Update the admin data in the arcrepository. Reply aftwerwards. Will reply with notOk on exceptions. 115 * 116 * @param msg the message to be processed 117 */ 118 public void visit(AdminDataMessage msg) { 119 ArgumentNotValid.checkNotNull(msg, "msg"); 120 121 try { 122 ar.updateAdminData(msg); 123 JMSConnectionFactory.getInstance().reply(msg); 124 } catch (Throwable t) { 125 log.warn("Failed to handle request to change admin data", t); 126 msg.setNotOk(t); 127 JMSConnectionFactory.getInstance().reply(msg); 128 } 129 } 130 131 /** 132 * Forwards the handling of upload replies to the arc repository. Will log errors, but otherwise ignore. 133 * 134 * @param msg a UploadMessage 135 * @throws ArgumentNotValid If the message is null. 136 */ 137 public void visit(UploadMessage msg) throws ArgumentNotValid { 138 ArgumentNotValid.checkNotNull(msg, "UploadMessage msg"); 139 try { 140 ar.onUpload(msg); 141 } catch (Throwable t) { 142 log.warn("Failed to handle upload reply", t); 143 } 144 } 145 146 /** 147 * Forwards the handling of batch replies to the arc repository. Will log errors, but otherwise ignore. 148 * 149 * @param msg a BatchReplyMessage 150 * @throws ArgumentNotValid If the message is null. 151 */ 152 public void visit(BatchReplyMessage msg) throws ArgumentNotValid { 153 ArgumentNotValid.checkNotNull(msg, "BatchReplyMessage msg"); 154 155 try { 156 ar.onBatchReply(msg); 157 } catch (Throwable t) { 158 log.warn("Failed to handle batch reply", t); 159 } 160 } 161 162 /** 163 * Resends a batch message to the requested bitarchive. 164 * <p> 165 * Note that this circumvents the ArcRepository entirely and that the reply goes directly back to whoever set the 166 * message. 167 * 168 * @param msg the batch message to be resend. 169 * @throws ArgumentNotValid if parameters are null 170 */ 171 public void visit(BatchMessage msg) throws ArgumentNotValid { 172 ArgumentNotValid.checkNotNull(ar, "ar"); 173 ArgumentNotValid.checkNotNull(msg, "BatchMessage msg"); 174 175 try { 176 ReplicaClient rc = ar.getReplicaClientFromReplicaId(msg.getReplicaId()); 177 rc.sendBatchJob(msg); 178 } catch (Throwable t) { 179 log.warn("Failed to handle batch request", t); 180 BatchReplyMessage replyMessage = new BatchReplyMessage(msg.getReplyTo(), Channels.getTheRepos(), 181 msg.getID(), 0, Collections.<File>emptyList(), null); 182 replyMessage.setNotOk(t); 183 JMSConnectionFactory.getInstance().send(replyMessage); 184 } 185 } 186 187 /** 188 * Forwards a get message to the local bitarchive. 189 * <p> 190 * Note that this circumvents the ArcRepository entirely and that the reply goes directly back to whoever sent the 191 * message. 192 * 193 * @param msg the message to be processed by the get command. 194 * @throws ArgumentNotValid If the message is null. 195 */ 196 public void visit(GetMessage msg) throws ArgumentNotValid { 197 ArgumentNotValid.checkNotNull(msg, "GetMessage msg"); 198 199 try { 200 ReplicaClient rc = ar.getReplicaClientFromReplicaId(Settings.get(CommonSettings.USE_REPLICA_ID)); 201 rc.sendGetMessage(msg); 202 } catch (Throwable t) { 203 log.warn("Failed to handle get request", t); 204 msg.setNotOk(t); 205 JMSConnectionFactory.getInstance().reply(msg); 206 } 207 } 208 209 /** 210 * Forwards a getfile message to requested bitarchive replica. 211 * <p> 212 * Note that this circumvents the ArcRepository entirely and that the reply goes directly back to whoever set the 213 * message. 214 * 215 * @param msg the message to be processed by the get command. 216 * @throws ArgumentNotValid If one of the arguments are null. 217 */ 218 public void visit(GetFileMessage msg) throws ArgumentNotValid { 219 ArgumentNotValid.checkNotNull(msg, "GetFileMessage msg"); 220 221 try { 222 ReplicaClient rc = ar.getReplicaClientFromReplicaId(msg.getReplicaId()); 223 rc.sendGetFileMessage(msg); 224 } catch (Throwable t) { 225 log.warn("Failed to handle get file request", t); 226 msg.setNotOk(t); 227 JMSConnectionFactory.getInstance().reply(msg); 228 } 229 } 230 231 /** 232 * For retrieving all the filenames from a replica. 233 * 234 * @param msg The message to be processed. 235 * @throws ArgumentNotValid If the argument is null. 236 */ 237 public void visit(GetAllFilenamesMessage msg) throws ArgumentNotValid { 238 ArgumentNotValid.checkNotNull(msg, "GetAllFilenames msg"); 239 240 try { 241 // retrieve the checksum client 242 ReplicaClient rc = ar.getReplicaClientFromReplicaId(msg.getReplicaId()); 243 rc.sendGetAllFilenamesMessage(msg); 244 } catch (Throwable t) { 245 log.warn("Failed to handle GetAllFilenamesMessage: {}", msg, t); 246 msg.setNotOk(t); 247 JMSConnectionFactory.getInstance().reply(msg); 248 } 249 } 250 251 /** 252 * Method for retrieving all the checksums from a replica. 253 * 254 * @param msg The GetAllChecksumsMessage. 255 * @throws ArgumentNotValid If the GetAllChecksumsMessage is null. 256 */ 257 public void visit(GetAllChecksumsMessage msg) throws ArgumentNotValid { 258 ArgumentNotValid.checkNotNull(msg, "GetAllChecksumsMessage msg"); 259 260 try { 261 // retrieve the checksum client 262 ReplicaClient rc = ar.getReplicaClientFromReplicaId(msg.getReplicaId()); 263 rc.sendGetAllChecksumsMessage(msg); 264 } catch (Throwable t) { 265 log.warn("Failed to handle GetAllChecksumsMessage: {}", msg, t); 266 msg.setNotOk(t); 267 JMSConnectionFactory.getInstance().reply(msg); 268 } 269 } 270 271 /** 272 * Method for handling the results of a GetChecksumMessage. This should be handled similar to a ReplyBatchMessage, 273 * when a batchjob has run on a single file. 274 * 275 * @param msg The GetChecksumMessage message. 276 */ 277 public void visit(GetChecksumMessage msg) { 278 ArgumentNotValid.checkNotNull(msg, "GetChecksum msg"); 279 280 log.info("Received GetChecksumMessage '{}'.", msg); 281 282 // If it is a reply, then handle by arc-repository. 283 // Otherwise send further. 284 if (msg.getIsReply()) { 285 try { 286 ar.onChecksumReply(msg); 287 } catch (Throwable t) { 288 log.warn("Failed to handle GetChecksumMessage", t); 289 } 290 } else { 291 try { 292 ReplicaClient rc = ar.getReplicaClientFromReplicaId(msg.getReplicaId()); 293 rc.sendGetChecksumMessage(msg); 294 } catch (Throwable t) { 295 log.warn("Failed to handle GetChecksumMessage.", t); 296 msg.setNotOk(t); 297 JMSConnectionFactory.getInstance().reply(msg); 298 } 299 } 300 } 301 302 /** 303 * Method for handling CorrectMessages. This message is just sent along to the corresponding replica archive, where 304 * the 'bad' entry will be corrected (made backup of and then replaced). 305 * 306 * @param msg The message for correcting a bad entry in an archive. 307 * @throws ArgumentNotValid If the CorrectMessage is null. 308 */ 309 public void visit(CorrectMessage msg) throws ArgumentNotValid { 310 ArgumentNotValid.checkNotNull(msg, "CorrectMessage msg"); 311 log.debug("Receiving CorrectMessage: {}", msg); 312 313 try { 314 ReplicaClient rc = ar.getReplicaClientFromReplicaId(msg.getReplicaId()); 315 rc.sendCorrectMessage(msg); 316 } catch (Throwable t) { 317 log.warn("Could not handle Correct message properly.", t); 318 msg.setNotOk(t); 319 JMSConnectionFactory.getInstance().reply(msg); 320 } 321 } 322 323 /** 324 * Removes the ArcRepositoryMessageHandler as listener. 325 */ 326 public void close() { 327 JMSConnectionFactory.getInstance().removeListener(Channels.getTheRepos(), this); 328 } 329 330}