DbEngine2.java

package com.renomad.minum.database;

import com.renomad.minum.state.Context;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.renomad.minum.utils.Invariants.mustBeFalse;
import static com.renomad.minum.utils.Invariants.mustBeTrue;

/**
 * a memory-based disk-persisted database class.
 *
 * <p>
 *     Engine 2 is a database engine that improves on the performance from the first
 *     database provided by Minum. It does this by using different strategies for disk persistence.
 * </p>
 * <p>
 *     The mental model of the previous Minum database has been an in-memory data
 *     structure in which every change is eventually written to its own file on disk for
 *     persistence.  Data changes affect just their relevant files.  The benefit of this approach is
 *     extreme simplicity. It requires very little code, relying as it does on the operating system's file capabilities.
 * </p>
 * <p>
 *     However, there are two performance problems with this approach.  First is when the
 *     data changes are arriving at a high rate.  In that situation, the in-memory portion keeps up to date,
 *     but the disk portion may lag by minutes.  The second problem is start-up time.  When
 *     the database starts, it reads files into memory.  The database can read about 6,000
 *     files a second in the best case.  If there are a million data items, it would take
 *     about 160 seconds to load it into memory, which is far too long.
 * </p>
 * <p>
 *      The new approach to disk persistence is to append each change to a file.  Append-only file
 *      changes can be very fast.  These append files are eventually consolidated into files
 *      partitioned by their index - data with indexes between 1 and 1000 go into one file, between
 *      1001 and 2000 go into another, and so on.
 *  </p>
 *  <p>
 *      Startup is magnitudes faster by this approach.  What took the previous database 160 seconds
 *      to load requires only 2 seconds. Writes to disk are also faster. What would have taken
 *      several minutes to write should only take a few seconds now.
 *  </p>
 *  <p>
 *      This new approach uses a different file structure than the previous. If it is
 *      desired to use the new engine on existing data, it is possible to convert the old
 *      data format to the new.  Construct an instance of the new engine, pointing
 *      at the same name as the previous, and it will convert the data.  If the previous
 *      call looked like this:
 *  </p>
 *  <code>
 *  Db<Photograph> photoDb = context.getDb("photos", Photograph.EMPTY);
 *  </code>
 *  <p>
 *  Then converting to the new database is just replacing it with the following
 *  line. <b>Please, backup your database before this change.</b>
 *  </p>
 *  <p>
 * <code>
 *     DbEngine2<Photograph> photoDb = context.getDb2("photos", Photograph.EMPTY);
 * </code>
 *  </p>
 *  <p>
 *     Once the new engine starts up, it will notice the old file structure and convert it
 *     over.  The methods and behaviors are mostly the same between the old and new engines, so the
 *     update should be straightforward.
 * </p>
 * <p>
 *     (By the way, it *is* possible to convert back to the old file structure,
 *     by starting the database the old way again.  Just be aware that each time the
 *     files are converted, it takes longer than normal to start the database)
 * </p>
 * <p>
 *     However, something to note is that using the old database is still fine in many cases,
 *     particularly for prototypes or systems which do not contain large amounts of data. If
 *     your system is working fine, there is no need to change things.
 * </p>
 *
 * @param <T> the type of data we'll be persisting (must extend from {@link DbData})
 */
public final class DbEngine2<T extends DbData<?>> extends AbstractDb<T> {

    private final ReentrantLock loadDataLock;
    private final ReentrantLock consolidateLock;
    private final ReentrantLock writeLock;
    int maxLinesPerAppendFile;
    boolean hasLoadedData;
    final DatabaseAppender databaseAppender;
    private final DatabaseConsolidator databaseConsolidator;

    /**
     * Here we track the number of appends we have made.  Once it hits
     * a certain number, we will kick off a consolidation in a thread
     */
    final AtomicInteger appendCount = new AtomicInteger(0);

    /**
     * Used to determine whether to kick off consolidation.  If it is
     * already running, we don't want to kick it off again. This would
     * only affect us if we are updating the database very fast.
     */
    boolean consolidationIsRunning;

    /**
     * Constructs an in-memory disk-persisted database.
     * Loading of data from disk happens at the first invocation of any command
     * changing or requesting data, such as {@link #write(DbData)}, {@link #delete(DbData)},
     * or {@link #values()}.  See the private method loadData() for details.
     * @param dbDirectory this uniquely names your database, and also sets the directory
     *                    name for this data.  The expected use case is to name this after
     *                    the data in question.  For example, "users", or "accounts".
     * @param context used to provide important state data to several components
     * @param instance an instance of the {@link DbData} object relevant for use in this database. Note
     *                 that each database (that is, each instance of this class), focuses on just one
     *                 data, which must be an implementation of {@link DbData}.
     */
    public DbEngine2(Path dbDirectory, Context context, T instance) {
        super(dbDirectory, context, instance);

        this.databaseConsolidator = new DatabaseConsolidator(dbDirectory, context);
        try {
            this.databaseAppender = new DatabaseAppender(dbDirectory, context);
        } catch (IOException e) {
            throw new DbException("Error while initializing DatabaseAppender in DbEngine2", e);
        }
        this.loadDataLock = new ReentrantLock();
        this.consolidateLock = new ReentrantLock();
        this.writeLock = new ReentrantLock();
        this.maxLinesPerAppendFile = context.getConstants().maxAppendCount;
    }

    /**
     * Write data to the database.  Use an index of 0 to store new data, and a positive
     * non-zero value to update data.
     * <p><em>
     *     Example of adding new data to the database:
     * </p></em>
     * {@snippet :
     *          final var newSalt = StringUtils.generateSecureRandomString(10);
     *          final var hashedPassword = CryptoUtils.createPasswordHash(newPassword, newSalt);
     *          final var newUser = new User(0L, newUsername, hashedPassword, newSalt);
     *          userDb.write(newUser);
     * }
     * <p><em>
     *     Example of updating data:
     * </p></em>
     * {@snippet :
     *         // write the updated salted password to the database
     *         final var updatedUser = new User(
     *                 user().getIndex(),
     *                 user().getUsername(),
     *                 hashedPassword,
     *                 newSalt);
     *         userDb.write(updatedUser);
     * }
     *
     * @param newData the data we are writing
     * @return the data with its new index assigned.
     * @throws DbException if there is a failure to write
     */
    @Override
    public T write(T newData) {
        if (newData.getIndex() < 0) throw new DbException("Negative indexes are disallowed");
        // load data if needed
        if (!hasLoadedData) loadData();

        writeLock.lock();
        try {
            boolean newElementCreated = processDataIndex(newData);
            writeToDisk(newData);
            writeToMemory(newData, newElementCreated);
        } catch (IOException ex) {
           throw new DbException("failed to write data " + newData, ex);
        } finally {
            writeLock.unlock();
        }

        // returning the data at this point is the most convenient
        // way users will have access to the new index of the data.
        return newData;
    }


    private void writeToDisk(T newData) throws IOException {
        logger.logTrace(() -> String.format("writing data to disk: %s", newData));
        String serializedData = newData.serialize();
        mustBeFalse(serializedData == null || serializedData.isBlank(),
                "the serialized form of data must not be blank. " +
                        "Is the serialization code written properly? Our datatype: " + emptyInstance);
        databaseAppender.appendToDatabase(DatabaseChangeAction.UPDATE, serializedData);
        appendCount.incrementAndGet();
        consolidateIfNecessary();
    }

    /**
     * If the append count is large enough, we will call the
     * consolidation method on the DatabaseConsolidator and
     * reset the append count to 0.
     */
    boolean consolidateIfNecessary() {
        if (appendCount.get() > maxLinesPerAppendFile && !consolidationIsRunning) {
            consolidateLock.lock(); // block threads here if multiple are trying to get in - only one gets in at a time
            try {
                consolidateInnerCode();
            } finally {
                consolidateLock.unlock();
            }
            return true;
        }
        return false;
    }

    /**
     * This code is only called in production from {@link #consolidateIfNecessary()},
     * and is necessarily protected by mutex locks.  However, it is provided
     * here as its own method for ease of testing.
     */
    void consolidateInnerCode() {
        if (appendCount.get() > maxLinesPerAppendFile && !consolidationIsRunning) {
            context.getExecutorService().submit(() -> {
                try {
                    consolidationIsRunning = true;
                    databaseConsolidator.consolidate();
                    consolidationIsRunning = false;
                } catch (Exception e) {
                    logger.logAsyncError(() -> "Error during consolidation: " + e);
                }
            });
            appendCount.set(0);
        }
    }

    /**
     * Delete data
     * <p><em>Example:</p></em>
     * {@snippet :
     *      userDb.delete(user);
     * }
     * @param dataToDelete the data we are serializing and writing
     * @throws DbException if there is a failure to delete
     */
    @Override
    public void delete(T dataToDelete) {
        // load data if needed
        if (!hasLoadedData) loadData();

        writeLock.lock();
        try {
            deleteFromDisk(dataToDelete);
            deleteFromMemory(dataToDelete);
        } catch (IOException ex) {
            throw new DbException("failed to delete data " + dataToDelete, ex);
        } finally {
            writeLock.unlock();
        }
    }

    private void deleteFromDisk(T dataToDelete) throws IOException {
        logger.logTrace(() -> String.format("deleting data from disk: %s", dataToDelete));
        databaseAppender.appendToDatabase(DatabaseChangeAction.DELETE, dataToDelete.serialize());
        appendCount.incrementAndGet();
        consolidateIfNecessary();
    }


    /**
     * Tells the database to load its data into memory immediately rather
     * than wait for a command that would require data (like {@link #write(DbData)},
     * {@link #delete(DbData)}, or {@link #values()}). This may be valuable
     * in cases where the developer wants greater control over the timing - such
     * as getting the data loaded into memory immediately at program start.
     */
    private void loadDataFromDisk() throws IOException {
        // if we find the "index.ddps" file, it means we are looking at an old
        // version of the database.  Update it to the new version, and then afterwards
        // remove the old version files.
        if (Files.exists(dbDirectory.resolve("index.ddps"))) {
            new DbFileConverter(context, dbDirectory).convertClassicFolderStructureToDbEngine2Form();
        }

        fileUtils.makeDirectory(dbDirectory);
        // if there are any remnant items in the current append-only file, move them
        // to a new file
        databaseAppender.saveOffCurrentDataToReadyFolder();
        databaseAppender.flush();

        // consolidate whatever files still exist in the append logs
        databaseConsolidator.consolidate();

        // load the data into memory
        walkAndLoad(dbDirectory);

        if (data.isEmpty()) {
            this.index = new AtomicLong(1);
        } else {
            var initialIndex = Collections.max(data.keySet()) + 1L;
            this.index = new AtomicLong(initialIndex);
        }
    }

    /**
     * Loops through each line of data in the consolidated data files,
     * converting each to its strongly-typed form and adding to the database
     */
    void walkAndLoad(Path dbDirectory) {
        List<String> consolidatedFiles = new ArrayList<>(
                List.of(Objects.requireNonNull(dbDirectory.resolve("consolidated_data").toFile().list())));

        // if there aren't any files, bail out
        if (consolidatedFiles.isEmpty()) return;

        // sort
        consolidatedFiles.sort(Comparator.comparingLong(DbEngine2::parseConsolidatedFileName));

        for (String fileName : consolidatedFiles) {
            logger.logDebug(() -> "Processing database file: " + fileName);
            Path consolidatedDataFile = dbDirectory.resolve("consolidated_data").resolve(fileName);

            // By using a lazy stream, we are able to read each item from the file into
            // memory without needing to read the whole file contents into memory at once,
            // thus avoiding requiring a great amount of memory
            try(Stream<String> fileStream = Files.lines(consolidatedDataFile, StandardCharsets.US_ASCII)) {
                fileStream.forEach(line -> readAndDeserialize(line, fileName));
            } catch (Exception e) {
                throw new DbException(e);
            }
        }
    }

    /**
     * Given a file like 1_to_1000 or 1001_to_2000, extract out the
     * beginning index (i.e. 1, or 1001).
     */
    static long parseConsolidatedFileName(String file) {
        int index = file.indexOf("_to_");
        if (index == -1) {
            throw new DbException("Consolidated filename was invalid: " + file);
        }
        return Long.parseLong(file, 0, index, 10);
    }

    /**
     * Converts a serialized string to a strongly-typed data structure
     * and adds it to the database.
     */
    void readAndDeserialize(String lineOfData, String fileName) {
        try {
            @SuppressWarnings("unchecked")
            T deserializedData = (T) emptyInstance.deserialize(lineOfData);
            mustBeTrue(deserializedData != null, "deserialization of " + emptyInstance +
                    " resulted in a null value. Was the serialization method implemented properly?");

            // put the data into the in-memory data structure
            data.put(deserializedData.getIndex(), deserializedData);
            addToIndexes(deserializedData);

        } catch (Exception e) {
            throw new DbException("Failed to deserialize " + lineOfData + " with data (\"" + fileName + "\"). Caused by: " + e);
        }
    }


    /**
     * This is what loads the data from disk the
     * first time someone needs it.  Because it is
     * locked, only one thread can enter at
     * a time.  The first one in will load the data,
     * and the second will encounter a branch which skips loading.
     */
    @Override
    public void loadData() {
        loadDataLock.lock(); // block threads here if multiple are trying to get in - only one gets in at a time
        try {
            if (!hasLoadedData) {
                loadDataFromDisk();
            }
            hasLoadedData = true;
        } catch (Exception ex) {
            throw new DbException("Failed to load data from disk.", ex);
        } finally {
            loadDataLock.unlock();
        }
    }

    /**
     * This method provides read capability for the values of a database.
     * <br>
     * The returned collection is a read-only view over the data, through {@link Collections#unmodifiableCollection(Collection)}
     *
     * <p><em>Example:</em></p>
     * {@snippet :
     * boolean doesUserAlreadyExist(String username) {
     *     return userDb.values().stream().anyMatch(x -> x.getUsername().equals(username));
     * }
     * }
     */
    @Override
    public Collection<T> values() {
        // load data if needed
        if (!hasLoadedData) loadData();

        return Collections.unmodifiableCollection(data.values());
    }

    @Override
    public boolean registerIndex(String indexName, Function<T, String> keyObtainingFunction) {
        if (hasLoadedData) {
            throw new DbException("This method must be run before the database loads data from disk.  Typically, " +
                    "it should be run immediately after the database is created.  See this method's documentation");
        }
        return super.registerIndex(indexName, keyObtainingFunction);
    }


    @Override
    public Collection<T> getIndexedData(String indexName, String key) {
        // load data if needed
        if (!hasLoadedData) loadData();
        return super.getIndexedData(indexName, key);
    }

    /**
     * This command calls {@link DatabaseAppender#flush()}, which will
     * force any in-memory-buffered data to be written to disk.  This is
     * not commonly necessary to call for business purposes, but tests
     * may require it if you want to be absolutely sure the data is written
     * to disk at a particular moment.
     */
    public void flush() {
        this.databaseAppender.flush();
    }

    /**
     * This is here to match the contract of {@link Db}
     * but all it does is tell the interior file writer
     * to write its data to disk.
     */
    @Override
    public void stop() {
        flush();
    }

    /**
     * No real difference to {@link #stop()} but here
     * to have a similar contract to {@link Db}
     */
    @Override
    public void stop(int count, int sleepTime) {
        flush();
    }
}