001/* 002 * #%L 003 * Netarchivesuite - archive 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.archive.bitarchive.distribute; 024 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028import dk.netarkivet.archive.checksum.distribute.CorrectMessage; 029import dk.netarkivet.archive.checksum.distribute.GetAllChecksumsMessage; 030import dk.netarkivet.archive.checksum.distribute.GetAllFilenamesMessage; 031import dk.netarkivet.archive.checksum.distribute.GetChecksumMessage; 032import dk.netarkivet.archive.distribute.ReplicaClient; 033import dk.netarkivet.common.distribute.ChannelID; 034import dk.netarkivet.common.distribute.Channels; 035import dk.netarkivet.common.distribute.JMSConnection; 036import dk.netarkivet.common.distribute.JMSConnectionFactory; 037import dk.netarkivet.common.distribute.RemoteFile; 038import dk.netarkivet.common.distribute.arcrepository.ReplicaType; 039import dk.netarkivet.common.exceptions.ArgumentNotValid; 040import dk.netarkivet.common.exceptions.IOFailure; 041import dk.netarkivet.common.utils.batch.FileBatchJob; 042 043/** 044 * Proxy for remote bitarchive. Establishes a JMS connection to the remote bitarchive. 045 */ 046public final class BitarchiveClient implements ReplicaClient { 047 048 // Each message is assigned a message id 049 /** The log. */ 050 private static final Logger log = LoggerFactory.getLogger(BitarchiveClient.class); 051 052 /** Connection to JMS provider. */ 053 private JMSConnection jmsCon; 054 055 // connection information 056 /** The ALL_BA channel for this replica. */ 057 private ChannelID allBa; 058 /** The ANY_BA channel for this replica. */ 059 private ChannelID anyBa; 060 /** The THE_BAMON channel for this replica. */ 061 private ChannelID theBamon; 062 /** The channel to the ArcRepository. */ 063 private ChannelID clientId = Channels.getTheRepos(); 064 /** The name of the replica whose client this is. */ 065 private String replicaId; 066 067 /** 068 * Establish the connection to the server. 069 * 070 * @param allBaIn topic to all bitarchives 071 * @param anyBaIn queue to one of the bitarchives 072 * @param theBamonIn queue to the bitarchive monitor 073 * @throws IOFailure If there is a problem making the connection. 074 */ 075 private BitarchiveClient(ChannelID allBaIn, ChannelID anyBaIn, ChannelID theBamonIn) throws IOFailure { 076 this.allBa = allBaIn; 077 this.anyBa = anyBaIn; 078 this.theBamon = theBamonIn; 079 replicaId = Channels.retrieveReplicaFromIdentifierChannel(theBamon.getName()).getId(); 080 jmsCon = JMSConnectionFactory.getInstance(); 081 } 082 083 /** 084 * Factory that establish the connection to the server. 085 * 086 * @param allBaIn topic to all bitarchives 087 * @param anyBaIn queue to one of the bitarchives 088 * @param theBamonIn queue to the bitarchive monitor 089 * @return A BitarchiveClient 090 * @throws IOFailure If there is a problem making the connection. 091 */ 092 public static BitarchiveClient getInstance(ChannelID allBaIn, ChannelID anyBaIn, ChannelID theBamonIn) 093 throws IOFailure { 094 return new BitarchiveClient(allBaIn, anyBaIn, theBamonIn); 095 } 096 097 /** 098 * Submit a get request to the bitarchive. 099 * 100 * @param arcfile The file containing the requested record 101 * @param index Offset of the ARC record in the file 102 * @return The submitted message or null if an error occured 103 */ 104 public GetMessage get(String arcfile, long index) { 105 ArgumentNotValid.checkNotNullOrEmpty(arcfile, "arcfile"); 106 ArgumentNotValid.checkNotNegative(index, "index"); 107 108 // Create and send get message 109 GetMessage msg = new GetMessage(allBa, clientId, arcfile, index); 110 jmsCon.send(msg); 111 112 return msg; 113 } 114 115 /** 116 * Submit an already constructed batch message to the archive. The reply goes directly back to whoever sent the 117 * message. 118 * 119 * @param msg the message to be processed by the get command. 120 */ 121 public void sendGetMessage(GetMessage msg) { 122 ArgumentNotValid.checkNotNull(msg, "msg"); 123 124 log.debug("Resending get message '{}' to bitarchives", msg); 125 126 try { 127 jmsCon.resend(msg, Channels.getAllBa()); 128 } catch (Throwable t) { 129 log.warn("Failure while resending {}", msg, t); 130 try { 131 msg.setNotOk(t); 132 jmsCon.reply(msg); 133 } catch (Throwable t1) { 134 log.warn("Failed to send error message back", t1); 135 } 136 } 137 } 138 139 /** 140 * Submit an already constructed getfile message to the archive. 141 * 142 * @param msg get file message to retrieve. 143 */ 144 public void sendGetFileMessage(GetFileMessage msg) { 145 ArgumentNotValid.checkNotNull(msg, "msg"); 146 log.debug("Resending get file message '{}' to bitarchives", msg); 147 jmsCon.resend(msg, this.allBa); 148 } 149 150 /** 151 * Forward the message to ALL_BA. 152 * 153 * @param msg the message to forward. 154 */ 155 public void sendRemoveAndGetFileMessage(RemoveAndGetFileMessage msg) { 156 ArgumentNotValid.checkNotNull(msg, "msg"); 157 jmsCon.resend(msg, this.allBa); 158 } 159 160 /** 161 * Sends a message to terminate a running batchjob. 162 * 163 * @param batchID The ID of the batchjob to terminate. 164 * @throws ArgumentNotValid If the batchID is either null or the empty string. 165 */ 166 public void sendBatchTerminationMessage(String batchID) throws ArgumentNotValid { 167 ArgumentNotValid.checkNotNullOrEmpty(batchID, "String batchID"); 168 // create and send the BatchTerminationMessage. 169 BatchTerminationMessage msg = new BatchTerminationMessage(this.allBa, batchID); 170 jmsCon.send(msg); 171 } 172 173 /** 174 * Submit an upload request to the bitarchive. 175 * 176 * @param rf The file to upload. 177 * @param precomputedChecksum A precomputed checksum 178 * 179 * @throws IOFailure If access to file denied. 180 * @throws ArgumentNotValid If arcfile is null. 181 */ 182 public void sendUploadMessage(RemoteFile rf, String precomputedChecksum) throws IOFailure, ArgumentNotValid { 183 ArgumentNotValid.checkNotNull(rf, "rf"); 184 UploadMessage up = new UploadMessage(anyBa, clientId, rf); 185 up.setPrecomputedChecksum(precomputedChecksum); 186 log.debug("Sending upload message\n{}", up.toString()); 187 jmsCon.send(up); 188 } 189 190 /** 191 * Submit an already constructed get message to the archive. This is used by the ArcRepository when forwarding batch 192 * jobs from its clients. 193 * 194 * @param bMsg a BatchMessage. 195 * @return The submitted message. 196 * @throws ArgumentNotValid If message is null. 197 */ 198 public BatchMessage sendBatchJob(BatchMessage bMsg) throws ArgumentNotValid { 199 ArgumentNotValid.checkNotNull(bMsg, "bMsg"); 200 log.debug("Resending batch message '{}' to bitarchive monitor {}", bMsg, this.theBamon); 201 jmsCon.resend(bMsg, this.theBamon); 202 return bMsg; 203 } 204 205 /** 206 * Submit a batch job to the archive. This is used by the ArcRepository when it needs to run batch jobs for its own 207 * reasons, i.e. when checksumming a file as part of the Store operation. 208 * 209 * @param replyChannel The channel that the reply of this job should be sent to. 210 * @param job The job that should be run on the bit archive handled by this client. 211 * @return The submitted message. 212 * @throws ArgumentNotValid If any parameter was null. 213 * @throws IOFailure If sending the batch message did not succeed. 214 */ 215 public BatchMessage sendBatchJob(ChannelID replyChannel, FileBatchJob job) throws ArgumentNotValid, IOFailure { 216 ArgumentNotValid.checkNotNull(replyChannel, "replyChannel"); 217 ArgumentNotValid.checkNotNull(job, "job"); 218 BatchMessage bMsg = new BatchMessage(this.theBamon, replyChannel, job, replicaId); 219 jmsCon.send(bMsg); 220 return bMsg; 221 } 222 223 /** 224 * Release jms connections. 225 */ 226 public void close() { 227 log.debug("Client has been shutdown"); 228 } 229 230 /** 231 * For correcting an erroneous entry in the archive. The message is sent the replica for correcting the 'bad' entry. 232 * 233 * @param msg The correct message to correct the bad entry in the archive. 234 * @throws ArgumentNotValid If the CorrectMessage is null. 235 */ 236 @Override 237 public void sendCorrectMessage(CorrectMessage msg) throws ArgumentNotValid { 238 ArgumentNotValid.checkNotNull(msg, "CorrectMessage msg"); 239 240 jmsCon.resend(msg, theBamon); 241 242 log.debug("Sending CorrectMessage: '{}'", msg); 243 } 244 245 /** 246 * Method for sending a GetAllFilenamesMessage to a checksum archive. 247 * 248 * @param msg The GetAllFilenamesMessage, which will be sent through the jms connection to the checksum archive. 249 * @throws ArgumentNotValid If the GetAllFilenamesMessage is null. 250 */ 251 public void sendGetAllFilenamesMessage(GetAllFilenamesMessage msg) throws ArgumentNotValid { 252 ArgumentNotValid.checkNotNull(msg, "GetAllFilenamesMessage msg"); 253 // send the message to the archive. 254 jmsCon.resend(msg, theBamon); 255 256 // log message. 257 log.debug("Resending GetAllFilenamesMessage: '{}'.", msg.toString()); 258 } 259 260 /** 261 * Method for sending the GetAllChecksumMessage to the ChecksumReplica. 262 * 263 * @param msg The GetAllChecksumMessage, which will be sent through the jms connection to the checksum archive. 264 * @throws ArgumentNotValid If the GetAllChecksumsMessage is null. 265 */ 266 public void sendGetAllChecksumsMessage(GetAllChecksumsMessage msg) throws ArgumentNotValid { 267 ArgumentNotValid.checkNotNull(msg, "GetAllChecksumsMessage msg"); 268 // send the message to the archive. 269 jmsCon.resend(msg, theBamon); 270 271 // log message. 272 log.debug("Sending GetAllChecksumMessage: '{}'.", msg.toString()); 273 } 274 275 /** 276 * Method for retrieving the checksum of a specific arcfile within the archive. 277 * 278 * @param msg The GetChecksumMessage which will be sent to the checksum archive though the jms connection. 279 * @throws ArgumentNotValid If the GetChecksumMessage is null. 280 */ 281 public void sendGetChecksumMessage(GetChecksumMessage msg) throws ArgumentNotValid { 282 // Validate arguments 283 ArgumentNotValid.checkNotNull(msg, "GetChecksumMessage msg"); 284 285 jmsCon.resend(msg, theBamon); 286 287 // log what we are doing. 288 log.debug("Sending GetChecksumMessage: '{}'.", msg.toString()); 289 } 290 291 /** 292 * Method for retrieving the checksum of a specific arcfile within the archive. 293 * 294 * @param replyChannel The channel where the reply should be sent. 295 * @param filename The GetChecksumMessage which has been sent to the checksum archive though the jms connection. 296 * @return The GetChecksumMessage which is sent. 297 * @throws ArgumentNotValid If the reply channel is null or if the filename is either null or the empty string. 298 */ 299 public GetChecksumMessage sendGetChecksumMessage(ChannelID replyChannel, String filename) throws ArgumentNotValid { 300 // Validate arguments 301 ArgumentNotValid.checkNotNull(replyChannel, "ChannelID replyChannel"); 302 ArgumentNotValid.checkNotNullOrEmpty(filename, "String filename"); 303 304 // Send a GetChecksumMessage to the replica. 305 GetChecksumMessage msg = new GetChecksumMessage(theBamon, replyChannel, filename, replicaId); 306 jmsCon.send(msg); 307 308 // log what we are doing. 309 log.debug("Sending GetChecksumMessage: '{}'.", msg.toString()); 310 311 return msg; 312 } 313 314 /** 315 * Retrieves the type of replica. 316 * 317 * @return The type of this replica. In this case Bitarchive. 318 */ 319 public ReplicaType getType() { 320 return ReplicaType.BITARCHIVE; 321 } 322 323}