package dk.statsbiblioteket.medieplatform.autonomous;

import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.locks.InterProcessLock;
import com.netflix.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/autonomous-component-1.7.jar:dk/statsbiblioteket/medieplatform/autonomous/AutonomousComponent.class */
public class AutonomousComponent implements Callable<CallResult> {
    private static Logger log = LoggerFactory.getLogger(AutonomousComponent.class);
    private final CuratorFramework lockClient;
    private final long timeoutSBOI;
    private final long timeoutBatch;
    private final RunnableComponent runnable;
    private final long pollTime = 1000;
    private final ConcurrencyConnectionStateListener concurrencyConnectionStateListener;
    private final long workerTimout;
    private int simultaneousProcesses;
    private List<String> pastSuccessfulEvents;
    private List<String> pastFailedEvents;
    private List<String> futureEvents;
    private boolean paused;
    private boolean stopped;
    private Integer maxResults;
    private EventTrigger eventTrigger;
    private EventStorer eventStorer;

    public AutonomousComponent(RunnableComponent runnableComponent, CuratorFramework curatorFramework, int i, List<String> list, List<String> list2, List<String> list3, long j, long j2, long j3, EventTrigger eventTrigger, EventStorer eventStorer) {
        this(runnableComponent, curatorFramework, i, list, list2, list3, j, j2, j3, null, eventTrigger, eventStorer);
    }

    public AutonomousComponent(RunnableComponent runnableComponent, CuratorFramework curatorFramework, int i, List<String> list, List<String> list2, List<String> list3, long j, long j2, long j3, Integer num, EventTrigger eventTrigger, EventStorer eventStorer) {
        this.pollTime = 1000L;
        this.paused = false;
        this.stopped = false;
        this.lockClient = curatorFramework;
        this.timeoutSBOI = j;
        this.timeoutBatch = j2;
        this.runnable = runnableComponent;
        this.workerTimout = j3;
        this.simultaneousProcesses = i;
        this.pastSuccessfulEvents = list;
        this.pastFailedEvents = list2;
        this.futureEvents = list3;
        this.eventTrigger = eventTrigger;
        this.eventStorer = eventStorer;
        this.concurrencyConnectionStateListener = new ConcurrencyConnectionStateListener(this);
        this.lockClient.getConnectionStateListenable().addListener(this.concurrencyConnectionStateListener);
        this.maxResults = num;
    }

    protected static void releaseQuietly(InterProcessLock interProcessLock) {
        boolean z = false;
        while (!z) {
            try {
                interProcessLock.release();
            } catch (IllegalStateException e) {
                z = true;
            } catch (Exception e2) {
                log.warn("Caught exception while trying to release lock", (Throwable) e2);
                return;
            }
        }
    }

    protected static boolean acquireQuietly(InterProcessLock interProcessLock, long j) throws LockingException {
        try {
            return interProcessLock.acquire(j, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new LockingException("Failed to acquire lock", e);
        }
    }

    private static String getSBOILockpath(RunnableComponent runnableComponent) {
        return "/SBOI/" + runnableComponent.getComponentName();
    }

    private static String getBatchLockPath(RunnableComponent runnableComponent, Batch batch) {
        return "/" + runnableComponent.getComponentName() + "/" + batch.getFullID();
    }

    private long parseLong(String str, long j) {
        try {
            return Long.parseLong(str);
        } catch (Exception e) {
            return j;
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public CallResult call() throws LockingException, CouldNotGetLockException, CommunicationException {
        InterProcessSemaphoreMutex interProcessSemaphoreMutex = null;
        CallResult callResult = new CallResult();
        HashMap hashMap = new HashMap();
        try {
            interProcessSemaphoreMutex = new InterProcessSemaphoreMutex(this.lockClient, getSBOILockpath(this.runnable));
            try {
                if (!acquireQuietly(interProcessSemaphoreMutex, this.timeoutSBOI)) {
                    throw new CouldNotGetLockException("Could not get lock of SBOI, so returning");
                }
                log.info("SBOI locked, quering for batches");
                Iterator<Batch> triggeredBatches = this.eventTrigger.getTriggeredBatches(this.pastSuccessfulEvents, this.pastFailedEvents, this.futureEvents);
                while (true) {
                    if (!triggeredBatches.hasNext()) {
                        break;
                    }
                    Batch next = triggeredBatches.next();
                    log.info("Found batch {}", next.getFullID());
                    InterProcessSemaphoreMutex interProcessSemaphoreMutex2 = new InterProcessSemaphoreMutex(this.lockClient, getBatchLockPath(this.runnable, next));
                    if (acquireQuietly(interProcessSemaphoreMutex2, this.timeoutBatch)) {
                        log.info("Batch {} locked, creating a worker", next.getFullID());
                        if (this.maxResults != null) {
                            log.debug("Worker will report a maximum of {} results.", this.maxResults);
                        }
                        hashMap.put(new BatchWorker(this.runnable, new ResultCollector(this.runnable.getComponentName(), this.runnable.getComponentVersion(), this.maxResults), next, this.eventStorer), interProcessSemaphoreMutex2);
                        if (hashMap.size() >= this.simultaneousProcesses) {
                            log.info("We now have sufficient workers, look for no more batches");
                            break;
                        }
                    }
                }
                checkLockServerConnectionState();
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.simultaneousProcesses);
                ArrayList arrayList = new ArrayList();
                for (BatchWorker batchWorker : hashMap.keySet()) {
                    log.info("Submitting worker for batch {}", batchWorker.getBatch().getBatchID());
                    this.concurrencyConnectionStateListener.add(batchWorker);
                    arrayList.add(newFixedThreadPool.submit(batchWorker));
                }
                log.info("Shutting down the pool, and waiting for the workers to terminate");
                newFixedThreadPool.shutdown();
                long currentTimeMillis = System.currentTimeMillis();
                boolean z = false;
                while (!z) {
                    log.trace("Waiting to terminate");
                    z = true;
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        z = z && ((Future) it.next()).isDone();
                    }
                    checkLockServerConnectionState(newFixedThreadPool);
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                    }
                    if (System.currentTimeMillis() - currentTimeMillis > this.workerTimout) {
                        log.error("Worker timeout exceeded (" + this.workerTimout + "ms), shutting down all threads. We still need to wait for them to terminate, however.");
                        newFixedThreadPool.shutdownNow();
                        Iterator it2 = arrayList.iterator();
                        while (it2.hasNext()) {
                            ((Future) it2.next()).cancel(true);
                        }
                    }
                }
                log.info("All is now done, all workers have completed");
                for (BatchWorker batchWorker2 : hashMap.keySet()) {
                    callResult.addResult(batchWorker2.getBatch(), batchWorker2.getResultCollector());
                }
                Iterator it3 = hashMap.values().iterator();
                while (it3.hasNext()) {
                    releaseQuietly((InterProcessLock) it3.next());
                }
                releaseQuietly(interProcessSemaphoreMutex);
                return callResult;
            } catch (RuntimeException e2) {
                Iterator it4 = hashMap.values().iterator();
                while (it4.hasNext()) {
                    releaseQuietly((InterProcessLock) it4.next());
                }
                throw e2;
            }
        } catch (Throwable th) {
            Iterator it5 = hashMap.values().iterator();
            while (it5.hasNext()) {
                releaseQuietly((InterProcessLock) it5.next());
            }
            releaseQuietly(interProcessSemaphoreMutex);
            throw th;
        }
    }

    private Set<Batch> asSet(Iterator<Batch> it) {
        HashSet hashSet = new HashSet();
        while (it.hasNext()) {
            hashSet.add(it.next());
        }
        return hashSet;
    }

    private void checkLockServerConnectionState() throws CommunicationException {
        checkLockServerConnectionState(null);
    }

    private void checkLockServerConnectionState(ExecutorService executorService) throws CommunicationException {
        checkStopped(executorService);
        while (this.paused && !this.stopped) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
        checkStopped(executorService);
    }

    private void checkStopped(ExecutorService executorService) throws CommunicationException {
        if (this.stopped) {
            if (executorService != null) {
                executorService.shutdownNow();
            }
            throw new CommunicationException("Lost connection to lock server");
        }
    }

    public void setPaused(boolean z) {
        this.paused = z;
    }

    public void setStopped(boolean z) {
        this.stopped = z;
    }
}
