ActionQueue.java

package com.renomad.minum.queue;

import com.renomad.minum.state.Context;
import com.renomad.minum.logging.ILogger;
import com.renomad.minum.utils.*;

import java.util.concurrent.*;

/**
 * This class provides the ability to pop items into
 * a queue thread-safely and know they'll happen later.
 * <p>
 * For example, this is helpful for minum.logging, or passing
 * functions to a minum.database.  It lets us run a bit faster,
 * since the I/O actions are happening on a separate
 * thread and the only time required is passing the
 * function of what we want to run later.
 * </p>
 * <h3>Example:</h3>
 * <p>
 *     This example shows where an {@link java.io.InputStream} representing the bytes
 *     of an image file are sent to the {@link ActionQueue} for processing.  The call
 *     to {@link ActionQueue#enqueue(String, ThrowingRunnable)} returns immediately,
 *     and processing continues on another thread.
 * </p>
 * <pre>
 * {@code
 *  ActionQueue photoResizingQueue;
 *  InputStream photoInputStream;
 *  photoResizingQueue = new ActionQueue("photo_resizing", context).initialize();
 *  photoResizingQueue.enqueue("resize an image", () -> resizeImage(photoInputStream));
 * }
 * </pre>
 */
public final class ActionQueue implements AbstractActionQueue {
    private final String name;
    private final ExecutorService queueExecutor;
    private final LinkedBlockingQueue<RunnableWithDescription> queue;
    private final ILogger logger;
    private boolean stop = false;
    private Thread queueThread;
    private boolean isStoppedStatus;

    /**
     * See the {@link ActionQueue} description for more detail. This
     * constructor will build your new action queue and handle registering
     * it with a list of other action queues in the {@link Context} object.
     * @param name give this object a unique, explanatory name.
     */
    public ActionQueue(String name, Context context) {
        this.name = name;
        this.queueExecutor = context.getExecutorService();
        this.queue = new LinkedBlockingQueue<>();
        context.getActionQueueState().offerToQueue(this);
        this.logger = context.getLogger();
    }

    // Regarding the InfiniteLoopStatement - indeed, we expect that the while loop
    // below is an infinite loop unless there's an exception thrown, that's what it is.
    @SuppressWarnings("InfiniteLoopStatement")
    @Override
    public ActionQueue initialize() {
        Runnable centralLoop = () -> {
            Thread.currentThread().setName(name);
            this.queueThread = Thread.currentThread();
            try {
                while (true) {
                    runAction(logger, queue);
                }
            } catch (InterruptedException ex) {
                /*
                this is what we expect to happen.
                once this happens, we just continue on.
                this only gets called when we are trying to shut everything
                down cleanly
                 */
                logger.logDebug(() -> String.format("%s ActionQueue for %s is stopped.%n", TimeUtils.getTimestampIsoInstant(), name));
                Thread.currentThread().interrupt();
            }
        };
        queueExecutor.submit(centralLoop);
        return this;
    }

    static void runAction(ILogger logger, LinkedBlockingQueue<RunnableWithDescription> queue) throws InterruptedException {
        RunnableWithDescription action = queue.take();
        try {
            action.run();
        } catch (Throwable e) {
            /*
             This needs to be a Throwable and not an Exception, for important reasons.

             Sometimes, the command being run will encounter Errors, rather than
             Exceptions. For example, OutOfMemoryError.  We must catch it, log the
             issue, and move on to avoid killing the thread needlessly
             */
            logger.logAsyncError(() -> StacktraceUtils.stackTraceToString(e));
        }
    }

    /**
     * Adds something to the queue to be processed.
     * <p>
     *     Here is an example use of .enqueue:
     * </p>
     * <p>
     * <pre>
     * {@code   actionQueue.enqueue("Write person file to disk at " + filePath, () -> {
     *             Files.writeString(filePath, pf.serialize());
     *         });}
     * </pre>
     * </p>
     */
    @Override
    public void enqueue(String description, ThrowingRunnable action) {
        if (! stop) {
            queue.add(new RunnableWithDescription(action, description));
        } else {
            throw new UtilsException(String.format("failed to enqueue %s - ActionQueue \"%s\" is stopped", description, this.name));
        }
    }

    /**
     * Stops the action queue
     * @param count how many loops to wait before we crash it closed
     * @param sleepTime how long to wait in milliseconds between loops
     */
    @Override
    public void stop(int count, int sleepTime) {
        String timestamp = TimeUtils.getTimestampIsoInstant();
        logger.logDebug(() ->  String.format("%s Stopping queue %s", timestamp, this));
        stop = true;
        for (int i = 0; i < count; i++) {
            if (queue.isEmpty()) return;
            logger.logDebug(() ->  String.format("%s Queue not yet empty, has %d elements. waiting...%n",timestamp, queue.size()));
            MyThread.sleep(sleepTime);
        }
        isStoppedStatus = true;
        logger.logDebug(() -> String.format("%s Queue %s has %d elements left but we're done waiting.  Queue toString: %s", timestamp, this, queue.size(), queue));
    }

    /**
     * This will prevent any new actions being
     * queued (by setting the stop flag to true and thus
     * causing an exception to be thrown
     * when a call is made to [enqueue]) and will
     * block until the queue is empty.
     */
    @Override
    public void stop() {
        stop(5, 20);
    }

    @Override
    public String toString() {
        return this.name;
    }

    Thread getQueueThread() {
        return queueThread;
    }

    @Override
    public LinkedBlockingQueue<RunnableWithDescription> getQueue() {
        return new LinkedBlockingQueue<>(queue);
    }

    @Override
    public boolean isStopped() {
        return isStoppedStatus;
    }
}