ActionQueue.java

package com.renomad.minum.queue;

import com.renomad.minum.state.Context;
import com.renomad.minum.logging.ILogger;
import com.renomad.minum.utils.*;

import java.util.concurrent.*;

/**
 * This class provides the ability to pop items into
 * a queue thread-safely and know they'll happen later.
 * <p>
 * For example, this is helpful for minum.logging, or passing
 * functions to a minum.database.  It lets us run a bit faster,
 * since the I/O actions are happening on a separate
 * thread and the only time required is passing the
 * function of what we want to run later.
 */
public final class ActionQueue implements AbstractActionQueue {
    private final String name;
    private final ExecutorService queueExecutor;
    private final LinkedBlockingQueue<RunnableWithDescription> queue;
    private final ILogger logger;
    private boolean stop = false;
    private Thread queueThread;
    private boolean isStoppedStatus;

    /**
     * See the {@link ActionQueue} description for more detail. This
     * constructor will build your new action queue and handle registering
     * it with a list of other action queues in the {@link Context} object.
     * @param name give this object a unique, explanatory name.
     */
    public ActionQueue(String name, Context context) {
        this.name = name;
        this.queueExecutor = context.getExecutorService();
        this.queue = new LinkedBlockingQueue<>();
        context.getActionQueueState().offerToQueue(this);
        this.logger = context.getLogger();
    }

    // Regarding the InfiniteLoopStatement - indeed, we expect that the while loop
    // below is an infinite loop unless there's an exception thrown, that's what it is.
    @SuppressWarnings("InfiniteLoopStatement")
    @Override
    public ActionQueue initialize() {
        Runnable centralLoop = () -> {
            Thread.currentThread().setName(name);
            this.queueThread = Thread.currentThread();
            try {
                while (true) {
                    runAction();
                }
            } catch (InterruptedException ex) {
                /*
                this is what we expect to happen.
                once this happens, we just continue on.
                this only gets called when we are trying to shut everything
                down cleanly
                 */
                logger.logDebug(() -> String.format("%s ActionQueue for %s is stopped.%n", TimeUtils.getTimestampIsoInstant(), name));
                Thread.currentThread().interrupt();
            }
        };
        queueExecutor.submit(centralLoop);
        return this;
    }

    private void runAction() throws InterruptedException {
        RunnableWithDescription action = queue.take();
        try {
            action.run();
        } catch (Exception e) {
            logger.logAsyncError(() -> StacktraceUtils.stackTraceToString(e));
        }
    }

    /**
     * Adds something to the queue to be processed.
     * <p>
     *     Here is an example use of .enqueue:
     * </p>
     * <p>
     * <pre>
     * {@code   actionQueue.enqueue("Write person file to disk at " + filePath, () -> {
     *             Files.writeString(filePath, pf.serialize());
     *         });}
     * </pre>
     * </p>
     */
    @Override
    public void enqueue(String description, ThrowingRunnable action) {
        if (! stop) {
            queue.add(new RunnableWithDescription(action, description));
        } else {
            throw new UtilsException(String.format("failed to enqueue %s - ActionQueue \"%s\" is stopped", description, this.name));
        }
    }

    /**
     * Stops the action queue
     * @param count how many loops to wait before we crash it closed
     * @param sleepTime how long to wait in milliseconds between loops
     */
    @Override
    public void stop(int count, int sleepTime) {
        String timestamp = TimeUtils.getTimestampIsoInstant();
        logger.logDebug(() ->  String.format("%s Stopping queue %s", timestamp, this));
        stop = true;
        for (int i = 0; i < count; i++) {
            if (queue.isEmpty()) return;
            logger.logDebug(() ->  String.format("%s Queue not yet empty, has %d elements. waiting...%n",timestamp, queue.size()));
            MyThread.sleep(sleepTime);
        }
        isStoppedStatus = true;
        logger.logDebug(() -> String.format("%s Queue %s has %d elements left but we're done waiting.  Queue toString: %s", timestamp, this, queue.size(), queue));
    }

    /**
     * This will prevent any new actions being
     * queued (by setting the stop flag to true and thus
     * causing an exception to be thrown
     * when a call is made to [enqueue]) and will
     * block until the queue is empty.
     */
    @Override
    public void stop() {
        stop(5, 20);
    }

    @Override
    public String toString() {
        return this.name;
    }

    Thread getQueueThread() {
        return queueThread;
    }

    @Override
    public LinkedBlockingQueue<RunnableWithDescription> getQueue() {
        return new LinkedBlockingQueue<>(queue);
    }

    @Override
    public boolean isStopped() {
        return isStoppedStatus;
    }
}