Towards Python in an Actor Environment

This is more of a call to action for the Python programming community which I am moving back into after a long hiatus. Yes, I have reviewed PEP8 standards since my last employment code tests. There is still a desperate need by serious and innovative production environments in ETL, ingestion, AI, analytics, and IOT to address the GIL with an alternative at the moment and create a strong distributed framework. This call to action supports that.

Spark is not an end all and while the Apache environment is far superior in terms of distribution its infrastructure offerings the Java environment is  laggard in tool support.

Despite the strong tool support, Python was the injured man at the party not long ago. It was easy to learn but terrible to run. Even through Python 3.2, the GIL had a tendency to  cause thrashing and third party libraries such as twisted entered the lingo of concurrency. Python was for pet projects and a novelty used at a few companies for large systems but Java was king and Scala was popular. Distribution was nowhere to be seen until PySpark came out and even then it was a secondary tool for quite a while with less than cutting edge features.

That has changed. Since Python 3.5 (late 2015 to late 2016), asynchronous behavior started to take center stage and transform the way Python works. In 3.2, the GIL was reworked and the folks at the Python Foundation seem to be pushing for its eradication. The wonderful people at the Spark project began supporting Python in early releases as well.

Sadly, a turn away from Scala by many projects has dashed the hopes I have for this extremely strong language and support network to become the JVM equivalent of Python. I still believe strongly that Scala will become a top tier language with the backing of groups such as Lightbend and a powerful quasi-open source foundation of its own. However, the hopes of a Numpy-like library from ND4J and first class Open CV support are floundering. It is also difficult to teach compared to Python.

Therefore, to support large projects benefiting from the constant uptime and serious concurrency of actors, I have created CompAktor. The aim is to support nearly all of the features of Akka in addition to strong peer grid support.  It will hopefully grow into a strong tool for scaling enormous clusters performing everything from ingestion and camera feed processing to string manipulation. Come help Python grow.

We have a Trello based scrum board so email me at aevans48@simplrinsites.com if you want to participate. I will shoot you an invite. Thanks

 

Advertisements

Akka: Properly build a New Stream Source with Video Stream Outpu

The Akka documentation is extensive and yet is limited in some ways. One such way is in describing the breadth of issues required to build a functional streaming source in Akka Streams. This article covers source generation in more detail to help you avoid a major catastrophe.

We will use video processing with FFMPEG in our example source as we need to break apart frames and then emit every frame in a stream, an intensive real world task. There is no guarantee that our video stream will be production grade but it can start you on the path to success.

See also:

Building A Source

Building a source does not need to be a struggle but there are several actions that each source must account for.

It is recommended to read Lightbend’s customer stream processing documentation.

These actions are:

  • Handling the Completion Downstream
  • Handling a Success
  • Handling a Failure

The Akka graph stage, used to create a custom stream offers a downstream finish method as shown in the example below. This is not well documented.

Using a Materialized Value

Often, we want to create a future from a stream to ensure a stream completed as wanted. Promises in Scala offer a way to create futures while notifying the programmer of failures and successes.

A promise is easy to create:

   
     var promise : Promise[Long] = Promise[Long]()
     if(true){
          promise.trySuccess(1L)
     }else{
          promise.tryFailure(new Exception("Test Failed"))
     }
     promise.future

A promise is generated and given a success or failure. A future is then generated which can be handled normally.

Putting It All Together

Each concept can be applied to the generation of a custom source:

case class VFrame(id : Long, path : Path, frame : IplImage,captureDate : Long = DateTime.now().getMillis)

class VideoSource(path : Path) extends GraphStageWithMaterializedValue[SourceShape[VFrame],Future[Boolean]]{
  val out : Outlet[VFrame] = Outlet("VideoFrameSource")
  override def shape: SourceShape[VFrame] = SourceShape(out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = {
    val promise : Promise[Boolean] = Promise[Boolean]()
    var readImages : Long  = 0L
    var convertToIpl : OpenCVFrameConverter.ToIplImage = new OpenCVFrameConverter.ToIplImage()
    var grabber : FFmpegFrameGrabber = null
    var frame : Frame = null
    var converter = new OpenCVFrameConverter.ToIplImage

    val logic = new GraphStageLogic(shape){
      setHandler(out, new OutHandler{
        override def onPull(): Unit = {
          try {
            if(grabber == null){
              if(!path.toFile.exists()){
                throw new FileNotFoundException(s"Path to ${path.toFile.getAbsolutePath} does not Exist")
              }

              grabber = new FFmpegFrameGrabber(path.toFile.getAbsolutePath)
              grabber.start()
            }

            if(grabber != null){
              try {
                frame = grabber.grab()
              }catch{
                case t : Throwable =>{
                  println(t.getMessage)
                  println(ExceptionUtils.getStackTrace(t))
                }
              }
              if(frame != null) {
                val im =  converter.convert(frame)
                readImages += 1
                push(out,VFrame(readImages,path,im))
              }else{
                success()
              }
            }
          }catch{
            case t : Throwable =>{
              println(t.getMessage)
              println(ExceptionUtils.getStackTrace(t))
              fail(t)
            }
          }
        }

        def fail(ex : Throwable)={
          if(grabber != null){
            grabber.stop()
            grabber.close()
          }
          promise.tryFailure(ex)
          failStage(ex)
        }

        def success()={
          if(grabber != null){
            try {
              grabber.stop()
            }catch {
              case t : Throwable =>{
                println(t.getMessage)
                println(ExceptionUtils.getStackTrace(t))
              }
            }finally
            {
              grabber.close()
            }
          }
          promise.trySuccess(true)
          completeStage()
        }

        override def onDownstreamFinish(): Unit = {
          success()
        }

      })
    }

    logic -> promise.future
  }
}

This class creates a source with a materialized value, a topic not fully covered in the Lightbend documentation. The number of frames is returned. The source overrides onDownStreamFinish and implements custom success and failure processing.

The anatomy of the above source is simple. A class is created which takes in a Path to a file source. FFMpeg is used to stream the obtain frames from the video. The source requires the creation of specialized source logic after specifying the source shape and type through createLogicAndMaterializedValue. The Promise promise contains the results of the materialized values. The resulting logic method yields a future when success  or failure are called. The method completeStage is called to complete the source.

Conclusion

This article examined the creation of new sources in Akka. We reviewed handling the completion of downstream sources, success, failure, and the return of a materialized value. Creating sources is easier than Lightbend lets on.