package org.archive.trough;

import com.rethinkdb.RethinkDB;
import com.rethinkdb.gen.ast.ReqlExpr;
import com.rethinkdb.net.Connection;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.archive.util.ArchiveUtils;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

/* loaded from: input_file:org/archive/trough/TroughClient.class */
public class TroughClient {
    private static final Logger logger = Logger.getLogger(TroughClient.class.getName());
    protected static final RethinkDB r = RethinkDB.r;
    protected static final int SIX_HOURS_MS = 21600000;
    protected static final int TEN_MINUTES_MS = 600000;
    protected static final String JSON_MIMETYPE = "application/json";
    protected static final String SQL_MIMETYPE = "application/sql";
    protected Random rand;
    protected Map<String, String> writeUrlCache;
    protected Map<String, String> readUrlCache;
    protected Set<String> dirtySegments;
    protected String[] rethinkServers;
    protected String rethinkDb;
    protected Integer promotionInterval;
    protected Thread promotrix;

    /* loaded from: input_file:org/archive/trough/TroughClient$Promotrix.class */
    protected class Promotrix implements Runnable {
        protected Promotrix() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Thread.sleep(TroughClient.this.promotionInterval.intValue() * 1000);
                    try {
                        TroughClient.this.promoteDirtySegments();
                    } catch (Exception e) {
                        TroughClient.logger.log(Level.WARNING, "continuing after unexpected exception promoting dirty segments", (Throwable) e);
                    }
                } catch (InterruptedException e2) {
                    TroughClient.logger.info("promoter thread shutting down");
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:org/archive/trough/TroughClient$TroughException.class */
    public class TroughException extends IOException {
        private static final long serialVersionUID = 1;

        public TroughException(String str) {
            super(str);
        }

        public TroughException(Exception exc) {
            super(exc);
        }

        public TroughException(String str, Throwable th) {
            super(str, th);
        }
    }

    /* loaded from: input_file:org/archive/trough/TroughClient$TroughNoReadUrlException.class */
    public class TroughNoReadUrlException extends TroughException {
        private static final long serialVersionUID = 1;

        public TroughNoReadUrlException(String str) {
            super(str);
        }
    }

    public static String sqlValue(Object obj) {
        return obj == null ? "null" : obj instanceof Date ? "datetime('" + ArchiveUtils.getLog14Date((Date) obj) + "')" : obj instanceof Boolean ? ((Boolean) obj).booleanValue() ? "1" : "0" : obj instanceof Number ? obj.toString() : "'" + obj.toString().replaceAll("'", "''") + "'";
    }

    public TroughClient(String str) throws MalformedURLException {
        this(str, null);
    }

    protected static HttpURLConnection httpRequest(String str, String str2, String str3, String str4, int i) throws IOException {
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(str2).openConnection();
        httpURLConnection.setConnectTimeout(i);
        httpURLConnection.setReadTimeout(i);
        httpURLConnection.setRequestMethod(str);
        if (str3 != null) {
            httpURLConnection.setRequestProperty("Content-Type", str3);
        }
        if (str4 != null) {
            httpURLConnection.setDoOutput(true);
            byte[] bytes = str4.getBytes("UTF-8");
            logger.finer(str + " " + str2 + " " + bytes.length + " bytes " + str3);
            httpURLConnection.setRequestProperty("Content-Length", Integer.toString(bytes.length));
            httpURLConnection.getOutputStream().write(bytes);
            httpURLConnection.getOutputStream().flush();
            httpURLConnection.getOutputStream().close();
        } else {
            logger.finer(str + " " + str2);
        }
        return httpURLConnection;
    }

    public void promote(String str) throws IOException, TroughException {
        String segmentManagerUrl = segmentManagerUrl("promote");
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("segment", str);
        logger.info("promoting segment " + str + " to permanent storage in hdfs: posting " + jSONObject + " to " + segmentManagerUrl);
        HttpURLConnection httpRequest = httpRequest("POST", segmentManagerUrl, JSON_MIMETYPE, jSONObject.toString(), SIX_HOURS_MS);
        if (httpRequest.getResponseCode() != 200) {
            throw new TroughException("received " + httpRequest.getResponseCode() + ": " + responsePayload(httpRequest) + " in response to POST " + segmentManagerUrl + " with data " + jSONObject.toString());
        }
    }

    protected static String responsePayload(HttpURLConnection httpURLConnection) throws IOException {
        InputStream errorStream;
        try {
            errorStream = httpURLConnection.getInputStream();
        } catch (IOException e) {
            errorStream = httpURLConnection.getErrorStream();
        }
        return IOUtils.toString(errorStream, "UTF-8");
    }

    protected Object rethinkQuery(ReqlExpr reqlExpr, Integer num) throws TroughException {
        logger.fine("querying rethinkdb: " + reqlExpr);
        if (num == null || num.intValue() < 0) {
            num = 9;
        }
        Exception exc = null;
        for (int i = 0; i <= num.intValue(); i++) {
            int nextInt = this.rand.nextInt(this.rethinkServers.length);
            Connection connection = null;
            try {
                try {
                    String[] split = this.rethinkServers[nextInt].split(":", 2);
                    String str = split[0];
                    connection = r.connection().hostname(str).port(split.length == 2 ? Integer.valueOf(split[1]).intValue() : 28015).db(this.rethinkDb).connect();
                    Object run = reqlExpr.run(connection);
                    if (connection != null) {
                        connection.close();
                    }
                    return run;
                } catch (Exception e) {
                    logger.warning("rethinkdb query failed (server=" + this.rethinkServers[nextInt] + "; " + i + " retries left)");
                    exc = e;
                    if (connection != null) {
                        connection.close();
                    }
                }
            } catch (Throwable th) {
                if (connection != null) {
                    connection.close();
                }
                throw th;
            }
        }
        throw new TroughException(exc);
    }

    protected String segmentManagerUrl(String str) throws TroughException {
        String segmentManagerUrl = segmentManagerUrl();
        if (segmentManagerUrl.endsWith("/")) {
            segmentManagerUrl = segmentManagerUrl.substring(0, segmentManagerUrl.length() - 1);
        }
        if (str.startsWith("/")) {
            str = str.substring(1);
        }
        return segmentManagerUrl + "/" + str;
    }

    protected String segmentManagerUrl() throws TroughException {
        Iterator it = ((Iterable) rethinkQuery(r.table("services").optArg("read_mode", "majority").getAll(new Object[]{"trough-sync-master"}).filter(reqlExpr -> {
            return r.now().sub(new Object[]{reqlExpr.g("last_heartbeat")}).lt(reqlExpr.g("ttl"), new Object[0]);
        }), null)).iterator();
        if (it.hasNext()) {
            return (String) ((Map) it.next()).get("url");
        }
        throw new TroughException("no healthy trough-sync-master in rethinkdb?");
    }

    public TroughClient(String str, Integer num) throws MalformedURLException {
        this.rand = new Random();
        parseRethinkdbUrl(str);
        this.writeUrlCache = new HashMap();
        this.readUrlCache = new HashMap();
        this.dirtySegments = new HashSet();
        this.promotionInterval = num;
    }

    public void start() {
        if (this.promotionInterval != null) {
            this.promotrix = new Thread(new Promotrix(), "TroughClient-promotrix");
            this.promotrix.setDaemon(true);
            this.promotrix.start();
        }
    }

    public void stop() {
        if (this.promotrix != null) {
            this.promotrix.interrupt();
            try {
                this.promotrix.join(60000L);
            } catch (InterruptedException e) {
            }
            if (this.promotrix.isAlive()) {
                logger.warning(this.promotrix + " is still running after interrupting and waiting one minute; it should die once the active promotion finishes");
            }
            this.promotrix = null;
        }
    }

    protected void parseRethinkdbUrl(String str) throws MalformedURLException {
        Matcher matcher = Pattern.compile("^rethinkdb://([^/]+)/([^/]+)$").matcher(str);
        if (!matcher.matches()) {
            throw new MalformedURLException("failed to parse as rethinkdb url: " + str);
        }
        this.rethinkServers = matcher.group(1).split(",");
        this.rethinkDb = matcher.group(2);
    }

    public List<Map<String, Object>> read(String str, String str2, Object[] objArr) throws IOException {
        String readUrl = readUrl(str);
        String[] strArr = new String[objArr.length];
        for (int i = 0; i < objArr.length; i++) {
            strArr[i] = sqlValue(objArr[i]);
        }
        String format = String.format(str2, strArr);
        try {
            HttpURLConnection httpRequest = httpRequest("POST", readUrl, SQL_MIMETYPE, format, TEN_MINUTES_MS);
            if (httpRequest.getResponseCode() != 200) {
                throw new TroughException("unexpected response " + httpRequest.getResponseCode() + " " + httpRequest.getResponseMessage() + ": " + responsePayload(httpRequest) + " from " + readUrl + " to query: " + format);
            }
            return (List) new JSONParser().parse(new InputStreamReader(httpRequest.getInputStream(), "UTF-8"));
        } catch (ParseException e) {
            synchronized (this.readUrlCache) {
                this.readUrlCache.remove(str);
                throw new TroughException("problem parsing json response from " + readUrl, e);
            }
        } catch (IOException e2) {
            synchronized (this.readUrlCache) {
                this.readUrlCache.remove(str);
                throw e2;
            }
        }
    }

    protected String readUrl(String str) throws TroughException {
        if (this.readUrlCache.get(str) == null) {
            synchronized (this.readUrlCache) {
                if (this.readUrlCache.get(str) == null) {
                    this.readUrlCache.put(str, readUrlNoCache(str));
                }
            }
            logger.info("segment " + str + " read url is " + this.readUrlCache.get(str));
        }
        return this.readUrlCache.get(str);
    }

    protected String readUrlNoCache(String str) throws TroughException {
        List list = (List) rethinkQuery(r.table("services").getAll(new Object[]{str}).optArg("index", "segment").filter(r.hashMap("role", "trough-read")).filter(reqlExpr -> {
            return r.now().sub(new Object[]{reqlExpr.g("last_heartbeat")}).lt(reqlExpr.g("ttl"), new Object[0]);
        }).orderBy("load"), null);
        if (list == null || list.size() <= 0) {
            throw new TroughNoReadUrlException("failed to obtain read url for trough segment " + str + " (maybe the segment hasn't been created yet)");
        }
        return (String) ((Map) list.get(0)).get("url");
    }

    public void registerSchema(String str, String str2) throws IOException {
        String segmentManagerUrl = segmentManagerUrl("schema/" + str + "/sql");
        logger.info("registering schema " + str + " at " + segmentManagerUrl);
        HttpURLConnection httpRequest = httpRequest("PUT", segmentManagerUrl, SQL_MIMETYPE, str2, TEN_MINUTES_MS);
        if (httpRequest.getResponseCode() != 201 && httpRequest.getResponseCode() != 204) {
            throw new TroughException("received " + httpRequest.getResponseCode() + ": " + responsePayload(httpRequest) + " in response to PUT " + segmentManagerUrl + " with data " + str2);
        }
    }

    public void write(String str, String str2, Object[] objArr) throws IOException {
        write(str, str2, objArr, "default");
    }

    public void write(String str, String str2, Object[] objArr, String str3) throws IOException {
        String writeUrl = writeUrl(str, str3);
        String[] strArr = new String[objArr.length];
        for (int i = 0; i < objArr.length; i++) {
            strArr[i] = sqlValue(objArr[i]);
        }
        String format = String.format(str2, strArr);
        try {
            HttpURLConnection httpRequest = httpRequest("POST", writeUrl, SQL_MIMETYPE, format, TEN_MINUTES_MS);
            if (httpRequest.getResponseCode() != 200) {
                throw new TroughException("unexpected response " + httpRequest.getResponseCode() + " " + httpRequest.getResponseMessage() + ": " + responsePayload(httpRequest) + " from " + writeUrl + " to query: " + format);
            }
            if (!this.dirtySegments.contains(str)) {
                synchronized (this.dirtySegments) {
                    this.dirtySegments.add(str);
                }
            }
        } catch (IOException e) {
            synchronized (this.writeUrlCache) {
                this.writeUrlCache.remove(str);
                throw e;
            }
        }
    }

    protected String writeUrl(String str, String str2) throws IOException {
        if (this.writeUrlCache.get(str) == null) {
            synchronized (this.writeUrlCache) {
                if (this.writeUrlCache.get(str) == null) {
                    this.writeUrlCache.put(str, writeUrlNoCache(str, str2));
                }
            }
            logger.info("segment " + str + " write url is " + this.writeUrlCache.get(str));
        }
        return this.writeUrlCache.get(str);
    }

    protected String writeUrlNoCache(String str, String str2) throws IOException {
        String segmentManagerUrl = segmentManagerUrl("provision");
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("segment", str);
        jSONObject.put("schema", str2);
        HttpURLConnection httpRequest = httpRequest("POST", segmentManagerUrl, JSON_MIMETYPE, jSONObject.toJSONString(), TEN_MINUTES_MS);
        if (httpRequest.getResponseCode() != 200) {
            throw new TroughException("received " + httpRequest.getResponseCode() + ": " + responsePayload(httpRequest) + " in response to POST " + segmentManagerUrl + " with data " + jSONObject);
        }
        try {
            JSONObject jSONObject2 = (JSONObject) new JSONParser().parse(new InputStreamReader(httpRequest.getInputStream(), "UTF-8"));
            String str3 = (String) jSONObject2.get("write_url");
            if (str3 == null) {
                throw new TroughException("write_url missing from response to " + segmentManagerUrl + " - " + jSONObject2);
            }
            return str3;
        } catch (ParseException e) {
            throw new TroughException("unable to parse response from POST " + segmentManagerUrl + " as json", e);
        }
    }

    public void promoteDirtySegments() {
        String[] strArr;
        synchronized (this.dirtySegments) {
            strArr = (String[]) this.dirtySegments.toArray(new String[0]);
            this.dirtySegments.clear();
        }
        for (String str : strArr) {
            try {
                promote(str);
            } catch (Exception e) {
                logger.log(Level.WARNING, "problem promoting segment " + str, (Throwable) e);
            }
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
        }
    }
}
