package org.semanticweb.elk.util.concurrent.computation;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.semanticweb.elk.util.concurrent.computation.InputProcessorFactory;

/* loaded from: input_file:org/semanticweb/elk/util/concurrent/computation/ConcurrentComputationWithInputs.class */
public class ConcurrentComputationWithInputs<I, F extends InputProcessorFactory<I, ?>> extends ConcurrentComputation<F> {
    private final BlockingQueue<I> buffer_;
    private final int bufferCapacity_;
    private final I poison_pill_;

    /* loaded from: input_file:org/semanticweb/elk/util/concurrent/computation/ConcurrentComputationWithInputs$Worker.class */
    private class Worker implements Runnable {
        private RuntimeException workerException_;

        private Worker() {
            this.workerException_ = null;
        }

        @Override // java.lang.Runnable
        public final void run() {
            InputProcessor inputProcessor = (InputProcessor) ((InputProcessorFactory) ConcurrentComputationWithInputs.this.processorFactory).getEngine();
            try {
                boolean z = false;
                while (true) {
                    if (!z) {
                        try {
                            inputProcessor.process();
                            z = true;
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            ConcurrentComputationWithInputs.this.wakeUpWorker();
                            if (this.workerException_ != null) {
                                throw this.workerException_;
                            }
                            inputProcessor.finish();
                            return;
                        } catch (Throwable th) {
                            this.workerException_ = new RuntimeException("Exception in worker thread: ", th);
                            ConcurrentComputationWithInputs.this.wakeUpWorker();
                            if (this.workerException_ != null) {
                                throw this.workerException_;
                            }
                            inputProcessor.finish();
                            return;
                        }
                    }
                    Object take = ConcurrentComputationWithInputs.this.buffer_.take();
                    if (take != ConcurrentComputationWithInputs.this.poison_pill_) {
                        inputProcessor.submit(take);
                        inputProcessor.process();
                    }
                    if (ConcurrentComputationWithInputs.this.termination || ConcurrentComputationWithInputs.this.isInterrupted()) {
                        if (ConcurrentComputationWithInputs.this.buffer_.isEmpty()) {
                            break;
                        } else if (ConcurrentComputationWithInputs.this.isInterrupted()) {
                            if (ConcurrentComputationWithInputs.this.buffer_.size() != ConcurrentComputationWithInputs.this.bufferCapacity_) {
                                break;
                            }
                        }
                    }
                }
                ConcurrentComputationWithInputs.this.wakeUpWorker();
                if (this.workerException_ != null) {
                    throw this.workerException_;
                }
                inputProcessor.finish();
            } catch (Throwable th2) {
                ConcurrentComputationWithInputs.this.wakeUpWorker();
                if (this.workerException_ != null) {
                    throw this.workerException_;
                }
                inputProcessor.finish();
                throw th2;
            }
        }
    }

    public ConcurrentComputationWithInputs(F f, ConcurrentExecutor concurrentExecutor, int i, int i2) {
        super(f, concurrentExecutor, i);
        this.poison_pill_ = (I) new Object();
        i2 = i2 <= i ? i + 1 : i2;
        this.bufferCapacity_ = i2;
        this.buffer_ = new ArrayBlockingQueue(i2);
    }

    public ConcurrentComputationWithInputs(F f, ConcurrentExecutor concurrentExecutor, int i) {
        this(f, concurrentExecutor, i, 512 + (32 * i));
    }

    public synchronized boolean submit(I i) throws InterruptedException {
        if (this.termination || isInterrupted()) {
            return false;
        }
        this.buffer_.put(i);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void wakeUpWorker() {
        if (this.buffer_.isEmpty()) {
            this.buffer_.offer(this.poison_pill_);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.semanticweb.elk.util.concurrent.computation.ConcurrentComputation
    public synchronized void waitWorkers() throws InterruptedException {
        wakeUpWorker();
        super.waitWorkers();
        while (this.buffer_.peek() == this.poison_pill_) {
            this.buffer_.remove();
        }
    }

    @Override // org.semanticweb.elk.util.concurrent.computation.ConcurrentComputation
    Runnable getWorker() {
        return new Worker();
    }
}
