/*
 * Decompiled with CFR 0.152.
 */
package de.sillysky.nyssr.impl.job;

import de.sillysky.nyssr.address.CTargetAddress;
import de.sillysky.nyssr.exception.CException;
import de.sillysky.nyssr.exception.CUtilCheck;
import de.sillysky.nyssr.id.IId;
import de.sillysky.nyssr.impl.id.CIdFactory;
import de.sillysky.nyssr.impl.job.CJobThread;
import de.sillysky.nyssr.impl.job.ILocalDependencies;
import de.sillysky.nyssr.job.EJobStatus;
import de.sillysky.nyssr.job.IJob;
import de.sillysky.nyssr.job.IJobEngine;
import de.sillysky.nyssr.job.records.CRecordAddJob;
import de.sillysky.nyssr.job.records.CRecordAddJobEngineThread;
import de.sillysky.nyssr.job.records.CRecordNoMoreJobs;
import de.sillysky.nyssr.job.records.CRecordNotifyJobFinished;
import de.sillysky.nyssr.job.records.CRecordRemoveJob;
import de.sillysky.nyssr.log.CLoggerFactory;
import de.sillysky.nyssr.log.ILogger;
import de.sillysky.nyssr.message.CEnvelope;
import de.sillysky.nyssr.namespace.INamespace;
import de.sillysky.nyssr.namespace.INamespaceFactory;
import de.sillysky.nyssr.record.CRecord;
import de.sillysky.nyssr.target.CTarget;
import de.sillysky.nyssr.target.registry.records.CRecordStartTarget;
import de.sillysky.nyssr.thread.EThreadPriority;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

class CJobEngine
extends CTarget
implements IJobEngine {
    private static final ILogger LOG = CLoggerFactory.getLogger(CJobEngine.class);
    private final ConcurrentLinkedQueue<IJob> mJobs;
    private final ConcurrentLinkedQueue<CJobThread> mJobThreads;
    private final ILocalDependencies mLocalDependencies;
    private final String mName;
    private final INamespace mNamespace;
    private final AtomicReference<CTargetAddress> mOwner = new AtomicReference();
    private final AtomicBoolean mStarted = new AtomicBoolean(false);

    CJobEngine(@NotNull ILocalDependencies aLocalDependencies, @NotNull String aName) throws CException {
        this.mLocalDependencies = aLocalDependencies;
        this.mName = aName;
        this.mJobs = new ConcurrentLinkedQueue();
        IId nid = CIdFactory.random("JOB");
        INamespaceFactory nf = aLocalDependencies.getNamespaceFactory();
        this.mNamespace = nf.createAndRegisterNamespace(nid, "JobThread");
        this.mJobThreads = new ConcurrentLinkedQueue();
        this.addMessageHandler(CRecordStartTarget.ID, this::asyncStartTarget);
        this.addMessageHandler(CRecordNotifyJobFinished.ID, this::asyncJobFinished);
        this.addMessageHandler(CRecordAddJob.ID, this::asyncAddJob);
        this.addMessageHandler(CRecordAddJobEngineThread.ID, this::asyncAddThread);
        this.addMessageHandler(CRecordRemoveJob.ID, this::asyncRemoveJob);
        this.mNamespace.getTargetRegistry().registerTarget(this);
    }

    private boolean asyncStartTarget(@NotNull CEnvelope aEnvelope, @NotNull CRecord aRecord) {
        if (aEnvelope.isAnswer()) {
            return false;
        }
        aEnvelope.setResultSuccess();
        return true;
    }

    private boolean asyncAddThread(@NotNull CEnvelope aEnvelope, @NotNull CRecord aRecord) throws CException {
        if (aEnvelope.isAnswer()) {
            return false;
        }
        String name = CRecordAddJobEngineThread.getName(aRecord, null);
        int priorityValue = CRecordAddJobEngineThread.getPriority(aRecord, EThreadPriority.NORMAL.getValue());
        EThreadPriority priority = EThreadPriority.create(priorityValue);
        int maxJobs = CRecordAddJobEngineThread.getMaxJobs(aRecord, 1);
        CJobThread thread = new CJobThread(this, this.mNamespace, name, priority, maxJobs);
        this.mJobThreads.add(thread);
        this.fillThread(thread);
        aEnvelope.setResultSuccess();
        return true;
    }

    private boolean asyncAddJob(@NotNull CEnvelope aEnvelope, @NotNull CRecord aRecord) throws CException {
        if (aEnvelope.isAnswer()) {
            return false;
        }
        IJob job = (IJob)CRecordAddJob.getJob(aRecord, null);
        CUtilCheck.checkNotNull(job, "Job missing", new Object[0]);
        this.privateAppendJob(job);
        LOG.debug(this.toString());
        aEnvelope.setResultSuccess();
        return true;
    }

    private boolean asyncJobFinished(@NotNull CEnvelope aEnvelope, @NotNull CRecord aRecord) throws CException {
        if (aEnvelope.isAnswer()) {
            return false;
        }
        IId queueId = CRecordNotifyJobFinished.getThreadQueueId(aRecord, null);
        String jobName = CRecordNotifyJobFinished.getJobName(aRecord, "?");
        long elapsedTime = CRecordNotifyJobFinished.getElapsedTime(aRecord, 0L);
        CJobThread thread = this.findThread(queueId);
        this.fillThread(thread);
        LOG.debug("Job {} finished, {} ms elapsed", jobName, elapsedTime);
        this.sendNoMoreJobIf();
        aEnvelope.setResultSuccess();
        return true;
    }

    private boolean asyncRemoveJob(@NotNull CEnvelope aEnvelope, @NotNull CRecord aRecord) throws CException {
        if (aEnvelope.isAnswer()) {
            return false;
        }
        UUID jobId = CRecordRemoveJob.getJobId(aRecord, null);
        CUtilCheck.checkNotNull(jobId, "Missing Job ID", new Object[0]);
        Iterator<IJob> it = this.mJobs.iterator();
        boolean done = false;
        while (it.hasNext()) {
            IJob job = it.next();
            if (!jobId.equals(job.getId())) continue;
            LOG.debug("Job {} removed", job.getJobName());
            done = true;
            it.remove();
        }
        if (!done) {
            for (CJobThread th : this.mJobThreads) {
                boolean found = th.stopJob(jobId);
                if (!found) continue;
                done = true;
                break;
            }
        }
        if (!done) {
            aEnvelope.setResult(5, "Not found");
        } else {
            LOG.debug(this.toString());
            this.sendNoMoreJobIf();
            aEnvelope.setResultSuccess();
        }
        return true;
    }

    private void fillThread(CJobThread aThread) throws CException {
        boolean started = this.mStarted.get();
        if (started && aThread != null) {
            boolean result;
            while (result = this.findAndStartJob(aThread)) {
            }
        }
    }

    private boolean findAndStartJob(@NotNull CJobThread aThread) throws CException {
        int maxConcurrentJobs;
        int jobCount;
        boolean started = this.mStarted.get();
        if (started && (jobCount = aThread.getJobCount()) < (maxConcurrentJobs = aThread.getMaxConcurrentJobs())) {
            EThreadPriority priority = aThread.getPriority();
            IJob job = this.fetchWaitingJobByPriority(priority);
            if (job != null) {
                aThread.startJob(job);
                return true;
            }
            int runningJobCount = this.getRunningJobCount();
            int waitingJobCount = this.getWaitingJobCount();
            if (runningJobCount == 0 && waitingJobCount > 0 && (job = this.fetchWaitingJobWithHighestPrio()) != null) {
                aThread.startJob(job);
                return true;
            }
        }
        return false;
    }

    @Nullable
    private IJob fetchWaitingJobByPriority(@NotNull EThreadPriority aPriority) {
        boolean started = this.mStarted.get();
        if (started && !this.mJobs.isEmpty()) {
            for (IJob j : this.mJobs) {
                if (!aPriority.equals((Object)j.getPriority())) continue;
                this.mJobs.remove(j);
                return j;
            }
        }
        return null;
    }

    @Nullable
    private IJob fetchWaitingJobWithHighestPrio() {
        boolean started = this.mStarted.get();
        if (started && !this.mJobs.isEmpty()) {
            IJob next = null;
            int prio = EThreadPriority.LOWEST.getValue();
            for (IJob j : this.mJobs) {
                int prioJob = j.getPriority().getValue();
                if (prioJob < prio) continue;
                next = j;
                prio = prioJob;
            }
            if (next != null) {
                this.mJobs.remove(next);
                return next;
            }
        }
        return null;
    }

    private CJobThread findThread(IId aQueueId) {
        if (aQueueId != null) {
            for (CJobThread th : this.mJobThreads) {
                if (!th.getQueueId().equals(aQueueId)) continue;
                return th;
            }
        }
        return null;
    }

    @Override
    public String getName() {
        return this.mName;
    }

    @Override
    public void appendThread(@NotNull String aName, @NotNull EThreadPriority aPriority, int aMaxConcurrentJobs) throws CException {
        CUtilCheck.checkEmptyString(aName, "Missing name", new Object[0]);
        int maxJobs = Math.max(Math.min(aMaxConcurrentJobs, 1000), 1);
        CEnvelope env = CEnvelope.forSingleTarget(this.getAddress());
        CRecord record = CRecordAddJobEngineThread.create();
        CRecordAddJobEngineThread.setName(record, aName);
        CRecordAddJobEngineThread.setPriority(record, aPriority.getValue());
        CRecordAddJobEngineThread.setMaxJobs(record, maxJobs);
        this.sendNotification(env, record);
    }

    @Override
    public void appendJob(@NotNull IJob aJob) {
        EJobStatus status = aJob.getStatus();
        if (status != EJobStatus.DETACHED) {
            LOG.error("Job has wrong status");
            return;
        }
        LOG.debug("Append Job {}", aJob.getJobName());
        aJob.setStatus(EJobStatus.WAITING);
        try {
            CEnvelope env = CEnvelope.forSingleTarget(this.getAddress());
            CRecord record = CRecordAddJob.create();
            CRecordAddJob.setJob(record, aJob);
            this.sendNotification(env, record);
        }
        catch (Exception aE) {
            LOG.error((Throwable)aE, "Couldn't add job");
        }
    }

    private void privateAppendJob(IJob aJob) throws CException {
        CJobThread matchingThread = this.findMatchingThread(aJob.getPriority());
        if (matchingThread != null) {
            matchingThread.startJob(aJob);
        } else {
            this.mJobs.add(aJob);
        }
    }

    private CJobThread findMatchingThread(@NotNull EThreadPriority aPriority) {
        CJobThread matchingThread = null;
        int jobCountOfMatchingThread = Integer.MAX_VALUE;
        for (CJobThread th : this.mJobThreads) {
            int max;
            int jobCount;
            EThreadPriority priority = th.getPriority();
            if (!aPriority.equals((Object)priority) || (jobCount = th.getJobCount()) >= (max = th.getMaxConcurrentJobs()) || jobCountOfMatchingThread <= jobCount) continue;
            jobCountOfMatchingThread = jobCount;
            matchingThread = th;
        }
        return matchingThread;
    }

    @Override
    public void removeJob(@NotNull IJob aJob) {
        UUID id = aJob.getId();
        String name = aJob.getJobName();
        EJobStatus status = aJob.getStatus();
        if (status == EJobStatus.WAITING) {
            boolean removed = this.mJobs.remove(aJob);
            if (removed) {
                LOG.debug("Job removed: id={}, name={}", id, name);
            }
        } else if (status == EJobStatus.RUNNING) {
            for (CJobThread th : this.mJobThreads) {
                try {
                    boolean found = th.stopJob(id);
                    if (!found) continue;
                    LOG.debug("Job removed: id={}, name={}", id, name);
                    break;
                }
                catch (CException aE) {
                    LOG.error(aE, "Error on stopping job {}", name);
                }
            }
        }
    }

    @Override
    public void dismiss() throws CException {
        this.stop();
        LOG.debug("Dismiss JobEngine {}", this.mName);
        INamespaceFactory nf = this.mLocalDependencies.getNamespaceFactory();
        nf.deleteNamespace(this.mNamespace.getNID());
    }

    @Override
    public int getRunningJobCount() {
        int count = 0;
        for (CJobThread th : this.mJobThreads) {
            count += th.getJobCount();
        }
        return count;
    }

    @Override
    public int getWaitingJobCount() {
        return this.mJobs.size();
    }

    @Override
    public String toString() {
        return "JobEngine " + this.mName + ": Running=" + this.getRunningJobCount() + ", Waiting=" + this.getWaitingJobCount();
    }

    @Override
    public void setOwner(@Nullable CTargetAddress aOwner) {
        this.mOwner.set(aOwner);
    }

    @Override
    public void start() throws CException {
        this.mStarted.set(true);
        for (CJobThread th : this.mJobThreads) {
            this.fillThread(th);
        }
    }

    public void stop() throws CException {
        this.mStarted.set(false);
    }

    private void sendNoMoreJobIf() throws CException {
        int waitingJobCount;
        int runningJobCount;
        CTargetAddress ownerAddress = this.mOwner.get();
        if (ownerAddress != null && (runningJobCount = this.getRunningJobCount()) + (waitingJobCount = this.getWaitingJobCount()) == 0) {
            CEnvelope env = CEnvelope.forSingleTarget(ownerAddress);
            CRecord record = CRecordNoMoreJobs.create();
            this.sendNotification(env, record);
        }
    }
}

