Off-heap Memory in Apache Flink and the curious JIT compiler

16 Sep 2015 by Stephan Ewen (@stephanewen)

Running data-intensive code in the JVM and making it well-behaved is tricky. Systems that put billions of data objects naively onto the JVM heap face unpredictable OutOfMemoryErrors and Garbage Collection stalls. Of course, you still want to to keep your data in memory as much as possible, for speed and responsiveness of the processing applications. In that context, “off-heap” has become almost something like a magic word to solve these problems.

In this blog post, we will look at how Flink exploits off-heap memory. The feature is part of the upcoming release, but you can try it out with the latest nightly builds. We will also give a few interesting insights into the behavior for Java’s JIT compiler for highly optimized methods and loops.

To understand Flink’s approach to off-heap memory, we need to recap Flink’s approach to custom managed memory. We have written an earlier blog post about how Flink manages JVM memory itself

As a summary, the core part is that Flink implements its algorithms not against Java objects, arrays, or lists, but actually against a data structure similar to java.nio.ByteBuffer. Flink uses its own specialized version, called MemorySegment on which algorithms put and get at specific positions ints, longs, byte arrays, etc, and compare and copy memory. The memory segments are held and distributed by a central component (called MemoryManager) from which algorithms request segments according to their calculated memory budgets.

Don’t believe that this can be fast? Have a look at the benchmarks in the earlier blogpost, which show that it is actually often much faster than working on objects, due to better control over data layout (cache efficiency, data size), and reducing the pressure on Java’s Garbage Collector.

This form of memory management has been in Flink for a long time. Anecdotally, the first public demo of Flink’s predecessor project Stratosphere, at the VLDB conference in 2010, was running its programs with custom managed memory (although I believe few attendees were aware of that).

Why actually bother with off-heap memory?

Given that Flink has a sophisticated level of managing on-heap memory, why do we even bother with off-heap memory? It is true that “out of memory” has been much less of a problem for Flink because of its heap memory management techniques. Nonetheless, there are a few good reasons to offer the possibility to move Flink’s managed memory out of the JVM heap:

  • Very large JVMs (100s of GBytes heap memory) tend to be tricky. It takes long to start them (allocate and initialize heap) and garbage collection stalls can be huge (minutes). While newer incremental garbage collectors (like G1) mitigate this problem to some extend, an even better solution is to just make the heap much smaller and allocate Flink’s managed memory chunks outside the heap.

  • I/O and network efficiency: In many cases, we write MemorySegments to disk (spilling) or to the network (data transfer). Off-heap memory can be written/transferred with zero copies, while heap memory always incurs an additional memory copy.

  • Off-heap memory can actually be owned by other processes. That way, cached data survives process crashes (due to user code exceptions) and can be used for recovery. Flink does not exploit that, yet, but it is interesting future work.

The opposite question is also valid. Why should Flink ever not use off-heap memory?

  • On-heap is easier and interplays better with tools. Some container environments and monitoring tools get confused when the monitored heap size does not remotely reflect the amount of memory used by the process.

  • Short lived memory segments are cheaper on the heap. Flink sometimes needs to allocate some short lived buffers, which works cheaper on the heap than off-heap.

  • Some operations are actually a bit faster on heap memory (or the JIT compiler understands them better).

The off-heap Memory Implementation

Given that all memory intensive internal algorithms are already implemented against the MemorySegment, our implementation to switch to off-heap memory is actually trivial. You can compare it to replacing all ByteBuffer.allocate(numBytes) calls with ByteBuffer.allocateDirect(numBytes). In Flink’s case it meant that we made the MemorySegment abstract and added the HeapMemorySegment and OffHeapMemorySegment subclasses. The OffHeapMemorySegment takes the off-heap memory pointer from a java.nio.DirectByteBuffer and implements its specialized access methods using sun.misc.Unsafe. We also made a few adjustments to the startup scripts and the deployment code to make sure that the JVM is permitted enough off-heap memory (direct memory, -XX:MaxDirectMemorySize).

In practice we had to go one step further, to make the implementation perform well. While the ByteBuffer is used in I/O code paths to compose headers and move bulk memory into place, the MemorySegment is part of the innermost loops of many algorithms (sorting, hash tables, …). That means that the access methods have to be as fast as possible.

Understanding the JIT and tuning the implementation

The MemorySegment was (before our change) a standalone class, it was final (had no subclasses). Via Class Hierarchy Analysis (CHA), the JIT compiler was able to determine that all of the accessor method calls go to one specific implementation. That way, all method calls can be perfectly de-virtualized and inlined, which is essential to performance, and the basis for all further optimizations (like vectorization of the calling loop).

With two different memory segments loaded at the same time, the JIT compiler cannot perform the same level of optimization any more, which results in a noticeable difference in performance: A slowdown of about 2.7 x in the following example:

Writing 100000 x 32768 bytes to 32768 bytes segment:

HeapMemorySegment    (standalone) : 1,441 msecs
OffHeapMemorySegment (standalone) : 1,628 msecs
HeapMemorySegment    (subclass)   : 3,841 msecs
OffHeapMemorySegment (subclass)   : 3,847 msecs

To get back to the original performance, we explored two approaches:

Approach 1: Make sure that only one memory segment implementation is ever loaded.

We re-structured the code a bit to make sure that all places that produce long-lived and short-lived memory segments instantiate the same MemorySegment subclass (Heap- or Off-Heap segment). Using factories rather than directly instantiating the memory segment classes, this was straightforward.

Experiments (see appendix) showed that the JIT compiler properly detects this (via hierarchy analysis) and that it can perform the same level of aggressive optimization as before, when there was only one MemorySegment class.

Approach 2: Write one segment that handles both heap and off-heap memory

We created a class HybridMemorySegment which handles transparently both heap- and off-heap memory. It can be initialized either with a byte array (heap memory), or with a pointer to a memory region outside the heap (off-heap memory).

Fortunately, there is a nice trick to do this without introducing code branches and specialized handling of the two different memory types. The trick is based on the way that the sun.misc.Unsafe methods interpret object references. To illustrate this, we take the method that gets a long integer from a memory position:

sun.misc.Unsafe.getLong(Object reference, long offset)

The method accepts an object reference, takes its memory address, and add the offset to obtain a pointer. It then fetches the eight bytes at the address pointed to and interprets them as a long integer. Since the method accepts null as the reference (and interprets it a zero) one can write a method that fetches a long integer seamlessly from heap and off-heap memory as follows:

public class HybridMemorySegment {

  private final byte[] heapMemory;  // non-null in heap case, null in off-heap case
  private final long address;       // may be absolute, or relative to byte[]


  // method of interest
  public long getLong(int pos) {
    return UNSAFE.getLong(heapMemory, address + pos);
  }


  // initialize for heap memory
  public HybridMemorySegment(byte[] heapMemory) {
    this.heapMemory = heapMemory;
    this.address = UNSAFE.arrayBaseOffset(byte[].class)
  }
  
  // initialize for off-heap memory
  public HybridMemorySegment(long offheapPointer) {
    this.heapMemory = null;
    this.address = offheapPointer
  }
}

To check whether both cases (heap and off-heap) really result in the same code paths (no hidden branches inside the Unsafe.getLong(Object, long) method) one can check out the C++ source code of sun.misc.Unsafe, available here: http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/tip/src/share/vm/prims/unsafe.cpp

Of particular interest is the macro in line 155, which is the base of all GET methods. Tracing the function calls (many are no-ops), one can see that both variants of Unsafe’s getLong() result in the same code: Either 0 + absolutePointer or objectRefAddress + offset.

Summary

We ended up choosing a combination of both techniques:

  • For off-heap memory, we use the HybridMemorySegment from approach (2) which can represent both heap and off-heap memory. That way, the same class represents the long-lived off-heap memory as the short-lived temporary buffers allocated (or wrapped) on the heap.

  • We follow approach (1) to use factories to make sure that one segment is ever only loaded, which gives peak performance. We can exploit the performance benefits of the HeapMemorySegment on individual byte operations, and we have a mechanism in place to add further implementations of MemorySegments for the case that Oracle really removes sun.misc.Unsafe in future Java versions.

The final code can be found in the Flink repository, under https://github.com/apache/flink/tree/master/flink-core/src/main/java/org/apache/flink/core/memory

Detailed micro benchmarks are in the appendix. A summary of the findings is as follows:

  • The HybridMemorySegment performs equally well in heap and off-heap memory, as is to be expected (the code paths are the same)

  • The HeapMemorySegment is quite a bit faster in reading individual bytes, not so much at writing them. Access to a byte[] is after all a bit cheaper than an invocation of a sun.misc.Unsafe method, even when JIT-ed.

  • The abstract class MemorySegment (with its subclasses HeapMemorySegment and HybridMemorySegment) performs as well as any specialized non-abstract class, as long as only one subclass is loaded. When both are loaded, performance may suffer by a factor of 2.7 x on certain operations.

  • How badly the performance degrades in cases where both MemorySegment subclasses are loaded seems to depend a lot on which subclass is loaded and operated on before and after which. Sometimes, performance is affected more than other times. It seems to be an artifact of the JIT’s code profiling and how heavily it performs optimistic specialization towards certain subclasses.

There is still a bit of mystery left, specifically why sometimes code is faster when it performs more checks (has more instructions and an additional branch). Even though the branch is perfectly predictable, this seems counter-intuitive. The only explanation that we could come up with is that the branch optimizations (such as optimistic elimination etc) result in code that does better register allocation (for whatever reason, maybe the intermediate instructions just fit the allocation algorithm better).

tl;dr

  • Off-heap memory in Flink complements the already very fast on-heap memory management. It improves the scalability to very large heap sizes and reduces memory copies for network and disk I/O.

  • Flink’s already present memory management infrastructure made the addition of off-heap memory simple. Off-heap memory is not only used for caching data, Flink can actually sort data off-heap and build hash tables off-heap.

  • We play a few nice tricks in the implementation to make sure the code is as friendly as possible to the JIT compiler and processor, to make the managed memory accesses are as fast as possible.

  • Understanding the JVM’s JIT compiler is tough - one needs a lot of (randomized) micro benchmarking to examine its behavior.


Appendix: Detailed Micro Benchmarks

These microbenchmarks test the performance of the different memory segment implementations on various operation.

Each experiments tests the different implementations multiple times in different orders, to balance the advantage/disadvantage of the JIT compiler specializing towards certain code paths. All experiments were run 5x, discarding the fastest and slowest run, and then averaged. This compensated for delay before the JIT kicks in.

My setup:

  • Oracle Java 8 (1.8.0_25)
  • 4 GBytes JVM heap (the experiments need 1.4 GBytes Heap + 1 GBytes direct memory)
  • Intel Core i7-4700MQ CPU, 2.40GHz (4 cores, 8 hardware contexts)

The tested implementations are

Type Description
HeapMemorySegment (exclusive) The case where it is the only loaded MemorySegment subclass.
HeapMemorySegment (mixed) The case where both the HeapMemorySegment and the HybridMemorySegment are loaded.
HybridMemorySegment (heap-exclusive) Backed by heap memory, and the case where it is the only loaded MemorySegment class.
HybridMemorySegment (heap-mixed) Backed by heap memory, and the case where both the HeapMemorySegment and the HybridMemorySegment are loaded.
HybridMemorySegment (off-heap-exclusive) Backed by off-heap memory, and the case where it is the only loaded MemorySegment class.
HybridMemorySegment (off-heap-mixed) Backed by heap off-memory, and the case where both the HeapMemorySegment and the HybridMemorySegment are loaded.
PureHeapSegment Has no class hierarchy and virtual methods at all.
PureHybridSegment (heap) Has no class hierarchy and virtual methods at all, backed by heap memory.
PureHybridSegment (off-heap) Has no class hierarchy and virtual methods at all, backed by off-heap memory.

Byte accesses

Writing 100000 x 32768 bytes to 32768 bytes segment

Segment Time
HeapMemorySegment, exclusive 1,441 msecs
HeapMemorySegment, mixed 3,841 msecs
HybridMemorySegment, heap, exclusive 1,626 msecs
HybridMemorySegment, off-heap, exclusive 1,628 msecs
HybridMemorySegment, heap, mixed 3,848 msecs
HybridMemorySegment, off-heap, mixed 3,847 msecs
PureHeapSegment 1,442 msecs
PureHybridSegment, heap 1,623 msecs
PureHybridSegment, off-heap 1,620 msecs

Reading 100000 x 32768 bytes from 32768 bytes segment

Segment Time
HeapMemorySegment, exclusive 1,326 msecs
HeapMemorySegment, mixed 1,378 msecs
HybridMemorySegment, heap, exclusive 2,029 msecs
HybridMemorySegment, off-heap, exclusive 2,030 msecs
HybridMemorySegment, heap, mixed 2,047 msecs
HybridMemorySegment, off-heap, mixed 2,049 msecs
PureHeapSegment 1,331 msecs
PureHybridSegment, heap 2,030 msecs
PureHybridSegment, off-heap 2,030 msecs

Writing 10 x 1073741824 bytes to 1073741824 bytes segment

Segment Time
HeapMemorySegment, exclusive 5,602 msecs
HeapMemorySegment, mixed 12,570 msecs
HybridMemorySegment, heap, exclusive 5,691 msecs
HybridMemorySegment, off-heap, exclusive 5,691 msecs
HybridMemorySegment, heap, mixed 12,566 msecs
HybridMemorySegment, off-heap, mixed 12,556 msecs
PureHeapSegment 5,599 msecs
PureHybridSegment, heap 5,687 msecs
PureHybridSegment, off-heap 5,681 msecs

Reading 10 x 1073741824 bytes from 1073741824 bytes segment

Segment Time
HeapMemorySegment, exclusive 4,243 msecs
HeapMemorySegment, mixed 4,265 msecs
HybridMemorySegment, heap, exclusive 6,730 msecs
HybridMemorySegment, off-heap, exclusive 6,725 msecs
HybridMemorySegment, heap, mixed 6,933 msecs
HybridMemorySegment, off-heap, mixed 6,926 msecs
PureHeapSegment 4,247 msecs
PureHybridSegment, heap 6,919 msecs
PureHybridSegment, off-heap 6,916 msecs

Byte Array accesses

Writing 100000 x 32 byte[1024] to 32768 bytes segment

Segment Time
HeapMemorySegment, mixed 164 msecs
HybridMemorySegment, heap, mixed 163 msecs
HybridMemorySegment, off-heap, mixed 163 msecs
PureHeapSegment 165 msecs
PureHybridSegment, heap 182 msecs
PureHybridSegment, off-heap 176 msecs

Reading 100000 x 32 byte[1024] from 32768 bytes segment

Segment Time
HeapMemorySegment, mixed 157 msecs
HybridMemorySegment, heap, mixed 155 msecs
HybridMemorySegment, off-heap, mixed 162 msecs
PureHeapSegment 161 msecs
PureHybridSegment, heap 175 msecs
PureHybridSegment, off-heap 179 msecs

Writing 10 x 1048576 byte[1024] to 1073741824 bytes segment

Segment Time
HeapMemorySegment, mixed 1,164 msecs
HybridMemorySegment, heap, mixed 1,173 msecs
HybridMemorySegment, off-heap, mixed 1,157 msecs
PureHeapSegment 1,169 msecs
PureHybridSegment, heap 1,174 msecs
PureHybridSegment, off-heap 1,166 msecs

Reading 10 x 1048576 byte[1024] from 1073741824 bytes segment

Segment Time
HeapMemorySegment, mixed 854 msecs
HybridMemorySegment, heap, mixed 853 msecs
HybridMemorySegment, off-heap, mixed 854 msecs
PureHeapSegment 857 msecs
PureHybridSegment, heap 896 msecs
PureHybridSegment, off-heap 887 msecs

Long integer accesses

(note that the heap and off-heap segments use the same or comparable code for this)

Writing 100000 x 4096 longs to 32768 bytes segment

Segment Time
HeapMemorySegment, mixed 221 msecs
HybridMemorySegment, heap, mixed 222 msecs
HybridMemorySegment, off-heap, mixed 221 msecs
PureHeapSegment 194 msecs
PureHybridSegment, heap 220 msecs
PureHybridSegment, off-heap 221 msecs

Reading 100000 x 4096 longs from 32768 bytes segment

Segment Time
HeapMemorySegment, mixed 233 msecs
HybridMemorySegment, heap, mixed 232 msecs
HybridMemorySegment, off-heap, mixed 231 msecs
PureHeapSegment 232 msecs
PureHybridSegment, heap 232 msecs
PureHybridSegment, off-heap 233 msecs

Writing 10 x 134217728 longs to 1073741824 bytes segment

Segment Time
HeapMemorySegment, mixed 1,120 msecs
HybridMemorySegment, heap, mixed 1,120 msecs
HybridMemorySegment, off-heap, mixed 1,115 msecs
PureHeapSegment 1,148 msecs
PureHybridSegment, heap 1,116 msecs
PureHybridSegment, off-heap 1,113 msecs

Reading 10 x 134217728 longs from 1073741824 bytes segment

Segment Time
HeapMemorySegment, mixed 1,097 msecs
HybridMemorySegment, heap, mixed 1,099 msecs
HybridMemorySegment, off-heap, mixed 1,093 msecs
PureHeapSegment 917 msecs
PureHybridSegment, heap 1,105 msecs
PureHybridSegment, off-heap 1,097 msecs

Integer accesses

(note that the heap and off-heap segments use the same or comparable code for this)

Writing 100000 x 8192 ints to 32768 bytes segment

Segment Time
HeapMemorySegment, mixed 578 msecs
HybridMemorySegment, heap, mixed 580 msecs
HybridMemorySegment, off-heap, mixed 576 msecs
PureHeapSegment 624 msecs
PureHybridSegment, heap 576 msecs
PureHybridSegment, off-heap 578 msecs

Reading 100000 x 8192 ints from 32768 bytes segment

Segment Time
HeapMemorySegment, mixed 464 msecs
HybridMemorySegment, heap, mixed 464 msecs
HybridMemorySegment, off-heap, mixed 465 msecs
PureHeapSegment 463 msecs
PureHybridSegment, heap 464 msecs
PureHybridSegment, off-heap 463 msecs

Writing 10 x 268435456 ints to 1073741824 bytes segment

Segment Time
HeapMemorySegment, mixed 2,187 msecs
HybridMemorySegment, heap, mixed 2,161 msecs
HybridMemorySegment, off-heap, mixed 2,152 msecs
PureHeapSegment 2,770 msecs
PureHybridSegment, heap 2,161 msecs
PureHybridSegment, off-heap 2,157 msecs

Reading 10 x 268435456 ints from 1073741824 bytes segment

Segment Time
HeapMemorySegment, mixed 1,782 msecs
HybridMemorySegment, heap, mixed 1,783 msecs
HybridMemorySegment, off-heap, mixed 1,774 msecs
PureHeapSegment 1,501 msecs
PureHybridSegment, heap 1,774 msecs
PureHybridSegment, off-heap 1,771 msecs