16 Sep 2015 by Stephan Ewen (@stephanewen)
Running data-intensive code in the JVM and making it well-behaved is tricky. Systems that put billions of data objects naively onto the JVM heap face unpredictable OutOfMemoryErrors and Garbage Collection stalls. Of course, you still want to to keep your data in memory as much as possible, for speed and responsiveness of the processing applications. In that context, “off-heap” has become almost something like a magic word to solve these problems.
In this blog post, we will look at how Flink exploits off-heap memory. The feature is part of the upcoming release, but you can try it out with the latest nightly builds. We will also give a few interesting insights into the behavior for Java’s JIT compiler for highly optimized methods and loops.
Recap: Memory Management in Flink
To understand Flink’s approach to off-heap memory, we need to recap Flink’s approach to custom managed memory. We have written an earlier blog post about how Flink manages JVM memory itself
As a summary, the core part is that Flink implements its algorithms not against Java objects, arrays, or lists, but actually against a data structure similar to java.nio.ByteBuffer
. Flink uses its own specialized version, called MemorySegment
on which algorithms put and get at specific positions ints, longs, byte arrays, etc, and compare and copy memory. The memory segments are held and distributed by a central component (called MemoryManager
) from which algorithms request segments according to their calculated memory budgets.
Don’t believe that this can be fast? Have a look at the benchmarks in the earlier blogpost, which show that it is actually often much faster than working on objects, due to better control over data layout (cache efficiency, data size), and reducing the pressure on Java’s Garbage Collector.
This form of memory management has been in Flink for a long time. Anecdotally, the first public demo of Flink’s predecessor project Stratosphere, at the VLDB conference in 2010, was running its programs with custom managed memory (although I believe few attendees were aware of that).
Why actually bother with off-heap memory?
Given that Flink has a sophisticated level of managing on-heap memory, why do we even bother with off-heap memory? It is true that “out of memory” has been much less of a problem for Flink because of its heap memory management techniques. Nonetheless, there are a few good reasons to offer the possibility to move Flink’s managed memory out of the JVM heap:
-
Very large JVMs (100s of GBytes heap memory) tend to be tricky. It takes long to start them (allocate and initialize heap) and garbage collection stalls can be huge (minutes). While newer incremental garbage collectors (like G1) mitigate this problem to some extend, an even better solution is to just make the heap much smaller and allocate Flink’s managed memory chunks outside the heap.
-
I/O and network efficiency: In many cases, we write MemorySegments to disk (spilling) or to the network (data transfer). Off-heap memory can be written/transferred with zero copies, while heap memory always incurs an additional memory copy.
-
Off-heap memory can actually be owned by other processes. That way, cached data survives process crashes (due to user code exceptions) and can be used for recovery. Flink does not exploit that, yet, but it is interesting future work.
The opposite question is also valid. Why should Flink ever not use off-heap memory?
-
On-heap is easier and interplays better with tools. Some container environments and monitoring tools get confused when the monitored heap size does not remotely reflect the amount of memory used by the process.
-
Short lived memory segments are cheaper on the heap. Flink sometimes needs to allocate some short lived buffers, which works cheaper on the heap than off-heap.
-
Some operations are actually a bit faster on heap memory (or the JIT compiler understands them better).
The off-heap Memory Implementation
Given that all memory intensive internal algorithms are already implemented against the MemorySegment
, our implementation to switch to off-heap memory is actually trivial. You can compare it to replacing all ByteBuffer.allocate(numBytes)
calls with ByteBuffer.allocateDirect(numBytes)
. In Flink’s case it meant that we made the MemorySegment
abstract and added the HeapMemorySegment
and OffHeapMemorySegment
subclasses. The OffHeapMemorySegment
takes the off-heap memory pointer from a java.nio.DirectByteBuffer
and implements its specialized access methods using sun.misc.Unsafe
. We also made a few adjustments to the startup scripts and the deployment code to make sure that the JVM is permitted enough off-heap memory (direct memory, -XX:MaxDirectMemorySize).
In practice we had to go one step further, to make the implementation perform well. While the ByteBuffer
is used in I/O code paths to compose headers and move bulk memory into place, the MemorySegment is part of the innermost loops of many algorithms (sorting, hash tables, …). That means that the access methods have to be as fast as possible.
Understanding the JIT and tuning the implementation
The MemorySegment
was (before our change) a standalone class, it was final (had no subclasses). Via Class Hierarchy Analysis (CHA), the JIT compiler was able to determine that all of the accessor method calls go to one specific implementation. That way, all method calls can be perfectly de-virtualized and inlined, which is essential to performance, and the basis for all further optimizations (like vectorization of the calling loop).
With two different memory segments loaded at the same time, the JIT compiler cannot perform the same level of optimization any more, which results in a noticeable difference in performance: A slowdown of about 2.7 x in the following example:
Writing 100000 x 32768 bytes to 32768 bytes segment:
HeapMemorySegment (standalone) : 1,441 msecs
OffHeapMemorySegment (standalone) : 1,628 msecs
HeapMemorySegment (subclass) : 3,841 msecs
OffHeapMemorySegment (subclass) : 3,847 msecs
To get back to the original performance, we explored two approaches:
Approach 1: Make sure that only one memory segment implementation is ever loaded.
We re-structured the code a bit to make sure that all places that produce long-lived and short-lived memory segments instantiate the same MemorySegment subclass (Heap- or Off-Heap segment). Using factories rather than directly instantiating the memory segment classes, this was straightforward.
Experiments (see appendix) showed that the JIT compiler properly detects this (via hierarchy analysis) and that it can perform the same level of aggressive optimization as before, when there was only one MemorySegment
class.
Approach 2: Write one segment that handles both heap and off-heap memory
We created a class HybridMemorySegment
which handles transparently both heap- and off-heap memory. It can be initialized either with a byte array (heap memory), or with a pointer to a memory region outside the heap (off-heap memory).
Fortunately, there is a nice trick to do this without introducing code branches and specialized handling of the two different memory types. The trick is based on the way that the sun.misc.Unsafe
methods interpret object references. To illustrate this, we take the method that gets a long integer from a memory position:
sun.misc.Unsafe.getLong(Object reference, long offset)
The method accepts an object reference, takes its memory address, and add the offset to obtain a pointer. It then fetches the eight bytes at the address pointed to and interprets them as a long integer. Since the method accepts null as the reference (and interprets it a zero) one can write a method that fetches a long integer seamlessly from heap and off-heap memory as follows:
public class HybridMemorySegment {
private final byte[] heapMemory; // non-null in heap case, null in off-heap case
private final long address; // may be absolute, or relative to byte[]
// method of interest
public long getLong(int pos) {
return UNSAFE.getLong(heapMemory, address + pos);
}
// initialize for heap memory
public HybridMemorySegment(byte[] heapMemory) {
this.heapMemory = heapMemory;
this.address = UNSAFE.arrayBaseOffset(byte[].class)
}
// initialize for off-heap memory
public HybridMemorySegment(long offheapPointer) {
this.heapMemory = null;
this.address = offheapPointer
}
}
To check whether both cases (heap and off-heap) really result in the same code paths (no hidden branches inside the Unsafe.getLong(Object, long)
method) one can check out the C++ source code of sun.misc.Unsafe
, available here: http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/tip/src/share/vm/prims/unsafe.cpp
Of particular interest is the macro in line 155, which is the base of all GET methods. Tracing the function calls (many are no-ops), one can see that both variants of Unsafe’s getLong()
result in the same code:
Either 0 + absolutePointer
or objectRefAddress + offset
.
Summary
We ended up choosing a combination of both techniques:
-
For off-heap memory, we use the
HybridMemorySegment
from approach (2) which can represent both heap and off-heap memory. That way, the same class represents the long-lived off-heap memory as the short-lived temporary buffers allocated (or wrapped) on the heap. -
We follow approach (1) to use factories to make sure that one segment is ever only loaded, which gives peak performance. We can exploit the performance benefits of the
HeapMemorySegment
on individual byte operations, and we have a mechanism in place to add further implementations ofMemorySegments
for the case that Oracle really removessun.misc.Unsafe
in future Java versions.
The final code can be found in the Flink repository, under https://github.com/apache/flink/tree/master/flink-core/src/main/java/org/apache/flink/core/memory
Detailed micro benchmarks are in the appendix. A summary of the findings is as follows:
-
The
HybridMemorySegment
performs equally well in heap and off-heap memory, as is to be expected (the code paths are the same) -
The
HeapMemorySegment
is quite a bit faster in reading individual bytes, not so much at writing them. Access to a byte[] is after all a bit cheaper than an invocation of asun.misc.Unsafe
method, even when JIT-ed. -
The abstract class
MemorySegment
(with its subclassesHeapMemorySegment
andHybridMemorySegment
) performs as well as any specialized non-abstract class, as long as only one subclass is loaded. When both are loaded, performance may suffer by a factor of 2.7 x on certain operations. -
How badly the performance degrades in cases where both MemorySegment subclasses are loaded seems to depend a lot on which subclass is loaded and operated on before and after which. Sometimes, performance is affected more than other times. It seems to be an artifact of the JIT’s code profiling and how heavily it performs optimistic specialization towards certain subclasses.
There is still a bit of mystery left, specifically why sometimes code is faster when it performs more checks (has more instructions and an additional branch). Even though the branch is perfectly predictable, this seems counter-intuitive. The only explanation that we could come up with is that the branch optimizations (such as optimistic elimination etc) result in code that does better register allocation (for whatever reason, maybe the intermediate instructions just fit the allocation algorithm better).
tl;dr
-
Off-heap memory in Flink complements the already very fast on-heap memory management. It improves the scalability to very large heap sizes and reduces memory copies for network and disk I/O.
-
Flink’s already present memory management infrastructure made the addition of off-heap memory simple. Off-heap memory is not only used for caching data, Flink can actually sort data off-heap and build hash tables off-heap.
-
We play a few nice tricks in the implementation to make sure the code is as friendly as possible to the JIT compiler and processor, to make the managed memory accesses are as fast as possible.
-
Understanding the JVM’s JIT compiler is tough - one needs a lot of (randomized) micro benchmarking to examine its behavior.
Appendix: Detailed Micro Benchmarks
These microbenchmarks test the performance of the different memory segment implementations on various operation.
Each experiments tests the different implementations multiple times in different orders, to balance the advantage/disadvantage of the JIT compiler specializing towards certain code paths. All experiments were run 5x, discarding the fastest and slowest run, and then averaged. This compensated for delay before the JIT kicks in.
My setup:
- Oracle Java 8 (1.8.0_25)
- 4 GBytes JVM heap (the experiments need 1.4 GBytes Heap + 1 GBytes direct memory)
- Intel Core i7-4700MQ CPU, 2.40GHz (4 cores, 8 hardware contexts)
The tested implementations are
Type | Description |
---|---|
HeapMemorySegment (exclusive) |
The case where it is the only loaded MemorySegment subclass. |
HeapMemorySegment (mixed) |
The case where both the HeapMemorySegment and the HybridMemorySegment are loaded. |
HybridMemorySegment (heap-exclusive) |
Backed by heap memory, and the case where it is the only loaded MemorySegment class. |
HybridMemorySegment (heap-mixed) |
Backed by heap memory, and the case where both the HeapMemorySegment and the HybridMemorySegment are loaded. |
HybridMemorySegment (off-heap-exclusive) |
Backed by off-heap memory, and the case where it is the only loaded MemorySegment class. |
HybridMemorySegment (off-heap-mixed) |
Backed by heap off-memory, and the case where both the HeapMemorySegment and the HybridMemorySegment are loaded. |
PureHeapSegment |
Has no class hierarchy and virtual methods at all. |
PureHybridSegment (heap) |
Has no class hierarchy and virtual methods at all, backed by heap memory. |
PureHybridSegment (off-heap) |
Has no class hierarchy and virtual methods at all, backed by off-heap memory. |
Byte accesses
Writing 100000 x 32768 bytes to 32768 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , exclusive |
1,441 msecs |
HeapMemorySegment , mixed |
3,841 msecs |
HybridMemorySegment , heap, exclusive |
1,626 msecs |
HybridMemorySegment , off-heap, exclusive |
1,628 msecs |
HybridMemorySegment , heap, mixed |
3,848 msecs |
HybridMemorySegment , off-heap, mixed |
3,847 msecs |
PureHeapSegment |
1,442 msecs |
PureHybridSegment , heap |
1,623 msecs |
PureHybridSegment , off-heap |
1,620 msecs |
Reading 100000 x 32768 bytes from 32768 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , exclusive |
1,326 msecs |
HeapMemorySegment , mixed |
1,378 msecs |
HybridMemorySegment , heap, exclusive |
2,029 msecs |
HybridMemorySegment , off-heap, exclusive |
2,030 msecs |
HybridMemorySegment , heap, mixed |
2,047 msecs |
HybridMemorySegment , off-heap, mixed |
2,049 msecs |
PureHeapSegment |
1,331 msecs |
PureHybridSegment , heap |
2,030 msecs |
PureHybridSegment , off-heap |
2,030 msecs |
Writing 10 x 1073741824 bytes to 1073741824 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , exclusive |
5,602 msecs |
HeapMemorySegment , mixed |
12,570 msecs |
HybridMemorySegment , heap, exclusive |
5,691 msecs |
HybridMemorySegment , off-heap, exclusive |
5,691 msecs |
HybridMemorySegment , heap, mixed |
12,566 msecs |
HybridMemorySegment , off-heap, mixed |
12,556 msecs |
PureHeapSegment |
5,599 msecs |
PureHybridSegment , heap |
5,687 msecs |
PureHybridSegment , off-heap |
5,681 msecs |
Reading 10 x 1073741824 bytes from 1073741824 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , exclusive |
4,243 msecs |
HeapMemorySegment , mixed |
4,265 msecs |
HybridMemorySegment , heap, exclusive |
6,730 msecs |
HybridMemorySegment , off-heap, exclusive |
6,725 msecs |
HybridMemorySegment , heap, mixed |
6,933 msecs |
HybridMemorySegment , off-heap, mixed |
6,926 msecs |
PureHeapSegment |
4,247 msecs |
PureHybridSegment , heap |
6,919 msecs |
PureHybridSegment , off-heap |
6,916 msecs |
Byte Array accesses
Writing 100000 x 32 byte[1024] to 32768 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , mixed |
164 msecs |
HybridMemorySegment , heap, mixed |
163 msecs |
HybridMemorySegment , off-heap, mixed |
163 msecs |
PureHeapSegment |
165 msecs |
PureHybridSegment , heap |
182 msecs |
PureHybridSegment , off-heap |
176 msecs |
Reading 100000 x 32 byte[1024] from 32768 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , mixed |
157 msecs |
HybridMemorySegment , heap, mixed |
155 msecs |
HybridMemorySegment , off-heap, mixed |
162 msecs |
PureHeapSegment |
161 msecs |
PureHybridSegment , heap |
175 msecs |
PureHybridSegment , off-heap |
179 msecs |
Writing 10 x 1048576 byte[1024] to 1073741824 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , mixed |
1,164 msecs |
HybridMemorySegment , heap, mixed |
1,173 msecs |
HybridMemorySegment , off-heap, mixed |
1,157 msecs |
PureHeapSegment |
1,169 msecs |
PureHybridSegment , heap |
1,174 msecs |
PureHybridSegment , off-heap |
1,166 msecs |
Reading 10 x 1048576 byte[1024] from 1073741824 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , mixed |
854 msecs |
HybridMemorySegment , heap, mixed |
853 msecs |
HybridMemorySegment , off-heap, mixed |
854 msecs |
PureHeapSegment |
857 msecs |
PureHybridSegment , heap |
896 msecs |
PureHybridSegment , off-heap |
887 msecs |
Long integer accesses
(note that the heap and off-heap segments use the same or comparable code for this)
Writing 100000 x 4096 longs to 32768 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , mixed |
221 msecs |
HybridMemorySegment , heap, mixed |
222 msecs |
HybridMemorySegment , off-heap, mixed |
221 msecs |
PureHeapSegment |
194 msecs |
PureHybridSegment , heap |
220 msecs |
PureHybridSegment , off-heap |
221 msecs |
Reading 100000 x 4096 longs from 32768 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , mixed |
233 msecs |
HybridMemorySegment , heap, mixed |
232 msecs |
HybridMemorySegment , off-heap, mixed |
231 msecs |
PureHeapSegment |
232 msecs |
PureHybridSegment , heap |
232 msecs |
PureHybridSegment , off-heap |
233 msecs |
Writing 10 x 134217728 longs to 1073741824 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , mixed |
1,120 msecs |
HybridMemorySegment , heap, mixed |
1,120 msecs |
HybridMemorySegment , off-heap, mixed |
1,115 msecs |
PureHeapSegment |
1,148 msecs |
PureHybridSegment , heap |
1,116 msecs |
PureHybridSegment , off-heap |
1,113 msecs |
Reading 10 x 134217728 longs from 1073741824 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , mixed |
1,097 msecs |
HybridMemorySegment , heap, mixed |
1,099 msecs |
HybridMemorySegment , off-heap, mixed |
1,093 msecs |
PureHeapSegment |
917 msecs |
PureHybridSegment , heap |
1,105 msecs |
PureHybridSegment , off-heap |
1,097 msecs |
Integer accesses
(note that the heap and off-heap segments use the same or comparable code for this)
Writing 100000 x 8192 ints to 32768 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , mixed |
578 msecs |
HybridMemorySegment , heap, mixed |
580 msecs |
HybridMemorySegment , off-heap, mixed |
576 msecs |
PureHeapSegment |
624 msecs |
PureHybridSegment , heap |
576 msecs |
PureHybridSegment , off-heap |
578 msecs |
Reading 100000 x 8192 ints from 32768 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , mixed |
464 msecs |
HybridMemorySegment , heap, mixed |
464 msecs |
HybridMemorySegment , off-heap, mixed |
465 msecs |
PureHeapSegment |
463 msecs |
PureHybridSegment , heap |
464 msecs |
PureHybridSegment , off-heap |
463 msecs |
Writing 10 x 268435456 ints to 1073741824 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , mixed |
2,187 msecs |
HybridMemorySegment , heap, mixed |
2,161 msecs |
HybridMemorySegment , off-heap, mixed |
2,152 msecs |
PureHeapSegment |
2,770 msecs |
PureHybridSegment , heap |
2,161 msecs |
PureHybridSegment , off-heap |
2,157 msecs |
Reading 10 x 268435456 ints from 1073741824 bytes segment
Segment | Time |
---|---|
HeapMemorySegment , mixed |
1,782 msecs |
HybridMemorySegment , heap, mixed |
1,783 msecs |
HybridMemorySegment , off-heap, mixed |
1,774 msecs |
PureHeapSegment |
1,501 msecs |
PureHybridSegment , heap |
1,774 msecs |
PureHybridSegment , off-heap |
1,771 msecs |