tu-huynh
's Blog
Blog

Build a Java off-heap in-memory DB

Tú @ Keva wrote

“Giúp bạn có thể lưu và truy xuất hàng terabytes dữ liệu tốc độ cao trên bộ nhớ - tất nhiên là nếu server của bạn có đủ chừng đó RAM nhé. emoji-smirk

Với một Java heap size rất nhỏ (khoảng 16MB chẳng hạn), ta vẫn có thể xây dựng một in-memory database có thể chứa tới hàng terabytes dữ liệu.

Chuyện là thế này, gần đây mình có thử build 1 bản key-value database (thực ra chỉ là 1 cái persistence in-memory hashtable) bằng Java, chỉ đơn giản xài ConcurrentHashMap để lưu data, có cơ chế sync data xuống disk… Lúc làm xong, mình thử tự hỏi ví dụ cái key-value store này chứa một lượng lớn data cỡ như 12-24GB thì sẽ như nào?

Với các runtime có bộ dọn rác tự động (Garbage Collector) như JVM (và ErlangVM/Go/Ruby/Python/Node.js runtime), việc cấp phát và quản lí một lượng lớn dữ liệu trên heap space sẽ khiến mỗi lần GC pause trở nên lâu hơn rất nhiều (có khi lên tới vài chục giây, vài phút…), nên có thể làm đơ cả hệ thống (tất cả GC hiện nay đều là stop-the-world), với nhiều ứng dụng chạy real-time, điều này sẽ không chấp nhận được.

meme

Để tránh GC-long-pauses, ta có thể tiếp cận các giải pháp cấp phát bộ nhớ không thông qua heap space (off-heap), trong Java thì có nhiều cách để thực hiện:

  • Nhúng code C/C++/Rust (vì có thể quản lí bộ nhớ bằng tay) bằng Java JNI (binding phức tạp, gặp nhiều vấn đề khi hỗ trợ đa nền tảng)
  • Sử dụng Java NIO API là ByteBuffer.allocateDirect để gọi cấp phát bộ nhớ thằng từ OS syscall, không qua heap space.

Java NIO ByteBuffer class

Class java.nio.ByteBuffer dựa trên abstract class java.nio.Buffer, được cung cấp để cấp phát lưu trữ các dữ liệu có primitive type trên bộ nhớ theo một array bytes và random access. Ta có thể dùng API này để write và read tất cả data kiểu primitive, ngoại trừ boolean, nó sẽ tự động chuyển đổi các giá trị được lưu và truy xuất thành một chuỗi bytes. Ngoài các chuỗi data kiểu primitive, bạn cũng có thể write và read các bytes arrays, sẽ có thể dùng để read/write Strings hoặc Java Objects (mà có hỗ trợ serialization).

Để tận dụng hiệu năng cao nhất và không ảnh hưởng tới Java heap, ta sẽ cấp phát bộ nhớ cho ByteBuffer sử dụng method là allocateDirect (thay vì allocate), khi xài method này thì Java sẽ gọi thẳng tới lệnh cấp phát bộ nhớ từ OS, và bộ nhớ được cấp phát sẽ không nằm trong sự quản lí của Java Heap Space cũng như Java GC.

ByteBuffer cho phép relative và absolute referencing, nghĩa là bạn có thể write các tập data liên tục, và class sẽ tự động cập nhật vị trí con trỏ (active position) sau mỗi lần gọi. Hoặc bạn có thể chỉ định vị trí (thứ tự vùng nhớ) mà bạn muốn write/read. Ví dụ 1, sử dụng relative referencing:

Ví dụ 1

Person person = new Person();
//…
ByteBuffer bb = ByteBuffer.allocateDirect(size);
bb.putInt(person.age);
bb.putFloat(person.weeklySalary);
bb.put(person.lastName.getBytes());
bb.put(person.firstName.getBytes());
bb.put((byte)(person.fullTime == true ? 1 : 0 ));

Với mỗi lần write, con trỏ tự động được cập nhật lại ở vị trí tiếp theo. Lưu ý rằng mặc dù không write/read được các boolean data, bạn có thể encode nó với byte, như trên ví dụ 1. Trong ví dụ 2 là cách lấy dữ liệu ra:

Ví dụ 2

Person person = new Person();
Person.age = bb.getInt();
person.weeklySalary = bb.getFloat();
//...

Để read một danh sách các dữ liệu, ta cần phải biết (và set) vị trí con trỏ nơi bắt đầu. Kể từ đó, bạn có thể đọc tuần tự, dựa trên ByteBuffer để kiểm soát vị trí khi read. Ví dụ, ở trên bạn đã write dữ liệu vào ByteBuffer từ vị trí 0, thì khi muốn đọc, ta phải quay về vị trí 0 bằng cách:

bb.position(0); // Set buffer position lại vị trí 0

Lưu ý rằng, để read chính xác các String value từ ByteBuffer ra, bạn cũng cần phải biết và lưu thêm độ dài của String đó (sẽ đề cập sau trong phần record header).

Persistence data với MappedByteBuffer

ByteBuffer giúp bạn làm việc với các vùng bytes trên bộ nhớ để read/write data. Trong khi subclass của nó MappedByteBuffer, cho phép bạn “mapping” một vùng dữ liệu trên bộ nhớ lên file (disk).

Việc mapping này giúp khi bạn read/write data vào MappedByteBuffer, dữ liệu sẽ đồng thời ở trên bộ nhớ (virtual memory) và trên file (disk), việc sync dữ liệu sẽ được handle bởi OS (mmapmsync syscall) một cách hiệu quả.

MappedByteBuffer có interface giống với ByteBuffer nên việc thao khác cũng khá dễ dàng, khiến cho việc sử dụng nó để persistence dữ liệu từ memory lên disk trở nên rất đơn giản và hiệu quả.

Để mapping một file vào bộ nhớ, trước tiên bạn cần gọi FileChannel.map.

Ví dụ 3

FileChannel fc =
    FileChannel.open(FileSystems.getDefault().getPath(path), 
                     StandardOpenOption.WRITE,
                     StandardOpenOption.READ);
MappedByteBuffer mbb = 
    fc.map(FileChannel.MapMode.READ_WRITE, 
           0,          // vị trí
           fc.size()); // kích cỡ

Implement NoHeap key-value store

ByteBuffer chỉ cung cấp API cho việc cấp phát dữ liệu dưới dạng arrays bytes trên bộ nhớ, để triển khai một cái key-value store thì ta phải tự kiểm soát/quản lí dữ liệu (không có cái class HashMap mì ăn liền nào xài được khi ta allocate ở ByteBuffer đâu).

Key-Value Store này sẽ có 2 phần, 1 phần là Journal Store (Data Store) để lưu dữ liệu, và 1 phần là Index Store (FixedHash) để lưu các index, phục vụ cho việc truy xuất O(1) tương tự như HashMap.

Đầu tiên, tạo 1 cái persistence store bằng RandomAccessFile, FileChannel và MappedByteBuffer

Ví dụ 5

protected final boolean createMessageJournalMBB(String journalPath) {
    try {
        // Tạo thư mục và file
        File filePath = new File(journalFolder);
        filePath.mkdir();
        File file = new File(journalPath);
        fileExists = file.exists();

        journal = new RandomAccessFile(journalPath, "rw");
        if (fileExists0  {
            // File đã tồn tài, tiếp tục xài file cũ
            bufferSize = (int)journal.length();
        } else {
            // File mới, set size
            journal.setLength(bufferSize);
        }

        channel = journal.getChannel();
        buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, bufferSize);

        if (!fileExists) {
            // File Journal mới, tạo header file
            writeJournalHeader(journal);
            currentEnd = journal.getFilePointer();
        } else {
            // Lặp qua các vị trí hiện tại để tìm điểm kết thúc
            currentEnd = scanJournal();
        }
    }
    catch (Exception e) {
        logger.log(Level.SEVERE, "createMessageJournalMBB Exception: ", e);
    }

    return false;
}

Tiếp theo, ta cần 1 cái Index Store để phục vụ truy xuất O(1), cách triển khai tương tự như class HashMap của Java, từ String key ta chạy hàm .hashCode() để lấy hashCode, sau đó có thể tìm ra được vị trí data bucket cần truy xuất ở bên Journal Store.

Index Store sẽ được implement dạng fixed-size hash table, tuy nhiên hashtable này sẽ có thể extend (và re-hash) khi load factor chạm ngưỡng (> 0.75), ban đầu ta sẽ allocate memory cho Index Store bằng 1/4 của Journal Store.

Cấu trúc lưu record của Journal Store. Các record trong Journal Store sẽ được lưu một cách liên tiếp trên array bytes buffer. Cùng với data, mỗi record sẽ lưu thêm phần record size (đơn vị bytes), type của record và một flag isActive để đánh dấu là record này đã bị xóa hay chưa. Như vậy, các record có thể được traversal một cách liên tiếp. Khi muốn xóa một record, đơn giản ta chỉ đánh dấu lên record đó là isActive = false. Các record mới được insert sau đó có thể được insert vào cuối array bytes buffer, hoặc insert đè lên record có isActive = false và có size lớn hơn hoặc bằng record mới (xem hình dưới cho dễ hiểu nha):

record

(Trong hình là insert một record mới có cỡ là 12-byte vào vị trí của một record cũ đã bị xóa)

Lưu trữ record. NoHeapDB của chúng ta sẽ hỗ trợ 8 methods để insert dữ liệu theo các type của dữ liệu cần được insert:

  • putInteger
  • putShort
  • putLong
  • putFloat
  • putDouble
  • putString
  • putObject
  • putChar

Mỗi method sẽ nhận vào argument là String key và Value với type tương ứng như trên. Đầu tiên, ta cần phải xác định size của record dựa trên type: các kiểu dữ liệu như Integer hoặc Short có fixed size, còn đối với Object và String ta phải xác định size bằng cách stringValue.length. Tiếp theo là cần xác định vị trí trong array bytes buffer mà ta có thể insert record mới vào. Hàm getStorageLocation bên dưới sẽ nhận nhiệm vụ này: tìm 1 free (inactive) slot để insert record mới, nếu không tìm thấy thì insert vào vị trí cuối cùng trong array bytes buffer.

Ví dụ 6

// Khi xóa record, ta sẽ lưu <size của record đó> và <danh sách các position của record đó>vào emptyIdx
// Tìm trong emptyIdx xem có sẵn position nào match size với record mới không
LinkedList<Long> records = emptyIdx.get(recordLength);
if (records != null && !records.isEmpty()) {
    location.offset = records.remove();

    // Trả offset
    location.newEmptyRecordSize = -1;
    return location;
}

// Lưu các slot record trống để xóa
ArrayList<Integer> toRemove = new ArrayList<>();

// Không tìm thấy exact match trong emptyIdx thì lướt và tìm 1 slot đủ lớn
for (Integer size : this.emptyIdx.keySet()) {
    // Kiểm tra xem có đủ chỗ cho record mới và thêm một record rỗng
    // với một header và ít nhất 1 byte dữ liệu không?
    if (size >= recordLength + Header.HEADER_SIZE + 1) {
        records = emptyIdx.get(size);
        if (records == null || records.size() == 0) { 
            toRemove.add(size);
            continue;
        }

        location.offset = records.remove();

        // Cần append một record trống sau sau record vừa mới được insert
        location.newEmptyRecordSize =
                size - recordLength - Header.HEADER_SIZE;

        int newOffset = (int) 
                location.offset + recordLength + Header.HEADER_SIZE;

        // Lưu vị trí của record rỗng mới này lại
        storeEmptyRecord(newOffset, location.newEmptyRecordSize);
        break;
    }
}

Các vị trí record đã bị xóa sẽ được lưu vào một cái ArrayList (emptyIdx), được sắp xếp theo thứ tự tăng dần theo size. Chính xác hơn là, mỗi entry trong cái ArrayList này là một LinkedList chứa các position của record đã bị xóa có size là entry đó.

Việc kiểm soát vùng nhớ các record đã bị xóa để tái sử dụng này tương tự với việc GC hoạt động (GC Compaction), do đó thời gian xử lí sẽ phụ thuộc vào số lượng inactive record, và nó unpredictable. Tuy nhiên chi phí để xử lí phần này sẽ thấp hơn nhiều so với việc GC chạy, và theo lí thuyết thì Developer vẫn có thể kiểm soát tốt (ví dụ ở peak time thì hạn chế modify dữ liệu trong store, hoặc lúc đó thì bỏ qua việc getStorageLocation mà chỉ việc insert vào cuối array bytes buffer).

Sau khi tìm được vị trí, việc lưu data của record sẽ được thực hiện như đã đề cập khi nãy (lưu thêm record header chứa size, type và flag isActive):

Ví dụ 7

buffer.put(ACTIVE_RECORD); // 1 byte
buffer.put(type);          // 1 byte
buffer.putInt(datalen);    // 4 bytes

switch (type) {
    case LONG_RECORD_TYPE:
        buffer.putLong( (Long)val );
        break;
    case INT_RECORD_TYPE:
        buffer.putInt( (Integer)val );
        break;
    case DOUBLE_RECORD_TYPE:
        buffer.putDouble( (Double)val );
        break;
    case FLOAT_RECORD_TYPE:
        buffer.putFloat( (Float)val );
        break;
    case SHORT_RECORD_TYPE:
        buffer.putShort( (Short)val );
        break;
    case CHAR_RECORD_TYPE:
        buffer.putChar( (char)val );
        break;
    case TEXT_RECORD_TYPE:
        buffer.put( ((String)val).getBytes() );
        break;
    case BYTEARRAY_RECORD_TYPE:
        buffer.put( (byte[])val );
        break;
}

Việc lưu record vào Store đã xong, tiếp theo bước cuối cùng là index record để có thể truy xuất O(1). Ta sẽ insert hashCode của key trong record mới và vị trí của record đó trong DataStore vào IndexStore.

Ví dụ 8

offset = getHashBucket(key.hashCode() );
indexBuffer.position(offset);
indexBuffer.mark();
byte occupied = indexBuffer.get();
if ( occupied == 0 ) {
    // Đã tìm thấy slot phù hợp, reset position
    indexBuffer.reset();
}
else {
    // Dính collision rồi :(
    
    collisions++;

    offset = findBucket(key, offset, false);

    // Chuyển tới vị trí khác
    indexBuffer.position(offset);
}

// Write data
//
indexBuffer.put((byte)key.length() );
indexBuffer.putInt(key.hashCode()); 
if ( KEY_SIZE > 0 ) {
    byte[] fixedKeyBytes = new byte[KEY_SIZE];
    System.arraycopy(key.getBytes(), 
                    0, fixedKeyBytes, 
                    0, key.length());
    indexBuffer.put( fixedKeyBytes );
}
indexBuffer.putLong( value ); // indexed record location

Xử lí khi bị collision (hàm findBucket):

Ví dụ 9

while ( occupied > 0 && ! found) {
    int keyHash = indexBuffer.getInt();
    if ( keyHash == key.hashCode() ) {
        if ( KEY_SIZE > 0 ) {
            indexBuffer.position(
                    offset + 1 + Integer.BYTES + KEY_SIZE );
        }
        found = true;
        break;
    }
    else {
        offset += INDEX_ENTRY_SIZE_BYTES;
        if ( offset >= (sizeInBytes - INDEX_ENTRY_SIZE_BYTES)) {
            offset = INDEX_ENTRY_SIZE_BYTES;
        }

        indexBuffer.position(offset);
        occupied = indexBuffer.get();
    }
}

Khác so với DataStore, IndexStore chỉ lưu trữ các position của DataStore, nên ta không cần header cho mỗi record.

Get data từ store. Nãy giờ ta đã đi qua các phần lưu dữ liệu và index dữ liệu trên store, giờ thì tới phần lấy dữ liệu ra (truy xuất) từ store, tương tự như 8 hàm put ở trên, để lấy ra cũng có 8 hàm get:

  • getInteger
  • getShort
  • getLong
  • getFloat
  • getDouble
  • getString
  • getObject
  • getChar

Ví dụ 10: lấy data ra từ DataStore

// Lấy position ra từ IndexStore
Long offset = index.get(key);
//...

// Nhảy tới vị trí đó
buffer.position(offset.intValue());

// Đầu tiên đọc header
byte active = buffer.get();
if (active != 1) {
    return null;
}

byte type = buffer.get();

int dataLength = buffer.getInt();

// Sau đó đọc data
byte[] bytes;
switch ( type ) {
    case LONG_RECORD_TYPE:
        val = buffer.getLong();
        break;
    case INT_RECORD_TYPE:
        val = buffer.getInt();
        break;
    case DOUBLE_RECORD_TYPE:
        val = buffer.getDouble();
        break;
    case FLOAT_RECORD_TYPE:
        val = buffer.getFloat();
        break;
    case SHORT_RECORD_TYPE:
        val = buffer.getShort();
        break;
    case CHAR_RECORD_TYPE:
        val = buffer.getChar();
        break;
    case BYTEARRAY_RECORD_TYPE:
        bytes = new byte[dataLength];
        buffer.get(bytes);
        val = bytes;
        break;
    case TEXT_RECORD_TYPE:
        bytes = new byte[dataLength];
        buffer.get(bytes);
        val = new String(bytes);
        break;
}

Ví dụ 11: Lấy data ra từ IndexStore

public Long get(String key) {
    int bucketOffset = getHashBucket( key.hashCode() );
    indexBuffer.position(bucketOffset);
    byte occupied = indexBuffer.get();
    if ( occupied > 0 ) {
        bucketOffset = findBucket(key, offset, true);
    }

    if ( bucketOffset == -1 ) {
        // Tìm không ra
        return -1L;
    }

    // Trả về position của record trong DataStore
    return indexBuffer.getLong();
}

Xóa record từ store. Xóa record thì khá là đơn giản, đầu tiên tương tự như truy xuất là tìm ra position của record đó trong DataStore bằng cách tìm index của nó trong IndexStore.

Ví dụ 12:

// Tìm record
offset = getRecordOffset(key);
if (offset == -1) {
    return false;
}

// Đọc header để lấy ra record size, sau đó đánh dấu nó là inactive
buffer.position(offset.intValue()); 
buffer.put(INACTIVE_RECORD);
buffer.put(EMPTY_RECORD_TYPE);
datalength = buffer.getInt();

// Lưu vị trí record bị xóa này để tái sử dụng sau
storeEmptyRecord(offset, datalength);

// Xóa record đó ra khỏi IndexStore
index.remove( key );

Iterate qua các record trong store. Chúng ta đã đi qua phần truy xuất dữ liệu theo key, bên cạnh đó, store cũng có thể được traversal qua tất cả record. Dựa theo cấu trúc của DataStore, dữ liệu nằm liên tiếp nhau và có thể dễ dàng truy xuất qua 1 vòng lặp ở cái arrays bytes buffer:

struct

Ví dụ 13: Traversal trong NoHeap store

while ( !found && current < (bufferSize - Header.HEADER_SIZE)) {
    boolean active = true;
    if (buffer.get() == INACTIVE_RECORD) {
        active = false;
    }

    type = buffer.get();
    if (type == EMPTY_RECORD_TYPE) {
        buffer.position((int)currentEnd);
        break;
    }

    int datalen = buffer.getInt();
    recordSize = Header.HEADER_SIZE + datalen;

    if ( active) {
        found = true;

        iterateNext = current + recordSize;
    }
    else {
        current += recordSize;
        buffer.position( (int)current );
    }
}

if ( found ) {
    return getValue(current, type);
}

Khi khởi tạo store, ta phải xác định trước kích thước của store (ví dụ 64mb hay 2GB) - giống như việc JVM nó allocate trước bộ nhớ, khi store xài hết chừng đó bộ nhớ thì ta phải expand ra thêm:

Ví dụ 14: Expand DataStore khi sắp xài hết

ByteBuffer newBuffer = ByteBuffer.allocateDirect((int)newLength);
if ( buffer.hasArray() ) {
    byte[] array = buffer.array();
    newBuffer.put( array );
}
else {
    buffer.position(0);
    newBuffer.put(buffer);
}
buffer = newBuffer;
journalLen = buffer.capacity();

Index Store cũng sẽ phải expand và rehashing khi load factor đạt 75%.

Thành quả: Keva Store

jiny

Xem qua Keva ở đây github.com/tuhuynh27/keva (thanks @axblueblader đã giúp mình build Keva)

Example cách dùng (mình xài để làm LINE Bot ở repo này):

val storeManager = new NoHeapStoreManager
storeManager.createStore("Dict", NoHeapStore.Storage.PERSISTED, 64)
def dictStore: NoHeapStore = storeManager.getStore("Dict")

// Get
def get(key: String): String = store.getString(key)

// Put
def put(key: String, value: String): Unit = {
  store.putString(key, value)
}

Ngoài ra còn có thể tải binary files server và CLI-client về xài ngay.

References