001/* 002 * #%L 003 * Netarchivesuite - archive 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.archive.bitarchive.distribute; 024 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028import dk.netarkivet.archive.checksum.distribute.CorrectMessage; 029import dk.netarkivet.archive.checksum.distribute.GetAllChecksumsMessage; 030import dk.netarkivet.archive.checksum.distribute.GetAllFilenamesMessage; 031import dk.netarkivet.archive.checksum.distribute.GetChecksumMessage; 032import dk.netarkivet.archive.distribute.ReplicaClient; 033import dk.netarkivet.common.distribute.ChannelID; 034import dk.netarkivet.common.distribute.Channels; 035import dk.netarkivet.common.distribute.JMSConnection; 036import dk.netarkivet.common.distribute.JMSConnectionFactory; 037import dk.netarkivet.common.distribute.RemoteFile; 038import dk.netarkivet.common.distribute.arcrepository.ReplicaType; 039import dk.netarkivet.common.exceptions.ArgumentNotValid; 040import dk.netarkivet.common.exceptions.IOFailure; 041import dk.netarkivet.common.utils.batch.FileBatchJob; 042 043/** 044 * Proxy for remote bitarchive. Establishes a JMS connection to the remote bitarchive. 045 */ 046public final class BitarchiveClient implements ReplicaClient { 047 048 // Each message is assigned a message id 049 /** The log. */ 050 private static final Logger log = LoggerFactory.getLogger(BitarchiveClient.class); 051 052 /** Connection to JMS provider. */ 053 private JMSConnection jmsCon; 054 055 // connection information 056 /** The ALL_BA channel for this replica. */ 057 private ChannelID allBa; 058 /** The ANY_BA channel for this replica. */ 059 private ChannelID anyBa; 060 /** The THE_BAMON channel for this replica. */ 061 private ChannelID theBamon; 062 /** The channel to the ArcRepository. */ 063 private ChannelID clientId = Channels.getTheRepos(); 064 /** The name of the replica whose client this is. */ 065 private String replicaId; 066 067 /** 068 * Establish the connection to the server. 069 * 070 * @param allBaIn topic to all bitarchives 071 * @param anyBaIn queue to one of the bitarchives 072 * @param theBamonIn queue to the bitarchive monitor 073 * @throws IOFailure If there is a problem making the connection. 074 */ 075 private BitarchiveClient(ChannelID allBaIn, ChannelID anyBaIn, ChannelID theBamonIn) throws IOFailure { 076 this.allBa = allBaIn; 077 this.anyBa = anyBaIn; 078 this.theBamon = theBamonIn; 079 replicaId = Channels.retrieveReplicaFromIdentifierChannel(theBamon.getName()).getId(); 080 jmsCon = JMSConnectionFactory.getInstance(); 081 } 082 083 /** 084 * Factory that establish the connection to the server. 085 * 086 * @param allBaIn topic to all bitarchives 087 * @param anyBaIn queue to one of the bitarchives 088 * @param theBamonIn queue to the bitarchive monitor 089 * @return A BitarchiveClient 090 * @throws IOFailure If there is a problem making the connection. 091 */ 092 public static BitarchiveClient getInstance(ChannelID allBaIn, ChannelID anyBaIn, ChannelID theBamonIn) 093 throws IOFailure { 094 return new BitarchiveClient(allBaIn, anyBaIn, theBamonIn); 095 } 096 097 /** 098 * Submit a get request to the bitarchive. 099 * 100 * @param arcfile The file containing the requested record 101 * @param index Offset of the ARC record in the file 102 * @return The submitted message or null if an error occured 103 */ 104 public GetMessage get(String arcfile, long index) { 105 ArgumentNotValid.checkNotNullOrEmpty(arcfile, "arcfile"); 106 ArgumentNotValid.checkNotNegative(index, "index"); 107 108 // Create and send get message 109 GetMessage msg = new GetMessage(allBa, clientId, arcfile, index); 110 jmsCon.send(msg); 111 112 return msg; 113 } 114 115 /** 116 * Submit an already constructed batch message to the archive. The reply goes directly back to whoever sent the 117 * message. 118 * 119 * @param msg the message to be processed by the get command. 120 */ 121 public void sendGetMessage(GetMessage msg) { 122 ArgumentNotValid.checkNotNull(msg, "msg"); 123 124 log.debug("Resending get message '{}' to bitarchives", msg); 125 126 try { 127 jmsCon.resend(msg, Channels.getAllBa()); 128 } catch (Throwable t) { 129 log.warn("Failure while resending {}", msg, t); 130 try { 131 msg.setNotOk(t); 132 jmsCon.reply(msg); 133 } catch (Throwable t1) { 134 log.warn("Failed to send error message back", t1); 135 } 136 } 137 } 138 139 /** 140 * Submit an already constructed getfile message to the archive. 141 * 142 * @param msg get file message to retrieve. 143 */ 144 public void sendGetFileMessage(GetFileMessage msg) { 145 ArgumentNotValid.checkNotNull(msg, "msg"); 146 log.debug("Resending get file message '{}' to bitarchives", msg); 147 jmsCon.resend(msg, this.allBa); 148 } 149 150 /** 151 * Forward the message to ALL_BA. 152 * 153 * @param msg the message to forward. 154 */ 155 public void sendRemoveAndGetFileMessage(RemoveAndGetFileMessage msg) { 156 ArgumentNotValid.checkNotNull(msg, "msg"); 157 jmsCon.resend(msg, this.allBa); 158 } 159 160 /** 161 * Sends a message to terminate a running batchjob. 162 * 163 * @param batchID The ID of the batchjob to terminate. 164 * @throws ArgumentNotValid If the batchID is either null or the empty string. 165 */ 166 public void sendBatchTerminationMessage(String batchID) throws ArgumentNotValid { 167 ArgumentNotValid.checkNotNullOrEmpty(batchID, "String batchID"); 168 // create and send the BatchTerminationMessage. 169 BatchTerminationMessage msg = new BatchTerminationMessage(this.allBa, batchID); 170 jmsCon.send(msg); 171 } 172 173 /** 174 * Submit an upload request to the bitarchive. 175 * 176 * @param rf The file to upload. 177 * @throws IOFailure If access to file denied. 178 * @throws ArgumentNotValid If arcfile is null. 179 */ 180 public void sendUploadMessage(RemoteFile rf) throws IOFailure, ArgumentNotValid { 181 ArgumentNotValid.checkNotNull(rf, "rf"); 182 UploadMessage up = new UploadMessage(anyBa, clientId, rf); 183 log.debug("Sending upload message\n{}", up.toString()); 184 jmsCon.send(up); 185 } 186 187 /** 188 * Submit an already constructed get message to the archive. This is used by the ArcRepository when forwarding batch 189 * jobs from its clients. 190 * 191 * @param bMsg a BatchMessage. 192 * @return The submitted message. 193 * @throws ArgumentNotValid If message is null. 194 */ 195 public BatchMessage sendBatchJob(BatchMessage bMsg) throws ArgumentNotValid { 196 ArgumentNotValid.checkNotNull(bMsg, "bMsg"); 197 log.debug("Resending batch message '{}' to bitarchive monitor {}", bMsg, this.theBamon); 198 jmsCon.resend(bMsg, this.theBamon); 199 return bMsg; 200 } 201 202 /** 203 * Submit a batch job to the archive. This is used by the ArcRepository when it needs to run batch jobs for its own 204 * reasons, i.e. when checksumming a file as part of the Store operation. 205 * 206 * @param replyChannel The channel that the reply of this job should be sent to. 207 * @param job The job that should be run on the bit archive handled by this client. 208 * @return The submitted message. 209 * @throws ArgumentNotValid If any parameter was null. 210 * @throws IOFailure If sending the batch message did not succeed. 211 */ 212 public BatchMessage sendBatchJob(ChannelID replyChannel, FileBatchJob job) throws ArgumentNotValid, IOFailure { 213 ArgumentNotValid.checkNotNull(replyChannel, "replyChannel"); 214 ArgumentNotValid.checkNotNull(job, "job"); 215 BatchMessage bMsg = new BatchMessage(this.theBamon, replyChannel, job, replicaId); 216 jmsCon.send(bMsg); 217 return bMsg; 218 } 219 220 /** 221 * Release jms connections. 222 */ 223 public void close() { 224 log.debug("Client has been shutdown"); 225 } 226 227 /** 228 * For correcting an erroneous entry in the archive. The message is sent the replica for correcting the 'bad' entry. 229 * 230 * @param msg The correct message to correct the bad entry in the archive. 231 * @throws ArgumentNotValid If the CorrectMessage is null. 232 */ 233 @Override 234 public void sendCorrectMessage(CorrectMessage msg) throws ArgumentNotValid { 235 ArgumentNotValid.checkNotNull(msg, "CorrectMessage msg"); 236 237 jmsCon.resend(msg, theBamon); 238 239 log.debug("Sending CorrectMessage: '{}'", msg); 240 } 241 242 /** 243 * Method for sending a GetAllFilenamesMessage to a checksum archive. 244 * 245 * @param msg The GetAllFilenamesMessage, which will be sent through the jms connection to the checksum archive. 246 * @throws ArgumentNotValid If the GetAllFilenamesMessage is null. 247 */ 248 public void sendGetAllFilenamesMessage(GetAllFilenamesMessage msg) throws ArgumentNotValid { 249 ArgumentNotValid.checkNotNull(msg, "GetAllFilenamesMessage msg"); 250 // send the message to the archive. 251 jmsCon.resend(msg, theBamon); 252 253 // log message. 254 log.debug("Resending GetAllFilenamesMessage: '{}'.", msg.toString()); 255 } 256 257 /** 258 * Method for sending the GetAllChecksumMessage to the ChecksumReplica. 259 * 260 * @param msg The GetAllChecksumMessage, which will be sent through the jms connection to the checksum archive. 261 * @throws ArgumentNotValid If the GetAllChecksumsMessage is null. 262 */ 263 public void sendGetAllChecksumsMessage(GetAllChecksumsMessage msg) throws ArgumentNotValid { 264 ArgumentNotValid.checkNotNull(msg, "GetAllChecksumsMessage msg"); 265 // send the message to the archive. 266 jmsCon.resend(msg, theBamon); 267 268 // log message. 269 log.debug("Sending GetAllChecksumMessage: '{}'.", msg.toString()); 270 } 271 272 /** 273 * Method for retrieving the checksum of a specific arcfile within the archive. 274 * 275 * @param msg The GetChecksumMessage which will be sent to the checksum archive though the jms connection. 276 * @throws ArgumentNotValid If the GetChecksumMessage is null. 277 */ 278 public void sendGetChecksumMessage(GetChecksumMessage msg) throws ArgumentNotValid { 279 // Validate arguments 280 ArgumentNotValid.checkNotNull(msg, "GetChecksumMessage msg"); 281 282 jmsCon.resend(msg, theBamon); 283 284 // log what we are doing. 285 log.debug("Sending GetChecksumMessage: '{}'.", msg.toString()); 286 } 287 288 /** 289 * Method for retrieving the checksum of a specific arcfile within the archive. 290 * 291 * @param replyChannel The channel where the reply should be sent. 292 * @param filename The GetChecksumMessage which has been sent to the checksum archive though the jms connection. 293 * @return The GetChecksumMessage which is sent. 294 * @throws ArgumentNotValid If the reply channel is null or if the filename is either null or the empty string. 295 */ 296 public GetChecksumMessage sendGetChecksumMessage(ChannelID replyChannel, String filename) throws ArgumentNotValid { 297 // Validate arguments 298 ArgumentNotValid.checkNotNull(replyChannel, "ChannelID replyChannel"); 299 ArgumentNotValid.checkNotNullOrEmpty(filename, "String filename"); 300 301 // Send a GetChecksumMessage to the replica. 302 GetChecksumMessage msg = new GetChecksumMessage(theBamon, replyChannel, filename, replicaId); 303 jmsCon.send(msg); 304 305 // log what we are doing. 306 log.debug("Sending GetChecksumMessage: '{}'.", msg.toString()); 307 308 return msg; 309 } 310 311 /** 312 * Retrieves the type of replica. 313 * 314 * @return The type of this replica. In this case Bitarchive. 315 */ 316 public ReplicaType getType() { 317 return ReplicaType.BITARCHIVE; 318 } 319 320}