ActionQueue.java

1
package com.renomad.minum.queue;
2
3
import com.renomad.minum.state.Context;
4
import com.renomad.minum.logging.ILogger;
5
import com.renomad.minum.utils.*;
6
7
import java.util.concurrent.*;
8
9
/**
10
 * This class provides the ability to pop items into
11
 * a queue thread-safely and know they'll happen later.
12
 * <p>
13
 * For example, this is helpful for minum.logging, or passing
14
 * functions to a minum.database.  It lets us run a bit faster,
15
 * since the I/O actions are happening on a separate
16
 * thread and the only time required is passing the
17
 * function of what we want to run later.
18
 * </p>
19
 * <h3>Example:</h3>
20
 * <p>
21
 *     This example shows where an {@link java.io.InputStream} representing the bytes
22
 *     of an image file are sent to the {@link ActionQueue} for processing.  The call
23
 *     to {@link ActionQueue#enqueue(String, ThrowingRunnable)} returns immediately,
24
 *     and processing continues on another thread.
25
 * </p>
26
 * <pre>
27
 * {@code
28
 *  ActionQueue photoResizingQueue;
29
 *  InputStream photoInputStream;
30
 *  photoResizingQueue = new ActionQueue("photo_resizing", context).initialize();
31
 *  photoResizingQueue.enqueue("resize an image", () -> resizeImage(photoInputStream));
32
 * }
33
 * </pre>
34
 */
35
public final class ActionQueue implements AbstractActionQueue {
36
    private final String name;
37
    private final ExecutorService queueExecutor;
38
    private final LinkedBlockingQueue<RunnableWithDescription> queue;
39
    private final ILogger logger;
40
    /**
41
     * Set as volatile so that multiple threads may see the update
42
     * as soon as it occurs.
43
     */
44
    private volatile boolean stop = false;
45
    private Thread queueThread;
46
    private volatile boolean isStoppedStatus;
47
48
    /**
49
     * See the {@link ActionQueue} description for more detail. This
50
     * constructor will build your new action queue and handle registering
51
     * it with a list of other action queues in the {@link Context} object.
52
     * @param name give this object a unique, explanatory name.
53
     */
54
    public ActionQueue(String name, Context context) {
55
        this.name = name;
56
        this.queueExecutor = context.getExecutorService();
57
        this.queue = new LinkedBlockingQueue<>();
58 1 1. <init> : removed call to com/renomad/minum/queue/ActionQueueState::offerToQueue → KILLED
        context.getActionQueueState().offerToQueue(this);
59
        this.logger = context.getLogger();
60
    }
61
62
    // Regarding the InfiniteLoopStatement - indeed, we expect that the while loop
63
    // below is an infinite loop unless there's an exception thrown, that's what it is.
64
    @SuppressWarnings("InfiniteLoopStatement")
65
    @Override
66
    public ActionQueue initialize() {
67
        Runnable centralLoop = () -> {
68
            Thread.currentThread().setName(name);
69
            this.queueThread = Thread.currentThread();
70
            try {
71
                while (true) {
72 1 1. lambda$initialize$1 : removed call to com/renomad/minum/queue/ActionQueue::runAction → TIMED_OUT
                    runAction(logger, queue);
73
                }
74
            } catch (InterruptedException ex) {
75
                /*
76
                this is what we expect to happen.
77
                once this happens, we just continue on.
78
                this only gets called when we are trying to shut everything
79
                down cleanly
80
                 */
81
                logger.logDebug(() -> String.format("%s ActionQueue for %s is stopped.%n", TimeUtils.getTimestampIsoInstant(), name));
82
                Thread.currentThread().interrupt();
83
            }
84
        };
85
        queueExecutor.submit(centralLoop);
86 1 1. initialize : replaced return value with null for com/renomad/minum/queue/ActionQueue::initialize → KILLED
        return this;
87
    }
88
89
    static void runAction(ILogger logger, LinkedBlockingQueue<RunnableWithDescription> queue) throws InterruptedException {
90
        RunnableWithDescription action = queue.take();
91
        logger.logTrace(() -> "in ActionQueue.runAction, running this: " + action);
92 1 1. runAction : removed call to java/lang/Runnable::run → KILLED
        ThrowingRunnable.throwingRunnableWrapper(action, logger).run();
93
    }
94
95
    /**
96
     * Adds something to the queue to be processed.
97
     * <p>
98
     *     Here is an example use of .enqueue:
99
     * </p>
100
     * <pre>
101
     * {@code   actionQueue.enqueue("Write person file to disk at " + filePath, () -> {
102
     *             Files.writeString(filePath, pf.serialize());
103
     *         });}
104
     * </pre>
105
     */
106
    @Override
107
    public void enqueue(String description, ThrowingRunnable action) {
108 1 1. enqueue : negated conditional → KILLED
        if (! stop) {
109
            queue.add(new RunnableWithDescription(action, description));
110
        } else {
111
            throw new QueueException(String.format("failed to enqueue %s - ActionQueue \"%s\" is stopped", description, this.name));
112
        }
113
    }
114
115
    /**
116
     * Stops the action queue
117
     * @param count how many loops to wait before we crash it closed
118
     * @param sleepTime how long to wait in milliseconds between loops
119
     */
120
    @Override
121
    public void stop(int count, int sleepTime) {
122
        String timestamp = TimeUtils.getTimestampIsoInstant();
123
        logger.logDebug(() ->  String.format("%s Stopping queue %s", timestamp, this));
124
        stop = true;
125 2 1. stop : negated conditional → KILLED
2. stop : changed conditional boundary → KILLED
        for (int i = 0; i < count; i++) {
126 1 1. stop : negated conditional → TIMED_OUT
            if (queue.isEmpty()) return;
127
            logger.logDebug(() ->  String.format("%s Queue not yet empty, has %d elements. waiting...%n",timestamp, queue.size()));
128
            MyThread.sleep(sleepTime);
129
        }
130
        isStoppedStatus = true;
131
        logger.logDebug(() -> String.format("%s Queue %s has %d elements left but we're done waiting.  Queue toString: %s", timestamp, this, queue.size(), queue));
132
    }
133
134
    /**
135
     * This will prevent any new actions being
136
     * queued (by setting the stop flag to true and thus
137
     * causing an exception to be thrown
138
     * when a call is made to [enqueue]) and will
139
     * block until the queue is empty.
140
     */
141
    @Override
142
    public void stop() {
143 1 1. stop : removed call to com/renomad/minum/queue/ActionQueue::stop → KILLED
        stop(5, 20);
144
    }
145
146
    @Override
147
    public String toString() {
148 1 1. toString : replaced return value with "" for com/renomad/minum/queue/ActionQueue::toString → KILLED
        return this.name;
149
    }
150
151
    Thread getQueueThread() {
152 1 1. getQueueThread : replaced return value with null for com/renomad/minum/queue/ActionQueue::getQueueThread → KILLED
        return queueThread;
153
    }
154
155
    @Override
156
    public LinkedBlockingQueue<RunnableWithDescription> getQueue() {
157 1 1. getQueue : replaced return value with null for com/renomad/minum/queue/ActionQueue::getQueue → KILLED
        return new LinkedBlockingQueue<>(queue);
158
    }
159
160
    @Override
161
    public boolean isStopped() {
162 2 1. isStopped : replaced boolean return with true for com/renomad/minum/queue/ActionQueue::isStopped → KILLED
2. isStopped : replaced boolean return with false for com/renomad/minum/queue/ActionQueue::isStopped → KILLED
        return isStoppedStatus;
163
    }
164
}

Mutations

58

1.1
Location : <init>
Killed by : com.renomad.minum.utils.ActionQueueKillerTests
removed call to com/renomad/minum/queue/ActionQueueState::offerToQueue → KILLED

72

1.1
Location : lambda$initialize$1
Killed by : none
removed call to com/renomad/minum/queue/ActionQueue::runAction → TIMED_OUT

86

1.1
Location : initialize
Killed by : com.renomad.minum.utils.ActionQueueKillerTests
replaced return value with null for com/renomad/minum/queue/ActionQueue::initialize → KILLED

92

1.1
Location : runAction
Killed by : com.renomad.minum.utils.ActionQueueTests
removed call to java/lang/Runnable::run → KILLED

108

1.1
Location : enqueue
Killed by : com.renomad.minum.utils.ActionQueueKillerTests
negated conditional → KILLED

125

1.1
Location : stop
Killed by : com.renomad.minum.utils.ActionQueueTests
negated conditional → KILLED

2.2
Location : stop
Killed by : com.renomad.minum.utils.ActionQueueTests
changed conditional boundary → KILLED

126

1.1
Location : stop
Killed by : none
negated conditional → TIMED_OUT

143

1.1
Location : stop
Killed by : com.renomad.minum.utils.ActionQueueKillerTests
removed call to com/renomad/minum/queue/ActionQueue::stop → KILLED

148

1.1
Location : toString
Killed by : com.renomad.minum.utils.ActionQueueKillerTests
replaced return value with "" for com/renomad/minum/queue/ActionQueue::toString → KILLED

152

1.1
Location : getQueueThread
Killed by : com.renomad.minum.utils.ActionQueueKillerTests
replaced return value with null for com/renomad/minum/queue/ActionQueue::getQueueThread → KILLED

157

1.1
Location : getQueue
Killed by : com.renomad.minum.utils.ActionQueueTests
replaced return value with null for com/renomad/minum/queue/ActionQueue::getQueue → KILLED

162

1.1
Location : isStopped
Killed by : com.renomad.minum.utils.ActionQueueTests
replaced boolean return with true for com/renomad/minum/queue/ActionQueue::isStopped → KILLED

2.2
Location : isStopped
Killed by : com.renomad.minum.utils.ActionQueueTests
replaced boolean return with false for com/renomad/minum/queue/ActionQueue::isStopped → KILLED

Active mutators

Tests examined


Report generated by PIT 1.17.0