001/* 002 * #%L 003 * Netarchivesuite - common 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023 024package dk.netarkivet.common.distribute; 025 026import java.util.Collection; 027 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import dk.netarkivet.common.CommonSettings; 032import dk.netarkivet.common.distribute.arcrepository.Replica; 033import dk.netarkivet.common.distribute.arcrepository.ReplicaType; 034import dk.netarkivet.common.exceptions.ArgumentNotValid; 035import dk.netarkivet.common.exceptions.IllegalState; 036import dk.netarkivet.common.exceptions.UnknownID; 037import dk.netarkivet.common.utils.Settings; 038 039/** 040 * This singleton class is in charge of giving out the correct channels. 041 */ 042public class Channels { 043 044 //private static final Log log = LogFactory.getLog(Channels.class); 045 private static final Logger log = LoggerFactory.getLogger(Channels.class); 046 047 /** 048 * Channel type prefixes for the current set of channels. 049 */ 050 private static final String ALLBA_CHANNEL_PREFIX = "ALL_BA"; 051 private static final String ANYBA_CHANNEL_PREFIX = "ANY_BA"; 052 private static final String THEBAMON_CHANNEL_PREFIX = "THE_BAMON"; 053 private static final String THEREPOS_CHANNEL_PREFIX = "THE_REPOS"; 054 private static final String THISREPOSCLIENT_CHANNEL_PREFIX = "THIS_REPOS_CLIENT"; 055 private static final String ERROR_CHANNEL_PREFIX = "ERROR"; 056 private static final String INDEXSERVER_CHANNEL_PREFIX = "INDEX_SERVER"; 057 private static final String THISINDEXCLIENT_CHANNEL_PREFIX = "THIS_INDEX_CLIENT"; 058 private static final String MONITOR_CHANNEL_PREFIX = "MONITOR"; 059 060 private static final String THECR_CHANNEL_PREFIX = "THE_CR"; 061 062 /** Channel part separator. */ 063 public static final String CHANNEL_PART_SEPARATOR = "_"; 064 065 /** The one existing instance of the Channels object. Not accessible from the outside at all. */ 066 private static Channels instance; 067 068 /** 069 * Accessor for singleton internally. 070 * 071 * @return the <code>Channels</code> object for this singleton. 072 */ 073 private static Channels getInstance() { 074 if (instance == null) { 075 instance = new Channels(); 076 } 077 return instance; 078 } 079 080 /** 081 * Contains the collection of replicas. 082 */ 083 private final Collection<Replica> replicas = Replica.getKnown(); 084 085 /** 086 * This is the container for the replica which is used by applications that only communicate with local processes. 087 */ 088 private final Replica useReplica = Replica.getReplicaFromId(Settings.get(CommonSettings.USE_REPLICA_ID)); 089 090 /** 091 * The constructor of Channels class. Validates that the current value of the setting USE_REPLICA_ID corresponds to 092 * one of the replicas listed in the settings. Furthermore we here fill content in the ALL_BA_ARRAY, ANY_BA_ARRAY, 093 * THE_BAMON_ARRAY, and initialize ALL_BA, ANY_BA, and THE_BAMON. 094 * 095 * @throws UnknownID If one of the replicas has an unhandled replica type. 096 */ 097 private Channels() { 098 // index count 099 int i = 0; 100 int useReplicaIndex = -1; 101 // go through all replicas and initialize their channels. 102 for (Replica rep : replicas) { 103 if (rep.getType() == ReplicaType.BITARCHIVE) { 104 // Bitarchive has 'ALL_BA', 'ANY_BA' and 'THE_BAMON'. 105 ALL_BA_ARRAY[i] = new ChannelID(ALLBA_CHANNEL_PREFIX, rep.getId(), ChannelID.NO_IP, 106 ChannelID.NO_APPLINST_ID, ChannelID.TOPIC); 107 ANY_BA_ARRAY[i] = new ChannelID(ANYBA_CHANNEL_PREFIX, rep.getId(), ChannelID.NO_IP, 108 ChannelID.NO_APPLINST_ID, ChannelID.QUEUE); 109 THE_BAMON_ARRAY[i] = new ChannelID(THEBAMON_CHANNEL_PREFIX, rep.getId(), ChannelID.NO_IP, 110 ChannelID.NO_APPLINST_ID, ChannelID.QUEUE); 111 THE_CR_ARRAY[i] = null; 112 } else if (rep.getType() == ReplicaType.CHECKSUM) { 113 // Checksum has only 'THE_CR'. 114 ALL_BA_ARRAY[i] = null; 115 ANY_BA_ARRAY[i] = null; 116 THE_BAMON_ARRAY[i] = null; 117 THE_CR_ARRAY[i] = new ChannelID(THECR_CHANNEL_PREFIX, rep.getId(), ChannelID.NO_IP, 118 ChannelID.NO_APPLINST_ID, ChannelID.QUEUE); 119 } else { 120 // Throw an exception when unknown replica type. 121 throw new UnknownID("The replica '" + rep + "' does not have " + "a valid replica type."); 122 } 123 124 // find the 'useReplica' 125 if (rep == useReplica) { 126 useReplicaIndex = i; 127 } 128 129 ++i; 130 } 131 132 // validate the index of the useReplica 133 if (useReplicaIndex < 0 || useReplicaIndex >= replicas.size()) { 134 // issue an error, if the use replica could not be found. 135 throw new ArgumentNotValid("The useReplica '" + useReplica + "' was not found in the list of replicas: '" 136 + replicas + "'."); 137 } 138 139 // set the channels for the useReplica 140 ALL_BA = ALL_BA_ARRAY[useReplicaIndex]; 141 ANY_BA = ANY_BA_ARRAY[useReplicaIndex]; 142 THE_BAMON = THE_BAMON_ARRAY[useReplicaIndex]; 143 THE_CR = THE_CR_ARRAY[useReplicaIndex]; 144 } 145 146 /** 147 * Method for retrieving the list of replicas used for the channels. The replica ids are in the same order as their 148 * channels. 149 * 150 * @return The replica ids in the same order as their channels. 151 */ 152 public static Collection<Replica> getReplicas() { 153 return getInstance().replicas; 154 } 155 156 /** 157 * Returns the one-per-client queue on which client receives replies from the arcrepository. 158 * 159 * @return the <code>ChannelID</code> object for this queue. 160 */ 161 public static ChannelID getThisReposClient() { 162 return getInstance().THIS_REPOS_CLIENT; 163 } 164 165 private final ChannelID THIS_REPOS_CLIENT = new ChannelID(THISREPOSCLIENT_CHANNEL_PREFIX, ChannelID.COMMON, 166 ChannelID.INCLUDE_IP, ChannelID.INCLUDE_APPLINST_ID, ChannelID.QUEUE); 167 168 /** 169 * Returns the queue on which all messages to the Repository are sent. 170 * 171 * @return the <code>ChannelID</code> object for this queue. 172 */ 173 public static ChannelID getTheRepos() { 174 return getInstance().THE_REPOS; 175 } 176 177 private final ChannelID THE_REPOS = new ChannelID(THEREPOS_CHANNEL_PREFIX, ChannelID.COMMON, ChannelID.NO_IP, 178 ChannelID.NO_APPLINST_ID, ChannelID.QUEUE); 179 180 /** 181 * Returns BAMON channels for every known bitarchive (replica). 182 * 183 * @return An array of BAMON channels - one per bitarchive (replica) 184 */ 185 public static final ChannelID[] getAllArchives_BAMONs() { 186 return getInstance().THE_BAMON_ARRAY; 187 } 188 189 private final ChannelID[] THE_BAMON_ARRAY = new ChannelID[replicas.size()]; 190 191 /** 192 * Returns the queue for sending messages to bitarchive monitors. 193 * 194 * @return the <code>ChannelID</code> object for this queue. 195 * @throws IllegalState If the current replica is not a checksum replica. 196 */ 197 public static ChannelID getTheBamon() throws IllegalState { 198 ChannelID res = getInstance().THE_BAMON; 199 200 if (res == null) { 201 throw new IllegalState("The channel for the bitarchive monitor cannot to be retrieved for replica '" 202 + getInstance().useReplica + "'."); 203 } 204 205 return res; 206 } 207 208 /** 209 * Implementation notice: This cannot be initialized directly in the field, as it uses THE_BAMON_ARRAY, which is 210 * initialized in the constructor. 211 */ 212 private final ChannelID THE_BAMON; 213 214 /** 215 * Returns the channels for the all Checksum replicas. 216 * 217 * @return An array of THE_CR channels - one for each replica, though only the checksum replicas have values (the 218 * others are null). 219 */ 220 public static final ChannelID[] getAllArchives_CRs() { 221 return getInstance().THE_CR_ARRAY; 222 } 223 224 /** The array containing the 'THE_CR' channels. */ 225 private final ChannelID[] THE_CR_ARRAY = new ChannelID[replicas.size()]; 226 227 /** 228 * Method for retrieving the 'THE_CR' channel for this replica. If the replica is not a checksum replica, then an 229 * error is thrown. 230 * 231 * @return the 'THE_CR' channel for this replica. 232 * @throws IllegalState If the current replica is not a checksum replica. 233 */ 234 public static ChannelID getTheCR() throws IllegalState { 235 ChannelID res = getInstance().THE_CR; 236 237 if (res == null) { 238 throw new IllegalState("A bitarchive replica does not have the channel for communicating with a checksum " 239 + "replica."); 240 } 241 242 return res; 243 } 244 245 /** 246 * The 'THE_CR' channel for this replica. This has the value 'null' if the replica is not a checksum replica. 247 */ 248 private final ChannelID THE_CR; 249 250 /** 251 * Returns ALL_BA channels for every known bitarchive. 252 * 253 * @return An array of ALL_BA channels - one per bitarchive 254 */ 255 public static final ChannelID[] getAllArchives_ALL_BAs() { 256 return getInstance().ALL_BA_ARRAY; 257 } 258 259 /** 260 * ALL_BA is the topic on which a Bitarchive client publishes get, correct and batch messages to all connected 261 * Bitarchive machines. The following is the list of ALL_BA for all archives (i.e. archive replicas). 262 */ 263 private final ChannelID[] ALL_BA_ARRAY = new ChannelID[replicas.size()]; 264 265 /** 266 * Returns the topic that all bitarchive machines on this replica are listening on. 267 * 268 * @return A topic channel that reaches all local bitarchive machines 269 * @throws IllegalState If the current replica is not a bitarchive replica. 270 */ 271 public static ChannelID getAllBa() throws IllegalState { 272 ChannelID res = getInstance().ALL_BA; 273 274 if (res == null) { 275 throw new IllegalState("A checksum replica does not have the channels for communicating with a bitarchive " 276 + "replica."); 277 } 278 279 return res; 280 } 281 282 /** 283 * Implementation notice: This cannot be initialized directly in the field, as it uses ALL_BA_ARRAY, which is 284 * initialized in the constructor. 285 */ 286 private final ChannelID ALL_BA; 287 288 /** 289 * Returns ANY_BA channels for every known bitarchive. 290 * 291 * @return An array of ANY_BA channels - one per bitarchive 292 */ 293 public static final ChannelID[] getAllArchives_ANY_BAs() { 294 return getInstance().ANY_BA_ARRAY; 295 } 296 297 /** 298 * Queue on which upload requests are sent out to bitarchive servers. The following is the list of ANY_BA for all 299 * archives. 300 */ 301 private final ChannelID[] ANY_BA_ARRAY = new ChannelID[replicas.size()]; 302 303 /** 304 * Returns the channel where exactly one of all the bitarchive machines at this replica will get the message. 305 * 306 * @return A queue channel that reaches one of the local bitarchive machines. 307 * @throws IllegalState If the current replica is not a bitarchive replica. 308 */ 309 public static ChannelID getAnyBa() throws IllegalState { 310 ChannelID res = getInstance().ANY_BA; 311 312 if (res == null) { 313 throw new IllegalState("A checksum replica does not have the channels for communicating with a bitarchive " 314 + "replica."); 315 } 316 317 return res; 318 } 319 320 /** 321 * Implementation notice: This cannot be initialized directly in the field, as it uses ANY_BA_ARRAY, which is 322 * initialized in the constructor. 323 */ 324 private final ChannelID ANY_BA; 325 326 /** 327 * Returns the queue on which to put errors which are not handled elsewhere. 328 * 329 * @return the <code>ChannelID</code> object for this queue. 330 */ 331 public static ChannelID getError() { 332 return getInstance().ERROR; 333 } 334 335 private final ChannelID ERROR = new ChannelID(ERROR_CHANNEL_PREFIX, ChannelID.COMMON, ChannelID.NO_IP, 336 ChannelID.NO_APPLINST_ID, ChannelID.QUEUE); 337 338 /** 339 * Given an replica, returns the BAMON queue to which batch jobs must be sent in order to run them on that 340 * bitarchive. 341 * 342 * @param replicaId The id of the replica 343 * @return the channel 344 * @throws ArgumentNotValid if the replicaId is null, unknown, or empty string 345 */ 346 public static ChannelID getBaMonForReplica(String replicaId) throws ArgumentNotValid { 347 ArgumentNotValid.checkNotNullOrEmpty(replicaId, "replicaId"); 348 ChannelID[] bamons = getAllArchives_BAMONs(); 349 for (ChannelID bamon : bamons) { 350 if (bamon != null 351 && bamon.getName().equals( 352 Settings.get(CommonSettings.ENVIRONMENT_NAME) + CHANNEL_PART_SEPARATOR + replicaId 353 + CHANNEL_PART_SEPARATOR + THEBAMON_CHANNEL_PREFIX)) { 354 return bamon; 355 } 356 } 357 throw new ArgumentNotValid("Did not find a BAMON queue for '" + replicaId + "'"); 358 } 359 360 public static ChannelID getTheCrForReplica(String replicaId) { 361 ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId"); 362 ChannelID[] crs = getAllArchives_CRs(); 363 for (ChannelID cr : crs) { 364 if (cr != null 365 && cr.getName().equals( 366 Settings.get(CommonSettings.ENVIRONMENT_NAME) + CHANNEL_PART_SEPARATOR + replicaId 367 + CHANNEL_PART_SEPARATOR + THECR_CHANNEL_PREFIX)) { 368 return cr; 369 } 370 } 371 throw new ArgumentNotValid("Did not find a checksum queue for '" + replicaId + "'"); 372 } 373 374 /** 375 * Method for extracting the replica from the name of the identifier channel. 376 * 377 * @param channelName The name of the identification channel for the replica. 378 * @return Replica who the identification channel belong to. 379 * @throws UnknownID If the replicaId does not point to a know replica. 380 * @throws ArgumentNotValid If the channelName is either null or empty. 381 */ 382 public static Replica retrieveReplicaFromIdentifierChannel(String channelName) throws UnknownID, ArgumentNotValid { 383 ArgumentNotValid.checkNotNullOrEmpty(channelName, "String channelName"); 384 if (channelName.contains(THECR_CHANNEL_PREFIX)) { 385 // environmentName ## replicaId ## THE_CR 386 String[] parts = channelName.split(CHANNEL_PART_SEPARATOR); 387 return Replica.getReplicaFromId(parts[1]); 388 } else if (channelName.contains(THEBAMON_CHANNEL_PREFIX)) { 389 // environmentName ## replicaId ## THE_BAMON 390 String[] parts = channelName.split(CHANNEL_PART_SEPARATOR); 391 return Replica.getReplicaFromId(parts[1]); 392 } 393 394 String errMsg = "The current channel name, '" + channelName + "' does not refer to an identification channel"; 395 log.warn(errMsg); 396 throw new UnknownID(errMsg); 397 } 398 399 /** 400 * The method for retrieving the name of the identification channel for a replica based on the Id of this replica. 401 * 402 * @param replicaId The id for the replica whose identification channel name should be retrieved. 403 * @return The name of the identification channel for the replica. 404 * @throws UnknownID If no replica with the given replica id is known. 405 * @throws ArgumentNotValid If the replicaId is null or empty. 406 */ 407 public static String retrieveReplicaChannelNameFromReplicaId(String replicaId) throws UnknownID, ArgumentNotValid { 408 ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId"); 409 return Replica.getReplicaFromId(replicaId).getIdentificationChannel().getName(); 410 } 411 412 /** 413 * The method for retrieving the identification channel for a replica based on the Id of this replica. 414 * 415 * @param replicaId The id for the replica whose identification channel name should be retrieved. 416 * @return The identification channel for the replica. 417 * @throws UnknownID If no replica with the given replica id is known. 418 * @throws ArgumentNotValid If the replicaId is null or empty. 419 */ 420 public static ChannelID retrieveReplicaChannelFromReplicaId(String replicaId) throws UnknownID, ArgumentNotValid { 421 ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId"); 422 return Replica.getReplicaFromId(replicaId).getIdentificationChannel(); 423 } 424 425 /** 426 * Returns the queue for sending messages to the IndexServer application. 427 * 428 * @return the <code>ChannelID</code> object for this queue. 429 */ 430 public static ChannelID getTheIndexServer() { 431 return getInstance().THE_INDEX_SERVER; 432 } 433 434 private final ChannelID THE_INDEX_SERVER = new ChannelID(INDEXSERVER_CHANNEL_PREFIX, ChannelID.COMMON, 435 ChannelID.NO_IP, ChannelID.NO_APPLINST_ID, ChannelID.QUEUE); 436 437 /** 438 * Returns the queue for getting responses from the IndexServer application. 439 * 440 * @return the <code>ChannelID</code> object for this queue. 441 */ 442 public static ChannelID getThisIndexClient() { 443 return getInstance().THIS_INDEX_CLIENT; 444 } 445 446 // TODO Should we use client channels for all our servers? 447 private final ChannelID THIS_INDEX_CLIENT = new ChannelID(THISINDEXCLIENT_CHANNEL_PREFIX, ChannelID.COMMON, 448 ChannelID.INCLUDE_IP, ChannelID.INCLUDE_APPLINST_ID, ChannelID.QUEUE); 449 450 /** 451 * Return the topic for the monitor registry. 452 * 453 * @return the <code>ChannelID</code> object for the queue. 454 */ 455 public static ChannelID getTheMonitorServer() { 456 return getInstance().THE_MONITOR_SERVER; 457 } 458 459 private final ChannelID THE_MONITOR_SERVER = new ChannelID(MONITOR_CHANNEL_PREFIX, ChannelID.COMMON, 460 ChannelID.NO_IP, ChannelID.NO_APPLINST_ID, ChannelID.TOPIC); 461 462 /** 463 * Reset the instance to re-read the settings. Only for use in tests. 464 */ 465 static void reset() { 466 instance = null; 467 } 468 469 /** 470 * Is a given name a ChannelName for a Topic or a Queue. 471 * 472 * @param name a given name 473 * @return true, if arg name contains the string "_ALL_" 474 */ 475 public static boolean isTopic(String name) { 476 ArgumentNotValid.checkNotNullOrEmpty(name, "String name"); 477 return name.contains("_TOPIC"); 478 } 479 480}