package dk.statsbiblioteket.medieplatform.autonomous;

import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.locks.InterProcessLock;
import com.netflix.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import dk.statsbiblioteket.medieplatform.autonomous.EventTrigger;
import dk.statsbiblioteket.medieplatform.autonomous.Item;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/autonomous-component-2.2.jar:dk/statsbiblioteket/medieplatform/autonomous/AutonomousComponent.class */
public class AutonomousComponent<T extends Item> implements Callable<CallResult<T>> {
    private static final Logger log = LoggerFactory.getLogger(AutonomousComponent.class);
    private final CuratorFramework lockClient;
    private final List<String> oldEvents;
    private final List<String> itemTypes;
    private final long timeoutSBOI;
    private final long timeoutBatch;
    private final RunnableComponent<T> runnable;
    private final ConcurrencyConnectionStateListener concurrencyConnectionStateListener;
    private final long workerTimout;
    private final int simultaneousProcesses;
    private final int workQueueMaxLength;
    private final List<String> pastSuccessfulEvents;
    private final List<String> futureEvents;
    private final Integer maxResults;
    private final EventTrigger<T> eventTrigger;
    private final EventStorer<T> eventStorer;
    private final long pollTime = 1000;
    private boolean paused = false;
    private boolean stopped = false;

    public AutonomousComponent(RunnableComponent<T> runnableComponent, CuratorFramework curatorFramework, int i, Integer num, List<String> list, List<String> list2, List<String> list3, List<String> list4, long j, long j2, long j3, Integer num2, EventTrigger<T> eventTrigger, EventStorer<T> eventStorer) {
        this.lockClient = curatorFramework;
        this.oldEvents = list3;
        this.itemTypes = list4;
        this.timeoutSBOI = j;
        this.timeoutBatch = j2;
        this.runnable = runnableComponent;
        this.workerTimout = j3;
        this.simultaneousProcesses = i;
        if (num == null) {
            this.workQueueMaxLength = i;
        } else {
            this.workQueueMaxLength = num.intValue();
        }
        this.pastSuccessfulEvents = list;
        this.futureEvents = list2;
        this.eventTrigger = eventTrigger;
        this.eventStorer = eventStorer;
        this.concurrencyConnectionStateListener = new ConcurrencyConnectionStateListener(this);
        this.lockClient.getConnectionStateListenable().addListener(this.concurrencyConnectionStateListener);
        this.maxResults = num2;
    }

    protected static void releaseQuietly(InterProcessLock interProcessLock) {
        boolean z = false;
        while (!z) {
            try {
                interProcessLock.release();
            } catch (IllegalStateException e) {
                z = true;
            } catch (Exception e2) {
                log.warn("Caught exception while trying to release lock", (Throwable) e2);
                return;
            }
        }
    }

    protected static boolean acquireQuietly(InterProcessLock interProcessLock, long j) throws LockingException {
        try {
            return interProcessLock.acquire(j, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new LockingException("Failed to acquire lock", e);
        }
    }

    private static <T extends Item> String getSBOILockpath(RunnableComponent<T> runnableComponent) {
        return "/SBOI/" + runnableComponent.getComponentName();
    }

    private static <T extends Item> String getBatchLockPath(RunnableComponent<T> runnableComponent, T t) {
        return "/" + runnableComponent.getComponentName() + "/" + t.getFullID();
    }

    private long parseLong(String str, long j) {
        try {
            return Long.parseLong(str);
        } catch (Exception e) {
            return j;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.util.concurrent.Callable
    public CallResult<T> call() throws LockingException, CouldNotGetLockException, CommunicationException {
        InterProcessSemaphoreMutex interProcessSemaphoreMutex = null;
        CallResult<T> callResult = (CallResult<T>) new CallResult();
        HashMap hashMap = new HashMap();
        try {
            log.info("Starting {}", this.runnable.getComponentName());
            interProcessSemaphoreMutex = new InterProcessSemaphoreMutex(this.lockClient, getSBOILockpath(this.runnable));
            try {
                if (!acquireQuietly(interProcessSemaphoreMutex, this.timeoutSBOI)) {
                    throw new CouldNotGetLockException("Could not get lock of SBOI, so returning");
                }
                log.debug("SBOI locked, quering for items");
                Iterator<T> triggeredItems = this.eventTrigger.getTriggeredItems(makeQuery());
                while (true) {
                    if (!triggeredItems.hasNext()) {
                        break;
                    }
                    T next = triggeredItems.next();
                    log.info("Found item {}", next.getFullID());
                    InterProcessSemaphoreMutex interProcessSemaphoreMutex2 = new InterProcessSemaphoreMutex(this.lockClient, getBatchLockPath(this.runnable, next));
                    if (acquireQuietly(interProcessSemaphoreMutex2, this.timeoutBatch)) {
                        log.info("Item {} locked, creating a worker", next.getFullID());
                        if (this.maxResults != null) {
                            log.debug("Worker will report a maximum of {} results.", this.maxResults);
                        }
                        hashMap.put(new AutonomousWorker(this.runnable, new ResultCollector(this.runnable.getComponentName(), this.runnable.getComponentVersion(), this.maxResults), next, this.eventStorer), interProcessSemaphoreMutex2);
                        if (hashMap.size() >= this.workQueueMaxLength) {
                            log.debug("We now have sufficient workers, look for no more items");
                            break;
                        }
                    } else {
                        log.info("Item {} already locked, so ignoring.", next.getFullID());
                    }
                }
                if (hashMap.isEmpty()) {
                    log.info("No Items locked, so nothing further to do");
                    Iterator it = hashMap.values().iterator();
                    while (it.hasNext()) {
                        releaseQuietly((InterProcessLock) it.next());
                    }
                    releaseQuietly(interProcessSemaphoreMutex);
                    return callResult;
                }
                checkLockServerConnectionState();
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.simultaneousProcesses);
                ArrayList arrayList = new ArrayList();
                for (AutonomousWorker autonomousWorker : hashMap.keySet()) {
                    log.info("Submitting worker for Item {}", autonomousWorker.getItem().getFullID());
                    this.concurrencyConnectionStateListener.add(autonomousWorker);
                    arrayList.add(newFixedThreadPool.submit(autonomousWorker));
                }
                log.debug("Shutting down the pool, and waiting for the workers to terminate");
                newFixedThreadPool.shutdown();
                long currentTimeMillis = System.currentTimeMillis();
                boolean z = false;
                while (!z) {
                    log.trace("Waiting to terminate");
                    z = true;
                    Iterator it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        z = z && ((Future) it2.next()).isDone();
                    }
                    checkLockServerConnectionState(newFixedThreadPool);
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                    }
                    if (System.currentTimeMillis() - currentTimeMillis > this.workerTimout) {
                        log.error("Worker timeout exceeded (" + this.workerTimout + "ms), shutting down all threads. We still need to wait for them to terminate, however.");
                        newFixedThreadPool.shutdownNow();
                        Iterator it3 = arrayList.iterator();
                        while (it3.hasNext()) {
                            ((Future) it3.next()).cancel(true);
                        }
                    }
                }
                log.info("All is now done, all workers have completed");
                for (AutonomousWorker autonomousWorker2 : hashMap.keySet()) {
                    callResult.addResult(autonomousWorker2.getItem(), autonomousWorker2.getResultCollector());
                }
                Iterator it4 = hashMap.values().iterator();
                while (it4.hasNext()) {
                    releaseQuietly((InterProcessLock) it4.next());
                }
                releaseQuietly(interProcessSemaphoreMutex);
                return callResult;
            } catch (RuntimeException e2) {
                Iterator it5 = hashMap.values().iterator();
                while (it5.hasNext()) {
                    releaseQuietly((InterProcessLock) it5.next());
                }
                throw e2;
            }
        } catch (Throwable th) {
            Iterator it6 = hashMap.values().iterator();
            while (it6.hasNext()) {
                releaseQuietly((InterProcessLock) it6.next());
            }
            releaseQuietly(interProcessSemaphoreMutex);
            throw th;
        }
    }

    private EventTrigger.Query<T> makeQuery() {
        EventTrigger.Query<T> query = new EventTrigger.Query<>();
        if (this.pastSuccessfulEvents != null) {
            query.getPastSuccessfulEvents().addAll(this.pastSuccessfulEvents);
        }
        if (this.futureEvents != null) {
            query.getFutureEvents().addAll(this.futureEvents);
        }
        if (this.oldEvents != null) {
            query.getOldEvents().addAll(this.oldEvents);
        }
        if (this.itemTypes != null) {
            query.getTypes().addAll(this.itemTypes);
        }
        return query;
    }

    private void checkLockServerConnectionState() throws CommunicationException {
        checkLockServerConnectionState(null);
    }

    private void checkLockServerConnectionState(ExecutorService executorService) throws CommunicationException {
        checkStopped(executorService);
        while (this.paused && !this.stopped) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
        checkStopped(executorService);
    }

    private void checkStopped(ExecutorService executorService) throws CommunicationException {
        if (this.stopped) {
            if (executorService != null) {
                executorService.shutdownNow();
            }
            throw new CommunicationException("Lost connection to lock server");
        }
    }

    public void setPaused(boolean z) {
        this.paused = z;
    }

    public void setStopped(boolean z) {
        this.stopped = z;
    }
}
