If you use Akka probably you already know what Akka persistence is. It’s an additional module, experimental before version 2.4, that allows to save the state of an actor on a permanent store.  Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor is started, restarted after a JVM crash or by a supervisor, or migrated in a cluster. The key concept behind Akka persistence is that only changes to an actor’s internal state are persisted but never its current state directly.

This concept is tightly related to event sourcing and CQRS. Actually we do not save the state of an actor but the events that allow to restore that specific state. For example, if a bank account is represented by an actor, we won’t save the balance but the list of transactions (the events) that led to that situation.Documentation is very useful to understand how to make persistence and existing actor and does not worth the effort of repeating it with that level of detail. However, when speaking about the serialization format , documentation leaves to the developer the choice of the “right” format. Four are mentioned:

  1. Avro
  2. JSON
  3. Protobuf
  4. Thrift

Actually there’s a fifth, the standard Java Serialization. It could be used at the very beginning of the project just to try things but, as Konrad Malawski says, stay away from it (or prepare to pay the choice in the future).

Java Serialization is a horrible idea if you're going to store the messages for a long time.

Avro

I decided to investigate on using Avro to manage persistence in Akka for these personal reasons:

  1. Avro is used in Kafka, another piece of the SMACK stack, it could be useful in future having more knowledge about it.
  2. Avro promises easy schema evolutions. I won’t not trust this until I do my own tests.
  3. There’s no example at the moment around that shows Avro in combination with Akka.
  4. JSON is quite verbose, and I’m not sure about performances, anycase it does have a schema.

For the same reasons, I decided to give less priority to the other formats (also Kryo could be a choice) for the moment even if Protobuf is used inside Akka itself (and usually I trust Akka team choices  ;-) ). And there’s a reason more in the next section.

The example

Looking for something already done with Avro, I found this repository that used Protobuf, JSON and Kryo. I added some Avro examples using the avro4S library.

Let’s say I want to model a music album (a so 90’s definition) as an actor. This actor can receive two messages:

sealed trait AlbumCommand
final case class ChangeAlbumTitle(title: Title) extends AlbumCommand
final case class ChangeAlbumYear(year: Year) extends AlbumCommand

We can model the events like this:

sealed trait AlbumEvent
final case class TitleChanged(title: Title) extends AlbumEvent
final case class YearChanged(year: Year) extends AlbumEvent

The corresponding actor is quite simple to understand if you already read the persistence documentation:

class Album(val persistenceId: String) extends PersistentActor with ActorLogging {
 
  import Music._
 
  var title: Title = _
  var year: Year = _
 
  override def receiveRecover: Receive = LoggingReceive {
    case e: TitleChanged ⇒ handleEvent(e)
    case e: YearChanged  ⇒ handleEvent(e)
  }
 
  def handleEvent(event: TitleChanged): Unit = {
    this.title = event.title
    log.debug(s"[TitleChanged]: Album $persistenceId => title: $title, year: $year songs: $songs")
  }
 
  def handleEvent(event: YearChanged): Unit = {
    this.year = event.year
    log.debug(s"[YearChanged]: Album $persistenceId => title: $title, year: $year songs: $songs")
  }
 
override def receiveCommand: Receive = LoggingReceive {
    case ChangeAlbumTitle(newTitle) ⇒
      persistAll(List(TitleChanged(newTitle)))(handleEvent)
    case ChangeAlbumYear(newYear) ⇒
      persistAll(List(YearChanged(newYear)))(handleEvent)
  }
 
  override def postStop(): Unit = {
    log.debug(s"Stopped $persistenceId")
    super.postStop()
  }
}

The receiveCommand handles the normal message flow of the actor, while is receiveRecover responsibility to restore the actor state reprocessing every event received and persisted before.

The serializer

To use Avro, we need to prepare a specific serializer for the event we want to manage. This class will act also as deserializer. Indeed, we need to extend the SerializerWithStringManifest that has fromBinary and toBinary methods.

class TitleChangedSerializer extends AvroSerializer[TitleChanged] {
 
  override def manifest(o: AnyRef): String = o.getClass.getName
   
    override def identifier: Int = 100010
   
    inal val Manifest = classOf[TitleChanged].getName
 
  override def toBinary(o: AnyRef): Array[Byte] = {
    val output = new ByteArrayOutputStream
    val avro = AvroOutputStream[TitleChanged](output)
    avro.write(o.asInstanceOf[TitleChanged])
    avro.close()
    output.toByteArray
  }
 
  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
    if (Manifest == manifest) {
 
      val is = AvroInputStream[TitleChanged](bytes)
      val events = is.iterator.toList
      is.close()
      events(0)
 
    } else throw new IllegalArgumentException(s"Unable to handle manifest $manifest, required $Manifest")
  }
}

The hard work is done by avro4s AvroInputStream and AvroOutputStream that handle the transformation. How is it done? avro4s relies on macros, so the Avro schema is generated and used at compile time eliminating boilerplate. You can find more details on the avro4s project page.

To complete, you just need to bind the serializer to your class, Akka will do the rest.

serializers {
  titleChanged = "com.github.dnvriend.serializer.avro.TitleChangedSerializer"
}
 
serialization-bindings {
  "com.github.dnvriend.domain.Music$TitleChanged" = titleChanged
}

Conclusions and further work

It was not difficult to add Avro serialization into Akka persistence since avro4s is a great library that does much work. Now starts the interesting benchmark: understand how to use Avro to manage schema evolutions in the context of Akka persistence. It will be the subject of the next post.