Design Pattern: fast concurrency with detached state
The basic equation of multi-threading can be summarised as follows:
In this article I’ll show a design pattern where the equation changes into:
In other words, the property values are correlated and any object of type Thing must be consistent. The “canonical” way to maintain consistency in presence of concurrency is by enclosing state changes in synchronized blocks:
class Thing {
private double mass;
private double volume;
private double density;
public double getMass() { synchronized(this) { return mass; } }
public void setMass(double newMass) {
synchronized(this) {
mass = newMass;
density = volume > 0 ? mass / volume : 0;
}
}
public double getVolume() { synchronized(this) { return volume; } }
public void setVolume(double newVolume) {
synchronized(this) {
volume = newVolume;
density = volume > 0 ? mass / volume : 0;
}
}
public double getDensity() { synchronized(this) { return density; } } }
My experience has taught me that using asymmetric locks makes concurrent programming harder, not easier. Sometimes it’s useful to pay the price for gain in speed but other times we risk to introduce even more subtle and hard-tracking bugs than when using full locks.
Concurrency + Mutability = Performance Penalty + Error Pronenesswhich is a direct result of context switches and the explicit management of state consistency in the presence of concurrency.
In this article I’ll show a design pattern where the equation changes into:
Concurrency + Mutability = Efficiency + SimplicityDisclaimer: this pattern almost surely exists somewhere else. Like many good things in life, it can bear the burden of many parents. I mention it here because it is related to my previous article.
Example
Let’s consider a class named Thing with properties volume and mass and another property named density derived from the previous two. Because it’s very hard to calculate density from mass and volume (tongue-in-cheek), its value needs to be cached so that we don’t repeat the complex calculation density = mass / volume every single time.In other words, the property values are correlated and any object of type Thing must be consistent. The “canonical” way to maintain consistency in presence of concurrency is by enclosing state changes in synchronized blocks:
class Thing {
private double mass;
private double volume;
private double density;
public double getMass() { synchronized(this) { return mass; } }
public void setMass(double newMass) {
synchronized(this) {
mass = newMass;
density = volume > 0 ? mass / volume : 0;
}
}
public double getVolume() { synchronized(this) { return volume; } }
public void setVolume(double newVolume) {
synchronized(this) {
volume = newVolume;
density = volume > 0 ? mass / volume : 0;
}
}
public double getDensity() { synchronized(this) { return density; } } }
The problem
Every synchronized declaration involves a performance penalty so it should be avoided wherever possible. In addition, restoring the consistency is also error prone – which makes concurrency-related bugs notoriously difficult to spot and/or fix.The solution
If the trouble comes from synchronized + keeping consistency under change, the solution is simply:- get rid of synchronized
- get rid of state changes
The state
Keeping the state into an external immutable object is trivial:class ThingState {
private final double mass;
private final double volume;
private final double density;
public ThingState(double mass, double volume) {
this.mass = mass;
this.volume = volume;
this.density = volume > 0 ? mass / volume : 0;
}
public double getMass() { return mass; }
public ThingState setMass(double newMass) {
return new Thing(newMass, volume);
}
public double getVolume() { return volume; }
public ThingState setVolume(double newVolume) {
return new Thing(mass, newVolume);
}
public double getDensity() { return density; } }
As one can easily see, the setters do not change properties, they create and return a new object and the original object stays immutable. private final double mass;
private final double volume;
private final double density;
public ThingState(double mass, double volume) {
this.mass = mass;
this.volume = volume;
this.density = volume > 0 ? mass / volume : 0;
}
public double getMass() { return mass; }
public ThingState setMass(double newMass) {
return new Thing(newMass, volume);
}
public double getVolume() { return volume; }
public ThingState setVolume(double newVolume) {
return new Thing(mass, newVolume);
}
public double getDensity() { return density; } }
The atomic replacement
The class Thing can how get rid of its fields and have only one volatile field aptly named state:
class Thing {
private volatile ThingState state;
public ThingState state() { return state; }
public ThingState setMass(double newMass) {
final ThingState s = state().setMass(newMass);
state = s;
return s;
}
public ThingState setVolume(double newVolume) {
final ThingState s = state().setVolume(newVolume);
state = s;
return s;
} }
private volatile ThingState state;
public ThingState state() { return state; }
public ThingState setMass(double newMass) {
final ThingState s = state().setMass(newMass);
state = s;
return s;
}
public ThingState setVolume(double newVolume) {
final ThingState s = state().setVolume(newVolume);
state = s;
return s;
} }
There are a few things to observe:
- class Thing has no getters except a method named state() which returns the consistent state of the object
- each setter returns the state it has just set which may not be the same as the state the object will end up with
- there is no critical section or thread context switch. Atomic replacement of state is ensured by volatile and since each state object is consistent, our class is guaranteed to transit from consistent to consistent state. Important: state order in a chain of transitions is not guaranteed, but order of execution is never guaranteed in true parallelism
The glitch
If detached state is so great, should we change all our concurrent classes according to this pattern? Not really, because there are also caveats:- each state change, no matter how small, creates a new object. This may be more expensive than using a critical section
- creating a new object may be a thread-blocking operation, too (depending on how the memory allocator works)
When to use?
This pattern can be useful in the following conditions:- the number of state changes is much smaller than state reads. This is not always the case
- immutability makes keeping state consistency much easier and safer. This is almost always the case
Why not using asymmetric locks?
Asymmetric locks allow multiple reads while keeping exclusion around writes. So, they address case 1. from above rather well. Yet, they do not cater for case 2. since the programmer still has to manage mutable state under concurrency.My experience has taught me that using asymmetric locks makes concurrent programming harder, not easier. Sometimes it’s useful to pay the price for gain in speed but other times we risk to introduce even more subtle and hard-tracking bugs than when using full locks.
With detached state we don’t have to make this compromise: state consistency and state update are clearly separated and the liaison between the two is a simple volatile variable, easy to understand. Code maintenance is easier, too: the programmer needs to update just the state class which is plain-vanilla single-threaded programming. This is quite different from usual concurrent programming when adding a field sometimes requires to half-redesign the whole class – with all the great potential to introduce new bugs.
Why all the fuss?
The CachedEnumerable<E> class shown in my previous article becomes much more concise and safe with the help of a detached state class:package enumj;
import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean;
public final class CachedEnumerableState<E> {
public final Enumerable<E> source;
public final long limit;
public final Runnable callback;
private final AtomicBoolean disabled;
private final Lazy<Enumerator<E>> enumerator;
private final Lazy<Optional<CachedElementWrapper<E>>> cache;
public CachedEnumerableState(Enumerable<E> source,
long limit,
Runnable callback) {
this(source, limit, callback, false);
}
private CachedEnumerableState(Enumerable<E> source,
long limit,
Runnable callback,
boolean disabled) {
this.source = source;
this.limit = limit;
this.callback = callback;
this.disabled = new AtomicBoolean(disabled);
this.enumerator = new Lazy(() -> this.source.enumerator());
this.cache = new Lazy(() ->
{
final Enumerator<E> en = this.enumerator.get();
final long lim = this.limit;
final AtomicBoolean dis = this.disabled;
final Runnable clbk = this.callback;
if (!en.hasNext()) {
return Optional.empty();
}
final E e = en.next();
return Optional.of(new CachedElementWrapper(
e,
() -> en.hasNext()
? Nullable.of(en.next())
: Nullable.empty(),
lim,
1,
() -> {
dis.set(true);
try {
clbk.run();
} catch(Exception ex) {
// do nothing }
}));
});
}
public boolean isDisabled() {
return disabled.get();
}
public CachedEnumerableState<E> enable() {
return new CachedEnumerableState(
source,
limit,
callback,
false);
}
public CachedEnumerableState<E> disable() {
return new CachedEnumerableState(
source,
limit,
callback,
true);
}
public CachedEnumerableState<E> reset() {
return new CachedEnumerableState(source, limit, callback);
}
public CachedEnumerableState<E> resize(long newLimit) {
Utils.ensureLessThan(limit,
newLimit,
Messages.ILLEGAL_ENUMERATOR_STATE);
return new CachedEnumerableState(source, newLimit, callback);
}
public Enumerator<E> enumerator() {
return disabled.get()
? source.enumerator()
: new CacheEnumerator(cache.get());
} }
import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean;
public final class CachedEnumerableState<E> {
public final Enumerable<E> source;
public final long limit;
public final Runnable callback;
private final AtomicBoolean disabled;
private final Lazy<Enumerator<E>> enumerator;
private final Lazy<Optional<CachedElementWrapper<E>>> cache;
public CachedEnumerableState(Enumerable<E> source,
long limit,
Runnable callback) {
this(source, limit, callback, false);
}
private CachedEnumerableState(Enumerable<E> source,
long limit,
Runnable callback,
boolean disabled) {
this.source = source;
this.limit = limit;
this.callback = callback;
this.disabled = new AtomicBoolean(disabled);
this.enumerator = new Lazy(() -> this.source.enumerator());
this.cache = new Lazy(() ->
{
final Enumerator<E> en = this.enumerator.get();
final long lim = this.limit;
final AtomicBoolean dis = this.disabled;
final Runnable clbk = this.callback;
if (!en.hasNext()) {
return Optional.empty();
}
final E e = en.next();
return Optional.of(new CachedElementWrapper(
e,
() -> en.hasNext()
? Nullable.of(en.next())
: Nullable.empty(),
lim,
1,
() -> {
dis.set(true);
try {
clbk.run();
} catch(Exception ex) {
// do nothing }
}));
});
}
public boolean isDisabled() {
return disabled.get();
}
public CachedEnumerableState<E> enable() {
return new CachedEnumerableState(
source,
limit,
callback,
false);
}
public CachedEnumerableState<E> disable() {
return new CachedEnumerableState(
source,
limit,
callback,
true);
}
public CachedEnumerableState<E> reset() {
return new CachedEnumerableState(source, limit, callback);
}
public CachedEnumerableState<E> resize(long newLimit) {
Utils.ensureLessThan(limit,
newLimit,
Messages.ILLEGAL_ENUMERATOR_STATE);
return new CachedEnumerableState(source, newLimit, callback);
}
public Enumerator<E> enumerator() {
return disabled.get()
? source.enumerator()
: new CacheEnumerator(cache.get());
} }
Replacing all the fields with a single state field makes CachedEnumerable<E> concise and safe:
package enumj;
public final class CachedEnumerable<E> extends AbstractEnumerable<E> {
private volatile CachedEnumerableState<E> state;
CachedEnumerable(Enumerable<E> source) {
this(source, Long.MAX_VALUE, () -> {});
}
CachedEnumerable(Enumerable<E> source,
long limit,
Runnable onLimitCallback) {
Utils.ensureNotNull(source, Messages.NULL_ENUMERATOR_SOURCE);
Utils.ensureLessThan(0, limit, Messages.ILLEGAL_ENUMERATOR_STATE);
Utils.ensureNotNull(onLimitCallback, Messages.NULL_ENUMERATOR_HANDLER);
this.state = new CachedEnumerableState(source, limit, onLimitCallback);
}
public CachedEnumerableState<E> state() {
return this.state;
}
public CachedEnumerableState<E> disable() {
final CachedEnumerableState<E> disabled = state().disable();
this.state = disabled;
return disabled;
}
public CachedEnumerableState<E> enable() {
final CachedEnumerableState<E> enabled = state().enable();
this.state = enabled;
return enabled;
}
public CachedEnumerableState<E> reset() {
final CachedEnumerableState<E> res = state().reset();
this.state = res;
return res;
}
public CachedEnumerableState<E> resize(long newLimit) {
final CachedEnumerableState<E> res = state().resize(newLimit);
this.state = res;
return res;
}
@Override
protected Enumerator<E> internalEnumerator() {
return state().enumerator();
} }
public final class CachedEnumerable<E> extends AbstractEnumerable<E> {
private volatile CachedEnumerableState<E> state;
CachedEnumerable(Enumerable<E> source) {
this(source, Long.MAX_VALUE, () -> {});
}
CachedEnumerable(Enumerable<E> source,
long limit,
Runnable onLimitCallback) {
Utils.ensureNotNull(source, Messages.NULL_ENUMERATOR_SOURCE);
Utils.ensureLessThan(0, limit, Messages.ILLEGAL_ENUMERATOR_STATE);
Utils.ensureNotNull(onLimitCallback, Messages.NULL_ENUMERATOR_HANDLER);
this.state = new CachedEnumerableState(source, limit, onLimitCallback);
}
public CachedEnumerableState<E> state() {
return this.state;
}
public CachedEnumerableState<E> disable() {
final CachedEnumerableState<E> disabled = state().disable();
this.state = disabled;
return disabled;
}
public CachedEnumerableState<E> enable() {
final CachedEnumerableState<E> enabled = state().enable();
this.state = enabled;
return enabled;
}
public CachedEnumerableState<E> reset() {
final CachedEnumerableState<E> res = state().reset();
this.state = res;
return res;
}
public CachedEnumerableState<E> resize(long newLimit) {
final CachedEnumerableState<E> res = state().resize(newLimit);
this.state = res;
return res;
}
@Override
protected Enumerator<E> internalEnumerator() {
return state().enumerator();
} }
Comments
Post a Comment