| 1 | package com.renomad.minum.queue; | |
| 2 | ||
| 3 | import com.renomad.minum.state.Context; | |
| 4 | import com.renomad.minum.logging.ILogger; | |
| 5 | import com.renomad.minum.utils.*; | |
| 6 | ||
| 7 | import java.util.concurrent.*; | |
| 8 | ||
| 9 | /** | |
| 10 | * This class provides the ability to pop items into | |
| 11 | * a queue thread-safely and know they'll happen later. | |
| 12 | * <p> | |
| 13 | * For example, this is helpful for minum.logging, or passing | |
| 14 | * functions to a minum.database. It lets us run a bit faster, | |
| 15 | * since the I/O actions are happening on a separate | |
| 16 | * thread and the only time required is passing the | |
| 17 | * function of what we want to run later. | |
| 18 | * </p> | |
| 19 | * <h3>Example:</h3> | |
| 20 | * <p> | |
| 21 | * This example shows where an {@link java.io.InputStream} representing the bytes | |
| 22 | * of an image file are sent to the {@link ActionQueue} for processing. The call | |
| 23 | * to {@link ActionQueue#enqueue(String, ThrowingRunnable)} returns immediately, | |
| 24 | * and processing continues on another thread. | |
| 25 | * </p> | |
| 26 | * <pre> | |
| 27 | * {@code | |
| 28 | * ActionQueue photoResizingQueue; | |
| 29 | * InputStream photoInputStream; | |
| 30 | * photoResizingQueue = new ActionQueue("photo_resizing", context).initialize(); | |
| 31 | * photoResizingQueue.enqueue("resize an image", () -> resizeImage(photoInputStream)); | |
| 32 | * } | |
| 33 | * </pre> | |
| 34 | */ | |
| 35 | public final class ActionQueue implements AbstractActionQueue { | |
| 36 | private final String name; | |
| 37 | private final ExecutorService queueExecutor; | |
| 38 | private final LinkedBlockingQueue<RunnableWithDescription> queue; | |
| 39 | private final ILogger logger; | |
| 40 | private boolean stop = false; | |
| 41 | private Thread queueThread; | |
| 42 | private boolean isStoppedStatus; | |
| 43 | ||
| 44 | /** | |
| 45 | * See the {@link ActionQueue} description for more detail. This | |
| 46 | * constructor will build your new action queue and handle registering | |
| 47 | * it with a list of other action queues in the {@link Context} object. | |
| 48 | * @param name give this object a unique, explanatory name. | |
| 49 | */ | |
| 50 | public ActionQueue(String name, Context context) { | |
| 51 | this.name = name; | |
| 52 | this.queueExecutor = context.getExecutorService(); | |
| 53 | this.queue = new LinkedBlockingQueue<>(); | |
| 54 |
1
1. <init> : removed call to com/renomad/minum/queue/ActionQueueState::offerToQueue → KILLED |
context.getActionQueueState().offerToQueue(this); |
| 55 | this.logger = context.getLogger(); | |
| 56 | } | |
| 57 | ||
| 58 | // Regarding the InfiniteLoopStatement - indeed, we expect that the while loop | |
| 59 | // below is an infinite loop unless there's an exception thrown, that's what it is. | |
| 60 | @SuppressWarnings("InfiniteLoopStatement") | |
| 61 | @Override | |
| 62 | public ActionQueue initialize() { | |
| 63 | Runnable centralLoop = () -> { | |
| 64 | Thread.currentThread().setName(name); | |
| 65 | this.queueThread = Thread.currentThread(); | |
| 66 | try { | |
| 67 | while (true) { | |
| 68 |
1
1. lambda$initialize$1 : removed call to com/renomad/minum/queue/ActionQueue::runAction → TIMED_OUT |
runAction(logger, queue); |
| 69 | } | |
| 70 | } catch (InterruptedException ex) { | |
| 71 | /* | |
| 72 | this is what we expect to happen. | |
| 73 | once this happens, we just continue on. | |
| 74 | this only gets called when we are trying to shut everything | |
| 75 | down cleanly | |
| 76 | */ | |
| 77 | logger.logDebug(() -> String.format("%s ActionQueue for %s is stopped.%n", TimeUtils.getTimestampIsoInstant(), name)); | |
| 78 | Thread.currentThread().interrupt(); | |
| 79 | } | |
| 80 | }; | |
| 81 | queueExecutor.submit(centralLoop); | |
| 82 |
1
1. initialize : replaced return value with null for com/renomad/minum/queue/ActionQueue::initialize → KILLED |
return this; |
| 83 | } | |
| 84 | ||
| 85 | static void runAction(ILogger logger, LinkedBlockingQueue<RunnableWithDescription> queue) throws InterruptedException { | |
| 86 | RunnableWithDescription action = queue.take(); | |
| 87 | try { | |
| 88 |
1
1. runAction : removed call to com/renomad/minum/utils/RunnableWithDescription::run → KILLED |
action.run(); |
| 89 | } catch (Throwable e) { | |
| 90 | /* | |
| 91 | This needs to be a Throwable and not an Exception, for important reasons. | |
| 92 | ||
| 93 | Sometimes, the command being run will encounter Errors, rather than | |
| 94 | Exceptions. For example, OutOfMemoryError. We must catch it, log the | |
| 95 | issue, and move on to avoid killing the thread needlessly | |
| 96 | */ | |
| 97 | logger.logAsyncError(() -> StacktraceUtils.stackTraceToString(e)); | |
| 98 | } | |
| 99 | } | |
| 100 | ||
| 101 | /** | |
| 102 | * Adds something to the queue to be processed. | |
| 103 | * <p> | |
| 104 | * Here is an example use of .enqueue: | |
| 105 | * </p> | |
| 106 | * <p> | |
| 107 | * <pre> | |
| 108 | * {@code actionQueue.enqueue("Write person file to disk at " + filePath, () -> { | |
| 109 | * Files.writeString(filePath, pf.serialize()); | |
| 110 | * });} | |
| 111 | * </pre> | |
| 112 | * </p> | |
| 113 | */ | |
| 114 | @Override | |
| 115 | public void enqueue(String description, ThrowingRunnable action) { | |
| 116 |
1
1. enqueue : negated conditional → KILLED |
if (! stop) { |
| 117 | queue.add(new RunnableWithDescription(action, description)); | |
| 118 | } else { | |
| 119 | throw new UtilsException(String.format("failed to enqueue %s - ActionQueue \"%s\" is stopped", description, this.name)); | |
| 120 | } | |
| 121 | } | |
| 122 | ||
| 123 | /** | |
| 124 | * Stops the action queue | |
| 125 | * @param count how many loops to wait before we crash it closed | |
| 126 | * @param sleepTime how long to wait in milliseconds between loops | |
| 127 | */ | |
| 128 | @Override | |
| 129 | public void stop(int count, int sleepTime) { | |
| 130 | String timestamp = TimeUtils.getTimestampIsoInstant(); | |
| 131 | logger.logDebug(() -> String.format("%s Stopping queue %s", timestamp, this)); | |
| 132 | stop = true; | |
| 133 |
2
1. stop : negated conditional → TIMED_OUT 2. stop : changed conditional boundary → TIMED_OUT |
for (int i = 0; i < count; i++) { |
| 134 |
1
1. stop : negated conditional → TIMED_OUT |
if (queue.isEmpty()) return; |
| 135 | logger.logDebug(() -> String.format("%s Queue not yet empty, has %d elements. waiting...%n",timestamp, queue.size())); | |
| 136 | MyThread.sleep(sleepTime); | |
| 137 | } | |
| 138 | isStoppedStatus = true; | |
| 139 | logger.logDebug(() -> String.format("%s Queue %s has %d elements left but we're done waiting. Queue toString: %s", timestamp, this, queue.size(), queue)); | |
| 140 | } | |
| 141 | ||
| 142 | /** | |
| 143 | * This will prevent any new actions being | |
| 144 | * queued (by setting the stop flag to true and thus | |
| 145 | * causing an exception to be thrown | |
| 146 | * when a call is made to [enqueue]) and will | |
| 147 | * block until the queue is empty. | |
| 148 | */ | |
| 149 | @Override | |
| 150 | public void stop() { | |
| 151 |
1
1. stop : removed call to com/renomad/minum/queue/ActionQueue::stop → KILLED |
stop(5, 20); |
| 152 | } | |
| 153 | ||
| 154 | @Override | |
| 155 | public String toString() { | |
| 156 |
1
1. toString : replaced return value with "" for com/renomad/minum/queue/ActionQueue::toString → KILLED |
return this.name; |
| 157 | } | |
| 158 | ||
| 159 | Thread getQueueThread() { | |
| 160 |
1
1. getQueueThread : replaced return value with null for com/renomad/minum/queue/ActionQueue::getQueueThread → KILLED |
return queueThread; |
| 161 | } | |
| 162 | ||
| 163 | @Override | |
| 164 | public LinkedBlockingQueue<RunnableWithDescription> getQueue() { | |
| 165 |
1
1. getQueue : replaced return value with null for com/renomad/minum/queue/ActionQueue::getQueue → KILLED |
return new LinkedBlockingQueue<>(queue); |
| 166 | } | |
| 167 | ||
| 168 | @Override | |
| 169 | public boolean isStopped() { | |
| 170 |
2
1. isStopped : replaced boolean return with true for com/renomad/minum/queue/ActionQueue::isStopped → KILLED 2. isStopped : replaced boolean return with false for com/renomad/minum/queue/ActionQueue::isStopped → KILLED |
return isStoppedStatus; |
| 171 | } | |
| 172 | } | |
Mutations | ||
| 54 |
1.1 |
|
| 68 |
1.1 |
|
| 82 |
1.1 |
|
| 88 |
1.1 |
|
| 116 |
1.1 |
|
| 133 |
1.1 2.2 |
|
| 134 |
1.1 |
|
| 151 |
1.1 |
|
| 156 |
1.1 |
|
| 160 |
1.1 |
|
| 165 |
1.1 |
|
| 170 |
1.1 2.2 |