Akka: Properly build a New Stream Source with Video Stream Outpu

The Akka documentation is extensive and yet is limited in some ways. One such way is in describing the breadth of issues required to build a functional streaming source in Akka Streams. This article covers source generation in more detail to help you avoid a major catastrophe.

We will use video processing with FFMPEG in our example source as we need to break apart frames and then emit every frame in a stream, an intensive real world task. There is no guarantee that our video stream will be production grade but it can start you on the path to success.

See also:

Building A Source

Building a source does not need to be a struggle but there are several actions that each source must account for.

It is recommended to read Lightbend’s customer stream processing documentation.

These actions are:

  • Handling the Completion Downstream
  • Handling a Success
  • Handling a Failure

The Akka graph stage, used to create a custom stream offers a downstream finish method as shown in the example below. This is not well documented.

Using a Materialized Value

Often, we want to create a future from a stream to ensure a stream completed as wanted. Promises in Scala offer a way to create futures while notifying the programmer of failures and successes.

A promise is easy to create:

   
     var promise : Promise[Long] = Promise[Long]()
     if(true){
          promise.trySuccess(1L)
     }else{
          promise.tryFailure(new Exception("Test Failed"))
     }
     promise.future

A promise is generated and given a success or failure. A future is then generated which can be handled normally.

Putting It All Together

Each concept can be applied to the generation of a custom source:

case class VFrame(id : Long, path : Path, frame : IplImage,captureDate : Long = DateTime.now().getMillis)

class VideoSource(path : Path) extends GraphStageWithMaterializedValue[SourceShape[VFrame],Future[Boolean]]{
  val out : Outlet[VFrame] = Outlet("VideoFrameSource")
  override def shape: SourceShape[VFrame] = SourceShape(out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = {
    val promise : Promise[Boolean] = Promise[Boolean]()
    var readImages : Long  = 0L
    var convertToIpl : OpenCVFrameConverter.ToIplImage = new OpenCVFrameConverter.ToIplImage()
    var grabber : FFmpegFrameGrabber = null
    var frame : Frame = null
    var converter = new OpenCVFrameConverter.ToIplImage

    val logic = new GraphStageLogic(shape){
      setHandler(out, new OutHandler{
        override def onPull(): Unit = {
          try {
            if(grabber == null){
              if(!path.toFile.exists()){
                throw new FileNotFoundException(s"Path to ${path.toFile.getAbsolutePath} does not Exist")
              }

              grabber = new FFmpegFrameGrabber(path.toFile.getAbsolutePath)
              grabber.start()
            }

            if(grabber != null){
              try {
                frame = grabber.grab()
              }catch{
                case t : Throwable =>{
                  println(t.getMessage)
                  println(ExceptionUtils.getStackTrace(t))
                }
              }
              if(frame != null) {
                val im =  converter.convert(frame)
                readImages += 1
                push(out,VFrame(readImages,path,im))
              }else{
                success()
              }
            }
          }catch{
            case t : Throwable =>{
              println(t.getMessage)
              println(ExceptionUtils.getStackTrace(t))
              fail(t)
            }
          }
        }

        def fail(ex : Throwable)={
          if(grabber != null){
            grabber.stop()
            grabber.close()
          }
          promise.tryFailure(ex)
          failStage(ex)
        }

        def success()={
          if(grabber != null){
            try {
              grabber.stop()
            }catch {
              case t : Throwable =>{
                println(t.getMessage)
                println(ExceptionUtils.getStackTrace(t))
              }
            }finally
            {
              grabber.close()
            }
          }
          promise.trySuccess(true)
          completeStage()
        }

        override def onDownstreamFinish(): Unit = {
          success()
        }

      })
    }

    logic -> promise.future
  }
}

This class creates a source with a materialized value, a topic not fully covered in the Lightbend documentation. The number of frames is returned. The source overrides onDownStreamFinish and implements custom success and failure processing.

The anatomy of the above source is simple. A class is created which takes in a Path to a file source. FFMpeg is used to stream the obtain frames from the video. The source requires the creation of specialized source logic after specifying the source shape and type through createLogicAndMaterializedValue. The Promise promise contains the results of the materialized values. The resulting logic method yields a future when success  or failure are called. The method completeStage is called to complete the source.

Conclusion

This article examined the creation of new sources in Akka. We reviewed handling the completion of downstream sources, success, failure, and the return of a materialized value. Creating sources is easier than Lightbend lets on.

Advertisements

Akka: An Introduction

Akkas documentation is immense. This series helps tackle the many components by providing a working example of the master slave design pattern built with this powerful tool. The following article reviews the higher level concepts behind Akka and its usage.

Links are provided to different parts of the Akka documentation throughout the article.

See also:

Akka

Akka is a software tool  used to build multi-threaded and distributed systems based on the actor model. It takes care of lower level systems building by providing high level APIs for node and actor generation.

Actors are the primitives behind Akka. They are useful for performing repeated tasks concurrently.  Actors run until terminated, receiving work through message passing.

actorsys

Resource Usage

Akka is extremely lightweight. The creators boast that the tool can handle thousands of actors on a single machine.

Message passing occurs through mailboxes. The maximum number of messages a mailbox holds is configurable with a 1000 messages default but messages must be under one megabyte.

The Actor

The actor is the universal primitive used in Akka. Unlike when using threading in a program language, this primitive runs like a daemon server. As such, it should be shut down gracefully.

Actors are user created.

class MyActor extends Actor with ActorLogging{

     override def preStart()= log.debug("Starting")
     override def postStop()= log.debug("Stopping")
     override def preRestart(reason: Throwable, message: Option[Any]) = log.error(s"Restarting because of ${reason.message}. ${message}")     
     override def postRestart(reason : Throwable) = 

     override def receive():Receive={
         case _ => sender ! "Hello from Actor"
     }
}

object MyActor{
   def setupMyActor()={
        val conf = ConfigFactory.load()
        val system = ActorSystem("MySystem",conf)
        val actor : ActorRef = system.actorOf(Props[MyActor],name = "myactor") 
   }
}

 

The example above creates an actor and a Scala companion class for instantiation.

Actors must extend Actor. ActorLogging provides the log library. The optional functions preRestart and postRestart handle exceptions, while the optional preStart and postStop methods handle setup and tear down tasks. The basic actor above incorporates logging and error processing.

An actor can:

  • Create and supervise other actors
  • Maintain a State
  • Control the flow of work in a system
  • Perform a unit of work on request or repeatably
  • Send and receive messages
  • Return the results of a computation

Akka’s serialization is extremely powerful. Anything available across a cluster or on the classpath that implements Serializable can be sent to and from an actor. Instances of classes are de-serialized without having the programmer recast them.

When to Use Akka

Actor systems are not a universal solution. When not performing repeated tasks and not benefiting from high levels of concurrency, they are a hindrance.

State persistence also weighs heavily in the use of an actor system. Take the example of cookies in network requests. Maintaining different network sessions across different remote actors can be highly beneficial in ingestion.

Any task provided to an actor should contain very little new code and a limited number of configuration variables.

Questions that should be asked based on these concepts include:

  • Can I break up tasks into sufficiently large quantities of work to benefit from concurrency?
  • How often will  tasks be repeated in the system?
  • How minimal can I make the configuration for the actor if necessary?
  • Is there a need for state persistence?
  • Is there a significant need for concurrency or is it a nice thought?
  • Is there a resource constraint that distribution can solve or that will limit threading?

State is definitely a large reason to use Akka. This could be in the form of actually  maintaining variables or in the actor itself.

In some distributed use cases involving the processing of enormous numbers of short lived requests, the actors own state and Akka’s mailbox capabilities are what is most important. This is the reasoning behind tools built on Akka such as Spark.

As is always the case when deciding to create a new system, the following should be asked as well:

  • Can I use an existing tools such as Spark or Tensor Flow?
  • Does the time required to build the system outweigh the overall benefit it will provide?

Clustering

Clustering is available in Akka. Akka provides high level APIs for generating distributed clusters. Specified seed nodes handle communications, serving as the system’s entry point.

Network design is provided entirely by the developer. Since node generation, logging, fault tolerance, and basic communication are the only pieces of a distributed system Akka handle’s, any distribution model will suffice. Two common models are the master-slave and graph based models.

Akka clusters are resilient with well developed fault tolerance.

Configuration

Configuration is performed either through a file or in a class. Many components can be configured including logging levels, cluster components, metrics collection, and routers.

Conclusion

This article is the entry point for the Akka series, providing the basic understanding needed as we begin to build a cluster.