Two Step Verification in a Flask REST App

flask

Flask is great. It is simple, easy, and allows for lightning fast deployment. However, there are a few security problems that should be worked out before using it in production.

This article examines how to deploy two step verificatiom and ip and mac address tracking alongside JWT tokens in Flask.

Code for this article is on my Github.

OS and Hardware Security

Software is just a series of electrons floating around the Internet. Fans, special devices for man in the middle attacks, and general human ignorance can all circumvent good practices.



Some things that should be done prior to development are:

  • Assign proper roles to users with appropriate security measures
  • Setup IP tables and other forms of firewall protection
  • Don’t randomly open ports to the world
  • Isolate unprotected devices from those handling highly secure data (a web server from your ETL servers for instance)
  • Ensure passwords are fairly secure (8-20 memorable characters avoiding certain others)
  • Use endpoint security such as RSA keys where appropriate

Proper Security

Like all things security, articles should not promote a version of encryption as secure or make claims using algorithms that could be rendered useless even as I write. All good algorithms sour

I can, however, provide a list of algorithms to not use:

  • bcrypt
  • blowfish

Remember, that all algorithms are usually broken. The US government currently lists AES as use-able and pbkdf can render sha512 useful. SHA512 is currently promoted as a good algorithm by NIST. 

JWT in Flask

JWT tokens are useful in that they store the information necessary to keep a user logged in. They are great for single page applications where session tracking might be in-appropriate. Know your use case.

A strong and configurable tool for implementing JWT keys in Flask is flask_jwt_extended which rides on the Flask-Security module.

Implementing JWT is fairly simple:

from flask import Flask
from flask_jwt_extended import JWTManager, jwt_required

app = Flask(__name__)
jwt = JWTManager(app)
jwt.init_app(app)

@app.route('/login', methods=['POST'])
def login():
    access_token = create_access_token(identity=username)
    return jsonify(access_token=access_token)

@app.route('/is_working')
@jwt_required
def is_working():
    return json.dumps({'Success': True}), 200, {'ContentType': 'application/json'}

It appears that Flask-Security was recently fixed so that password hashing works appropriately once again.

from flask_security import Security
from flask_security.utils import encrypt_password, verify_password

security = Security(app, datastore)
pwd = encrypt_password("test")
if verify_password("test", pwd):
    print("Verified")

Email Server

Before discussing two step verification, it is necessary to setup a test email server and be able to send emails. The smtplib offers the functionality of a web server in a simple configurable Python application. I personally printed out the input so will not post the code here. The Python docs are a good place to get started.

Sending emails can be done through smtplib or Flask-Mail. The smtplib library will be more flexible.

The following sets up a smptlib for sending an email:

import smtplib

....

host = email_config['host']
port = email_config['port']
email_server = smtplib.SMTP(host, port)
if email_config.get('ehlo', False):
    email_server.ehlo()
if email_config.get('start_tls', False):
    certfile = email_config.get('tls_cert', None)
    keyfile = email_config.get('tls_key', None)
    context = email_config.get('context', None)
    email_server.starttls(keyfile, certfile, context)
if email_config.get('user') and email_config.get('password'):
    user = email_config.get('user')
    password = email_config.get('password')
    email_server.login(user, password)
...
email_server.sendmail(recipient, [sender], msg.as_string())

Many different options are configurable using smtplib. These settings can be set using Flask-Mail but any code needed to help perform setup might be an issue.

Two Step Verification

It is now possible to extend the login function to include multi step authorization. The important pieces of the puzzle are obtaining an ip and/or mac address, verifying a password as shown, sending an email with a verification code, handling receipt of the code, and persistence.

Most of this is shown in my own open source project. This code uses uuid to generate a unique code:

import uuid
...
code = uuid.uuid4()

This code is hashed as before and stored using SQLAlchemy.

The basic process followed in my Github code is:

  1. Use login() to retrieve the JWT key and check for a matching mac address and ip
  2. Send an email verification code as needed
  3. Through verify_ip_code and verify_mac_code the code is validated and databases updated

The login function contains the majority of calls for two step verification.

Conclusion

This article examined the basics required to create two step verification in Python using Flask using examples and code from my Github repository.

It is important to use the most up to date algorithms. This article made no attempt to recommend an encryption algorithm.

Getting a Dictionary from ConfigParser

Something that might be little known to the community and yet immensely powerful is the ability to obtain a dictionary  from the configuration sections. I will simply leave this tidbit here for anyone interested.

config = None
if file and os.path.exists(file):
    with open(file) as fp:
        config = configparser.ConfigParser()
        config.readfp(fp)
        if config:
            config = config._sections()
            for section in config:
                ordered_dict = config[section]
                config[section] = dict(ordered_dict)

I hope that helps. Cheers!

Which to Use: Microservices or Actor Systems?

Which is better, an actor system handling requests and registering to a network or a microservice? That question is probably not asked often enough. This article examines this question from a theoretical standpoint prior to deployment in a production system.

Benefits of Microservices

We often decide to incorporate microservices simply because they have been the go to for quite some time, forgetting about the actor model all together. In understanding our debate, we need to layout the benefits of the microservice.

Microservices:

  • Allow for flexibility by allowing code to change in an application
  • Allow different applications to scale autonomically based on need
  • Allow for different teams to focus on different tasks without needing to know the implementation of another team’s tasks

However, they also:

  • Can grow complicated and not take into account interaction between services directly, slowing the system down
  • Require more knowledge of other team’s APIs (a poor con)
  • Require a layer of abstration that leaves them somewhat vulnerable and are often implemented insecurely

Benefits of Actor Models

Actor models are fast, efficient, and have benefits as well. In particular, they:

  • Allow flexibility by allowing a node to change behavior or state
  • Allow different parts of a system to scale automatically based on need
  • Take into account different interactions between services
  • Tend to be more secure. Think of block chain as a database version of an actor system. It maintains state and is contained.
  • Simplifies different components (both a pro and a con)

However, actor models:

  • May or may not require knowledge of the implementation produced by other teams
  • Require a great deal more work to become as flexible and abstract as microservices
  • Are more difficult to change
  • Tend to perform blocking requests poorly
  • Require extra work to stabilize

Conclusion

Microservices are clearly great for company wide tasks but what about internally to each team where microservices can complicate matters significantly due to their inherent complexity. This is where actor models shine. For instance, if we want to manage different machines in a non-blocking manner to kick off different tasks, actor models perform well. Perhaps we are creating a set of nodes in the system to manage celery workers and producers. This is the debate going on in my open source CeleryETL system at the moment.

Feel free to comment!

 

Ways Uber Can Improve Their App: A Pro-Uber Position

I recently landed a lucrative opportunity. With all such opportunities, the question comes up as to what to do next. The answer, sadly, was look for part time work and build an extensive system.  As anyone seeking to stay out of a homeless shelter after leaving a position might do, I decided to try Uber as a bike courier.

If you are already wondering why I chose to be a bike courier,  I need the excercise after sitting 10-12 hours per day working on my backend to establish best practices, working on agreements, and having meetings.

Being a bike courier gives me a unique perspective on the actual Uber driver app that might pique the interest of the folks who ordered sushi from Hai Sushi last night. My roomate and I give the restaurant a 5/5 and I really do not intend to have a delivery cancelled again, especially mid-delivery when I need to continue eating.

The Issues

Uber promises are overblown, you really need to come down to earth and realize that actual bike couriers earn about 10.50-11 dollars per hour and car couriers do not far much better. I already knew this after doing my research.

There are other problems. Instead of merely listing them, why not give examples:

  • The application only considers cars: The major street on my alluded to pickup (which only took 5 minutes) allows cars to travel 50 mph. Uber failed to take into account, it does not show a delivery partner distance, that the capped speed for the app to work on a bicycle is 15 mph. So, a 5 minute pickup led to a 4 minute wait and a nasty 35 minute bicicyle ride at 15 mph praying the customer would enjoy cold sushi.  You can see that a 15 minute delivery is now taking 45 minutes, both false advertising and bad data engineering. I know the latter because that is my trade and this lack of consideration is a fireable offense.
  • The application fails to work with the Android device’s gyroscope correctly: This one really has me beat. Google Maps, Uber’s old map vendor, works perfectly. Considering Google’s ability to charge for their services, the decision to build their own app was not surprising. However, when I see a long journey know a road and quickly pack my phone away, I really don’t want to find out 2-3 minutes later that the app was pointing in the wrong direction. Worst of all, moving five to ten feet does not resolve this problem.
  • The application fails to live up to a Google replacement in route finding:  Beyond traffic information, the application fails to consider the type of vehicle it should route find for. As I learn the bike routes  I get faster but even then, the application eats juice and probably keeps re-estimating my time to arrival as I take them.

All of these problems lead to some glaringly bad quality issues. If Uber were Postmates, it would be dead.

Why are these Problems

The perception of quality at UberEATS and Uber depends on three to five things. These are timely service, the degredation of food/delivery items if applicable, the connections with partners allowing faster food/item prepartion if applicable, the knowledge of the driver, and the personalities and customer service presented by the driver.

These problems have serious side effects as:

  • They result in less timely service
  • They result in partners (bsusinesses) questioning your service
  • They result in angrier drivers and thus worse experiences
  • All of this leads to unloyal customers and higher turnover, hurting the bottom line entirely

It is not uncommon for the CAQ quality score to come off as a 2/5 when many of your taxi drivers starts to badmouth the company and complain about the application.

The Solutions

For those wondering how Uber can improve it’s application, my take follows:

  1. Peg speed limits to distance. This will greatly improve time estimates.
  2. Limit the radius of operation by vehicle type and warn customers outside of them about time problems. It is easy to accomplish this on a graph structure. Graph databases like Neo4J are quite fast and can easily accomodate distance calculations and route finding within these distances. There is actually a geo-spatial tool with a haversine function for Neo4J. This could be accomplished quicky on Postgres as well. FYI, Postgres is used for all of the real time satellite data at the multi-billion dollar in REVENUE (not the bullshitting most tech companies base their value on) really awesome Digital Globe. If you want to work for a super-awseome version of Google that feels like NASA, Digital Globe is it. Heuristics, thresholds, and genetic algorithms can also help. A simple distance related cut off in Neo4J and a fast program, even in Python, should be sufficient even for millions to possibly billions of requests. Think Celery and Flask. It no longer takes C++ to acomplish such tasks.
  3. Figure out why your application is not working as well as Google Maps. I know it eats my battery do to the GPS requirement which is actually an inevitabilty and why I carry a backup batter but please generate some tickets for the gyroscope and look into battery use, likely with some less intense code language and some more server-side processing.
  4. Add route information per vehicle. I can travel faster when I am not pushed to a major state highway in a new area forcing me up on the sidewalk. Most cities offer bike route information and, in a graph database, is_bikeroute is really just an edge attribute.
  5. Add traffic information. Please add traffic information.
  6. Push the feedback form and incorporate the feedback. Qualtiy is an iterative process. It is good to have response customer service for the driver application but these people are the bread and butter. Make sure they know it exists and treat the feedback representative as a typical organization treats a shift manager in terms of said feedback. This should bubble up to the appropriate position with the filters turned on and make it to the data, application, and front end teams at some point.
  7. As always, keep working to improve battery life.

Conclusion

Despite the size and revenue of this very real company, they still feel a bit off in the development category. I am not at odds with working for the company to gain side money but really just hope that the iterative software development ideologies made their way to Uber, Postmates, and the other companies. As always, consider promoting the engineers to positions of architecture and power. They are more knowledgeable on achieving customer goals with smooth scalable systems.

ETL 1 Billion Rows in 2.5 Hours Without Paying on 4 cores and 7gb of RAM

There are a ton of ETL tools in the world. Alteryx, Tableau, Pentaho. This list goes on. Out of each, only Pentaho offers a quality free version. Alteryx prices can reach as high as $100,000 per year for a six person company and it is awful and awfully slow. Pentaho is not the greatest solution for streaming ETL either as it is not reactive but is a solid choice over the competitors.

How then, is it possible to ETL large datasets, stream on the same system from a TCP socket, or run flexible computations at speed. Surprisingly, this article will describe how to do just that using Celery and a tool which I am currently working on, CeleryETL.

Celery

Python is clearly an easy language to learn over others such as Scala, Java, and, of course, C++. These languages handle the vast majority of tasks for data science, AI, and mathematics outside of specialized languages such as R. They are likely the front runners in building production grade systems.

In place of the actor model popular with other languages, Python, being more arcane and outdated than any of the popular languages, requires task queues. My own foray into actor systems in Python led to a design which was, in fact, Celery backed by Python’s Thespian.

Celery handles tasks through RabbitMQ or other brokers claiming that the former can achieve up to 50 million messages per second. That is beyond the scope of this article but would theoretically cause my test case to outstrip the capacity of my database to write records. I only hazard to guess at what that would do to my file system.

Task queues are clunky, just like Python. Still, especially with modern hardware, they get the job done fast, blazingly fast. A task is queued with a module name specified as modules are loaded into a registry at run time. The queues, processed by a distributed set of workers running much like an actor in Akka, can be managed externally.

Celery allows for task streaming through chains and chords. The technical documentation is quite extensive and requires a decent chunk of time to get through.

Processing at Speed

Processing in Python at speed requires little more than properly chunking operations, batching record processing appropriately to remove latency, and performing other simple tasks as described in the Akka streams documentation. In fact, I wrote my layer on Celery using the Akka streams play book.

The only truly important operation, chunk your records. When streaming over TCP, this may not be necessary unless TCP connections happen extremely rapidly. Thresholding in this case may be an appropriate solution. If there are more connection attempts than can be completed at once, buffer requests and empty the buffer appropriately upon completion of each chain. I personally found that a maximum bucket size of 1000 for typical records was appropriate and 100 for large records including those containing text blobs was appropriate.

Take a look at my tool for implementation. However, I was able to remap,  split fields to rows, perform string operations, and write to my Neo4J graph database at anywhere from 80,000 to 120,000 records per second.

Conclusion

While this article is shorter than my others, it is something I felt necessary to write in the short time I have to write it. This discovery allows me to write a single language system through Celery, Neo4J, Django, PyQt, and PyTorch for an entire company. That, is phenomenal and only rivaled by Scala which is, sadly, dying despite being a far superior, faster, and less arcane language. By all measures, Scala should have won over the data science community but people detest the JVM. Until this changes, there is Celery.

 

Opinion: Does the End of Net Neutrality Signal the Start of Public Internet

Companies are already selling overpriced Internet services with terrible performance and awful customer satisfaction ratings. The end of net neutrality as the rule of the land may exacerbate the problems that plague companies like Comcast. A lack of stable systems, a quilt of poorly deployed technologies, and the hell on earth it is to deal with the company’s technical nightmare (think MySQL database being used for all equipment monitoring) can be easily glossed over by simply forcing certain customers to pay more.

The end of net neutrality could become the equivalent of ObamaCare in the Trump era. It is a chance for another industry to gauge the private sector and the individual under the guise of open markets. The idea of an entirely open market itself was brushed aside by Adam Smith by the way.

In the midst of this nightmare, Longmont is providing a model for public broadband service at low prices. Despite the FCC ruling, the city’s NextLight broadband utility promises no tiered payment plans. Businesses, organizations, and even your local school will rejoice in this type of service. That is, of course, if you are lucky enough to live in Longmont.

NextLight is the type of service that provides impact and reduces the pressure Adam Smith referred to when disavowing companies such as the East India Company, claiming they could never really be a private venture. That is something that should be noticed in today’s world where services are increasingly produced by fewer and fewer companies whose increasing size makes Karl Marx look like a prophet.

Is this experiment profitable? The customer gets 1 gigabyte of download speeds for $45 per month. How, when a similar service costs over $100 per month from a competitor is this profitable? How can a $40.3 million dollar project ever pay off for poorer communities? How can the service be of quality? How can cities without the infrastructure Longmont had afford such a project?

The current project is funding at only 17 percent over the estimated cost. While small municipalities like Longmont do not post cost estimates, there is also the fact that in three to four years of operation, there have been no cost increases and no extra funding granted by taxation. The cost overrun is also relatively small for a public works project. In the same period, I personally have seen my internet services for 10mbs download speeds that rarely deliver their promised performance from Comcast go from $80 to $116 per month. Comcast does not even have the add-on features that $45 per month NextLight serves. Even Century Link offers incredibly slower services for $10 more dollars per month. Only Charter compares.

When it comes to poor communities, voting is more powerful at a local level. However, dealing with the misfortunes of rural and poor communities is an enormous problem in this country. Still, another source for funding may be technology companies themselves. Google and Facebook have already shown interest in divorcing the middle man from the process of getting their services to the consumer. Instead of balloons and airplanes, why not invest in public projects prior to the development of space delivered quantum communications.

It is also important to remember that, like many commonplace public projects, the middle class, Longmont at the beginning of NextLight, and rich, Longmont now due to the Californication of the Colorado Front Range, are often the jumping off point for all communities. Construction costs come down with demand, neighboring communities benefit, and knowledge is shared regarding operations.

As for the quality end. Longmont’s NextLight just beat out Google Fiber. The city is ranked third for customer service and Internet service. While a terrible argument, if you dislike your provider, you the taxpayer could literally fire them by voting.

Of more interesting note, municipalities are more likely to spur innovation as well in the search for better service at less cost. Denver based LayerTV just partnered with NextLight. Apparently, public Internet can actually increase competition.

So, will the end of net neutrality spur a public movement to reopen the Internet as an endless frontier of experimentation and cost savings compared to unregulated Internet? The future will tell.

Do you know the best part about NextLight? No unscheduled maintenance. It almost seems like Comcast violates other FCC rules regarding throttling and content provision already.

 

 

Reactive Streaming with Thespian

In this article I will cover my new attempts at building reactive software for the world in Python, helping to streamline the tasks that hinder scale in this operationally efficient language. We will review Thespian and my reactive software based on this actively developed project.

Problem

Well folks, my attempts at building reactive software in asyncio have stalled. After beating my head against a wall when my loops became stuck, even when using a future instead of solely a loop, I have given up for now.

After a search I should have done weeks ago, I discovered I am not the only one to give up on an asyncio actor model (see Xudd and Cleveland). Thankfully, I stumbled on Thespian.

Thespian

Thespian is a basic actor system written in Python. While it is possible to wrap CAF, Thespian allows for wildly popular Python tools to be spread across an actor system while only minimally dealing with serialization or data packaging.

The system is stable, having been originally utilized in a production environment at GoDaddy.  The system is well documented.

Reactive Software

Despite being stable, Thespian is fairly light in terms of features. Compared with Akka, the system makes little to no attempt at implementing Reactive Streams, lacks decent cluster support, and is even missing basic routers.

To resolve this, I am building a series of reactive software programs that implement streams, routers, and the other basic features of Akka. I am additionally focusing on building ETL, ingestion, and networking tools based around the reactive streams model and Thespian.

Help Build the Future

We need your help. Our platforms should help streamline the monstrosities that Python based backends and systems can become. Help build the kind of scale that will hopefully power future Python applications, hone your skills, learn a new framework.

If interested, contact me at aevans48@simplrinsites.com. I can invite you to our Trello boards.

CompAktor: Python Actor System

While interest in Scala wanes, many of the tools that remain popular and a reason the language persists will likely be sidelined or moved to Java. While Java is popular language, it may not be appropriate for some use cases

Imagine building a series of interconnected, reactive robots based on Raspberry Pi with their own ‘nervous system’. Asyncio in Python 3.5+ gives the programmer this unique capability by allowing them to create several actor systems.

This article explores some of the basics behind actors in Python while serving as conceptual documentation for CompAktor. All code comes from CompAktor.

Message Passing

Actor Systems utilize message passing to perform units of work.  Queues store messages that the actor uses to perform certain tasks (image courtesy of PetaBridge):

how-actors-process-messages

In CompAktor, this is represented by the following code:

    @abstractmethod    
    async def _task(self):
        """
        The running task.  It is not recommended to override this function.
        """
        message = await self.__inbox.get()
        try:
            handler = self._handlers[type(message)]
            is_query = isinstance(message, QueryMessage)
            try:
                if handler:
                    response = await handler(message)
                else:
                    logging.warning("Handler is NoneType")
                    self.handle_fail()
            except Exception as ex:
                if is_query:
                    message.result.set_exception(ex)
                else:
                    logging.warning('Unhandled exception from handler of '
                                    '{0}'.format(type(message)))
                    self.handle_fail()
            else:
                if is_query:
                    message.result.set_result(response)
        except KeyError as ex:
            self.handle_fail()
            raise HandlerNotFoundError(type(message)) from ex
.......

Actor in Python

An actor is, of course, the core consumer and producer of messages in the actor system. The actor maintains a queue and in non-asyncio environments typically runs on its own thread.

Actors maintain and can change state, another critical component of actor systems. They use provided messages to perform units of work.

State is handled in CompAktor with the following code:

self._handlers = {}
self.register_handler(PoisonPill, self._stop_message_handler

The PoisonPill kills an actor and is a common construct.

Asyncio Loop

Asyncio runs on event loops. Multiple loops can be run in a program.

The loop works around Python’s GIL by using generator like behavior to mimic completely asynchronous behavior. Loops do not block block on I/O, allowing multiple tasks to run at once.

Straight from python.org:

tulip_coro

Notes on Asynchrosity and Event Loops

The actor model is safer internally than most other systems. Actors themselves perform a single task at a time.

Despite being safer, they still should not block the loop for a long time. If a task will take a while, it is recommended to use a separate thread or process to complete a task as opposed to blocking the loop and wreaking potential havoc on the system.

Conclusion

This article explored the creation of actors in Python using asyncio. The actor is the basic object in CompAktor.

Akka: Properly build a New Stream Source with Video Stream Outpu

The Akka documentation is extensive and yet is limited in some ways. One such way is in describing the breadth of issues required to build a functional streaming source in Akka Streams. This article covers source generation in more detail to help you avoid a major catastrophe.

We will use video processing with FFMPEG in our example source as we need to break apart frames and then emit every frame in a stream, an intensive real world task. There is no guarantee that our video stream will be production grade but it can start you on the path to success.

See also:

Building A Source

Building a source does not need to be a struggle but there are several actions that each source must account for.

It is recommended to read Lightbend’s customer stream processing documentation.

These actions are:

  • Handling the Completion Downstream
  • Handling a Success
  • Handling a Failure

The Akka graph stage, used to create a custom stream offers a downstream finish method as shown in the example below. This is not well documented.

Using a Materialized Value

Often, we want to create a future from a stream to ensure a stream completed as wanted. Promises in Scala offer a way to create futures while notifying the programmer of failures and successes.

A promise is easy to create:

   
     var promise : Promise[Long] = Promise[Long]()
     if(true){
          promise.trySuccess(1L)
     }else{
          promise.tryFailure(new Exception("Test Failed"))
     }
     promise.future

A promise is generated and given a success or failure. A future is then generated which can be handled normally.

Putting It All Together

Each concept can be applied to the generation of a custom source:

case class VFrame(id : Long, path : Path, frame : IplImage,captureDate : Long = DateTime.now().getMillis)

class VideoSource(path : Path) extends GraphStageWithMaterializedValue[SourceShape[VFrame],Future[Boolean]]{
  val out : Outlet[VFrame] = Outlet("VideoFrameSource")
  override def shape: SourceShape[VFrame] = SourceShape(out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = {
    val promise : Promise[Boolean] = Promise[Boolean]()
    var readImages : Long  = 0L
    var convertToIpl : OpenCVFrameConverter.ToIplImage = new OpenCVFrameConverter.ToIplImage()
    var grabber : FFmpegFrameGrabber = null
    var frame : Frame = null
    var converter = new OpenCVFrameConverter.ToIplImage

    val logic = new GraphStageLogic(shape){
      setHandler(out, new OutHandler{
        override def onPull(): Unit = {
          try {
            if(grabber == null){
              if(!path.toFile.exists()){
                throw new FileNotFoundException(s"Path to ${path.toFile.getAbsolutePath} does not Exist")
              }

              grabber = new FFmpegFrameGrabber(path.toFile.getAbsolutePath)
              grabber.start()
            }

            if(grabber != null){
              try {
                frame = grabber.grab()
              }catch{
                case t : Throwable =>{
                  println(t.getMessage)
                  println(ExceptionUtils.getStackTrace(t))
                }
              }
              if(frame != null) {
                val im =  converter.convert(frame)
                readImages += 1
                push(out,VFrame(readImages,path,im))
              }else{
                success()
              }
            }
          }catch{
            case t : Throwable =>{
              println(t.getMessage)
              println(ExceptionUtils.getStackTrace(t))
              fail(t)
            }
          }
        }

        def fail(ex : Throwable)={
          if(grabber != null){
            grabber.stop()
            grabber.close()
          }
          promise.tryFailure(ex)
          failStage(ex)
        }

        def success()={
          if(grabber != null){
            try {
              grabber.stop()
            }catch {
              case t : Throwable =>{
                println(t.getMessage)
                println(ExceptionUtils.getStackTrace(t))
              }
            }finally
            {
              grabber.close()
            }
          }
          promise.trySuccess(true)
          completeStage()
        }

        override def onDownstreamFinish(): Unit = {
          success()
        }

      })
    }

    logic -> promise.future
  }
}

This class creates a source with a materialized value, a topic not fully covered in the Lightbend documentation. The number of frames is returned. The source overrides onDownStreamFinish and implements custom success and failure processing.

The anatomy of the above source is simple. A class is created which takes in a Path to a file source. FFMpeg is used to stream the obtain frames from the video. The source requires the creation of specialized source logic after specifying the source shape and type through createLogicAndMaterializedValue. The Promise promise contains the results of the materialized values. The resulting logic method yields a future when success  or failure are called. The method completeStage is called to complete the source.

Conclusion

This article examined the creation of new sources in Akka. We reviewed handling the completion of downstream sources, success, failure, and the return of a materialized value. Creating sources is easier than Lightbend lets on.

Checking and Increasing the Connection Limit in PostgreSQL

It finally happened. I blew the max connection limit in my new  PostgreSQL install by starting too many grid nodes  with associated connection pools for a system I am writing.

The default limit of 100 connections is far too few, especially with two people doing database intensive tasks. This article explains how to check the number of connections used as a standard user and administrator and how the administrator can change the connection limit.

Default Connection Limit and Buffer Size

By default, PostgreSQL has a relatively low number of maximum allowed connections. The default limit is 100.

The limit is related to the size of the shared buffers. Connections utilize the memory in the shared buffers. By default, the shared buffer size is set to 8 gigabytes.

PostgreSQL is a versatile database. Amazon built Redshift on the system. However, the maximum sizes are low and should remain so to encourage developers and students not to abuse their system’s resources.

Common Error When Exceeding Size Limit

With an ever increasing amount of data available and still growing need for technology, connection limits will be exceeded on many systems. PostgreSQL throws the following error when this occurs:

psql: FATAL: remaining connection slots are reserved for non-replication superuser connections

In my case, the connection pool failed to reserve the required ten to twenty connections per highly database intensive job and released what connections it did acquire back to the database instantly. This left me with 96 used connections and four that were unattainable. Some of the 96 connections were by an ETL tool or PG Admin.

Querying the Connection Limit

It is possible for non-administrators to check the number of connections in use with the following query:

SELECT count(distinct(numbackends)) FROM pg_stat_database

This query puts together a view of database statistics and counts the number of currently active connections.

Administrators have the option of querying connections from the psql command line using:

SHOW max_connections

This query asks the database to return the value of the max_connections configuration variable described below.

Most configuration variables can be queried in psql. Some can even be set from the command line console. The max_connections variable cannot be set from the command line due to an associated increase or decrease in memory usage.

Setting Connection Related Variables

While the initial settings are low by todays settings, they can be set in your postgresql.conf file. The file is typically found at a location such as:

/var/lib/pgsql/X.X/data/postgresql.conf  or /usr/local/postgres/postgresql.conf

The default location is the data directory under your PostgreSQL installation.

Look for the following variables in the file:

shared_buffers = 8000MB
max_connections = 100

Choosing Size Limits

It is important to set size limits relevant to the size of the shared buffers. Dividing the RAM usage by the current number of connections in use or the maximum number allotted will give a nice over-estimate of total memory use per connection. Ensure that this multiple does not exceed the shared buffer size and the capacity of your RAM. The shared buffer size should generally be less than the amount of RAM in your machine.

The equality to  check with is:

shared_buffers < (RAM - padding) && (current_ram_use / max_connections) * desired_connections < (RAM - padding) && (current_ram_use / max_connections) * desired_connections < shared_buffers

Basically, ensure that you have enough RAM and that the potential memory use does not exceed RAM and the shared buffer size. This inequality uses the max_connections settings instead of the actual number of used connections for some extra assurance.

Reloading and Restarting

The configuration file can be reloaded in an active session without a restart using the sql command line console or through PG Admin with the query:

SELECT pg_reload_config()

This query does not require administrator privileges but will not work when setting the maximum number of allotted connections or buffer size. Any variable in the configuration file proceeded by the following comment requires restarting the database server:

# (change requires restart)

Both max_connections and shared_buffers cause a change in memory usage which cannot be reworked on the fly. From the Linux command line, restart the server with the command:

./postgresql restart

On Ubuntu, the restart script normally resides at /etc/init.d.

If PostgreSQL is setup as a service, use:

service postgresql restart

Conclusion

In today’s world where data is bountiful, it is easy to exceed the connection limit in PostgreSQL. This article reviewed how to reset the connection limit and considerations for resetting the limit and associated shared buffer size.