tu-huynh
tuhuynh
.com
Blog

Build a Java off-heap in-memory DB

Tú @ Keva wrote

“Helping you store and retrieve terabytes of data at high speed in memory - assuming your server has enough RAM, of course. emoji-smirk

With a very small Java heap size (around 16MB, for example), we can still build an in-memory database that can hold up to terabytes of data.

So, here’s the thing: recently, I tried to build a key-value database (actually just a persistent in-memory hashtable) using Java, simply using ConcurrentHashMap to store data, with a mechanism to sync data to disk… After finishing it, I wondered what would happen if this key-value store contained a large amount of data, like 12-24GB?

With runtimes that have automatic garbage collection (Garbage Collector) like the JVM (and ErlangVM/Go/Ruby/Python/Node.js runtime), allocating and managing a large amount of data on the heap space will make each GC pause much longer (sometimes up to tens of seconds, even minutes…), which can freeze the entire system (all current GCs are stop-the-world). For many real-time applications, this is unacceptable.

meme

To avoid GC-long-pauses, we can approach memory allocation solutions that don’t go through heap space (off-heap). In Java, there are several ways to do this:

  • Embed C/C++/Rust code (because you can manage memory manually) using Java JNI (complex binding, many problems when supporting multiple platforms)
  • Use the Java NIO API, ByteBuffer.allocateDirect, to directly call memory allocation from OS syscalls, without going through heap space.

Java NIO ByteBuffer class

The java.nio.ByteBuffer class, based on the abstract class java.nio.Buffer, is provided to allocate and store data with primitive types in memory as an array of bytes and for random access. We can use this API to write and read all primitive data types, except boolean, which will automatically convert the stored and retrieved values into a sequence of bytes. In addition to primitive data sequences, you can also write and read byte arrays, which can be used to read/write Strings or Java Objects (that support serialization).

To take advantage of the highest performance and not affect the Java heap, we will allocate memory for ByteBuffer using the allocateDirect method (instead of allocate). When using this method, Java will directly call the memory allocation command from the OS, and the allocated memory will not be managed by the Java Heap Space or the Java GC.

ByteBuffer allows relative and absolute referencing, meaning you can write data sets continuously, and the class will automatically update the pointer position (active position) after each call. Or you can specify the position (memory area order) you want to write/read. Example 1, using relative referencing:

Example 1

Person person = new Person();
//…
ByteBuffer bb = ByteBuffer.allocateDirect(size);
bb.putInt(person.age);
bb.putFloat(person.weeklySalary);
bb.put(person.lastName.getBytes());
bb.put(person.firstName.getBytes());
bb.put((byte)(person.fullTime == true ? 1 : 0 ));

With each write, the pointer is automatically updated to the next position. Note that although you cannot write/read boolean data, you can encode it with byte, as in example 1. Example 2 shows how to retrieve the data:

Example 2

Person person = new Person();
Person.age = bb.getInt();
person.weeklySalary = bb.getFloat();
//...

To read a list of data, we need to know (and set) the pointer position where it starts. From there, you can read sequentially, relying on ByteBuffer to control the position when reading. For example, if you wrote data to ByteBuffer from position 0, when you want to read, you must return to position 0 by:

bb.position(0); // Set buffer position back to position 0

Note that, to read String values ​​accurately from ByteBuffer, you also need to know and store the length of that String (will be mentioned later in the record header section).

Persistence data with MappedByteBuffer

ByteBuffer helps you work with byte regions in memory to read/write data. While its subclass, MappedByteBuffer, allows you to “map” a data region in memory to a file (disk).

This mapping helps when you read/write data to MappedByteBuffer, the data will be simultaneously in memory (virtual memory) and on the file (disk). The data synchronization will be handled by the OS (mmap and msync syscalls) efficiently.

MappedByteBuffer has the same interface as ByteBuffer, so the operations are quite easy, making it very simple and efficient to use it to persist data from memory to disk.

To map a file into memory, you first need to call FileChannel.map.

Example 3

FileChannel fc =
    FileChannel.open(FileSystems.getDefault().getPath(path), 
                     StandardOpenOption.WRITE,
                     StandardOpenOption.READ);
MappedByteBuffer mbb = 
    fc.map(FileChannel.MapMode.READ_WRITE, 
           0,          // position
           fc.size()); // size

Implement NoHeap key-value store

Since ByteBuffer only provides APIs for allocating data as arrays of bytes in memory, to implement a key-value store, we have to control/manage the data ourselves (there is no ready-made HashMap class that can be used when we allocate with ByteBuffer).

This Key-Value Store will have 2 parts, one part is the Journal Store (Data Store) to store data, and one part is the Index Store (FixedHash) to store indexes, serving for O(1) access similar to HashMap.

First, create a persistence store using RandomAccessFile, FileChannel, and MappedByteBuffer

Example 5

protected final boolean createMessageJournalMBB(String journalPath) {
    try {
        // Create directory and file
        File filePath = new File(journalFolder);
        filePath.mkdir();
        File file = new File(journalPath);
        fileExists = file.exists();

        journal = new RandomAccessFile(journalPath, "rw");
        if (fileExists0  {
            // File already exists, continue using the old file
            bufferSize = (int)journal.length();
        } else {
            // New file, set size
            journal.setLength(bufferSize);
        }

        channel = journal.getChannel();
        buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, bufferSize);

        if (!fileExists) {
            // New Journal file, create header file
            writeJournalHeader(journal);
            currentEnd = journal.getFilePointer();
        } else {
            // Iterate through the current positions to find the end point
            currentEnd = scanJournal();
        }
    }
    catch (Exception e) {
        logger.log(Level.SEVERE, "createMessageJournalMBB Exception: ", e);
    }

    return false;
}

Next, we need an Index Store to serve O(1) access. The implementation is similar to Java’s HashMap class. From the String key, we run the .hashCode() function to get the hashCode, then we can find the data bucket position to access in the Journal Store.

The Index Store will be implemented as a fixed-size hash table. However, this hashtable can be extended (and re-hashed) when the load factor reaches a threshold (> 0.75). Initially, we will allocate memory for the Index Store using 1/4 of the Journal Store.

Record storage structure of the Journal Store. Records in the Journal Store will be stored continuously on the array bytes buffer. Along with the data, each record will store the record size (in bytes), the type of record, and an isActive flag to mark whether this record has been deleted. Thus, records can be traversed continuously. When you want to delete a record, simply mark that record as isActive = false. New records inserted later can be inserted at the end of the array bytes buffer, or overwrite the record with isActive = false and a size greater than or equal to the new record (see the image below for easy understanding):

record

(The image shows inserting a new record of size 12-byte into the position of an old deleted record)

Store record. Our NoHeapDB will support 8 methods to insert data according to the type of data to be inserted:

  • putInteger
  • putShort
  • putLong
  • putFloat
  • putDouble
  • putString
  • putObject
  • putChar

Each method will receive an argument as a String key and a Value with the corresponding type as above. First, we need to determine the size of the record based on the type: data types like Integer or Short have a fixed size, while for Object and String we have to determine the size by stringValue.length. Next, we need to determine the position in the array bytes buffer where we can insert the new record. The getStorageLocation function below will take on this task: find a free (inactive) slot to insert the new record. If not found, insert it into the last position in the array bytes buffer.

Example 6

// When deleting a record, we will store <size of that record> and <list of positions of that record> in emptyIdx
// Find in emptyIdx if there is a position that matches the size of the new record
LinkedList<Long> records = emptyIdx.get(recordLength);
if (records != null && !records.isEmpty()) {
    location.offset = records.remove();

    // Return offset
    location.newEmptyRecordSize = -1;
    return location;
}

// Store empty record slots for deletion
ArrayList<Integer> toRemove = new ArrayList<>();

// If an exact match is not found in emptyIdx, then scroll and find a slot large enough
for (Integer size : this.emptyIdx.keySet()) {
    // Check if there is enough space for the new record and add an empty record
    // with a header and at least 1 byte of data?
    if (size >= recordLength + Header.HEADER_SIZE + 1) {
        records = emptyIdx.get(size);
        if (records == null || records.size() == 0) { 
            toRemove.add(size);
            continue;
        }

        location.offset = records.remove();

        // Need to append an empty record after the newly inserted record
        location.newEmptyRecordSize =
                size - recordLength - Header.HEADER_SIZE;

        int newOffset = (int) 
                location.offset + recordLength + Header.HEADER_SIZE;

        // Save the position of this new empty record
        storeEmptyRecord(newOffset, location.newEmptyRecordSize);
        break;
    }
}

The positions of deleted records will be stored in an ArrayList (emptyIdx), sorted in ascending order by size. More precisely, each entry in this ArrayList is a LinkedList containing the positions of deleted records with the size of that entry.

This memory control of deleted records for reuse is similar to how GC works (GC Compaction), so the processing time will depend on the number of inactive records, and it is unpredictable. However, the cost of processing this part will be much lower than running GC, and in theory, the Developer can still control it well (for example, during peak times, limit modifying data in the store, or at that time, ignore getStorageLocation and just insert into the end of the array bytes buffer).

After finding the position, the data of the record will be saved as mentioned earlier (save the record header containing the size, type, and isActive flag):

Example 7

buffer.put(ACTIVE_RECORD); // 1 byte
buffer.put(type);          // 1 byte
buffer.putInt(datalen);    // 4 bytes

switch (type) {
    case LONG_RECORD_TYPE:
        buffer.putLong( (Long)val );
        break;
    case INT_RECORD_TYPE:
        buffer.putInt( (Integer)val );
        break;
    case DOUBLE_RECORD_TYPE:
        buffer.putDouble( (Double)val );
        break;
    case FLOAT_RECORD_TYPE:
        buffer.putFloat( (Float)val );
        break;
    case SHORT_RECORD_TYPE:
        buffer.putShort( (Short)val );
        break;
    case CHAR_RECORD_TYPE:
        buffer.putChar( (char)val );
        break;
    case TEXT_RECORD_TYPE:
        buffer.put( ((String)val).getBytes() );
        break;
    case BYTEARRAY_RECORD_TYPE:
        buffer.put( (byte[])val );
        break;
}

The record saving to the Store is done. The next and final step is to index the record so that it can be accessed in O(1). We will insert the hashCode of the key in the new record and the position of that record in the DataStore into the IndexStore.

Example 8

offset = getHashBucket(key.hashCode() );
indexBuffer.position(offset);
indexBuffer.mark();
byte occupied = indexBuffer.get();
if ( occupied == 0 ) {
    // Found a suitable slot, reset position
    indexBuffer.reset();
}
else {
    // Collision occurred :(
    
    collisions++;

    offset = findBucket(key, offset, false);

    // Move to another position
    indexBuffer.position(offset);
}

// Write data
//
indexBuffer.put((byte)key.length() );
indexBuffer.putInt(key.hashCode()); 
if ( KEY_SIZE > 0 ) {
    byte[] fixedKeyBytes = new byte[KEY_SIZE];
    System.arraycopy(key.getBytes(), 
                    0, fixedKeyBytes, 
                    0, key.length());
    indexBuffer.put( fixedKeyBytes );
}
indexBuffer.putLong( value ); // indexed record location

Handle collision (function findBucket):

Example 9

while ( occupied > 0 && ! found) {
    int keyHash = indexBuffer.getInt();
    if ( keyHash == key.hashCode() ) {
        if ( KEY_SIZE > 0 ) {
            indexBuffer.position(
                    offset + 1 + Integer.BYTES + KEY_SIZE );
        }
        found = true;
        break;
    }
    else {
        offset += INDEX_ENTRY_SIZE_BYTES;
        if ( offset >= (sizeInBytes - INDEX_ENTRY_SIZE_BYTES)) {
            offset = INDEX_ENTRY_SIZE_BYTES;
        }

        indexBuffer.position(offset);
        occupied = indexBuffer.get();
    }
}

Unlike DataStore, IndexStore only stores the positions of DataStore, so we don’t need a header for each record.

Get data from store. So far, we have gone through the parts of saving data and indexing data on the store. Now it’s time to retrieve data from the store, similar to the 8 put functions above. To retrieve, there are also 8 get functions:

  • getInteger
  • getShort
  • getLong
  • getFloat
  • getDouble
  • getString
  • getObject
  • getChar

Example 10: Retrieve data from DataStore

// Get the position from IndexStore
Long offset = index.get(key);
//...

// Jump to that position
buffer.position(offset.intValue());

// First read the header
byte active = buffer.get();
if (active != 1) {
    return null;
}

byte type = buffer.get();

int dataLength = buffer.getInt();

// Then read the data
byte[] bytes;
switch ( type ) {
    case LONG_RECORD_TYPE:
        val = buffer.getLong();
        break;
    case INT_RECORD_TYPE:
        val = buffer.getInt();
        break;
    case DOUBLE_RECORD_TYPE:
        val = buffer.getDouble();
        break;
    case FLOAT_RECORD_TYPE:
        val = buffer.getFloat();
        break;
    case SHORT_RECORD_TYPE:
        val = buffer.getShort();
        break;
    case CHAR_RECORD_TYPE:
        val = buffer.getChar();
        break;
    case BYTEARRAY_RECORD_TYPE:
        bytes = new byte[dataLength];
        buffer.get(bytes);
        val = bytes;
        break;
    case TEXT_RECORD_TYPE:
        bytes = new byte[dataLength];
        buffer.get(bytes);
        val = new String(bytes);
        break;
}

Example 11: Retrieve data from IndexStore

public Long get(String key) {
    int bucketOffset = getHashBucket( key.hashCode() );
    indexBuffer.position(bucketOffset);
    byte occupied = indexBuffer.get();
    if ( occupied > 0 ) {
        bucketOffset = findBucket(key, offset, true);
    }

    if ( bucketOffset == -1 ) {
        // Not found
        return -1L;
    }

    // Return the position of the record in DataStore
    return indexBuffer.getLong();
}

Delete record from store. Deleting a record is quite simple. First, similar to accessing, find the position of that record in DataStore by finding its index in IndexStore.

Example 12:

// Find record
offset = getRecordOffset(key);
if (offset == -1) {
    return false;
}

// Read the header to get the record size, then mark it as inactive
buffer.position(offset.intValue()); 
buffer.put(INACTIVE_RECORD);
buffer.put(EMPTY_RECORD_TYPE);
datalength = buffer.getInt();

// Save the position of this deleted record for reuse later
storeEmptyRecord(offset, datalength);

// Delete that record from IndexStore
index.remove( key );

Iterate through records in the store. We have gone through the part of accessing data by key. Besides that, the store can also be traversed through all records. Based on the structure of DataStore, the data is contiguous and can be easily accessed through a loop in the arrays bytes buffer:

struct

Example 13: Traversal in NoHeap store

while ( !found && current < (bufferSize - Header.HEADER_SIZE)) {
    boolean active = true;
    if (buffer.get() == INACTIVE_RECORD) {
        active = false;
    }

    type = buffer.get();
    if (type == EMPTY_RECORD_TYPE) {
        buffer.position((int)currentEnd);
        break;
    }

    int datalen = buffer.getInt();
    recordSize = Header.HEADER_SIZE + datalen;

    if ( active) {
        found = true;

        iterateNext = current + recordSize;
    }
    else {
        current += recordSize;
        buffer.position( (int)current );
    }
}

if ( found ) {
    return getValue(current, type);
}

When initializing the store, we must determine the size of the store in advance (for example, 64mb or 2GB) - just like the JVM allocates memory in advance. When the store uses up that much memory, we have to expand it:

Example 14: Expand DataStore when running out of space

ByteBuffer newBuffer = ByteBuffer.allocateDirect((int)newLength);
if ( buffer.hasArray() ) {
    byte[] array = buffer.array();
    newBuffer.put( array );
}
else {
    buffer.position(0);
    newBuffer.put(buffer);
}
buffer = newBuffer;
journalLen = buffer.capacity();

The Index Store will also have to expand and rehash when the load factor reaches 75%.

Achievement: Keva Store

jiny

Check out Keva here github.com/tuhuynh27/keva (thanks @axblueblader for helping me build Keva)

Example of how to use it (I use it to make a LINE Bot in this repo):

val storeManager = new NoHeapStoreManager
storeManager.createStore("Dict", NoHeapStore.Storage.PERSISTED, 64)
def dictStore: NoHeapStore = storeManager.getStore("Dict")

// Get
def get(key: String): String = store.getString(key)

// Put
def put(key: String, value: String): Unit = {
  store.putString(key, value)
}

You can also download the binary files server and CLI-client to use immediately.

References