1 | package com.renomad.minum.queue; | |
2 | ||
3 | import com.renomad.minum.state.Context; | |
4 | import com.renomad.minum.logging.ILogger; | |
5 | import com.renomad.minum.utils.*; | |
6 | ||
7 | import java.util.concurrent.*; | |
8 | ||
9 | /** | |
10 | * This class provides the ability to pop items into | |
11 | * a queue thread-safely and know they'll happen later. | |
12 | * <p> | |
13 | * For example, this is helpful for minum.logging, or passing | |
14 | * functions to a minum.database. It lets us run a bit faster, | |
15 | * since the I/O actions are happening on a separate | |
16 | * thread and the only time required is passing the | |
17 | * function of what we want to run later. | |
18 | */ | |
19 | public final class ActionQueue implements AbstractActionQueue { | |
20 | private final String name; | |
21 | private final ExecutorService queueExecutor; | |
22 | private final LinkedBlockingQueue<RunnableWithDescription> queue; | |
23 | private final ILogger logger; | |
24 | private boolean stop = false; | |
25 | private Thread queueThread; | |
26 | private boolean isStoppedStatus; | |
27 | ||
28 | /** | |
29 | * See the {@link ActionQueue} description for more detail. This | |
30 | * constructor will build your new action queue and handle registering | |
31 | * it with a list of other action queues in the {@link Context} object. | |
32 | * @param name give this object a unique, explanatory name. | |
33 | */ | |
34 | public ActionQueue(String name, Context context) { | |
35 | this.name = name; | |
36 | this.queueExecutor = context.getExecutorService(); | |
37 | this.queue = new LinkedBlockingQueue<>(); | |
38 |
1
1. <init> : removed call to com/renomad/minum/queue/ActionQueueState::offerToQueue → KILLED |
context.getActionQueueState().offerToQueue(this); |
39 | this.logger = context.getLogger(); | |
40 | } | |
41 | ||
42 | // Regarding the InfiniteLoopStatement - indeed, we expect that the while loop | |
43 | // below is an infinite loop unless there's an exception thrown, that's what it is. | |
44 | @SuppressWarnings("InfiniteLoopStatement") | |
45 | @Override | |
46 | public ActionQueue initialize() { | |
47 | Runnable centralLoop = () -> { | |
48 | Thread.currentThread().setName(name); | |
49 | this.queueThread = Thread.currentThread(); | |
50 | try { | |
51 | while (true) { | |
52 |
1
1. lambda$initialize$1 : removed call to com/renomad/minum/queue/ActionQueue::runAction → TIMED_OUT |
runAction(); |
53 | } | |
54 | } catch (InterruptedException ex) { | |
55 | /* | |
56 | this is what we expect to happen. | |
57 | once this happens, we just continue on. | |
58 | this only gets called when we are trying to shut everything | |
59 | down cleanly | |
60 | */ | |
61 | logger.logDebug(() -> String.format("%s ActionQueue for %s is stopped.%n", TimeUtils.getTimestampIsoInstant(), name)); | |
62 | Thread.currentThread().interrupt(); | |
63 | } | |
64 | }; | |
65 | queueExecutor.submit(centralLoop); | |
66 |
1
1. initialize : replaced return value with null for com/renomad/minum/queue/ActionQueue::initialize → KILLED |
return this; |
67 | } | |
68 | ||
69 | private void runAction() throws InterruptedException { | |
70 | RunnableWithDescription action = queue.take(); | |
71 | try { | |
72 |
1
1. runAction : removed call to com/renomad/minum/utils/RunnableWithDescription::run → KILLED |
action.run(); |
73 | } catch (Exception e) { | |
74 | logger.logAsyncError(() -> StacktraceUtils.stackTraceToString(e)); | |
75 | } | |
76 | } | |
77 | ||
78 | /** | |
79 | * Adds something to the queue to be processed. | |
80 | * <p> | |
81 | * Here is an example use of .enqueue: | |
82 | * </p> | |
83 | * <p> | |
84 | * <pre> | |
85 | * {@code actionQueue.enqueue("Write person file to disk at " + filePath, () -> { | |
86 | * Files.writeString(filePath, pf.serialize()); | |
87 | * });} | |
88 | * </pre> | |
89 | * </p> | |
90 | */ | |
91 | @Override | |
92 | public void enqueue(String description, ThrowingRunnable action) { | |
93 |
1
1. enqueue : negated conditional → KILLED |
if (! stop) { |
94 | queue.add(new RunnableWithDescription(action, description)); | |
95 | } else { | |
96 | throw new UtilsException(String.format("failed to enqueue %s - ActionQueue \"%s\" is stopped", description, this.name)); | |
97 | } | |
98 | } | |
99 | ||
100 | /** | |
101 | * Stops the action queue | |
102 | * @param count how many loops to wait before we crash it closed | |
103 | * @param sleepTime how long to wait in milliseconds between loops | |
104 | */ | |
105 | @Override | |
106 | public void stop(int count, int sleepTime) { | |
107 | String timestamp = TimeUtils.getTimestampIsoInstant(); | |
108 | logger.logDebug(() -> String.format("%s Stopping queue %s", timestamp, this)); | |
109 | stop = true; | |
110 |
2
1. stop : negated conditional → KILLED 2. stop : changed conditional boundary → KILLED |
for (int i = 0; i < count; i++) { |
111 |
1
1. stop : negated conditional → KILLED |
if (queue.isEmpty()) return; |
112 | logger.logDebug(() -> String.format("%s Queue not yet empty, has %d elements. waiting...%n",timestamp, queue.size())); | |
113 | MyThread.sleep(sleepTime); | |
114 | } | |
115 | isStoppedStatus = true; | |
116 | logger.logDebug(() -> String.format("%s Queue %s has %d elements left but we're done waiting. Queue toString: %s", timestamp, this, queue.size(), queue)); | |
117 | } | |
118 | ||
119 | /** | |
120 | * This will prevent any new actions being | |
121 | * queued (by setting the stop flag to true and thus | |
122 | * causing an exception to be thrown | |
123 | * when a call is made to [enqueue]) and will | |
124 | * block until the queue is empty. | |
125 | */ | |
126 | @Override | |
127 | public void stop() { | |
128 |
1
1. stop : removed call to com/renomad/minum/queue/ActionQueue::stop → KILLED |
stop(5, 20); |
129 | } | |
130 | ||
131 | @Override | |
132 | public String toString() { | |
133 |
1
1. toString : replaced return value with "" for com/renomad/minum/queue/ActionQueue::toString → KILLED |
return this.name; |
134 | } | |
135 | ||
136 | Thread getQueueThread() { | |
137 |
1
1. getQueueThread : replaced return value with null for com/renomad/minum/queue/ActionQueue::getQueueThread → KILLED |
return queueThread; |
138 | } | |
139 | ||
140 | @Override | |
141 | public LinkedBlockingQueue<RunnableWithDescription> getQueue() { | |
142 |
1
1. getQueue : replaced return value with null for com/renomad/minum/queue/ActionQueue::getQueue → KILLED |
return new LinkedBlockingQueue<>(queue); |
143 | } | |
144 | ||
145 | @Override | |
146 | public boolean isStopped() { | |
147 |
2
1. isStopped : replaced boolean return with true for com/renomad/minum/queue/ActionQueue::isStopped → KILLED 2. isStopped : replaced boolean return with false for com/renomad/minum/queue/ActionQueue::isStopped → KILLED |
return isStoppedStatus; |
148 | } | |
149 | } | |
Mutations | ||
38 |
1.1 |
|
52 |
1.1 |
|
66 |
1.1 |
|
72 |
1.1 |
|
93 |
1.1 |
|
110 |
1.1 2.2 |
|
111 |
1.1 |
|
128 |
1.1 |
|
133 |
1.1 |
|
137 |
1.1 |
|
142 |
1.1 |
|
147 |
1.1 2.2 |