Akka: Properly build a New Stream Source with Video Stream Outpu

The Akka documentation is extensive and yet is limited in some ways. One such way is in describing the breadth of issues required to build a functional streaming source in Akka Streams. This article covers source generation in more detail to help you avoid a major catastrophe.

We will use video processing with FFMPEG in our example source as we need to break apart frames and then emit every frame in a stream, an intensive real world task. There is no guarantee that our video stream will be production grade but it can start you on the path to success.

See also:

Building A Source

Building a source does not need to be a struggle but there are several actions that each source must account for.

It is recommended to read Lightbend’s customer stream processing documentation.

These actions are:

  • Handling the Completion Downstream
  • Handling a Success
  • Handling a Failure

The Akka graph stage, used to create a custom stream offers a downstream finish method as shown in the example below. This is not well documented.

Using a Materialized Value

Often, we want to create a future from a stream to ensure a stream completed as wanted. Promises in Scala offer a way to create futures while notifying the programmer of failures and successes.

A promise is easy to create:

   
     var promise : Promise[Long] = Promise[Long]()
     if(true){
          promise.trySuccess(1L)
     }else{
          promise.tryFailure(new Exception("Test Failed"))
     }
     promise.future

A promise is generated and given a success or failure. A future is then generated which can be handled normally.

Putting It All Together

Each concept can be applied to the generation of a custom source:

case class VFrame(id : Long, path : Path, frame : IplImage,captureDate : Long = DateTime.now().getMillis)

class VideoSource(path : Path) extends GraphStageWithMaterializedValue[SourceShape[VFrame],Future[Boolean]]{
  val out : Outlet[VFrame] = Outlet("VideoFrameSource")
  override def shape: SourceShape[VFrame] = SourceShape(out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = {
    val promise : Promise[Boolean] = Promise[Boolean]()
    var readImages : Long  = 0L
    var convertToIpl : OpenCVFrameConverter.ToIplImage = new OpenCVFrameConverter.ToIplImage()
    var grabber : FFmpegFrameGrabber = null
    var frame : Frame = null
    var converter = new OpenCVFrameConverter.ToIplImage

    val logic = new GraphStageLogic(shape){
      setHandler(out, new OutHandler{
        override def onPull(): Unit = {
          try {
            if(grabber == null){
              if(!path.toFile.exists()){
                throw new FileNotFoundException(s"Path to ${path.toFile.getAbsolutePath} does not Exist")
              }

              grabber = new FFmpegFrameGrabber(path.toFile.getAbsolutePath)
              grabber.start()
            }

            if(grabber != null){
              try {
                frame = grabber.grab()
              }catch{
                case t : Throwable =>{
                  println(t.getMessage)
                  println(ExceptionUtils.getStackTrace(t))
                }
              }
              if(frame != null) {
                val im =  converter.convert(frame)
                readImages += 1
                push(out,VFrame(readImages,path,im))
              }else{
                success()
              }
            }
          }catch{
            case t : Throwable =>{
              println(t.getMessage)
              println(ExceptionUtils.getStackTrace(t))
              fail(t)
            }
          }
        }

        def fail(ex : Throwable)={
          if(grabber != null){
            grabber.stop()
            grabber.close()
          }
          promise.tryFailure(ex)
          failStage(ex)
        }

        def success()={
          if(grabber != null){
            try {
              grabber.stop()
            }catch {
              case t : Throwable =>{
                println(t.getMessage)
                println(ExceptionUtils.getStackTrace(t))
              }
            }finally
            {
              grabber.close()
            }
          }
          promise.trySuccess(true)
          completeStage()
        }

        override def onDownstreamFinish(): Unit = {
          success()
        }

      })
    }

    logic -> promise.future
  }
}

This class creates a source with a materialized value, a topic not fully covered in the Lightbend documentation. The number of frames is returned. The source overrides onDownStreamFinish and implements custom success and failure processing.

The anatomy of the above source is simple. A class is created which takes in a Path to a file source. FFMpeg is used to stream the obtain frames from the video. The source requires the creation of specialized source logic after specifying the source shape and type through createLogicAndMaterializedValue. The Promise promise contains the results of the materialized values. The resulting logic method yields a future when success  or failure are called. The method completeStage is called to complete the source.

Conclusion

This article examined the creation of new sources in Akka. We reviewed handling the completion of downstream sources, success, failure, and the return of a materialized value. Creating sources is easier than Lightbend lets on.

Advertisements

Akka: An Introduction

Akkas documentation is immense. This series helps tackle the many components by providing a working example of the master slave design pattern built with this powerful tool. The following article reviews the higher level concepts behind Akka and its usage.

Links are provided to different parts of the Akka documentation throughout the article.

See also:

Akka

Akka is a software tool  used to build multi-threaded and distributed systems based on the actor model. It takes care of lower level systems building by providing high level APIs for node and actor generation.

Actors are the primitives behind Akka. They are useful for performing repeated tasks concurrently.  Actors run until terminated, receiving work through message passing.

actorsys

Resource Usage

Akka is extremely lightweight. The creators boast that the tool can handle thousands of actors on a single machine.

Message passing occurs through mailboxes. The maximum number of messages a mailbox holds is configurable with a 1000 messages default but messages must be under one megabyte.

The Actor

The actor is the universal primitive used in Akka. Unlike when using threading in a program language, this primitive runs like a daemon server. As such, it should be shut down gracefully.

Actors are user created.

class MyActor extends Actor with ActorLogging{

     override def preStart()= log.debug("Starting")
     override def postStop()= log.debug("Stopping")
     override def preRestart(reason: Throwable, message: Option[Any]) = log.error(s"Restarting because of ${reason.message}. ${message}")     
     override def postRestart(reason : Throwable) = 

     override def receive():Receive={
         case _ => sender ! "Hello from Actor"
     }
}

object MyActor{
   def setupMyActor()={
        val conf = ConfigFactory.load()
        val system = ActorSystem("MySystem",conf)
        val actor : ActorRef = system.actorOf(Props[MyActor],name = "myactor") 
   }
}

 

The example above creates an actor and a Scala companion class for instantiation.

Actors must extend Actor. ActorLogging provides the log library. The optional functions preRestart and postRestart handle exceptions, while the optional preStart and postStop methods handle setup and tear down tasks. The basic actor above incorporates logging and error processing.

An actor can:

  • Create and supervise other actors
  • Maintain a State
  • Control the flow of work in a system
  • Perform a unit of work on request or repeatably
  • Send and receive messages
  • Return the results of a computation

Akka’s serialization is extremely powerful. Anything available across a cluster or on the classpath that implements Serializable can be sent to and from an actor. Instances of classes are de-serialized without having the programmer recast them.

When to Use Akka

Actor systems are not a universal solution. When not performing repeated tasks and not benefiting from high levels of concurrency, they are a hindrance.

State persistence also weighs heavily in the use of an actor system. Take the example of cookies in network requests. Maintaining different network sessions across different remote actors can be highly beneficial in ingestion.

Any task provided to an actor should contain very little new code and a limited number of configuration variables.

Questions that should be asked based on these concepts include:

  • Can I break up tasks into sufficiently large quantities of work to benefit from concurrency?
  • How often will  tasks be repeated in the system?
  • How minimal can I make the configuration for the actor if necessary?
  • Is there a need for state persistence?
  • Is there a significant need for concurrency or is it a nice thought?
  • Is there a resource constraint that distribution can solve or that will limit threading?

State is definitely a large reason to use Akka. This could be in the form of actually  maintaining variables or in the actor itself.

In some distributed use cases involving the processing of enormous numbers of short lived requests, the actors own state and Akka’s mailbox capabilities are what is most important. This is the reasoning behind tools built on Akka such as Spark.

As is always the case when deciding to create a new system, the following should be asked as well:

  • Can I use an existing tools such as Spark or Tensor Flow?
  • Does the time required to build the system outweigh the overall benefit it will provide?

Clustering

Clustering is available in Akka. Akka provides high level APIs for generating distributed clusters. Specified seed nodes handle communications, serving as the system’s entry point.

Network design is provided entirely by the developer. Since node generation, logging, fault tolerance, and basic communication are the only pieces of a distributed system Akka handle’s, any distribution model will suffice. Two common models are the master-slave and graph based models.

Akka clusters are resilient with well developed fault tolerance.

Configuration

Configuration is performed either through a file or in a class. Many components can be configured including logging levels, cluster components, metrics collection, and routers.

Conclusion

This article is the entry point for the Akka series, providing the basic understanding needed as we begin to build a cluster.

Messaging, ETL, and an AKKA Proposal

Data sources are becoming many. NoSQL can help aggregate multiple sources into a more coherent whole. Akka, which can split data across multiple sources, servers as a perfect way of writing distributed systems. The combination with messaging via Queues or Topics and the Master-Slave pattern could provide a significant boost to ETL. Using databases as messaging systems, it is easy to see how processes can kick start. My goal will be to create a highly concurrent system that takes data from a scraper, from any source as can be done with my Python crawl modules, write the data to a NoSQL based JSONB store in PostgreSQL, notify a set of parsers which then look at patterns in the data to determine how to ETL the data. This is not really revolutionary but a good test of concurrency and automation.

Results will be reported.

Collection with NoSQL and Storage with SQL

There are four really well known forms of NoSQL databases. They are key-value, document, column-family, and graph databases. In the case of ETL, key-value is a good way to expand data without worrying about what if anything is present. However, even in demoralized form, this is not the best storage solution for customer facing solutions. Therefore, data will be placed into a client facing database configured with relational PostgreSQL tables.

Messaging and Building Patterns for AKKA and Scala

With messaging and state machines, actual uses for an actor do not need to be known at runtime. During runtime, interactions or patterns force the actor to take on a different role. This can be accomplished with a simple case-switch statement. From here a message with the data to be transformed can be passed to an actor. This data, with a rowID, can then be parsed after an Actor reads a message from a Queue. The queue specifies conditions such as which Parser-Combinator to use and then completes an activity based on this. This is not incredibly different from the Message slip Pattern, just that no re-routing occurs.

The data would be aggregated using the available row ideas in batches of a certain size. Perhaps batch iterators would best do the trick in determining the size of the batch to process.

Returning Data back to the original Actor

Returning the data requires messaging as well. The message returns from the initial actor where it needs to be matched with the appropriate row.

Recap

To recap, the question is, can AKKA perform more generic ETL than comes in currently available Open Source Tools?

To test this question I am developing Akka ETL. The tool will take in scraped data (from processes that can be managed with the same messaging technique but not easily distributed due to statefullness and security). The design includes taking in completed sources from a database, acquiring data, messaging an Actor with the appropriate parsing information, receiving the transformed data from these actors and posting to a relational database.

The real tests will be maintaining data-deduplication, non-mixed data, and unique identifiers.