Akka: Properly build a New Stream Source with Video Stream Outpu

The Akka documentation is extensive and yet is limited in some ways. One such way is in describing the breadth of issues required to build a functional streaming source in Akka Streams. This article covers source generation in more detail to help you avoid a major catastrophe.

We will use video processing with FFMPEG in our example source as we need to break apart frames and then emit every frame in a stream, an intensive real world task. There is no guarantee that our video stream will be production grade but it can start you on the path to success.

See also:

Building A Source

Building a source does not need to be a struggle but there are several actions that each source must account for.

It is recommended to read Lightbend’s customer stream processing documentation.

These actions are:

  • Handling the Completion Downstream
  • Handling a Success
  • Handling a Failure

The Akka graph stage, used to create a custom stream offers a downstream finish method as shown in the example below. This is not well documented.

Using a Materialized Value

Often, we want to create a future from a stream to ensure a stream completed as wanted. Promises in Scala offer a way to create futures while notifying the programmer of failures and successes.

A promise is easy to create:

     var promise : Promise[Long] = Promise[Long]()
          promise.tryFailure(new Exception("Test Failed"))

A promise is generated and given a success or failure. A future is then generated which can be handled normally.

Putting It All Together

Each concept can be applied to the generation of a custom source:

case class VFrame(id : Long, path : Path, frame : IplImage,captureDate : Long = DateTime.now().getMillis)

class VideoSource(path : Path) extends GraphStageWithMaterializedValue[SourceShape[VFrame],Future[Boolean]]{
  val out : Outlet[VFrame] = Outlet("VideoFrameSource")
  override def shape: SourceShape[VFrame] = SourceShape(out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = {
    val promise : Promise[Boolean] = Promise[Boolean]()
    var readImages : Long  = 0L
    var convertToIpl : OpenCVFrameConverter.ToIplImage = new OpenCVFrameConverter.ToIplImage()
    var grabber : FFmpegFrameGrabber = null
    var frame : Frame = null
    var converter = new OpenCVFrameConverter.ToIplImage

    val logic = new GraphStageLogic(shape){
      setHandler(out, new OutHandler{
        override def onPull(): Unit = {
          try {
            if(grabber == null){
                throw new FileNotFoundException(s"Path to ${path.toFile.getAbsolutePath} does not Exist")

              grabber = new FFmpegFrameGrabber(path.toFile.getAbsolutePath)

            if(grabber != null){
              try {
                frame = grabber.grab()
                case t : Throwable =>{
              if(frame != null) {
                val im =  converter.convert(frame)
                readImages += 1
            case t : Throwable =>{

        def fail(ex : Throwable)={
          if(grabber != null){

        def success()={
          if(grabber != null){
            try {
            }catch {
              case t : Throwable =>{

        override def onDownstreamFinish(): Unit = {


    logic -> promise.future

This class creates a source with a materialized value, a topic not fully covered in the Lightbend documentation. The number of frames is returned. The source overrides onDownStreamFinish and implements custom success and failure processing.

The anatomy of the above source is simple. A class is created which takes in a Path to a file source. FFMpeg is used to stream the obtain frames from the video. The source requires the creation of specialized source logic after specifying the source shape and type through createLogicAndMaterializedValue. The Promise promise contains the results of the materialized values. The resulting logic method yields a future when success  or failure are called. The method completeStage is called to complete the source.


This article examined the creation of new sources in Akka. We reviewed handling the completion of downstream sources, success, failure, and the return of a materialized value. Creating sources is easier than Lightbend lets on.

JavaCV Basics: Basic Image Processing

Here, we analyze some of the basic image processing tools in OpenCV and their use in GoatImage.

All code is available on GitHub under GoatImage. To fully understand this article, read the related articles and look at this code.

Select functions are exemplified here. In GoatImage, JavaDocs can be generated further explaining the functions. The functions explained are:

  • Sharpen
  • Contrast
  • Blur

Dilate, rotate, erode, min thresholding, and max thresholding are left to the code. Thresholding in OpenCV is described in depth with graphs and charts via the documentation.

Related Articles:

Basic Processing in Computer Vision

Basic processing is the key to successful recognition. Training sets come in a specific form. Pre-processing is usually required to ensure the accuracy and quality of a program. JavaCV and OpenCV are fast enough to work in a variety of circumstances to improve algorithmic performance at a much lower speed reduction cost. Each transform applied to an image takes time and memory but will pay off handsomely if done correctly.

Kernel Processing

Most of these functions are linear transformations. A linear transformation uses a function to map one matrix to another (Ax = b). In image processing, the matrix kernel is used to do this. Basically a weighted matrix can be used to map a certain point or pixel value.

For an overview of image processing kernels, see wikipedia.

Kernels may be generated in JavaCV.

    * Create a kernel from a double array (write large kernels more understandably)
    * @param kernelArray      The double array of doubles with the kernel values as signed ints
    * @return                 The kernel mat
  def generateKernel(kernelArray: Array[Array[Int]]):Mat={
    val m = if(kernelArray != null) kernelArray.length else 0
    if(m == 0 ){
      throw new IllegalStateException("Your Kernel Array Must be Initialized with values")

    if(kernelArray(0).length != m){
      throw new IllegalStateException("Your Kernel Array Must be Square and not sparse.")

    val kernel = new Mat(m,m,CV_32F,new Scalar(0))
    val ki = kernel.createIndexer().asInstanceOf[FloatIndexer]

    for(i <- 0 until m){
      for(j <- 0 until m){

More reliably, there is a function for generating a Gaussian Kernel.

    * Generate the square gaussian kernel. I think the pattern is a(0,0)=1 a(1,0) = n a(2,0) = n+2i with rows as a(2,1) = a(2,0) * n and adding two to the middle then subtracting.
    * However, there were only two examples on the page I found so do not use that without verification.
    * @param kernelMN    The m and n for our kernel matrix
    * @param sigma       The sigma to multiply by (kernel standard deviation)
    * @return            The resulting kernel matrix
  def generateGaussianKernel(kernelMN : Int, sigma : Double):Mat={

Sharpen with A Cutom Kernel

Applying a kernel in OpenCV can be done with the filter2D method.


Here a sharpening kernel using the function above is applied.

    * Sharpen an image with a standard sharpening kernel.
    * @param image    The image to sharpen
    * @return         A new and sharper image
  def sharpen(image : Image):Image={
    val srcMat = new Mat(image.image)
    val outMat = new Mat(srcMat.rows(),srcMat.cols(),srcMat.`type`())

    val karr : Array[Array[Int]] = Array[Array[Int]](Array(0,-1,0),Array(-1,5,-1),Array(0,-1,0))
    val kernel : Mat = this.generateKernel(karr)
    new Image(new IplImage(outMat),image.name,image.itype)


Contrast kicks up the color intensity in images by equation, equalization, or based on neighboring pixels.

One form of Contrast applies a direct function to an image:

    * Use an equation applied to the pixels to increase contrast. It appears that
    * the majority of the effect occurs from converting back and forth with a very
    * minor impact for the values. However, the impact is softer than with equalizing
    * histograms. Try sharpen as well. The kernel kicks up contrast around edges.
    * (maxIntensity/phi)*(x/(maxIntensity/theta))**0.5
    * @param image                The image to use
    * @param maxIntensity         The maximum intensity (numerator)
    * @param phi                  Phi value to use
    * @param theta                Theta value to use
    * @return
  def contrast(image : Image, maxIntensity : Double, phi : Double = 0.5, theta : Double = 0.5):Image={
    val srcMat = new Mat(image.image)
    val outMat = new Mat(srcMat.rows(),srcMat.cols(),srcMat.`type`())

    val usrcMat = new Mat()
    val dest = new Mat(srcMat.rows(),srcMat.cols(),usrcMat.`type`())

    multiply(dest,(maxIntensity / phi))
    val fm = 1 / Math.pow(maxIntensity / theta,0.5)
    multiply(dest, fm)

    new Image(new IplImage(outMat),image.name,image.itype)

Here the image is manipulated using matrix equations to form a new image where pixel intensities are improved for clarity.

Another form of contrast equalizes the image histogram:

* A form of contrast based around equalizing image histograms.
* @param image The image to equalize
* @return A new Image
def equalizeHistogram(image : Image):Image={
val srcMat = new Mat(image.image)
val outMat = new Mat(srcMat.rows(),srcMat.cols(),srcMat.`type`())
new Image(new IplImage(outMat),image.name,image.itype)

The JavaCV method equalizeHist is used here.


Blurring uses averaging to dull images.

Gaussian blurring uses a Gaussian derived kernel to blur. This kernel uses an averaging function as opposed to equal weighting of neighboring pixels.

    * Perform a Gaussian blur. The larger the kernel the more blurred the image will be.
    * @param image              The image to use
    * @param degree             Strength of the blur
    * @param kernelMN           The kernel height and width should match (for instance 5x5)
    * @param sigma              The sigma to use in generating the matrix
    * @param depth              The depth to use
    * @param brightenFactor     A factor to brighten the result by with  0){
      outImage = this.brighten(outImage,brightenFactor)

A box blur uses a straight kernel to blur, often weighting pixels equally.

    * Perform a box blur and return a new Image. Increasing the factor has a significant impact.
    * This algorithm tends to be overly powerful. It wiped the lines out of my test image.
    * @param image   The Image object
    * @param depth   The depth to use with -1 as default corresponding to image.depth
    * @return        A new Image
  def boxBlur(image : Image,factor: Int = 1,depth : Int = -1):Image={
    val srcMat = new Mat(image.image)
    val outMat = new Mat(srcMat.rows(),srcMat.cols(),srcMat.`type`())

    //build kernel
    val kernel : Mat = this.generateKernel(Array(Array(factor,factor,factor),Array(factor,factor,factor),Array(factor,factor,factor)))

    //apply kernel
    filter2D(srcMat,outMat, depth, kernel)

    new Image(new IplImage(outMat),image.name,image.itype)

Unsharp Masking

Once a blurred Mat is achieved, it is possible to perform an unsharp mask. The unsharp mask brings out certain features by subtracting the blurred image from the original while taking into account an aditional factor.

def unsharpMask(image : Image, kernelMN : Int = 3, sigma : Double = 60,alpha : Double = 1.5, beta : Double= -0.5,gamma : Double = 2.0,brightenFactor : Int = 0):Image={
    val srcMat : Mat = new Mat(image.image)
    val outMat = new Mat(srcMat.rows(),srcMat.cols(),srcMat.`type`())
    val retMat = new Mat(srcMat.rows(),srcMat.cols(),srcMat.`type`())

    //using htese methods allows the matrix kernel size to grow
    GaussianBlur(srcMat,outMat,new Size(kernelMN,kernelMN),sigma)

    var outImage : Image = new Image(new IplImage(outMat),image.name,image.itype)

    if(brightenFactor > 0){
      outImage = this.brighten(outImage,brightenFactor)



This article examined various image processing techniques.

JavaCV Basics: Splitting Objects

Here we put together functions from previous articles to describe a use case where objects are discovered in an image and rotated.

All code is available on GitHub under the GoatImage project.

Related Articles:

Why Split Objects

At times, objects need to be tracked reliably, OCR needs to be broken down to more manageable tasks, or there is another task requiring splitting and rotation. Particularly, recognition and other forms of statistical computing benefit from such standardization.

Splitting allows object by object recognition which may or may not improve accuracy depending on the data used to train an algorithm and even the type of algorithm used. Bayesian based networks, including RNNs, benefit from this task significantly.

Splitting and Rotating

The following function in GoatImage performs contouring to find objects, creates minimum area rect, and finally rotates objects based on their skew angle.

    * Split an image using an existing contouring function. Take each RIO, rotate, and return new Images with the original,
    * @param image              The image to split objects from
    * @param contourType        The contour type to use defaulting to CV_RETR_EXTERNAL
    * @param minBoxArea         Minumum box area to accept (-1 means everything and is default)
    * @param maxBoxArea         Maximum box area to accept (-1 means everything and is default)
    * @param show               Whether or not to show the image. Default is false.
    * @param xPosSort           Whether or not to sort the objects by their x position. Default is true. This is faster than a full sort
    * @return                   A tuple with the original Image and a List of split out Image objects named by the original_itemNumber
  def splitObjects(image : Image, contourType : Int=  CV_RETR_LIST,minBoxArea : Int = -1, maxBoxArea : Int = -1, show : Boolean= false,xPosSort : Boolean = true):(Image,List[(Image,BoundingBox)])={
    val imTup : (Image, List[BoundingBox]) = this.contour(image,contourType)

    var imObjs : List[(Image,BoundingBox)] = List[(Image,BoundingBox)]()

    var boxes : List[BoundingBox] = imTup._2

    //ensure that the boxes are sorted by x position
      boxes = boxes.sortBy(_.x1)

    if(minBoxArea > 0){
        boxes = boxes.filter({x => (x.width * x.height) > minBoxArea})

    if(maxBoxArea > 0){
      boxes = boxes.filter({x => (x.width * x.height) < maxBoxArea})

    //get and rotate objects
    var idx : Int = 0
    for(box <-  boxes){
      val im = this.rotateImage(box.image,box.skewAngle)
        im.showImage(s"My Box ${idx}")
      imObjs = imObjs :+ (im,box)
      idx += 1


Contours are filtered after sorting if desired. For each box, rotation is performed and the resulting image returned as a new Image.


Here the splitObjects function of GoatImage is reviewed, revealing how the library and OpenCV splits and rotates objects as part of standardization for object recognition and OCR.

JavaCV Basics: Cropping

The ROI code is  broken on the JavaCV example site. Here we will look at cropping an image by defining a region of interest. The remaining JavaCV example code should work.

All code is available on GitHub under the GoatImage project.

Related Articles:

Defining an ROI

Setting a Region of Interest (ROI) requires using the cvSetImageROI function which takes an IplImages and a Rect representing the region of interest.

cvSetImageROI(image, rect)

Putting it all Together By Cropping

Cropping takes our ROI and generates a new image fairly directly.

    * Crop an existing image.
    * @param image      The image to crop
    * @param x          The starting x coordinate
    * @param y          The starting y coordinate
    * @param width      The width
    * @param height     The height
    * @return           A new Image
  def crop(image : Image, x : Int, y : Int, width : Int, height : Int): Image={
    val rect = new CvRect(x,y,width,height)
    val uImage : IplImage = image.image.clone()
    cvSetImageROI(uImage, rect)
    new Image(cvCreateImage(cvGetSize(uImage),image.image.depth(),image.image.nChannels()),image.name,image.itype)


Simple cropping was introduced to rectify an issue with the ROI example from JavaCV.

JavaCV Basics: Rotating

Rotating an image is a common task. This article reviews how to rotate a matrix using JavaCV.

These tutorials utilize GoatImage. The Image object used in the code examples comes from this library.

Related Articles:

Rotation Matrix

The rotation matrix  is used to map from one pixel position to another. The matrix, shown below, uses trigonometric functions.


Rotation is a linear transformation. A linear transformation uses a function to map from one matrix to another. In image processing, the matrix kernel is used to perform this mapping.

Rotation in JavaCV 3

Rotation in JavaCV 3 utilizes a generated rotation matrix and the warp affine function. The function getRotationMatrix2D generates a two dimensional matrix using a center Point2f, angle, and a scale.

    * Rotate an image by a specified angle using an affine transformation.
    * @param image      The image to rotate
    * @param angle      The angle to rotate by
    * @return           A rotated Image
  def rotateImage(image : Image,angle : Double):Image={
    val srcMat = new Mat(image.image)
    val outMat = new Mat(srcMat.cols(),srcMat.rows(),srcMat.`type`())
    val cosv = Math.cos(angle)
    val sinv = Math.sin(angle)
    val width = image.image.width
    val height = image.image.height
    val cx = width/2
    val cy = height/2

    //(image.image.width*cosv + image.image.height*sinv, image.image.width*sinv + image.image.height*cosv);
    val rotMat : Mat = getRotationMatrix2D(new Point2f(cx.toInt,cy.toInt),angle,1)

    new Image(new IplImage(outMat),image.name,image.itype)

The Angle

The angle in OpenCV and thus JavaCV is centered at -45 degrees due to the use of vertical contouring. If an image is less than -45 degrees, adding 90 degrees will correct this offset.

val angle = if(minAreaRect.angle < -45.0) minAreaRect.angle + 90 else minAreaRect.angle


In this tutorial we reviewed the function in GoatImage for rotating images using OpenCv. The functions getRotationMatrix2D and warpAffine were introduced. Basic kernel processing was introduced as well.

Java CV Basics: IplImage

Java CV  documentation is not always useful or is already out of date. To rectify this, I am creating some basic tutorials with links to the JavaDocs. This article explores loading and saving images and the basics behind an IplImage. These tutorials are written in Java and Scala.

These tutorials utilize GoatImage. The Image object is from this library.

Related Articles:

Why Java and Scala

Java and Scala are popular and make up a huge portion of data science libraries that exist today. Tools such as Akka allow for the stream processing of images as well.

Java CV and OpenCV

Java CV is a set of bindings for the C classes in OpenCV. Method calls are similar to their OpenCV equivalents.


The JNI is used to allow access to the OpenCV api in Java.

The Ipl Image

The IplImage stores image bytes, channel numbers, sizes and other bits of useful information. An IPL image exists at org.bytedeco.javacpp.opencv_core.IplImage.

Load an Ipl Image

Preferably, an IplImage is loaded from a file.

import org.bytedeco.javacpp.opencv_imgcodecs.cvLoadImage
val impl : IplImage = cvLoadImage(fpath.getAbsolutePath)

The IplImage may be loaded from a matrix or mat, from a byte pointer, or may be specified later by using the no option constructor or the size of the raster as a Long.

import org.bytedeco.javacpp.BytePointer
import org.bytedeco.javacpp.opencv_core.IplImage

val bp : BytePointer = new BytePointer(ByteBuffer.wrap(bytes))
this.image = new IplImage(bp)

The BytePointer is the native pointer to an underlying C based char array. A BytePointer is loaded using a ByteBuffer. This type of Pointer can be converted to a ByteBuffer using asBuffer().

Unfortunately, the BytePointer method is not always reliable. An alternate way to load an IplImage would be to use a BufferedImage.

val m : Mat = new Mat(image.getHeight,image.getWidth,image.getType,new BytePointer(ByteBuffer.wrap(image.getRaster.getDataBuffer.asInstanceOf[DataBufferByte].getData)))
new Image(new IplImage(m),name,itype)

Here, the Mat class is used to create a new image matrix.

Image Matrix

The image matrix, Mat, located under the opencv_core is a data structure wrapped around a pointer to an array of bytes with related methods including a set of mathematical operations.

Cross products, dot products, inverse, and the ability to divide values, a non-linear operation, are all available.

import org.bytedeco.javacpp.opencv_core.cvLoad
import org.bytedeco.javacpp.opencv_core.{Mat,IplImage}

new IplImage(new Mat(cvLoad(fpath.getAbsolutePath)))

The conversion process is shown above in addition to how to load the Mat.

Load an Image

Loading an image or image matrix is possible using the methods from org.bytedeco.javacpp.opencv_imgcodecs. The methods take a BytePointer object or file name.

import org.bytedeco.javacpp.opencv_imgcodecs._

path : File = new File("out/image.jpg")

mpath : File =new File("out/imageMat.mat")

Save an IplImage

The save functions exist in the opencv_core and imagecodecs libraries. Most are static as direct wrappers around the OpenCV library.  They can be imported directly.

import org.bytedeco.javacpp.opencv_core._

directory : File = new File("out/")
cvSave(new File(directory.getAbsolutePath,this.name).getAbsolutePath,myIplImage)

Constructors for cvSave include (filename : String, image : IplImage), and (filename : String, struct_ptr : Pointer).

Useful Functions

Some useful functions exist for the base IplImage. These include:

  • imageData() – Get the BytePointer froom the IplImage. The resulting structure opens asBuffer()
  • width() – Get the image width
  • height() – Get the image height
  • nchannels() – Get the number of channels with 3 corresponding to RGB and 4 to RGBA

Create an Image

At times, it is necessary to create an image from scratch. This is useful to avoid overwriting information. This requires using the cvCreateImage function. This function from opencv_core.IplImage takes the width, height, depth, and channels as arguments.

val src : IplImage = ....
val dst : IplImage = cvCreateImage(cvSize((src.width() * scale).toInt,(src.height * scale).toInt),src.depth,src.nChannels)


Here, we discussed the basic image classes and packages in OpenCV. Some useful functions were mentioned.

Mornging Joe: Can Computer Vision Technology Help De-Militarize the Police and Provide Assistance?

There ha been an explosion of computer vision technology in the past few years or even the last decade or so considering OpenCV has been around that long. The recent events in Ferguson have created a need for keeping the police in line as well as the need to present credible evidence regarding certain situations.

Many police departments are starting to test programs that place snake cams like those used in the military on officers. While this could be viewed as more militarization, it also can present departments with a black eye if power is abused.

What if the lawyers, police, and ethics commissions could have a way of recognizing potentially dangerous situations before they happen? What if there was a light weight solution that allowed data programs to monitor situations in real or near real time, spot troublesome incidents, and provide alerts when situations were likely to get out of hand? What if potentially unethical situations could be flagged?

The answer is that this is possible without too much development already.

Statistical patterns can be used to predict behaviour long before anything happens. Microsoft and Facebook can accurately predict what you will be doing a year from now. The sad state of the current near police state is that the government has as much or more data on officers and citizens than Microsoft and Facebook.

These patterns can be used to narrow the video from those snake cams to potentially harmful situations for real time monitoring.

From there, a plethora of strong open source tools can be used to spot everything from weapons and the potential use of force, using the training capabilities of OpenCV and some basic kinematics (video is just a bunch of really quickly taken photos played in order), speech using Sphinx4 (a work in progress for captchas but probably not for clear speech), and even optical character recognition with pytesser. A bit of image pre-processing and OCR in Tesseract can already break nearly every captcha on the market in under one second with a single core and less than 2 gb of RAM. The same goes for using corner detection and OCR on a pdf table. Why can’t it be used in this situation?

The result in this case should be a more ethical police force and better safety to qualm the fears of officers and civilians alike.

Call me crazy but we can go deeper than just using snake cams on officers to police officers and provide assistance.  Quantum computing and/or better processors and graphics cards will only make this more of a reality.