001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.common.distribute;
025
026import java.util.Collection;
027
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import dk.netarkivet.common.CommonSettings;
032import dk.netarkivet.common.distribute.arcrepository.Replica;
033import dk.netarkivet.common.distribute.arcrepository.ReplicaType;
034import dk.netarkivet.common.exceptions.ArgumentNotValid;
035import dk.netarkivet.common.exceptions.IllegalState;
036import dk.netarkivet.common.exceptions.UnknownID;
037import dk.netarkivet.common.utils.Settings;
038
039/**
040 * This singleton class is in charge of giving out the correct channels.
041 */
042public class Channels {
043
044    //private static final Log log = LogFactory.getLog(Channels.class);
045    private static final Logger log = LoggerFactory.getLogger(Channels.class);
046
047    /**
048     * Channel type prefixes for the current set of channels.
049     */
050    private static final String ALLBA_CHANNEL_PREFIX = "ALL_BA";
051    private static final String ANYBA_CHANNEL_PREFIX = "ANY_BA";
052    private static final String THEBAMON_CHANNEL_PREFIX = "THE_BAMON";
053    private static final String THEREPOS_CHANNEL_PREFIX = "THE_REPOS";
054    private static final String THISREPOSCLIENT_CHANNEL_PREFIX = "THIS_REPOS_CLIENT";
055    private static final String ERROR_CHANNEL_PREFIX = "ERROR";
056    private static final String INDEXSERVER_CHANNEL_PREFIX = "INDEX_SERVER";
057    private static final String THISINDEXCLIENT_CHANNEL_PREFIX = "THIS_INDEX_CLIENT";
058    private static final String MONITOR_CHANNEL_PREFIX = "MONITOR";
059
060    private static final String THECR_CHANNEL_PREFIX = "THE_CR";
061
062    /** Channel part separator. */
063    public static final String CHANNEL_PART_SEPARATOR = "_";
064
065    /** The one existing instance of the Channels object. Not accessible from the outside at all. */
066    private static Channels instance;
067
068    /**
069     * Accessor for singleton internally.
070     *
071     * @return the <code>Channels</code> object for this singleton.
072     */
073    private static Channels getInstance() {
074        if (instance == null) {
075            instance = new Channels();
076        }
077        return instance;
078    }
079
080    /**
081     * Contains the collection of replicas.
082     */
083    private final Collection<Replica> replicas = Replica.getKnown();
084
085    /**
086     * This is the container for the replica which is used by applications that only communicate with local processes.
087     */
088    private final Replica useReplica = Replica.getReplicaFromId(Settings.get(CommonSettings.USE_REPLICA_ID));
089
090    /**
091     * The constructor of Channels class. Validates that the current value of the setting USE_REPLICA_ID corresponds to
092     * one of the replicas listed in the settings. Furthermore we here fill content in the ALL_BA_ARRAY, ANY_BA_ARRAY,
093     * THE_BAMON_ARRAY, and initialize ALL_BA, ANY_BA, and THE_BAMON.
094     *
095     * @throws UnknownID If one of the replicas has an unhandled replica type.
096     */
097    private Channels() {
098        // index count
099        int i = 0;
100        int useReplicaIndex = -1;
101        // go through all replicas and initialize their channels.
102        for (Replica rep : replicas) {
103            if (rep.getType() == ReplicaType.BITARCHIVE) {
104                // Bitarchive has 'ALL_BA', 'ANY_BA' and 'THE_BAMON'.
105                ALL_BA_ARRAY[i] = new ChannelID(ALLBA_CHANNEL_PREFIX, rep.getId(), ChannelID.NO_IP,
106                        ChannelID.NO_APPLINST_ID, ChannelID.TOPIC);
107                ANY_BA_ARRAY[i] = new ChannelID(ANYBA_CHANNEL_PREFIX, rep.getId(), ChannelID.NO_IP,
108                        ChannelID.NO_APPLINST_ID, ChannelID.QUEUE);
109                THE_BAMON_ARRAY[i] = new ChannelID(THEBAMON_CHANNEL_PREFIX, rep.getId(), ChannelID.NO_IP,
110                        ChannelID.NO_APPLINST_ID, ChannelID.QUEUE);
111                THE_CR_ARRAY[i] = null;
112            } else if (rep.getType() == ReplicaType.CHECKSUM) {
113                // Checksum has only 'THE_CR'.
114                ALL_BA_ARRAY[i] = null;
115                ANY_BA_ARRAY[i] = null;
116                THE_BAMON_ARRAY[i] = null;
117                THE_CR_ARRAY[i] = new ChannelID(THECR_CHANNEL_PREFIX, rep.getId(), ChannelID.NO_IP,
118                        ChannelID.NO_APPLINST_ID, ChannelID.QUEUE);
119            } else {
120                // Throw an exception when unknown replica type.
121                throw new UnknownID("The replica '" + rep + "' does not have " + "a valid replica type.");
122            }
123
124            // find the 'useReplica'
125            if (rep == useReplica) {
126                useReplicaIndex = i;
127            }
128
129            ++i;
130        }
131
132        // validate the index of the useReplica
133        if (useReplicaIndex < 0 || useReplicaIndex >= replicas.size()) {
134            // issue an error, if the use replica could not be found.
135            throw new ArgumentNotValid("The useReplica '" + useReplica + "' was not found in the list of replicas: '"
136                    + replicas + "'.");
137        }
138
139        // set the channels for the useReplica
140        ALL_BA = ALL_BA_ARRAY[useReplicaIndex];
141        ANY_BA = ANY_BA_ARRAY[useReplicaIndex];
142        THE_BAMON = THE_BAMON_ARRAY[useReplicaIndex];
143        THE_CR = THE_CR_ARRAY[useReplicaIndex];
144    }
145
146    /**
147     * Method for retrieving the list of replicas used for the channels. The replica ids are in the same order as their
148     * channels.
149     *
150     * @return The replica ids in the same order as their channels.
151     */
152    public static Collection<Replica> getReplicas() {
153        return getInstance().replicas;
154    }
155
156    /**
157     * Returns the one-per-client queue on which client receives replies from the arcrepository.
158     *
159     * @return the <code>ChannelID</code> object for this queue.
160     */
161    public static ChannelID getThisReposClient() {
162        return getInstance().THIS_REPOS_CLIENT;
163    }
164
165    private final ChannelID THIS_REPOS_CLIENT = new ChannelID(THISREPOSCLIENT_CHANNEL_PREFIX, ChannelID.COMMON,
166            ChannelID.INCLUDE_IP, ChannelID.INCLUDE_APPLINST_ID, ChannelID.QUEUE);
167
168    /**
169     * Returns the queue on which all messages to the Repository are sent.
170     *
171     * @return the <code>ChannelID</code> object for this queue.
172     */
173    public static ChannelID getTheRepos() {
174        return getInstance().THE_REPOS;
175    }
176
177    private final ChannelID THE_REPOS = new ChannelID(THEREPOS_CHANNEL_PREFIX, ChannelID.COMMON, ChannelID.NO_IP,
178            ChannelID.NO_APPLINST_ID, ChannelID.QUEUE);
179
180    /**
181     * Returns BAMON channels for every known bitarchive (replica).
182     *
183     * @return An array of BAMON channels - one per bitarchive (replica)
184     */
185    public static final ChannelID[] getAllArchives_BAMONs() {
186        return getInstance().THE_BAMON_ARRAY;
187    }
188
189    private final ChannelID[] THE_BAMON_ARRAY = new ChannelID[replicas.size()];
190
191    /**
192     * Returns the queue for sending messages to bitarchive monitors.
193     *
194     * @return the <code>ChannelID</code> object for this queue.
195     * @throws IllegalState If the current replica is not a checksum replica.
196     */
197    public static ChannelID getTheBamon() throws IllegalState {
198        ChannelID res = getInstance().THE_BAMON;
199
200        if (res == null) {
201            throw new IllegalState("The channel for the bitarchive monitor  cannot to be retrieved for replica '"
202                    + getInstance().useReplica + "'.");
203        }
204
205        return res;
206    }
207
208    /**
209     * Implementation notice: This cannot be initialized directly in the field, as it uses THE_BAMON_ARRAY, which is
210     * initialized in the constructor.
211     */
212    private final ChannelID THE_BAMON;
213
214    /**
215     * Returns the channels for the all Checksum replicas.
216     *
217     * @return An array of THE_CR channels - one for each replica, though only the checksum replicas have values (the
218     * others are null).
219     */
220    public static final ChannelID[] getAllArchives_CRs() {
221        return getInstance().THE_CR_ARRAY;
222    }
223
224    /** The array containing the 'THE_CR' channels. */
225    private final ChannelID[] THE_CR_ARRAY = new ChannelID[replicas.size()];
226
227    /**
228     * Method for retrieving the 'THE_CR' channel for this replica. If the replica is not a checksum replica, then an
229     * error is thrown.
230     *
231     * @return the 'THE_CR' channel for this replica.
232     * @throws IllegalState If the current replica is not a checksum replica.
233     */
234    public static ChannelID getTheCR() throws IllegalState {
235        ChannelID res = getInstance().THE_CR;
236
237        if (res == null) {
238            throw new IllegalState("A bitarchive replica does not have the channel for communicating with a checksum "
239                    + "replica.");
240        }
241
242        return res;
243    }
244
245    /**
246     * The 'THE_CR' channel for this replica. This has the value 'null' if the replica is not a checksum replica.
247     */
248    private final ChannelID THE_CR;
249
250    /**
251     * Returns ALL_BA channels for every known bitarchive.
252     *
253     * @return An array of ALL_BA channels - one per bitarchive
254     */
255    public static final ChannelID[] getAllArchives_ALL_BAs() {
256        return getInstance().ALL_BA_ARRAY;
257    }
258
259    /**
260     * ALL_BA is the topic on which a Bitarchive client publishes get, correct and batch messages to all connected
261     * Bitarchive machines. The following is the list of ALL_BA for all archives (i.e. archive replicas).
262     */
263    private final ChannelID[] ALL_BA_ARRAY = new ChannelID[replicas.size()];
264
265    /**
266     * Returns the topic that all bitarchive machines on this replica are listening on.
267     *
268     * @return A topic channel that reaches all local bitarchive machines
269     * @throws IllegalState If the current replica is not a bitarchive replica.
270     */
271    public static ChannelID getAllBa() throws IllegalState {
272        ChannelID res = getInstance().ALL_BA;
273
274        if (res == null) {
275            throw new IllegalState("A checksum replica does not have the channels for communicating with a bitarchive "
276                    + "replica.");
277        }
278
279        return res;
280    }
281
282    /**
283     * Implementation notice: This cannot be initialized directly in the field, as it uses ALL_BA_ARRAY, which is
284     * initialized in the constructor.
285     */
286    private final ChannelID ALL_BA;
287
288    /**
289     * Returns ANY_BA channels for every known bitarchive.
290     *
291     * @return An array of ANY_BA channels - one per bitarchive
292     */
293    public static final ChannelID[] getAllArchives_ANY_BAs() {
294        return getInstance().ANY_BA_ARRAY;
295    }
296
297    /**
298     * Queue on which upload requests are sent out to bitarchive servers. The following is the list of ANY_BA for all
299     * archives.
300     */
301    private final ChannelID[] ANY_BA_ARRAY = new ChannelID[replicas.size()];
302
303    /**
304     * Returns the channel where exactly one of all the bitarchive machines at this replica will get the message.
305     *
306     * @return A queue channel that reaches one of the local bitarchive machines.
307     * @throws IllegalState If the current replica is not a bitarchive replica.
308     */
309    public static ChannelID getAnyBa() throws IllegalState {
310        ChannelID res = getInstance().ANY_BA;
311
312        if (res == null) {
313            throw new IllegalState("A checksum replica does not have the channels for communicating with a bitarchive "
314                    + "replica.");
315        }
316
317        return res;
318    }
319
320    /**
321     * Implementation notice: This cannot be initialized directly in the field, as it uses ANY_BA_ARRAY, which is
322     * initialized in the constructor.
323     */
324    private final ChannelID ANY_BA;
325
326    /**
327     * Returns the queue on which to put errors which are not handled elsewhere.
328     *
329     * @return the <code>ChannelID</code> object for this queue.
330     */
331    public static ChannelID getError() {
332        return getInstance().ERROR;
333    }
334
335    private final ChannelID ERROR = new ChannelID(ERROR_CHANNEL_PREFIX, ChannelID.COMMON, ChannelID.NO_IP,
336            ChannelID.NO_APPLINST_ID, ChannelID.QUEUE);
337
338    /**
339     * Given an replica, returns the BAMON queue to which batch jobs must be sent in order to run them on that
340     * bitarchive.
341     *
342     * @param replicaId The id of the replica
343     * @return the channel
344     * @throws ArgumentNotValid if the replicaId is null, unknown, or empty string
345     */
346    public static ChannelID getBaMonForReplica(String replicaId) throws ArgumentNotValid {
347        ArgumentNotValid.checkNotNullOrEmpty(replicaId, "replicaId");
348        ChannelID[] bamons = getAllArchives_BAMONs();
349        for (ChannelID bamon : bamons) {
350            if (bamon != null
351                    && bamon.getName().equals(
352                            Settings.get(CommonSettings.ENVIRONMENT_NAME) + CHANNEL_PART_SEPARATOR + replicaId
353                                    + CHANNEL_PART_SEPARATOR + THEBAMON_CHANNEL_PREFIX)) {
354                return bamon;
355            }
356        }
357        throw new ArgumentNotValid("Did not find a BAMON queue for '" + replicaId + "'");
358    }
359
360    public static ChannelID getTheCrForReplica(String replicaId) {
361        ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId");
362        ChannelID[] crs = getAllArchives_CRs();
363        for (ChannelID cr : crs) {
364            if (cr != null
365                    && cr.getName().equals(
366                            Settings.get(CommonSettings.ENVIRONMENT_NAME) + CHANNEL_PART_SEPARATOR + replicaId
367                                    + CHANNEL_PART_SEPARATOR + THECR_CHANNEL_PREFIX)) {
368                return cr;
369            }
370        }
371        throw new ArgumentNotValid("Did not find a checksum queue for '" + replicaId + "'");
372    }
373
374    /**
375     * Method for extracting the replica from the name of the identifier channel.
376     *
377     * @param channelName The name of the identification channel for the replica.
378     * @return Replica who the identification channel belong to.
379     * @throws UnknownID If the replicaId does not point to a know replica.
380     * @throws ArgumentNotValid If the channelName is either null or empty.
381     */
382    public static Replica retrieveReplicaFromIdentifierChannel(String channelName) throws UnknownID, ArgumentNotValid {
383        ArgumentNotValid.checkNotNullOrEmpty(channelName, "String channelName");
384        if (channelName.contains(THECR_CHANNEL_PREFIX)) {
385            // environmentName ## replicaId ## THE_CR
386            String[] parts = channelName.split(CHANNEL_PART_SEPARATOR);
387            return Replica.getReplicaFromId(parts[1]);
388        } else if (channelName.contains(THEBAMON_CHANNEL_PREFIX)) {
389            // environmentName ## replicaId ## THE_BAMON
390            String[] parts = channelName.split(CHANNEL_PART_SEPARATOR);
391            return Replica.getReplicaFromId(parts[1]);
392        }
393
394        String errMsg = "The current channel name, '" + channelName + "' does not refer to an identification channel";
395        log.warn(errMsg);
396        throw new UnknownID(errMsg);
397    }
398
399    /**
400     * The method for retrieving the name of the identification channel for a replica based on the Id of this replica.
401     *
402     * @param replicaId The id for the replica whose identification channel name should be retrieved.
403     * @return The name of the identification channel for the replica.
404     * @throws UnknownID If no replica with the given replica id is known.
405     * @throws ArgumentNotValid If the replicaId is null or empty.
406     */
407    public static String retrieveReplicaChannelNameFromReplicaId(String replicaId) throws UnknownID, ArgumentNotValid {
408        ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId");
409        return Replica.getReplicaFromId(replicaId).getIdentificationChannel().getName();
410    }
411
412    /**
413     * The method for retrieving the identification channel for a replica based on the Id of this replica.
414     *
415     * @param replicaId The id for the replica whose identification channel name should be retrieved.
416     * @return The identification channel for the replica.
417     * @throws UnknownID If no replica with the given replica id is known.
418     * @throws ArgumentNotValid If the replicaId is null or empty.
419     */
420    public static ChannelID retrieveReplicaChannelFromReplicaId(String replicaId) throws UnknownID, ArgumentNotValid {
421        ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId");
422        return Replica.getReplicaFromId(replicaId).getIdentificationChannel();
423    }
424
425    /**
426     * Returns the queue for sending messages to the IndexServer application.
427     *
428     * @return the <code>ChannelID</code> object for this queue.
429     */
430    public static ChannelID getTheIndexServer() {
431        return getInstance().THE_INDEX_SERVER;
432    }
433
434    private final ChannelID THE_INDEX_SERVER = new ChannelID(INDEXSERVER_CHANNEL_PREFIX, ChannelID.COMMON,
435            ChannelID.NO_IP, ChannelID.NO_APPLINST_ID, ChannelID.QUEUE);
436
437    /**
438     * Returns the queue for getting responses from the IndexServer application.
439     *
440     * @return the <code>ChannelID</code> object for this queue.
441     */
442    public static ChannelID getThisIndexClient() {
443        return getInstance().THIS_INDEX_CLIENT;
444    }
445
446    // TODO Should we use client channels for all our servers?
447    private final ChannelID THIS_INDEX_CLIENT = new ChannelID(THISINDEXCLIENT_CHANNEL_PREFIX, ChannelID.COMMON,
448            ChannelID.INCLUDE_IP, ChannelID.INCLUDE_APPLINST_ID, ChannelID.QUEUE);
449
450    /**
451     * Return the topic for the monitor registry.
452     *
453     * @return the <code>ChannelID</code> object for the queue.
454     */
455    public static ChannelID getTheMonitorServer() {
456        return getInstance().THE_MONITOR_SERVER;
457    }
458
459    private final ChannelID THE_MONITOR_SERVER = new ChannelID(MONITOR_CHANNEL_PREFIX, ChannelID.COMMON,
460            ChannelID.NO_IP, ChannelID.NO_APPLINST_ID, ChannelID.TOPIC);
461
462    /**
463     * Reset the instance to re-read the settings. Only for use in tests.
464     */
465    static void reset() {
466        instance = null;
467    }
468
469    /**
470     * Is a given name a ChannelName for a Topic or a Queue.
471     *
472     * @param name a given name
473     * @return true, if arg name contains the string "_ALL_"
474     */
475    public static boolean isTopic(String name) {
476        ArgumentNotValid.checkNotNullOrEmpty(name, "String name");
477        return name.contains("_TOPIC");
478    }
479
480}