After browsing through Designing Data Intensive Applications by Martin Kleppmann a few times, I kept coming back to the fencing token example. I’ve messed around with it for a while and have gone back and forth when trying to figure out how complex of an example to provide, but finally settled on starting out with a very simple example instead of the one outlined in the book. Firstly, because it is more approachable and secondly, it gives a good jumping off point to expand on later.
The main points that were simplified away are removing any kind of http or service calls, removing lease/lock and reducing the responsibilities down to a fence-like object and a storage object. The fence only handles producing tokens while the storage handles validating a token and “writing” (printing in this case). To simulate some kind of delay in between writing a delay can be passed into the write method to simulate something like a “stop the-world” garbage collection mentioned in the book. See the code examples below
package fence
type Service struct {
token int
}
func (s *Service) AcquireToken() int {
s.token += 1
return s.token
}
the fence doesn’t handle anything other than providing a new token that is incremented by 1 each call
package storage
import (
"fencing-tokens/fence"
"time"
)
type Storage struct {
tokenService fence.Service
token int
}
func (store *Storage) isValidToken(t int) bool {
return t > store.token
}
func (store *Storage) write(s string, delay time.Duration, writeToken int) {
// simulate some kind of delay between token assignment and write
<-time.Tick(delay)
if store.isValidToken(writeToken) {
store.token = writeToken
println("Write:", s)
} else {
println("Write:", s, "--> BLOCKED, expired token:", writeToken)
}
}
and the storage keeps track of the highest token to validate new write calls that come in.
Now we can test it like so:
func TestStorageWrite_Failure_OneThread(t *testing.T) {
var wg sync.WaitGroup
storage := Storage{tokenService: fence.Service{}}
token1 := storage.tokenService.AcquireToken()
token2 := storage.tokenService.AcquireToken()
storage.write("This should pass", time.Nanosecond, token2)
wg.Add(1)
go func() {
storage.write("This should fail", time.Second, token1)
defer wg.Done()
}()
wg.Wait()
}
/* this prints the following:
Write: This should pass
Write: This should fail --> BLOCKED, expired token: 1
*/
func TestStorageWrite_Failure_TwoThreads(t *testing.T) {
var wg sync.WaitGroup
storage := Storage{tokenService: fence.Service{}}
token1 := storage.tokenService.AcquireToken()
token2 := storage.tokenService.AcquireToken()
storage.write("This should pass", time.Nanosecond, token2)
wg.Add(2)
go func() {
storage.write("This should fail", time.Second, token1)
defer wg.Done()
}()
go func() {
storage.write("This should pass, too", time.Second/2, storage.tokenService.AcquireToken())
defer wg.Done()
}()
storage.write("This should also pass", time.Nanosecond, storage.tokenService.AcquireToken())
wg.Wait()
}
/* this prints the following:
Write: This should pass
Write: This should also pass
Write: This should pass, too
Write: This should fail --> BLOCKED, expired token: 1
*/
So the “bad” writes are properly rejected! But there are some drawbacks to this approach, and they are covered well in the book so please do check it out if this seems interesting at all! And it will flesh out the solution ten times over compared to this reductive approach. I plan on circling back around at some point to do the outlined solution more justice, but wanted to write it up because it seems like such an elegant solution to a thorny problem when dealing with distributed systems.