DatabaseAppender.java

package com.renomad.minum.database;


import com.renomad.minum.logging.ILogger;
import com.renomad.minum.state.Constants;
import com.renomad.minum.state.Context;
import com.renomad.minum.utils.FileUtils;
import com.renomad.minum.utils.MyThread;
import com.renomad.minum.utils.StacktraceUtils;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;

/**
 * This class provide the capability of appending database changes
 * to the disk, quickly and efficiently.
 */
final class DatabaseAppender {

    /**
     * Results in output like "2025_08_30_13_01_49_123", which is year_month_day_hour_minute_second_millisecond.
     * This can be used to parse the file names to {@link java.util.Date} so we can process the oldest
     * file first.
     */
    static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss_SSS");

    private final Path persistenceDirectory;

    Writer bufferedWriter;

    /**
     * if true, there is data in the buffered writer that needs to be
     * written to disk using {@link BufferedWriter#flush()}
     */
    private boolean bufferedWriterHasUnwrittenData;

    /**
     * This class field tracks the status of the loop which runs
     * a flush every second
     */
    private boolean flushLoopRunning;

    /**
     * The directory for this database
     */
    private final Path appendLogDirectory;

    /**
     * Used to create a thread that contains an inner loop
     * to flush the data to disk on a periodic basis
     */
    private final ExecutorService executorService;

    private final ILogger logger;

    private final ReentrantLock moveFileLock;

    /**
     * The maximum number of data's we will add to the append-only
     * file before we move on to a new file.
     */
    int maxAppendCount;

    /**
     * this is the current count of how many appends have
     * been made to the current database file.  Once it
     * exceeds a certain maximum, we'll switch to a
     * different file.
     */
    int appendCount;

    /**
     * This is the count of bytes that have been appended
     */
    private long appendBytes;

    DatabaseAppender(Path persistenceDirectory, Context context) throws IOException {
        this.persistenceDirectory = persistenceDirectory;
        this.appendLogDirectory = persistenceDirectory.resolve("append_logs");
        this.executorService = context.getExecutorService();
        this.logger = context.getLogger();
        Constants constants = context.getConstants();
        FileUtils fileUtils = new FileUtils(logger, constants);
        this.maxAppendCount = constants.maxAppendCount;
        fileUtils.makeDirectory(this.appendLogDirectory);
        moveFileLock = new ReentrantLock();
        createNewAppendFile();
    }

    /**
     * Creates a new append-file (a file used for appending data) and
     * resets the append count to zero.
     */
    private void createNewAppendFile() throws IOException {
        Path currentAppendFile = this.persistenceDirectory.resolve("currentAppendLog");

        // if we are starting up with an existing currentAppendLog, set the appendCount
        // appropriately.  Otherwise, initialize to 0.  The currentAppendLog file is
        // never very large - it's mostly a temporary place to store incoming data
        // until we can store it off elsewhere. For that reason, it's not a performance
        // concern to read all the existing lines, just to get the count of current lines.
        if (Files.exists(currentAppendFile)) {
            List<String> lines = Files.readAllLines(currentAppendFile);
            appendCount = lines.size();
        } else {
            // reset the count to zero, we're starting a new file.
            logger.logDebug(() -> "Creating a new database append file. Previous file: %,d lines, %.2f megabytes".formatted(appendCount, (appendBytes / 1_048_576.0)));
            appendCount = 0;
            appendBytes = 0;
        }

        bufferedWriter = Files.newBufferedWriter(currentAppendFile, StandardCharsets.US_ASCII, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
    }

    /**
     * Appends new data to the end of a file.
     * @return if we created a new append file, we'll return the name of it. Otherwise, an empty string.
     */
    String appendToDatabase(DatabaseChangeAction action, String serializedData) throws IOException {
        String newlyCreatedFileName = "";
        if (appendCount >= maxAppendCount) {
            moveFileLock.lock(); // block threads here if multiple are trying to get in - only one gets in at a time
            try {
                newlyCreatedFileName = saveOffWrapped(appendCount, maxAppendCount);
            } finally {
                moveFileLock.unlock();
            }
        }

        bufferedWriter.append(action.toString()).append(' ').append(serializedData).append('\n');
        setBufferedWriterHasUnwrittenData();
        appendCount += 1;
        appendBytes += serializedData.length() + 8; // 8 includes the action (e.g. UPDATE), a space character, and a newline
        return newlyCreatedFileName;
    }

    private void setBufferedWriterHasUnwrittenData() {
        bufferedWriterHasUnwrittenData = true;
        if (!flushLoopRunning) {
            initializeTimedFlusher();
        }
    }

    /**
     * This method is kicked off when there is new data added to
     * the {@link BufferedWriter}.  While there is data to write, it
     * will wake up every second to flush the data.  Once there is
     * no more data, it will end.
     */
    private void initializeTimedFlusher() {
        Runnable timedFlusherLoop = () -> {
            flushLoopRunning = true;
            Thread.currentThread().setName("database_timed_flusher");
            while (bufferedWriterHasUnwrittenData) {
                flush();

                // this code only runs when there is data to add, so no need to take a
                // lot of waiting time.  But, if the data is coming fast and furious,
                // at least a small wait will allow greater efficiency.
                MyThread.sleep(50);
            }
            flushLoopRunning = false;
        };
        executorService.submit(timedFlusherLoop);
    }

    /**
     * This helper just wraps a method to enable easier testing.
     * @return true if the appendCount is greater or equal to maxAppendCount,
     * meaning that we moved on to calling {@link #saveOffCurrentDataToReadyFolder()},
     * false otherwise.
     */
    String saveOffWrapped(int appendCount, int maxAppendCount) throws IOException {
        if (appendCount >= maxAppendCount) {
            return saveOffCurrentDataToReadyFolder();
        }
        return "";
    }

    /**
     * Move the append-only file to a new place to prepare for
     * consolidation, and reset the append count.
     * @return the name of the newly-created file
     */
    String saveOffCurrentDataToReadyFolder() throws IOException {
        flush();
        String newFileName = moveToReadyFolder();
        createNewAppendFile();
        return newFileName;
    }

    /**
     * When we are done filling a file, move it to the ready
     * folder named by the date + time + millis.
     * @return the name of the new file
     */
    private String moveToReadyFolder() throws IOException {
        String appendFile = simpleDateFormat.format(new java.util.Date());
        Files.move(persistenceDirectory.resolve("currentAppendLog"), this.appendLogDirectory.resolve(appendFile));
        return appendFile;
    }

    void flush() {
        flush(this.bufferedWriter, this.logger);
        this.bufferedWriterHasUnwrittenData = false;
    }

    static void flush(Writer writer, ILogger logger) {
        try {
            writer.flush();
        } catch (IOException e) {
            logger.logAsyncError(() -> "Error while flushing in TimedFlusher: " + StacktraceUtils.stackTraceToString(e));
            throw new DbException(e);
        }
    }
}