Tuesday, 12 May 2015

Caching enumerations: the internals


In the previous post I wrote about the correct implementation for caching enumerables over large sets that involve a large number of compositions.
In this article I will give implementation details.

Two ways of being lazy

Lazy evaluation is in the bone and marrow of the EnumJ library. Caching enumeration is no exception: the internal caching buffer grows lazily, as the consuming enumerators require.
EnumJ has two classes that help lazy evaluation:

Lazy<T>

Lazy<T> is an implementation of LazyInitializer<T> that takes a supplier and overrides initialize() to call the supplier when initializing the object. The value is initialized only once even under multi-threading and it is fast once initialization is carried out.
In addition, Lazy<T> ensures that the supplier gets released upon initialization. The code is quite simple:
package enumj;
import java.util.function.Supplier;
import org.apache.commons.lang3.concurrent.LazyInitializer;

class Lazy<T> extends LazyInitializer<T> {


   
private Supplier<T> supplier;


   
public Lazy(Supplier<T> supplier) {

       
this.supplier = supplier;

   
}


   
@Override

   
protected T initialize() {

       
final T result = supplier.get();

       
supplier = null;

       
return result;

   
} }

LazySupplier<T>

LazySupplier<T> is an implementation of Supplier<T> which yields a value calculated lazily (therefore it encapsulates a Lazy<T>). Unlike Lazy<T> or LazyInitializer<T>, it allows the value to be refreshed by replacing the supplier (along with the internal Lazy<T> altogether).
The code is equally simple:
package enumj;
import java.util.function.Supplier; import org.apache.commons.lang3.concurrent.ConcurrentException;

class LazySupplier<T> implements Supplier<T> {


   
volatile Lazy<T> lazy;


   
LazySupplier(Supplier<T> supplier) {

       
refresh(supplier);

   
}


   
void refresh(Supplier<T> supplier) {

       
lazy = new Lazy(supplier);

   
}


   
void clear() {

       
lazy = null;

   
}


   
@Override

   
public T get() {

       
try {

           
Lazy<T> o = lazy;

           
return o.get();

       
} catch(ConcurrentException ex) {

           
throw new UnsupportedOperationException(ex);

       
}

   
} }

A buffer that grows lazily

Once we have the basic mechanism for laziness, let’s see how we can have a caching buffer that grows lazily.
The best solution that I found is the age-old linked list but with a trick: each node knows how to generate its successor on an on-needed basis. This technique of lists growing indefinitely at the tail is common in Prolog and Lisp1) so it will be imported here for lazy caching.
Each element of the linked list must store the cached element so it is aptly named CachedElementWrapper<E>. It has a field elem of type E and a field next of type … oops, that is the problem: it cannot be CachedElementWrapper<E>, because this would remove the possibility of the node to generate its successor lazily.
As the successor needs to be generated lazily, a more suitable type would be Lazy<CachedElementWrapper<E>> and because the successor may not exist at all (if we reached the end of enumeration), then it must be of type Lazy<Optional<CachedElementWrapper<E>>>:

   
private final E elem;
   
private final Lazy<Optional<CachedElementWrapper<E>>> next;
Once we defined the fields, let’s define the constructor:
  • it needs the element to store, of type E
  • it needs a supplier for the next element that contains (internally) an enumerator. A Supplier<Nullable<E>>2) does the job
  • it needs the limit of the caching buffer. We need this to notify the user that the limit has been reached and to cancel caching (I explained the reasons in my previous post)
  • it needs the ordinal of the element in the linked list, which equals the number of elements in the list when the current node is the last. This is necessary to know how long the linked list is without traversing it
  • it needs a function to disable enumeration when the consumer attempts to cache elements beyond the limit
The constructor of CachedElementWrapper<E> has a definition like this:

   
CachedElementWrapper(E elem,
                        
Supplier<Nullable<E>> nextSupplier,

                        
long limit,

                        
long ordinal,

                        
Runnable disableProc) { … }
In addition, CachedElementWrapper<E> needs two more methods, one to retrieve the cached element and the other one to return the lazily generated successor:

   
public E getElement() {
       
return elem;

   
}


   
public Optional<CachedElementWrapper<E>> getNextWrapper() {

       
try {

           
return next.get();

       
} catch(ConcurrentException ex) {

           
throw new UnsupportedOperationException(ex);

       
}

   
}
The key is how to define the supplier to pass on to the next field. That supplier must:
  1. retrieve the next element to cache, that will go into the successor node of type CachedElementWrapper<E>. Obs: this next element may be Nullable.empty()!
  2. if the next element is Nullable.empty() then set the result to Optional.empty()
  3. if the next element is present then set the result to an Optional of the successor node of type CachedElementWrapper<E>. That new node must be initialized with the next element value, same nextSupplier, same limit, ordinal+1 as ordinal and same disabling procedure
  4. if the number of elements in the list equals limit, then call the disabling procedure disableProc
  5. return the result
The full code of the constructor is as follows:

   
CachedElementWrapper(E elem,
                        
Supplier<Nullable<E>> nextSupplier,

                        
long limit,

                        
long ordinal,

                        
Runnable disableProc) {

       
this.elem = elem;

       
this.next = new Lazy(() -> {

           
final long count = ordinal;

           
final Nullable<E> e = nextSupplier.get();

           
final Optional<CachedElementWrapper<E>> result = e.isPresent()

                   
? Optional.of(new CachedElementWrapper(e.get(),

                                                          
nextSupplier,

                                                          
limit,

                                                          
ordinal+1,

                                                          
disableProc))

                   
: Optional.empty();

           
if (count == limit && result.isPresent()) {

               
disableProc.run();

           
}

           
return result;

       
});

   
}
The beauty of this design is that all the references to nextSupplier, disableProc and the internal objects therein get lost upon initialisation so that all it remains is a simple node with the cached element and a successor plus a very small footprint.
In other words, the enumeration cache is, in terms of memory footprint, almost as efficient as a plain vanilla linked list. This magic is carried out by the Lazy<T> class which knows how to drop the reference to its supplier once its internal value gets initialized.
For the full code of the class, see Appendix at the end of the article.

An Enumerable<T> with a lazy buffer

Once the caching buffer that grows lazily exists, the CachedEnumerable<E> class just needs to initialise the buffer properly and pass it on to the enumerator it spawns.
CachedEnumerable<E> needs the following:
  • a source field named source, of type Enumerable<E>. This represents the data source whose elements we want to cache
  • a generator field named enumerator, of type LazySupplier<Enumerator<E>>. This field provides the one generator of elements that get cached. It is lazily computed as source.enumerator()
  • a field named cache holding the head of the linked list with the cached elements. As the list is retrieved lazily and it may be empty, its type is LazySupplier<Optional<CachedElementWrapper<E>>>
  • a field named limit of type long, storing the maximum size of the cache (Long.MAX_VALUE means no limit)
  • a field named disabled of type AtomicBoolean, telling whether caching is disabled or not. It is essential that disabled is AtomicBoolean and not volatile boolean for reasons that I will expose below
Here are the field declarations:
    
   
private final Enumerable<E> source; 
   
private final LazySupplier<Enumerator<E>> enumerator;
 
   
private final LazySupplier<Optional<CachedElementWrapper<E>>> cache;
 

   
private volatile long limit;
 
   
private volatile AtomicBoolean disabled
CachedEnumerable<E> requires the following methods:
  • internalEnumerator() which yields a new Enumerator<E>. This protected method does the work of AbstractEnumerator.enumerator()
  • getCacheSupplier() which returns the supplier for cache
  • disable() which disables caching altogether
  • enable() which enables caching
  • reset() which refreshes the internal lazy evaluators while maintaining the cache size
  • resize() which refreshes the internal lazy evaluators while growing the cache size
All these methods call upon resize(long, boolean, boolean):

   
public void disable() {
       
resize(0, true, true);

   
}

   
public void enable() {

       
reset();

   
}


   
public long reset() {

       
return resize(0, true, false);

   
}

   
public long resize(long newLimit) {

       
return resize(newLimit, false, false);

   
}
Finally, internalEnumerator() quickly bypasses caching if disabled is true:

   
@Override
   
protected Enumerator<E> internalEnumerator() {

       
final AtomicBoolean dis = disabled;

       
if (dis.get()) {

           
return source.enumerator();

       
}

       
synchronized(cache) {

           
final AtomicBoolean dis1 = disabled;

           
return dis1.get()

                   
? source.enumerator()

                   
: new CacheEnumerator(cache.get());

       
}

   
}
The internal method resize(long, boolean, boolean) refreshes everything:

   
private long resize(long newLimit, boolean resetting, boolean disable) {
       
synchronized(cache) {

           
final long result = limit;

           
if (resetting) {

               
newLimit = result;

           
} else {

               
Utils.ensureLessThan(result,

                                    
newLimit,

                                    
Messages.ILLEGAL_ENUMERATOR_STATE);

           
}


           
disabled = new AtomicBoolean(true);

           
try {

               
limit = newLimit;


               
enumerator.refresh(() -> source.enumerator());

               
cache.refresh(getCacheSupplier());


               
return result;

           
} finally {

               
disabled.set(disable);

           
}

       
}

   
}
Once all the elements are in place, we can face the challenging task of initializing the cached enumerable.

The constructor

The biggest challenge consists in defining the supplier for cache:

   
private Supplier<Optional<CachedElementWrapper<E>>> getCacheSupplier() {
       
return () -> {

           
final Enumerator<E> en = this.enumerator.get();

           
final long lim = this.limit;

           
final AtomicBoolean dis = this.disabled;

           
final Runnable clbk = this.callback;


           
if (!en.hasNext()) {

               
return Optional.empty();

           
}

           
final E e = en.next();

           
return Optional.of(

                   
new CachedElementWrapper(

                           
e,

                           
() -> en.hasNext()

                                   
? Nullable.of(en.next())

                                   
: Nullable.empty(),

                           
lim,

                           
1,

                           
() -> {

                               
dis.set(true);

                               
try {

                                   
clbk.run();

                               
} catch (Exception ex) {

                                   
// do nothing                                 };

                           
}));

       
};

   
}

As one can easily see, the function returned by getCacheSupplier fetches the field values locally and then returns a CachedElementWrapper<E> initialized with functions using the local copies instead of the CachedEnumerable<E> fields.

This means that CachedElementWrapper<E> has everything it needs to grow the buffer lazily: the enumerable can change behind but enumeration may still carry on. This means that enumeration in presence of caching is decoupled from the cached enumerable and it has a life of its own. The cached enumerable can spawn other enumerators, it can be reset, resized or disabled at will but the enumeration can still continue unhindered. This ensures maximum of parallelism without making enumeration a thread-blocking operation.
Once the supplier for cache is defined, the constructor is easy to implement:


   
CachedEnumerable(Enumerable<E> source,
                    
long limit,

                    
Runnable onLimitCallback) {

       
Utils.ensureNotNull(source, Messages.NULL_ENUMERATOR_SOURCE);

       
Utils.ensureLessThan(0, limit, Messages.ILLEGAL_ENUMERATOR_STATE);

       
Utils.ensureNotNull(onLimitCallback, Messages.NULL_ENUMERATOR_HANDLER);


       
this.source = source;

       
this.enumerator = new LazySupplier(() -> this.source.enumerator());

       
this.callback = onLimitCallback;

       
this.limit = limit;

       
this.disabled = new AtomicBoolean(false);

       
this.cache = new LazySupplier(getCacheSupplier());

   
}
The last thing to observe is the usage of .callback. This callback allows the user to take some action when the cache attempts to go over its limit. The user can do nothing on that callback or it can resize the cache for enlarged caching. This is not a problem because the methods of CachedElementWrapper<E> are fully re-entrant.
For the full code of the class, see Appendix below.

The last, but not the least: CacheEnumerator<E>

We need one more thing to complete the picture: an enumerator that knows how to yield the enumerated elements while buffering them. This enumerator, aptly named CacheEnumerator<E>, is internal to EnumJ and it has a deceptively simple implementation. This simplicity is possible thanks to CachedElementWrapper<E> and CachedEnumerable<E> that make the internal caching buffer work efficiently, seamlessly and concurrently:
package enumj;
import java.util.Optional;

final class CacheEnumerator<E> extends AbstractEnumerator<E> {


   
Optional<CachedElementWrapper<E>> cached;

    
   
CacheEnumerator(Optional<CachedElementWrapper<E>> cached) {

       
this.cached = cached;

   
}


   
@Override

   
protected boolean internalHasNext() {

       
return cached.isPresent();

   
}


   
@Override

   
protected E internalNext() {

       
final E result = cached.get().getElement();

       
cached = cached.get().getNextWrapper();

       
return result;

   
}


   
@Override

   
protected void cleanup() {

       
cached = null;

   
} }

What if?

Before publishing the full source code I would like to outline a few “what if?” scenarios.

What if the source has fewer elements than the cache limit?

Then CachedElementWrapper<E> never disables the enumerable and all the elements fit in the cache. The buffer gets built on the first traversal, all subsequent traversals pick the elements from the cache instead of the source.

What if the source has exactly the same number of elements as cache limit?

Same as when the number of elements is smaller than the cache limit.

What if the source has more elements than the cache limit?

Then CachedElementWrapper<E> disables the enumerable when retrieving the (limit+1)th element. This will set CachedEnumerable<E>.disabled to true and call the user callback passed on to the CachedEnumerable<E> constructor.
It is important to notice that the caching buffer continues to grow beyond the limit even though any new spawned enumerators will ignore caching. The user can call CachedEnumerable<E>.disable(), .enable(), .reset() or .resize() on the callback to detach the cache and free the memory.

What if more enumerations are progressing when the cache limit is overpassed?

Then they all continue enumerating over the same caching buffer. If the caching buffer gets detached from the enumerable, then the elements behind them become eligible for garbage collection. The elements in front of them get created lazily as usual.

What if the cached buffer gets traversed concurrently?

The lazy retrieval of the next node in the list is thread-safe. So, the first traversal that hits the end of the list triggers the creation of the next node. The next traversal will simply navigate towards the next node without creating anything.

What if the enumerable gets reset by the user while an enumeration is under way?

Then the caching buffer gets detached from the enumerable but the enumeration continues as before because it uses copies of the enumerable’s fields so the refreshment doesn’t affect what is being enumerated. The only difference is that any caching node which is not referred to by a preceding node or by an enumerator becomes eligible for garbage collection.

What if a new enumeration gets triggered at the same time as a reset?

Then one of them will be first and the other one second because they are mutually exclusive operations. If the reset is last, the enumerator will work on a detached caching buffer. If spawning the enumerator is last, it will work on an empty cache that needs to be filled.

What if the enumeration overpasses the limit at the same time as a reset?

Overpassing the limit causes a disabling of caching. Within CachedElementWrapper<E>, this disabling takes place by acting upon a copy of CachedEnumerable<E>.disabled.
If the reset happens before and the .disabled field gets replaced, the old copy remains but the enumerator will work upon a detached buffer. If the reset happens after, then it refreshes all the fields anyhow, so the enumerator will work upon a detached buffer.

What if the source has too many elements?

The caching limit does not guarantee that the caching buffer will stay within limit. It only guarantees that:
  • caching is disabled when the limit is overpassed
  • the user gets notified via the callback
It is the responsibility of the user to reset or resize the cache if he sees fit. A reset will detach the old cache buffer making it eligible for garbage collection if not enumerated.
The only possibility of memory overflow after detachment is when the detached caching buffer is referred to by two enumerators: one pointing at the beginning of the buffer and another one at the end and the distance between the two growing without limit. This will flood the memory because all the elements in between will be kept.
However, as enumerators are designed to advance not to lag behind, this is a rather unlikely scenario.

What if .disabled were not an AtomicBoolean but a volatile boolean instead?

Then the only way to change it when overpassing the caching limit would be to pass on a reference to the enumerable to each CachedElementWrapper<E> instance. That’s a bad idea for two reasons:
  • keeping a back reference is simply not good from the perspective of garbage collection: any detached buffer still being enumerated would hold the CachedEnumerable<E> object alive. While this can be alleviated by using a weak reference, it leads to a undesirable intrusion of the enumerable’s lifetime into the life of the CachedElementWrapper<E> instance.

    In other words, the enumerable would become reciprocally tied to each cache it ever had, detached or not. Not good!
  • allowing a detached buffer to influence the behaviour of the original enumerable is really bad. Once a caching buffer is detached, it is detached – it should not purposely influence the original enumerable. That’s why disabling takes place upon a copy of .disabled – which may, or may not be the one held by the enumerable when the limit gets hit.

Conclusion and beyond

I wrote this code over the weekend so most likely it will undergo some changes before the final version of EnumJ will go public. However, it is (hopefully) my last addition to the library before I embark into full testing, documentation and release of EnumJ 1.0 to Maven Central.
The next post will be (I hope) not about extra features but about how I will be back to 100% code coverage – a target which I left behind and I need to conquer again.

Appendix A: CachedEnumerableWrapper<E>

package enumj;
import java.util.Optional; import java.util.function.Supplier; import org.apache.commons.lang3.concurrent.ConcurrentException;

final class CachedElementWrapper<E> {


   
private final E elem;

   
private final Lazy<Optional<CachedElementWrapper<E>>> next;


   
CachedElementWrapper(E elem,

                        
Supplier<Nullable<E>> nextSupplier,

                        
long limit,

                        
long ordinal,

                        
Runnable disableProc) {

       
this.elem = elem;

       
this.next = new Lazy(() -> {

           
final long count = ordinal;

           
final Nullable<E> e = nextSupplier.get();

           
final Optional<CachedElementWrapper<E>> result = e.isPresent()

                   
? Optional.of(new CachedElementWrapper(e.get(),

                                                          
nextSupplier,

                                                          
limit,

                                                          
ordinal+1,

                                                          
disableProc))

                   
: Optional.empty();

           
if (count == limit && result.isPresent()) {

               
disableProc.run();

           
}

           
return result;

       
});

   
}


   
public E getElement() {

       
return elem;

   
}


   
public Optional<CachedElementWrapper<E>> getNextWrapper() {

       
try {

           
return next.get();

       
} catch(ConcurrentException ex) {

           
throw new UnsupportedOperationException(ex);

       
}

   
} }

Appendix B: CachedEnumerable<E>

package enumj;
import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean;

public final class CachedEnumerable<E> extends AbstractEnumerable<E> {


   
private final Enumerable<E> source;

   
private final LazySupplier<Enumerator<E>> enumerator;

   
private final LazySupplier<Optional<CachedElementWrapper<E>>> cache;


   
private volatile long limit;

   
private volatile AtomicBoolean disabled;


   
CachedEnumerable(Enumerable<E> source) {

       
this(source, Long.MAX_VALUE);

   
}

   
CachedEnumerable(Enumerable<E> source, long limit) {

       
this(source, limit, () -> {});

   
}

   
CachedEnumerable(Enumerable<E> source,

                    
long limit,

                    
Runnable onLimit) {

       
Utils.ensureNotNull(source, Messages.NULL_ENUMERATOR_SOURCE);

       
Utils.ensureLessThan(0, limit, Messages.ILLEGAL_ENUMERATOR_STATE);


       
this.source = source;

       
this.enumerator = new LazySupplier(() -> this.source.enumerator());

       
this.limit = limit;

       
this.disabled = new AtomicBoolean(false);

       
this.cache = new LazySupplier(() -> {

           
final Enumerator<E> en = this.enumerator.get();

           
final long lim = this.limit;

           
final AtomicBoolean dis = this.disabled;


           
if (!en.hasNext()) {

               
return Optional.empty();

           
}

           
final E e = en.next();

           
return Optional.of(

                   
new CachedElementWrapper(

                           
e,

                           
() -> en.hasNext()

                                   
? Nullable.of(en.next())

                                   
: Nullable.empty(),

                           
lim,

                           
1,

                           
() -> {

                               
dis.set(true);

                               
try {

                                   
onLimit.run();

                               
} catch (Exception ex) {

                                   
// do nothing                                 };

                           
}));

       
});

   
}


   
public void disable() {

       
resize(0, true, true);

   
}

   
public void enable() {

       
reset();

   
}


   
public long reset() {

       
return resize(0, true, false);

   
}

   
public long resize(long newLimit) {

       
return resize(newLimit, false, false);

   
}


   
@Override

   
protected Enumerator<E> internalEnumerator() {

       
synchronized(cache) {

           
AtomicBoolean dis = disabled;

           
return dis.get()

                   
? source.enumerator()

                   
: new CacheEnumerator(cache.get());

       
}

   
}


   
private long resize(long newLimit, boolean resetting, boolean disable) {

       
synchronized(cache) {

           
final long result = limit;

           
if (resetting) {

               
newLimit = result;

           
} else {

               
Utils.ensureLessThan(result,

                                    
newLimit,

                                    
Messages.ILLEGAL_ENUMERATOR_STATE);

           
}


           
disabled = new AtomicBoolean(true);

           
try {

               
limit = newLimit;


               
enumerator.refresh(() -> source.enumerator());

               
cache.refresh();


               
return result;

           
} finally {

               
disabled.set(disable);

           
}

       
}

   
} }

1) The difference being that the growth is carried out by the runtime, not by the node.
2) Nullable<T> is internal to EnumJ. It works exactly like Optional<T> but it can store null as a valid value. Nullable.empty() means the absence of any value, even null.

No comments:

Post a Comment