package org.archive.crawler.frontier;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.httpclient.URIException;
import org.archive.crawler.event.AMQPUrlReceivedEvent;
import org.archive.crawler.event.CrawlStateEvent;
import org.archive.crawler.framework.CrawlController;
import org.archive.crawler.postprocessor.CandidatesProcessor;
import org.archive.modules.CrawlURI;
import org.archive.modules.extractor.Hop;
import org.archive.modules.extractor.LinkContext;
import org.archive.net.UURI;
import org.archive.net.UURIFactory;
import org.archive.spring.KeyedProperties;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.Lifecycle;

/* loaded from: input_file:org/archive/crawler/frontier/AMQPUrlReceiver.class */
public class AMQPUrlReceiver implements Lifecycle, ApplicationContextAware, ApplicationListener<CrawlStateEvent> {
    private static final long serialVersionUID = 2;
    public static final String A_RECEIVED_FROM_AMQP = "receivedFromAMQP";
    protected ApplicationContext appCtx;
    protected CandidatesProcessor candidates;
    private transient StarterRestarter starterRestarter;
    private static final Logger logger = Logger.getLogger(AMQPUrlReceiver.class.getName());
    protected static final Set<String> REQUEST_HEADER_BLACKLIST = new HashSet(Arrays.asList("accept-encoding", "upgrade-insecure-requests", "host", "connection"));
    protected String amqpUri = "amqp://guest:guest@localhost:5672/%2f";
    protected String exchange = "umbra";
    protected String queueName = "requests";
    protected boolean isRunning = false;
    private boolean durable = false;
    private boolean autoDelete = true;
    private boolean forceFetch = false;
    private Integer prefetchCount = 1000;
    private transient Lock lock = new ReentrantLock(true);
    private transient boolean pauseConsumer = false;
    private transient String consumerTag = null;
    protected transient Connection connection = null;
    protected transient Channel channel = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.archive.crawler.frontier.AMQPUrlReceiver$1, reason: invalid class name */
    /* loaded from: input_file:org/archive/crawler/frontier/AMQPUrlReceiver$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$archive$crawler$framework$CrawlController$State = new int[CrawlController.State.values().length];

        static {
            try {
                $SwitchMap$org$archive$crawler$framework$CrawlController$State[CrawlController.State.PAUSING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$archive$crawler$framework$CrawlController$State[CrawlController.State.PAUSED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$archive$crawler$framework$CrawlController$State[CrawlController.State.RUNNING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/archive/crawler/frontier/AMQPUrlReceiver$StarterRestarter.class */
    private class StarterRestarter extends Thread {
        public StarterRestarter(String str) {
            super(str);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    AMQPUrlReceiver.this.lock.lockInterruptibly();
                    AMQPUrlReceiver.logger.finest("Checking consumerTag=" + AMQPUrlReceiver.this.consumerTag + " and pauseConsumer=" + AMQPUrlReceiver.this.pauseConsumer);
                    try {
                        if (AMQPUrlReceiver.this.consumerTag == null && !AMQPUrlReceiver.this.pauseConsumer) {
                            try {
                                startConsumer();
                            } catch (IOException | TimeoutException e) {
                                AMQPUrlReceiver.logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again after 10 seconds)", e);
                            }
                        }
                        if (AMQPUrlReceiver.this.consumerTag != null && AMQPUrlReceiver.this.pauseConsumer) {
                            try {
                                if (AMQPUrlReceiver.this.consumerTag != null) {
                                    AMQPUrlReceiver.logger.info("Attempting to cancel URLConsumer with consumerTag=" + AMQPUrlReceiver.this.consumerTag);
                                    AMQPUrlReceiver.this.channel().basicCancel(AMQPUrlReceiver.this.consumerTag);
                                    AMQPUrlReceiver.this.consumerTag = null;
                                    AMQPUrlReceiver.logger.info("Cancelled URLConsumer.");
                                }
                            } catch (IOException | TimeoutException e2) {
                                AMQPUrlReceiver.logger.log(Level.SEVERE, "problem cancelling AMQP consumer (will try again after 10 seconds)", e2);
                            }
                        }
                        AMQPUrlReceiver.this.lock.unlock();
                        Thread.sleep(10000L);
                    } finally {
                    }
                } catch (InterruptedException e3) {
                    return;
                }
            }
        }

        public void startConsumer() throws IOException, TimeoutException {
            UrlConsumer urlConsumer = new UrlConsumer(AMQPUrlReceiver.this.channel());
            AMQPUrlReceiver.this.channel().exchangeDeclare(AMQPUrlReceiver.this.getExchange(), "direct", true);
            AMQPUrlReceiver.this.channel().queueDeclare(AMQPUrlReceiver.this.getQueueName(), AMQPUrlReceiver.this.durable, false, AMQPUrlReceiver.this.autoDelete, (Map) null);
            AMQPUrlReceiver.this.channel().queueBind(AMQPUrlReceiver.this.getQueueName(), AMQPUrlReceiver.this.getExchange(), AMQPUrlReceiver.this.getQueueName());
            if (AMQPUrlReceiver.this.prefetchCount != null) {
                AMQPUrlReceiver.this.channel().basicQos(AMQPUrlReceiver.this.prefetchCount.intValue());
            }
            AMQPUrlReceiver.this.consumerTag = AMQPUrlReceiver.this.channel().basicConsume(AMQPUrlReceiver.this.getQueueName(), false, urlConsumer);
            AMQPUrlReceiver.logger.info("started AMQP consumer uri=" + AMQPUrlReceiver.this.getAmqpUri() + " exchange=" + AMQPUrlReceiver.this.getExchange() + " queueName=" + AMQPUrlReceiver.this.getQueueName() + " consumerTag=" + AMQPUrlReceiver.this.consumerTag);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/archive/crawler/frontier/AMQPUrlReceiver$UrlConsumer.class */
    public class UrlConsumer extends DefaultConsumer {
        public UrlConsumer(Channel channel) {
            super(channel);
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            try {
                String str2 = new String(bArr, "UTF-8");
                JSONObject jSONObject = new JSONObject(str2);
                if ("GET".equals(jSONObject.getString("method"))) {
                    try {
                        CrawlURI makeCrawlUri = makeCrawlUri(jSONObject);
                        KeyedProperties.clearAllOverrideContexts();
                        AMQPUrlReceiver.this.candidates.runCandidateChain(makeCrawlUri, (CrawlURI) null);
                        AMQPUrlReceiver.this.appCtx.publishEvent(new AMQPUrlReceivedEvent(AMQPUrlReceiver.this, makeCrawlUri));
                    } catch (JSONException e) {
                        AMQPUrlReceiver.logger.log(Level.SEVERE, "problem creating CrawlURI from json received via AMQP " + str2, (Throwable) e);
                    } catch (URIException e2) {
                        AMQPUrlReceiver.logger.log(Level.WARNING, "problem creating CrawlURI from json received via AMQP " + str2, e2);
                    } catch (Exception e3) {
                        AMQPUrlReceiver.logger.log(Level.SEVERE, "Unanticipated problem creating CrawlURI from json received via AMQP " + str2, (Throwable) e3);
                    }
                } else {
                    AMQPUrlReceiver.logger.info("ignoring url with method other than GET - " + str2);
                }
                AMQPUrlReceiver.logger.finest("Now ACKing: " + str2);
                getChannel().basicAck(envelope.getDeliveryTag(), false);
            } catch (UnsupportedEncodingException e4) {
                throw new RuntimeException(e4);
            }
        }

        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
            if (shutdownSignalException.isInitiatedByApplication()) {
                AMQPUrlReceiver.logger.info("amqp channel/connection shut down consumerTag=" + str);
            } else {
                AMQPUrlReceiver.logger.log(Level.SEVERE, "amqp channel/connection unexpectedly shut down consumerTag=" + str, (Throwable) shutdownSignalException);
            }
            AMQPUrlReceiver.this.consumerTag = null;
        }

        protected CrawlURI makeCrawlUri(JSONObject jSONObject) throws URIException, JSONException {
            JSONObject jSONObject2 = jSONObject.getJSONObject("headers");
            UURI uURIFactory = UURIFactory.getInstance(jSONObject.getString("url"));
            UURI uURIFactory2 = UURIFactory.getInstance(jSONObject.getString("parentUrl"));
            JSONObject jSONObject3 = jSONObject.getJSONObject("parentUrlMetadata");
            CrawlURI crawlURI = new CrawlURI(uURIFactory, jSONObject3.getString("pathFromSeed") + jSONObject.optString("hop", Hop.INFERRED.getHopString()), uURIFactory2, LinkContext.INFERRED_MISC);
            populateHeritableMetadata(crawlURI, jSONObject3);
            HashMap hashMap = new HashMap();
            for (Object obj : jSONObject2.keySet()) {
                String obj2 = obj.toString();
                if (!obj2.startsWith(":") && !AMQPUrlReceiver.REQUEST_HEADER_BLACKLIST.contains(obj2)) {
                    hashMap.put(obj2, jSONObject2.getString(obj.toString()));
                }
            }
            crawlURI.getData().put("customHttpRequestHeaders", hashMap);
            if (Hop.INFERRED.getHopString().equals(crawlURI.getLastHop())) {
                crawlURI.setSchedulingDirective(1);
                crawlURI.setPrecedence(1);
            }
            crawlURI.setForceFetch(AMQPUrlReceiver.this.forceFetch || jSONObject.optBoolean("forceFetch"));
            crawlURI.setSeed(jSONObject.optBoolean("isSeed"));
            crawlURI.getAnnotations().add(AMQPUrlReceiver.A_RECEIVED_FROM_AMQP);
            return crawlURI;
        }

        protected void populateHeritableMetadata(CrawlURI crawlURI, JSONObject jSONObject) {
            JSONObject jSONObject2 = jSONObject.getJSONObject("heritableData");
            for (String str : jSONObject2.keySet()) {
                Object obj = jSONObject2.get(str);
                if (obj instanceof JSONArray) {
                    HashSet hashSet = new HashSet();
                    JSONArray jSONArray = (JSONArray) obj;
                    for (int i = 0; i < jSONArray.length(); i++) {
                        hashSet.add(jSONArray.getString(i));
                    }
                    crawlURI.getData().put(str, hashSet);
                } else {
                    crawlURI.getData().put(str, jSONObject2.get(str));
                }
            }
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.appCtx = applicationContext;
    }

    public CandidatesProcessor getCandidates() {
        return this.candidates;
    }

    @Autowired
    public void setCandidates(CandidatesProcessor candidatesProcessor) {
        this.candidates = candidatesProcessor;
    }

    public String getAmqpUri() {
        return this.amqpUri;
    }

    public void setAmqpUri(String str) {
        this.amqpUri = str;
    }

    public String getExchange() {
        return this.exchange;
    }

    public void setExchange(String str) {
        this.exchange = str;
    }

    public String getQueueName() {
        return this.queueName;
    }

    public void setQueueName(String str) {
        this.queueName = str;
    }

    public boolean isRunning() {
        return this.isRunning;
    }

    public boolean isDurable() {
        return this.durable;
    }

    public void setDurable(boolean z) {
        this.durable = z;
    }

    public boolean isAutoDelete() {
        return this.autoDelete;
    }

    public void setAutoDelete(boolean z) {
        this.autoDelete = z;
    }

    public boolean isForceFetch() {
        return this.forceFetch;
    }

    public void setForceFetch(boolean z) {
        this.forceFetch = z;
    }

    public void start() {
        this.lock.lock();
        try {
            if (!this.isRunning) {
                this.starterRestarter = new StarterRestarter(AMQPUrlReceiver.class.getSimpleName() + "-starter-restarter");
                try {
                    this.starterRestarter.startConsumer();
                } catch (IOException | TimeoutException e) {
                    logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again soon)", e);
                }
                this.starterRestarter.start();
            }
            this.isRunning = true;
        } finally {
            this.lock.unlock();
        }
    }

    public void stop() {
        this.lock.lock();
        try {
            logger.info("shutting down");
            if (this.starterRestarter != null && this.starterRestarter.isAlive()) {
                this.starterRestarter.interrupt();
                try {
                    this.starterRestarter.join();
                } catch (InterruptedException e) {
                }
            }
            this.starterRestarter = null;
            if (this.connection != null && this.connection.isOpen()) {
                try {
                    this.connection.close();
                } catch (IOException e2) {
                    logger.log(Level.SEVERE, "problem closing AMQP connection", (Throwable) e2);
                }
            }
            this.connection = null;
            this.channel = null;
            this.isRunning = false;
        } finally {
            this.lock.unlock();
        }
    }

    protected Connection connection() throws IOException, TimeoutException {
        this.lock.lock();
        try {
            if (this.connection != null && !this.connection.isOpen()) {
                logger.warning("connection is closed, creating a new one");
                this.connection = null;
            }
            if (this.connection == null) {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                try {
                    connectionFactory.setUri(getAmqpUri());
                    this.connection = connectionFactory.newConnection();
                } catch (Exception e) {
                    throw new IOException("problem with AMQP uri " + getAmqpUri(), e);
                }
            }
            return this.connection;
        } finally {
            this.lock.unlock();
        }
    }

    protected Channel channel() throws IOException, TimeoutException {
        this.lock.lock();
        try {
            if (this.channel != null && !this.channel.isOpen()) {
                logger.warning("channel is not open, creating a new one");
                this.channel = null;
            }
            if (this.channel == null) {
                this.channel = connection().createChannel();
            }
            return this.channel;
        } finally {
            this.lock.unlock();
        }
    }

    public void onApplicationEvent(CrawlStateEvent crawlStateEvent) {
        switch (AnonymousClass1.$SwitchMap$org$archive$crawler$framework$CrawlController$State[crawlStateEvent.getState().ordinal()]) {
            case 1:
            case 2:
                if (this.pauseConsumer) {
                    return;
                }
                logger.info("Requesting a pause of the URLConsumer...");
                this.pauseConsumer = true;
                return;
            case 3:
                if (this.pauseConsumer) {
                    logger.info("Requesting unpause of the URLConsumer...");
                    this.pauseConsumer = false;
                    return;
                }
                return;
            default:
                return;
        }
    }
}
