Reactive Streaming with Thespian

In this article I will cover my new attempts at building reactive software for the world in Python, helping to streamline the tasks that hinder scale in this operationally efficient language. We will review Thespian and my reactive software based on this actively developed project.

Problem

Well folks, my attempts at building reactive software in asyncio have stalled. After beating my head against a wall when my loops became stuck, even when using a future instead of solely a loop, I have given up for now.

After a search I should have done weeks ago, I discovered I am not the only one to give up on an asyncio actor model (see Xudd and Cleveland). Thankfully, I stumbled on Thespian.

Thespian

Thespian is a basic actor system written in Python. While it is possible to wrap CAF, Thespian allows for wildly popular Python tools to be spread across an actor system while only minimally dealing with serialization or data packaging.

The system is stable, having been originally utilized in a production environment at GoDaddy.  The system is well documented.

Reactive Software

Despite being stable, Thespian is fairly light in terms of features. Compared with Akka, the system makes little to no attempt at implementing Reactive Streams, lacks decent cluster support, and is even missing basic routers.

To resolve this, I am building a series of reactive software programs that implement streams, routers, and the other basic features of Akka. I am additionally focusing on building ETL, ingestion, and networking tools based around the reactive streams model and Thespian.

Help Build the Future

We need your help. Our platforms should help streamline the monstrosities that Python based backends and systems can become. Help build the kind of scale that will hopefully power future Python applications, hone your skills, learn a new framework.

If interested, contact me at aevans48@simplrinsites.com. I can invite you to our Trello boards.

Advertisements

CompAktor: Python Actor System

While interest in Scala wanes, many of the tools that remain popular and a reason the language persists will likely be sidelined or moved to Java. While Java is popular language, it may not be appropriate for some use cases

Imagine building a series of interconnected, reactive robots based on Raspberry Pi with their own ‘nervous system’. Asyncio in Python 3.5+ gives the programmer this unique capability by allowing them to create several actor systems.

This article explores some of the basics behind actors in Python while serving as conceptual documentation for CompAktor. All code comes from CompAktor.

Message Passing

Actor Systems utilize message passing to perform units of work.  Queues store messages that the actor uses to perform certain tasks (image courtesy of PetaBridge):

how-actors-process-messages

In CompAktor, this is represented by the following code:

    @abstractmethod    
    async def _task(self):
        """
        The running task.  It is not recommended to override this function.
        """
        message = await self.__inbox.get()
        try:
            handler = self._handlers[type(message)]
            is_query = isinstance(message, QueryMessage)
            try:
                if handler:
                    response = await handler(message)
                else:
                    logging.warning("Handler is NoneType")
                    self.handle_fail()
            except Exception as ex:
                if is_query:
                    message.result.set_exception(ex)
                else:
                    logging.warning('Unhandled exception from handler of '
                                    '{0}'.format(type(message)))
                    self.handle_fail()
            else:
                if is_query:
                    message.result.set_result(response)
        except KeyError as ex:
            self.handle_fail()
            raise HandlerNotFoundError(type(message)) from ex
.......

Actor in Python

An actor is, of course, the core consumer and producer of messages in the actor system. The actor maintains a queue and in non-asyncio environments typically runs on its own thread.

Actors maintain and can change state, another critical component of actor systems. They use provided messages to perform units of work.

State is handled in CompAktor with the following code:

self._handlers = {}
self.register_handler(PoisonPill, self._stop_message_handler

The PoisonPill kills an actor and is a common construct.

Asyncio Loop

Asyncio runs on event loops. Multiple loops can be run in a program.

The loop works around Python’s GIL by using generator like behavior to mimic completely asynchronous behavior. Loops do not block block on I/O, allowing multiple tasks to run at once.

Straight from python.org:

tulip_coro

Notes on Asynchrosity and Event Loops

The actor model is safer internally than most other systems. Actors themselves perform a single task at a time.

Despite being safer, they still should not block the loop for a long time. If a task will take a while, it is recommended to use a separate thread or process to complete a task as opposed to blocking the loop and wreaking potential havoc on the system.

Conclusion

This article explored the creation of actors in Python using asyncio. The actor is the basic object in CompAktor.

Akka: Properly build a New Stream Source with Video Stream Outpu

The Akka documentation is extensive and yet is limited in some ways. One such way is in describing the breadth of issues required to build a functional streaming source in Akka Streams. This article covers source generation in more detail to help you avoid a major catastrophe.

We will use video processing with FFMPEG in our example source as we need to break apart frames and then emit every frame in a stream, an intensive real world task. There is no guarantee that our video stream will be production grade but it can start you on the path to success.

See also:

Building A Source

Building a source does not need to be a struggle but there are several actions that each source must account for.

It is recommended to read Lightbend’s customer stream processing documentation.

These actions are:

  • Handling the Completion Downstream
  • Handling a Success
  • Handling a Failure

The Akka graph stage, used to create a custom stream offers a downstream finish method as shown in the example below. This is not well documented.

Using a Materialized Value

Often, we want to create a future from a stream to ensure a stream completed as wanted. Promises in Scala offer a way to create futures while notifying the programmer of failures and successes.

A promise is easy to create:

   
     var promise : Promise[Long] = Promise[Long]()
     if(true){
          promise.trySuccess(1L)
     }else{
          promise.tryFailure(new Exception("Test Failed"))
     }
     promise.future

A promise is generated and given a success or failure. A future is then generated which can be handled normally.

Putting It All Together

Each concept can be applied to the generation of a custom source:

case class VFrame(id : Long, path : Path, frame : IplImage,captureDate : Long = DateTime.now().getMillis)

class VideoSource(path : Path) extends GraphStageWithMaterializedValue[SourceShape[VFrame],Future[Boolean]]{
  val out : Outlet[VFrame] = Outlet("VideoFrameSource")
  override def shape: SourceShape[VFrame] = SourceShape(out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = {
    val promise : Promise[Boolean] = Promise[Boolean]()
    var readImages : Long  = 0L
    var convertToIpl : OpenCVFrameConverter.ToIplImage = new OpenCVFrameConverter.ToIplImage()
    var grabber : FFmpegFrameGrabber = null
    var frame : Frame = null
    var converter = new OpenCVFrameConverter.ToIplImage

    val logic = new GraphStageLogic(shape){
      setHandler(out, new OutHandler{
        override def onPull(): Unit = {
          try {
            if(grabber == null){
              if(!path.toFile.exists()){
                throw new FileNotFoundException(s"Path to ${path.toFile.getAbsolutePath} does not Exist")
              }

              grabber = new FFmpegFrameGrabber(path.toFile.getAbsolutePath)
              grabber.start()
            }

            if(grabber != null){
              try {
                frame = grabber.grab()
              }catch{
                case t : Throwable =>{
                  println(t.getMessage)
                  println(ExceptionUtils.getStackTrace(t))
                }
              }
              if(frame != null) {
                val im =  converter.convert(frame)
                readImages += 1
                push(out,VFrame(readImages,path,im))
              }else{
                success()
              }
            }
          }catch{
            case t : Throwable =>{
              println(t.getMessage)
              println(ExceptionUtils.getStackTrace(t))
              fail(t)
            }
          }
        }

        def fail(ex : Throwable)={
          if(grabber != null){
            grabber.stop()
            grabber.close()
          }
          promise.tryFailure(ex)
          failStage(ex)
        }

        def success()={
          if(grabber != null){
            try {
              grabber.stop()
            }catch {
              case t : Throwable =>{
                println(t.getMessage)
                println(ExceptionUtils.getStackTrace(t))
              }
            }finally
            {
              grabber.close()
            }
          }
          promise.trySuccess(true)
          completeStage()
        }

        override def onDownstreamFinish(): Unit = {
          success()
        }

      })
    }

    logic -> promise.future
  }
}

This class creates a source with a materialized value, a topic not fully covered in the Lightbend documentation. The number of frames is returned. The source overrides onDownStreamFinish and implements custom success and failure processing.

The anatomy of the above source is simple. A class is created which takes in a Path to a file source. FFMpeg is used to stream the obtain frames from the video. The source requires the creation of specialized source logic after specifying the source shape and type through createLogicAndMaterializedValue. The Promise promise contains the results of the materialized values. The resulting logic method yields a future when success  or failure are called. The method completeStage is called to complete the source.

Conclusion

This article examined the creation of new sources in Akka. We reviewed handling the completion of downstream sources, success, failure, and the return of a materialized value. Creating sources is easier than Lightbend lets on.

Checking and Increasing the Connection Limit in PostgreSQL

It finally happened. I blew the max connection limit in my new  PostgreSQL install by starting too many grid nodes  with associated connection pools for a system I am writing.

The default limit of 100 connections is far too few, especially with two people doing database intensive tasks. This article explains how to check the number of connections used as a standard user and administrator and how the administrator can change the connection limit.

Default Connection Limit and Buffer Size

By default, PostgreSQL has a relatively low number of maximum allowed connections. The default limit is 100.

The limit is related to the size of the shared buffers. Connections utilize the memory in the shared buffers. By default, the shared buffer size is set to 8 gigabytes.

PostgreSQL is a versatile database. Amazon built Redshift on the system. However, the maximum sizes are low and should remain so to encourage developers and students not to abuse their system’s resources.

Common Error When Exceeding Size Limit

With an ever increasing amount of data available and still growing need for technology, connection limits will be exceeded on many systems. PostgreSQL throws the following error when this occurs:

psql: FATAL: remaining connection slots are reserved for non-replication superuser connections

In my case, the connection pool failed to reserve the required ten to twenty connections per highly database intensive job and released what connections it did acquire back to the database instantly. This left me with 96 used connections and four that were unattainable. Some of the 96 connections were by an ETL tool or PG Admin.

Querying the Connection Limit

It is possible for non-administrators to check the number of connections in use with the following query:

SELECT count(distinct(numbackends)) FROM pg_stat_database

This query puts together a view of database statistics and counts the number of currently active connections.

Administrators have the option of querying connections from the psql command line using:

SHOW max_connections

This query asks the database to return the value of the max_connections configuration variable described below.

Most configuration variables can be queried in psql. Some can even be set from the command line console. The max_connections variable cannot be set from the command line due to an associated increase or decrease in memory usage.

Setting Connection Related Variables

While the initial settings are low by todays settings, they can be set in your postgresql.conf file. The file is typically found at a location such as:

/var/lib/pgsql/X.X/data/postgresql.conf  or /usr/local/postgres/postgresql.conf

The default location is the data directory under your PostgreSQL installation.

Look for the following variables in the file:

shared_buffers = 8000MB
max_connections = 100

Choosing Size Limits

It is important to set size limits relevant to the size of the shared buffers. Dividing the RAM usage by the current number of connections in use or the maximum number allotted will give a nice over-estimate of total memory use per connection. Ensure that this multiple does not exceed the shared buffer size and the capacity of your RAM. The shared buffer size should generally be less than the amount of RAM in your machine.

The equality to  check with is:

shared_buffers < (RAM - padding) && (current_ram_use / max_connections) * desired_connections < (RAM - padding) && (current_ram_use / max_connections) * desired_connections < shared_buffers

Basically, ensure that you have enough RAM and that the potential memory use does not exceed RAM and the shared buffer size. This inequality uses the max_connections settings instead of the actual number of used connections for some extra assurance.

Reloading and Restarting

The configuration file can be reloaded in an active session without a restart using the sql command line console or through PG Admin with the query:

SELECT pg_reload_config()

This query does not require administrator privileges but will not work when setting the maximum number of allotted connections or buffer size. Any variable in the configuration file proceeded by the following comment requires restarting the database server:

# (change requires restart)

Both max_connections and shared_buffers cause a change in memory usage which cannot be reworked on the fly. From the Linux command line, restart the server with the command:

./postgresql restart

On Ubuntu, the restart script normally resides at /etc/init.d.

If PostgreSQL is setup as a service, use:

service postgresql restart

Conclusion

In today’s world where data is bountiful, it is easy to exceed the connection limit in PostgreSQL. This article reviewed how to reset the connection limit and considerations for resetting the limit and associated shared buffer size.

A Guide to Defining Business Objectives

It can be said that in the world of the actual developer when the client or boss stops complaining and critiquing, the developer was automated to the point of being redundant. Basically, if no one is complaining, the developer is out of work.

Everyone wants a flying car, even when the job involves ingestion. However, there is a fine line in a business between a business requirement and idealism or ignorance. This is especially true when reporting to a boss whose technical competence and knowledge is less than stellar. So what rules have I discovered that will keep you employed? How do you avoid going to far off track when creating SCRUM tasks? Read on.

This article is an opinion piece on defining objectives and, to some extent, building the infrastructure required for a project.

Defining Core Objectives

All actual rules should be tightly cropped to the actual end client’s needs. This means being social with the end user. The first thing to do should be to find the most pessimistic person in the office and the person who will be using your product most. They may be the same people. Strike up a nice conversation with them. Get to know them  and then start asking questions about what they absolutely need. If there are multiple people in the office interacting directly with your end product, work from the most pessimistic and necessary to the least, establishing an absolute core of necessity.

By default, your boss will usually be the most optimistic person in the office. They are often less knowledgeable of the technical requirements and often of technology in general. Your goal is to temper their expectations to something reasonable. I have found this to be true of business partners as well.  You should understand what they want as they will provide a small set of absolute requirements but also keep in mind that if they can squeeze gold from water, they will.

If you find yourself limiting your objectives, use memos and presentations to make sure that you are providing a solid line of reasoning for why something cannot be done. Learn the IEEE standards for documentation and get familiar with Microsoft Office or Libre Office. Always offer a solution and state what would be needed to actually accomplish an objective and why it may be infeasible. In doing so, you may find a compromise or a solution. Offer them as alternatives. Do not be overly technical with the less technical.

My line of work requires providing relatively sensitive information in bulk at speed with a fair degree of normalization and quality. Basically, I am developing and building a large distributed ingestion and ETL engine with unique requirements that do not fit existing tools. This process has been a wreck as I was new to development coming in, given a largely inappropriate set of technology, ignored, and asked to build a Netflix style stack from the hulk of Geo Cities.

Defining business requirements was the first task I struggled with. Competing and even conflicting interests came from everywhere. My boss wanted and to a large degree wants an auto-scaling system with a great degree of statistical prediction on top of a massive number of sources from just one person. My co-workers, clients in the most strict sense, want normalization, de-duplication, anomaly detection,  and a large number of enhancements on top of a high degree of speed. No one seemed to grasp the technical requirements but everyone had an idea of what they needed.

In solving this mess, I found that my most valuable resource was the end user who was both most pessimistic of what we could deliver and had less technical skill than hoped for.  She is extremely bright and quite amazing, so bringing her up to speed was a simple task. However, she was very vague about what she wanted. In this case, I was able to discern requirements from my bosses optimism and a set of questions posed to her. As she also creates the tickets stemming from issues in the system, she indirectly defines our objectives as well.

Available Technology

The availability of technology will determine how much you can do. Your company will often try to provide less than the required amount of technology. Use your standards based documentation, cost models, and business writing to jockey for more. If you are under-respected, find someone who has clout and push them to act on your behalf.

As a junior employee several years ago, I found myself needing to push for basic technologies. My boss wanted me to use Pentaho for highly concurrent yet state based networking tasks on large documents ranging from HTML to PDF. In addition to this, he wanted automation and a tool that could scale easily. Pentaho was a poor tool choice. Worse, I had no server access. It took one year before I was able to start leaning on a more senior employee to lobby for more leniency and after another year and a half, servers. It took another year before I had appropriate access. If I was not developing a company, one that now has clients, I would have quit. The important take away, get to know your senior employees and use them on your behalf when you need to. Bribes can be paid in donuts where I work.

Promise Appropriately, Deliver With Quality

Some organizations require under-promising and over-delivering. These tend to be large organizations with performance review systems in desperate need of an overhaul. If you find yourself in such a situation, lobby to change the system. A solid set of reasoning goes a long way.

Most of us are in a position to promise an appropriate number of features with improvements over time. If you use SCRUM, this model fits perfectly. Devise your tasks around this model. Know who is on your team and promise an appropriate unit of work. Sales targets are built around what you can deliver. They are kept on your quality and  the ease of handling your product. Do not deliver to little, you will be fired but don’t define so much as to raise exuberance to an unsatisfiable level.

In my ingestion job, promises are made based on quality and quantity. I use the SCRUM model to refine our promises. Knowing my new co-worker’s capacity, fairly dismal, and my own, swamped with creating the tool, I can temper our tasks to meet business goals. Over time, we are able to include more business requirements on top of the number of sources being output and improving existing tools.

Hire Talent

If you are in the position of being able to hire people to expand on what you can achieve,  I do not recommend telling your boss an entry level position will suffice as they will then find someone with no skill. Also, push to be in the loop on the hiring process. The right person can make or break a project. My current co-worker is stuck re-running old tasks as he had no knowledge of our required tools and concepts despite my memo to my boss. Over time, he will get better but, with little skill, that may be too long. Sometimes the most difficult higher ups are those who are nice at heart but ignorant in practice.

Tickets

Your core requirements are not the ten commandments. You are not defining a society and universal morals but a more organic project. Requirements and objectives will change over time. The best thing you can do is to establish a ticket system, choose a solid system as changing to a different tool later is difficult. Patterns from these tickets will create new tools, define more requirements, and help you to better define your process.

In finding an appropriate system, ask:

  • Do I need an API that can interact with my or my clients tools?
  • Do I need SCRUM and Kanban capabilities on top of ticketing?
  • How hard is it to communicate with the client or end user?

At my work, I implemented a manual SCRUM board for certain tasks which had a positive impact on my overwhelmed co-worker who found JIRA cumbersome and full of lag. It is. We use JIRA for bug reporting and associated Kanban capabilities.

Cost

Cost is that lurking issue many will ignore. You need to document cost and use it to explain the feasibility of an objective. When possible create statistical tools that you can use to predict the burden on profitability and justify decisions. Money is the most powerful reasoning tool you have.

Conclusion

This opinion piece reviewed my lessons for entry level software developers looking to learn how to define business objectives. Overall, my advice is to:

  • Define core objectives starting with the most important and pessimistic users
  • Dive into your bosses core requirements and use their optimism to define the icing on the cake
  • Build on your objectives and requirements over time
  • Be involved in your bosses decisions
  • Define an appropriate number of objectives that allow you to deliver quality work (you will build on your past work over time)
  • Communicate and use an appropriate project management framework
  • Track costs and build statistical tools
  • Learn IEEE standards based documentation such as Software Design Documents and Database Design Documents, get familiar with business writing
  • Make sure you hire the right people

 

Akka: Resolving a Split Brain Without a Headache

Anyone who develops distributed systems knows that there are many issues to resolve before reaching stability. Akka does not entirely avoid these issues and, while many can be handled through configuration or a few lines of could, some problems require extra leg work. One major problem is the split brain. This article explains what a split brain is and examines a solution that does not involve paying Lightbend.

See also:

Split Brains

Split brains  are the cell division of the concurrent programming world. When different nodes on a cluster cannot reach one another, they must decide how to handle the nodes they cannot reach. Without proper configuration in Akka, the nodes merely assume the other nodes are down and remove or gate them.  The previously single cluster has divided into two separate clusters.

In the world of concurrent programming, the question is not whether a split brain will occur but when. Networks crash. Hardware fails, needs to be upgraded or updated, or requires replacement every so often.

Unfortunately, there is no free way to automatically handle the problem in Akka. Auto downing, the only freely available method for resolving the unreachable state,  is actually not a solution to the split brain problem and will result in the separation of nodes into different clusters.

The following graphic on cell division illustrates the split brain problem. Notice how the two cells are completely independent of each other and yet perform the same role.

800px-Major_events_in_mitosis.svg

Strategies for Resolving a Split Brain

Lightbend, the company behind Akka, lays out several strategies for resolving a split brain. In a nutshell, they are:

Unfortunately,  Lightbend requires a paid subscription to access implementations of these strategies.

Custom Majority Split Brain Resolver

While the folks behind Akka do not provide free solutions to the split brain problem, they do provide the tools to implement one of the aforementioned strategies.

The following code utilizes the majority strategy:

The preStart method requests the receipt of messages regarding reachability in the cluster. Once the Unreachable message is caught, the code stores the relevant actor reference in a sequence of unreachable nodes and schedules the removal of all unreachable nodes after a period of time if the current set of nodes contains the majority of its kind. After the pres

Conclusion

A split brains is a serious problem. We reviewed ways to solve the issue and presented a free solution using the majority strategy.

 

PostgreSQL Faster Way to Check Column Existence

We often need to check that a column exists before creating it, especially when dealing with dynamically generated data being inserted into PostgreSQL. This brief article shows how to perform this operation quickly and offers a PGPLSQL solution for creating new columns in databases older than PostgreSQL 9.6.

Querying The Catalog

There are several ways to search the PostgreSQL catalog tables. The easiest for the programmer is to use the information_schema. The fastest is to build a custom query.

While it is enticing to query the information schema, it is not a fast operation.  Multiple nested loops are created to pull from the catalog tables even when looking for a single attribute.

The following query on the information_schema results in running a rather large set of operations nested within several loops:

EXPLAIN SELECT count(*) > 0 FROM information_schema.columns WHERE table_schema LIKE 'us_ut_sor' AND table_name LIKE 'dirtyrecords' AND column_name LIKE 'bullage'

When run, the output is as follows:

"Aggregate  (cost=3777.32..3777.33 rows=1 width=0)"
"  ->  Nested Loop Left Join  (cost=2.39..3777.32 rows=1 width=0)"
"        ->  Nested Loop  (cost=1.83..3775.73 rows=1 width=4)"
"              ->  Nested Loop Left Join  (cost=1.54..3775.42 rows=1 width=8)"
"                    Join Filter: (t.typtype = 'd'::"char")"
"                    ->  Nested Loop  (cost=0.84..3774.50 rows=1 width=13)"
"                          ->  Nested Loop  (cost=0.42..3770.19 rows=1 width=8)"
"                                ->  Nested Loop  (cost=0.00..3740.84 rows=1 width=8)"
"                                      Join Filter: (c.relnamespace = nc.oid)"
"                                      ->  Seq Scan on pg_namespace nc  (cost=0.00..332.06 rows=1 width=4)"
"                                            Filter: ((NOT pg_is_other_temp_schema(oid)) AND (((nspname)::information_schema.sql_identifier)::text ~~ 'us_ut_sor'::text))"
"                                      ->  Seq Scan on pg_class c  (cost=0.00..3407.26 rows=121 width=12)"
"                                            Filter: ((relkind = ANY ('{r,v,f}'::"char"[])) AND (((relname)::information_schema.sql_identifier)::text ~~ 'dirtyrecords'::text))"
"                                ->  Index Scan using pg_attribute_relid_attnum_index on pg_attribute a  (cost=0.42..29.35 rows=1 width=14)"
"                                      Index Cond: ((attrelid = c.oid) AND (attnum > 0))"
"                                      Filter: ((NOT attisdropped) AND (((attname)::information_schema.sql_identifier)::text ~~ 'bullage'::text) AND (pg_has_role(c.relowner, 'USAGE'::text) OR has_column_privilege(c.oid, attnum, 'SELECT, INSERT, UPDATE, REFE (...)"
"                          ->  Index Scan using pg_type_oid_index on pg_type t  (cost=0.42..4.30 rows=1 width=13)"
"                                Index Cond: (oid = a.atttypid)"
"                    ->  Nested Loop  (cost=0.70..0.90 rows=1 width=4)"
"                          ->  Index Scan using pg_type_oid_index on pg_type bt  (cost=0.42..0.58 rows=1 width=8)"
"                                Index Cond: (t.typbasetype = oid)"
"                          ->  Index Only Scan using pg_namespace_oid_index on pg_namespace nbt  (cost=0.29..0.31 rows=1 width=4)"
"                                Index Cond: (oid = bt.typnamespace)"
"              ->  Index Only Scan using pg_namespace_oid_index on pg_namespace nt  (cost=0.29..0.31 rows=1 width=4)"
"                    Index Cond: (oid = t.typnamespace)"
"        ->  Nested Loop  (cost=0.56..1.57 rows=1 width=4)"
"              ->  Index Scan using pg_collation_oid_index on pg_collation co  (cost=0.28..0.35 rows=1 width=72)"
"                    Index Cond: (a.attcollation = oid)"
"              ->  Index Scan using pg_namespace_oid_index on pg_namespace nco  (cost=0.29..1.21 rows=1 width=68)"
"                    Index Cond: (oid = co.collnamespace)"
"                    Filter: ((nspname  'pg_catalog'::name) OR (co.collname  'default'::name))"

This is truly nasty. In fact, any program running in O(n^2) or larger time will be less than ideal in this situation.

Limiting the O(n) time can be done by directly querying the catalog tables. The previous query was merely checking to see if a column existed under a given table and schema. The following custom query performs this operation much faster:

EXPLAIN SELECT count(*) > 0 FROM (SELECT q1.oid,q1.relname,q1.relowner,q1.relnamespace,q2.nspname FROM (SELECT oid,relname,relowner,relnamespace FROM pg_class) as q1 INNER JOIN (SELECT oid, * FROM pg_catalog.pg_namespace) as q2 ON q1.relnamespace = q2.oid WHERE q1.relname LIKE 'dirtyrecords' AND q2.nspname LIKE 'us_ut_sor') as oq1 INNER JOIN (SELECT attrelid,attname FROM pg_attribute) as oq2 ON oq1.oid = oq2.attrelid WHERE oq2.attname LIKE 'bullage'

While larger, many less operations are performed for a comparatively lower speed cost:

"Aggregate  (cost=292.44..292.45 rows=1 width=0)"
"  ->  Nested Loop  (cost=0.84..292.43 rows=1 width=0)"
"        ->  Nested Loop  (cost=0.42..289.64 rows=1 width=4)"
"              ->  Seq Scan on pg_namespace  (cost=0.00..281.19 rows=1 width=4)"
"                    Filter: (nspname ~~ 'us_ut_sor'::text)"
"              ->  Index Scan using pg_class_relname_nsp_index on pg_class  (cost=0.42..8.44 rows=1 width=8)"
"                    Index Cond: ((relname = 'dirtyrecords'::name) AND (relnamespace = pg_namespace.oid))"
"                    Filter: (relname ~~ 'dirtyrecords'::text)"
"        ->  Index Only Scan using pg_attribute_relid_attnam_index on pg_attribute  (cost=0.42..2.79 rows=1 width=4)"
"              Index Cond: ((attrelid = pg_class.oid) AND (attname = 'bullage'::name))"
"              Filter: (attname ~~ 'bullage'::text)"

 

Notice how the cost of the first query was 3777.32 while the second was merely 292.44. That is a not so small order of magnitude better

PGPLSQL Function

For databases versions prior to PostgreSQL 9.6, which introduces the syntax ALTER TABLE x ADD COLUMN IF NOT EXISTS y TYPE, the following PGPLSQL function performs the desired table alteration:

 

CREATE OR REPLACE FUNCTION add_column_if_not_exists(schema_name varchar(63), table_name varchar(63), column_name varchar(63),column_type varchar(1024)) RETURNS void AS
$BODY$
     DECLARE
          column_exists BOOLEAN;
     BEGIN     
        IF schema_name IS NOT NULL THEN
		SELECT count(*) > 0 INTO column_exists FROM (SELECT q1.oid,q1.relname,q1.relowner,q1.relnamespace,q2.nspname FROM (SELECT oid,relname,relowner,relnamespace FROM pg_class) as q1 INNER JOIN (SELECT oid, * FROM pg_catalog.pg_namespace) as q2 ON q1.relnamespace = q2.oid WHERE q1.relname LIKE table_name AND q2.nspname LIKE schema_name) as oq1 INNER JOIN (SELECT attrelid,attname FROM pg_attribute) as oq2 ON oq1.oid = oq2.attrelid WHERE oq2.attname LIKE column_name;
		IF column_exists IS FALSE THEN
		    EXECUTE 'ALTER TABLE '||schema_name||'.'||table_name||' ADD COLUMN '||column_name||' '||column_type;
                END IF;
	ELSE
		SELECT count(*) > 0 INTO column_exists FROM (SELECT oid,relname,relowner,relnamespace FROM pg_class WHERE relname LIKE table_name) as oq1 INNER JOIN (SELECT attrelid,attname FROM pg_attribute) as oq2 ON oq1.oid = oq2.attrelid WHERE oq2.attname LIKE column_name;
		IF column_exists IS FALSE THEN
		    EXECUTE 'ALTER TABLE '||table_name||' ADD COLUMN '||column_name||' '||column_type;
                END IF;
        END IF;

      
END;
$BODY$
LANGUAGE plpgsql;

We did not create a trigger that fires on all alter statements to avoid creating additional cost when not desired. The provided function also avoids a costly join if no schema is present.

Conclusion

In this article, we discovered that the information schema is not as ideal as it seems. Armed with this information, we created a better function to add columns to a table only if they do not exist.