2019 Trend: Data Engineering Becomes a Household Name

2019

Stuck, Flickr

There will be many 2019 trends that last well beyond the year. With Tableau now being a household name, Salesforce being a  workhorse for analytics, SAS continuing to grow through jmp, and small players such as Panoply acquiring funding, one hot 2019 trend in technology will be data engineering.

This underlies a massive problem in  the field of data. Data management tools and frameworks are severely deficient. Many merely perform materialization.

That is changing this year, and it means that data engineering will be an important term over the next few years. Automation will become a reality.

What is a data engineer?

Data engineers create pipelines. This means automating the handling of data from aggregation and ingestion to the modeling and reporting process. These professionals handle big data or even small data loads with streaming playing an important role in their work.

As they cover the entire pipeline for your data and often implement analytics in a repeatable manner, data engineering is a broad task. Terms such as ETL, ELT, verification, testing, reporting, materialization, standardization, normalization, distributed programming, crontab, Kubernetes, microservices, Docker, Akka, Spark, AWS, REST, Postgres, Kafka, and statistics are commonly slung with ease by data engineers.

Until 2019, integrating systems often meant combing a variety of tools into a cluttered wreck. A company might deploy python scripts for visualization, Vantara (formerly Pentaho) for ETL, use a variety of aggregation tools combined in Kafka, have data warehouses in PostgreSQL, and may even still use Microsoft Excel to Store data.

The typical company spends $4000 – $8000 per employ providing these pipelines. This cost is unacceptable and can be avoided in the coming years.

Why won’t ELT kill data engineers?

ELT applications promise to get rid of data engineers.  That is pure nonsense meant to attract ignorant investors money:

  • ELT is horrible for big data with continuous ETL proving more effective
  • ELT is often performed on data sources that already underwent ETL by the companies it was purchased from such as Acxiom, Nasdaq, and TransUnion
  • ELT eats resources in a significant way and often limits its use to small data sets
  • ELT ignores issues related to streaming from surveys and other sources which greatly benefit from the requirements analysis and transformations of ETL
  • ELT is horrible for integration tasks where data standards differ or are non-existent
  • You cannot run good AI or build models on poorly or non-standardized data

This means that ETL will continue to be an major part of a data engineers job.

Of course, since data engineers translate business analyst requirements into reality, the job will continue to be secure. Coding may become less important as new products are released but will never go away in the most efficient organizations.

Why is Python Likely to become less popular?

Many people point to Python as a means for making data engineers redundant. This is simply false.

Python is limited. This means that jvm will rise in popularity with data scientist and even analysts as companies want to make money on the backs of their algorithms. This benefits data engineers who are typically proficient in at least Java, Go, or Scala.

Python works for developers, analysts, and data scientists who want to control tools written in a more powerful language such as C++ or Java. Pentaho experimented with the language being bought by Hitachi. However, being 60 times slower than the JVM and often requiring three times the resources,  it is not an enterprise-grade language.

Python does not provide power. It is not great at parallelism and is single threaded. Any language can achieve parallelism. Python uses heavy OS threads to perform anything asynchronously. This is horrendous.

Consider the case of using Python’s Celery versus Akka, a Scala and Java-based tool. Celery and Akka perform the same tasks across a distributed system.

Parsing millions of records in celery can quickly eat up more than fifty percent of typical server resources with a mere ten processes. RabbitMQ, the messaging framework behind Celery, can only parse 50 million messages per second on a cluster. Depending on the use case, Celery may also require Redis to run effectively. This means that an 18 logical core server with 31 gigabytes of RAM can be severely bogged down processing tasks.

Akka, on the other hand, is the basis for Apache Spark. It is lightweight and all inclusive. 50 million messages per second are attainable with 10 million actors running concurrently at much less than fifty percent of typical servers resources. With not every use case requiring spark, even in data engineering, this is an outstanding difference. Not needing a message routing and results backend means that less skill is required for deployment as well.

Will Scala become popular again?

When I started programming in Scala, the language was fairly unheard of. Many co-workers merely looked at this potent language as a curiosity. Eventually, Scala’s popularity started to wain as java developers were still focused on websites and ignored creating the same frameworks for Scala that exist in Python.

That is changing. With the rise of R, whose syntax is incredibly similar to Scala, mathematicians and analysts are gaining skill in increasingly complex languages.

Perhaps due to this, Scala is making it back into the lexicon of developers. The power of Python was greatly reduced in 2017 as non-existent or previously non-production level tools were released for the JVM.

Consider what is now at least version 1.0 in Scala:

  • Nd4j and Nd4s: A Scala and Java-based non-dimensional array framework that boasts speeds faster than Numpy
  • Dl4J: Skymind is a terrific company producing tools comparable to torch
  • Tensor Flow: Contains APIs for both Java and Scala
  • Neanderthal: A Clojure based linear algebra system that is blazing fast
  • OpenNLP: A new framework that, unlike the Stanford NLP tools, is actively developed and includes named entity recognition and other powerful transformative tools
  • Bytedeco: This project is filled with angels (I actually think they came from heaven) whose innovative and nearly automated JNI creator has created links to everything from Python code to Torch, libpostal, and OpenCV
  • Akka: Lightbend continues to produce distributed tools for Scala with now open sourced split brain resolvers that go well beyond my majority resolver
  • MongoDB connectors: Python’s MongoDB connectors are resource intensive due to the rather terrible nature of Python byte code
  • Spring Boot: Scala and Java are interoperable, but benchmarks of Spring Boot show at least a 10000 request per second improvement over Django
  • Apereo CAS: A single sign-on system that adds terrific security to disparate applications

Many of these frameworks are available in Java.  Since Scala runs any Java programs, the languages are interoperable. Scala is cleaner, functional, highly modular, and requires much less code than Java which puts these tools in the reach of analysts.

What do new tools mean for a data engineer?

The new Java, Scala, and Go tools and frameworks mean that attaining 1000 times the speed on a single machine with significant cost reduction over Python is possible. It also means chaining millions of moving parts to a solid microservices architecture system instead of a cluttered monolithic wreck.

The result is clear. My own company is switching off of Python everywhere except for our responsive and front end heavy web application for a fifty percent cost reduction in hardware.

How will new tools help data engineers?

With everything that Scala and the JVM offers, Data Engineers now have a potent tool for automation. These valuable employees may not be creating the algorithms, but they will be transforming data in smart ways that produce real value.

Companies no longer have to rely on archaic languages to produce messy systems, and this will translate directly into value. Data engineers will be behind this increase in value as they can more easily combine tools into a coherent and flexible whole.

Conclusion

The continued rise of JVM backed tools starting in 2018 will make data pipeline automation a significant part of a company’s IT cost. Data engineers will be behind the evolution of data pipelines from disparate systems to a streamlined whole backed by custom code and new products.

Data engineering will be a hot 2019 trend. After this year, we may just be seeing the creation of Skynet.

Fluff Stuff: Better Governments, Better Processes, Simplr Insites

Cities are heading towards bankruptcy. The credit rating of Stockton, CA was downgraded. Harrisburg, PA is actually bankrupt.  It is only a matter of time before Chicago implodes. Since 1995, city debt rose by between $1.3 and $1.8 trillion. While a large chunk of this cost is from waste, more is the result of using intuition over data when tackling infrastructure and new projects. Think of your city government as the boss who likes football more than his job so he builds a football stadium despite your company being in the submarine industry.

This is not an unsolvable nightmare.

Take an effective use case where technologies and government processes were outsourced. As costs rose in Sandy Sprints, GA, the city outsourced and achieved more streamlined processes, better technology, and lower costs. Today, without raising taxes, the city is in the green. While Sandy Springs residents are wealth, even poorer cities can learn from this experience.

Cities run projects in an extremely scientific manner and require an immense amount of clean, quality, well-managed data isolated into individual projects to run appropriately. With an average of $8000 spent per employee on technology each year and with an immense effort spent in acquiring analysts and modernizing infrastructure, cities are struggling to modernize.

It is my opinion, one I am basing a company on, that the provision of quality data management, sharing and collaboration tools, IT infrastructure, and powerful project and statistical management systems in a single SAAS tool can greatly reduce the $8000 per employee cost and improve budgets. These systems can even reduce the amount of administrative staff, allowing money to flow to where it is needed most.

How can a collaborative tool tackle the cost problem. Through:

  • collaborative knowledge sharing of working, ongoing, and even failed solutions
  • public facing project blogs and information on organizations, projects, statistical models, results, and solutions that allow even non-mathematical members of an organization to learn about a project
  • a security minded institutional resource manager (IRM better thought of as a large, securable, shared file storage system) capable of expanding into the petabytes while maintaining FERPA, HIPPA, and state and local regulations
  • the possibility to share data externally, keep it internal, or keep the information completely private while obfuscating names and other protected information
  • complexity analysis (graph based analysis) systems for people, projects, and organizations clustered for comparison
  • strong comparison tools
  • potent and learned aggregation systems with validity in mind ranging from streamed data from sensors and the internet to surveys to uploads
  • powerful drag and drop integration and ETL with mostly automated standardization
  • deep diving upload, data set, project, and model exploration using natural language searching
  • integration with everything from a phone to a tablet to a powerful desktop
  • access controls for sharing the bare minimum amount of information
  • outsourced IT infrastructure including infrastructure for large model building
  • validation using proven algorithms and increased awareness of what that actually means
  • anomaly detection
  • organization of models, data sets, people, and statistical elements into a single space for learning
  • connectors to popular visualizers such as Tableau and Vantara with a customize-able dashboard for general reporting
  • downloadable sets with two entity verification if required that can be streamed or used in Python and R

Tools such as ours significantly reduce the cost of IT by as much as 65%. We eliminate much of the waste in the data science pipeline while trying to be as simple as possible.

We should consider empowering and streamlining the companies, non-profits, and government entities such as schools and planning departments that perform vital services before our own lives are greatly effected. Debt and credit are not solutions to complex problems.

Take a look, or don’t. This is a fluff piece on something I am passionately building. Contact us if you are interested in a beta test.

Automating Django Database Re-Creation on PostgreSQL

border_wall

photo: Oren Ziv/Activestills.org

The Django database migration system is a mess. This mess is made more difficult when using PostgreSQL.

Schemas go unrecognized without a work around, indices can break when using the work around, and processes are very difficult to automate with the existing code base.

The authors of Django, or someone such as myself who occasionally finds the time to dip in and unclutter a mess or two, will eventually resolve this. However, the migration system today is a train wreck.

My current solution to migrating a Postgres based Django application is presented in this article.

Benefits of Getting Down and Dirty

The benefits of taking the time to master this process for your own projects are not limited to the ability to automate Django. They include the ability to easily manage migrations and even automate builds.

Imagine having a DDL that is defined in your application and changed with one line of code without ever touching your database.

Django is a powerful and concurrent web framework with an enormous amount of add-ons and features. Mastering this task can open your code base to amazing amounts of abstraction and potential.

Well Defined Problems

While Django is powerful, integration with Postgres is horrible. A schema detection problem can cause a multitude of problems as can issues related to using multiple databases. When combined, these problems sap hours of valuable time.

The most major issues are:

  • lack of migration support for multiple databases
  • lack of schema support (an issue recognized over 13 years ago)
  • some indices (PostGIS polygon ids here) break when using the schema workaround

Luckily, the solutions for these problems are simple and require hacking only a few lines of your configuration.

Solving the Multiple Database Issue

This issue is more annoying than a major problem. Simply obtain a list of your applications and use your database names as defined by your database router in migration.

If there are migrations you want to reset, use the following article instead of the migrate command to delete your migrations and make sure to drop tables from your database without deleting any required schemas.

Using python[version] manage.py migrate [app] zero  did not actually drop and recreate my tables for some reason.

For thoroughness, discover your applications using:

python[verion] manage.py showmigrations

With your applications defined, make the migrations and run the second line of code in an appropriate order for each application:

python[version] manage.py makemigrations [--settings=my_settings.py]
python[version] manage.py migrate [app] --database= [--settings=my_settings.py]
....

As promised, the solution is simple. The –database switch matches the database name in your configuration and must be present as Django only recognizes your default configuration if it is not. This can complete migrations without actually performing them.

Solving the Schema Problem without Breaking PostGIS Indices

Django offers terrific Postgres support, including support for PostGIS. However, Postgres schemas are not supported. Tickets were opened over 13 years ago but were merged and forgotten.

To ensure that schemas are read properly, setup a database router and add a database configuration utilizing the following template:

"default": {
        "ENGINE": "django.contrib.gis.db.backends.postgis",
        "NAME": "test",
        "USER": "test",
        "PASSWORD": "test",
        "HOST": "127.0.0.1",
        "PORT": "5432",
        "OPTIONS": {
            "options": "-csearch_path=my_schema,public"
        }
 }

An set of options append to your dsn including a schema and a backup schema, used if the first schema is not present, are added via the configuration. Note the lack of spacing.

Now that settings.py is configured and a database router is established, add the following Meta class to each Model:

    class Meta():
        db_table=u'schema\".\"table'

Notice the awkward value for db_table. This is due to the way that tables are specified in Django. It is possible to leave managed as True, allowing Django to perform migrations, as long as the database is cleaned up a bit.

If there are any indices breaking on migration at this point, simply drop the table definition and use whichever schema this table ends up in. There is no apparent work around for this.

Now Your Migration Can Be Run In a Script, Even in a CI Tool

After quite a bit of fiddling, I found that it is possible to script and thus automate database builds. This is incredibly useful for testing.

My script, written to recreate a test environment, included a few direct SQL statements as well as calls to manage.py:

echo "DROP SCHEMA IF EXISTS auth CASCADE" | python3 ./app_repo/simplred/manage.py dbshell --settings=test_settings
echo "DROP SCHEMA IF EXISTS filestorage CASCADE" | python3 ./app_repo/simplred/manage.py dbshell --settings=test_settings
echo "DROP SCHEMA IF EXISTS licensing CASCADE" | python3 ./app_repo/simplred/manage.py dbshell --settings=test_settings
echo "DROP SCHEMA IF EXISTS simplred CASCADE" | python3 ./app_repo/simplred/manage.py dbshell --settings=test_settings
echo "DROP SCHEMA IF EXISTS fileauth CASCADE" | python3 ./app_repo/simplred/manage.py dbshell --settings=test_settings
echo "DROP SCHEMA IF EXISTS licenseauth CASCADE" | python3 ./app_repo/simplred/manage.py dbshell --settings=test_settings
echo "DROP OWNED BY simplrdev" | python3 ./app_repo/simplred/manage.py dbshell --settings=test_settings
echo "CREATE SCHEMA IF NOT EXISTS auth" | python3 ./app_repo/simplred/manage.py dbshell --settings=test_settings
echo "CREATE SCHEMA IF NOT EXISTS filestorage" | python3 ./app_repo/simplred/manage.py dbshell --settings=test_settings
echo "CREATE SCHEMA IF NOT EXISTS licensing" | python3 ./app_repo/simplred/manage.py dbshell --settings=test_settings
echo "CREATE SCHEMA IF NOT EXISTS simplred" | python3 ./app_repo/simplred/manage.py dbshell --settings=test_settings
echo "CREATE SCHEMA IF NOT EXISTS licenseauth" | python3 ./app_repo/simplred/manage.py dbshell --settings=test_settings
echo "CREATE SCHEMA IF NOT EXISTS fileauth" | python3 ./app_repo/simplred/manage.py dbshell --settings=test_settings
find ./app_repo -path "*/migrations/*.py" -not -name "__init__.py" -delete
find ./app_repo -path "*/migrations/*.pyc"  -delete
python3 ./app_repo/simplred/manage.py makemigrations --settings=test_settings
python3 ./app_repo/simplred/manage.py migrate auth --settings=test_settings --database=auth
python3 ./app_repo/simplred/manage.py migrate admin --settings=test_settings --database=auth
python3 ./app_repo/simplred/manage.py migrate registration --settings=test_settings --database=auth
python3 ./app_repo/simplred/manage.py migrate sessions --settings=test_settings --database=auth
python3 ./app_repo/simplred/manage.py migrate oauth2_provider --settings=test_settings --database=auth
python3 ./app_repo/simplred/manage.py migrate auth_middleware --settings=test_settings --database=auth
python3 ./app_repo/simplred/manage.py migrate simplredapp --settings=test_settings --database=data
python3 ./app_repo/simplred/manage.py loaddata --settings=test_settings --database=data fixture

Assuming the appropriate security measures are followed, this script works well in a Bamboo job.  My script drops and recreates any necessary database components as well as clears migrations and then creates and performs migrations. Remember, this script recreates a test environment.

The running Django application, which is updated via a webhook, doesn’t actually break when I do this. I now have a completely automated test environment predicated on merely merging pull requests into my test branch and ignoring any migrations folders through .gitignore.

Conclusion

PostgreSQL is powerful, as is Django, but combining the two requires some finagling to achieve maximum efficiency. Here, we examined a few issues encountered when using Django and Postgres together and discussed how to solve them.

We discovered that it is possible to automate database migration without too much effort if you already know the hacks that make this process work.

 

Encrypting Data in Django with the Fernet Algorithm

lock

At some point, it will be necessary to encrypt data. While most queries are performed raw, there is still a use for the models Django provides in encrypting and decrypting data or even just in obtaining a model from a raw query.

This article examines how to apply the Fernet algorithm to save data in an encrypted format using Django.



 

Fernet Encryption

Fernet encryption utilizes the AES method at its core. This method is widely accepted and more powerful than RSA when there is no need for communication.

RSA is a powerful tool when requiring that data be passed over the wire. In this example, we are more concerned with data storage.

Secret Key

The Fernet algorithm requires using a secret key to store data.

from cryptography.fernet import Fernet

Fernet.generate_key()

This key should be stored in a secure fashion. Options for retrieving the key include loading the key from a file passed through an environment variable.

Encrypted Field

Django provides an excellent tutorial for writing custom fields. By overwriting the from_db_value and get_db_prep_value methods it is possible to achieve decryption and encryption respectively.

HOME = str(Path.home())


class EncryptedFernetField(models.TextField):
    """
    A field where data is encrypted using the Fernet Algorithm
    """

    description = "An encrypted field for storing information using Fernet Encryption"

    def __init__(self, *args, **kwargs):
        self.__key_path = os.environ.get('FERNET_KEY', None)
        if self.__key_path is None:
            self.__key_dir = os.environ.get('field_key_dir', None)
            if self.__key_dir is None:
                self.__key_dir = os.path.sep.join([HOME, 'field_keys'])
                if os.path.exists(self.__key_dir) is False:
                    os.mkdir(self.__key_dir)
            key = Fernet.generate_key()
            with open(os.path.sep.join([self.__key_dir, 'fernet.key']), 'w') as fp:
                fp.write(key.decode('utf-8'))
            self.__key_path = os.path.sep.join([self.__key_dir, 'fernet.key'])
        super().__init__(*args, **kwargs)

    def deconstruct(self):
        name, path, args, kwargs = super().deconstruct()
        return name, path, args, kwargs

    def from_db_value(self, value, expression, connection):
        if self.__key_path and value:
            value = base64.b64decode(value)
            with open(self.__key_path, 'r') as fp:
                key = fp.read()
            f = Fernet(key)
            value = f.decrypt(value)
        return value

    def to_python(self, value):
        return value

    def get_db_prep_value(self, value, connection, prepared=False):
        if value:
            if self.__key_path:
                with open(self.__key_path, 'r') as fp:
                    key = fp.read().encode()
                f = Fernet(key)
                value = f.encrypt(value.encode())
                value = base64.b64encode(value).decode('utf-8')
        return value

Encrypted values are stored in base 64 to avoid potential byte related issues. This class utilizes the models.TextField to save data as a blob.

Using the Field

The field is used in the same way as any other.

class License(models.Model):
    username = models.CharField(max_length=1024)
    application = models.CharField(max_length=1024)
    unique_id = models.IntegerField(unique=True)
    license = EncryptedFernetField()
    active = models.BooleanField(default=False)
    expiration_date = models.DateField()

    class Meta:
        unique_together = (('username', 'application'), )

 

Conclusion

Data encryption is fairly straightforward in Django. This tutorial examined how to create an encrypted field using the Fernet algorithm.

Checking and Increasing the Connection Limit in PostgreSQL

It finally happened. I blew the max connection limit in my new  PostgreSQL install by starting too many grid nodes  with associated connection pools for a system I am writing.

The default limit of 100 connections is far too few, especially with two people doing database intensive tasks. This article explains how to check the number of connections used as a standard user and administrator and how the administrator can change the connection limit.

Default Connection Limit and Buffer Size

By default, PostgreSQL has a relatively low number of maximum allowed connections. The default limit is 100.

The limit is related to the size of the shared buffers. Connections utilize the memory in the shared buffers. By default, the shared buffer size is set to 8 gigabytes.

PostgreSQL is a versatile database. Amazon built Redshift on the system. However, the maximum sizes are low and should remain so to encourage developers and students not to abuse their system’s resources.

Common Error When Exceeding Size Limit

With an ever increasing amount of data available and still growing need for technology, connection limits will be exceeded on many systems. PostgreSQL throws the following error when this occurs:

psql: FATAL: remaining connection slots are reserved for non-replication superuser connections

In my case, the connection pool failed to reserve the required ten to twenty connections per highly database intensive job and released what connections it did acquire back to the database instantly. This left me with 96 used connections and four that were unattainable. Some of the 96 connections were by an ETL tool or PG Admin.

Querying the Connection Limit

It is possible for non-administrators to check the number of connections in use with the following query:

SELECT count(distinct(numbackends)) FROM pg_stat_database

This query puts together a view of database statistics and counts the number of currently active connections.

Administrators have the option of querying connections from the psql command line using:

SHOW max_connections

This query asks the database to return the value of the max_connections configuration variable described below.

Most configuration variables can be queried in psql. Some can even be set from the command line console. The max_connections variable cannot be set from the command line due to an associated increase or decrease in memory usage.

Setting Connection Related Variables

While the initial settings are low by todays settings, they can be set in your postgresql.conf file. The file is typically found at a location such as:

/var/lib/pgsql/X.X/data/postgresql.conf  or /usr/local/postgres/postgresql.conf

The default location is the data directory under your PostgreSQL installation.

Look for the following variables in the file:

shared_buffers = 8000MB
max_connections = 100

Choosing Size Limits

It is important to set size limits relevant to the size of the shared buffers. Dividing the RAM usage by the current number of connections in use or the maximum number allotted will give a nice over-estimate of total memory use per connection. Ensure that this multiple does not exceed the shared buffer size and the capacity of your RAM. The shared buffer size should generally be less than the amount of RAM in your machine.

The equality to  check with is:

shared_buffers < (RAM - padding) && (current_ram_use / max_connections) * desired_connections < (RAM - padding) && (current_ram_use / max_connections) * desired_connections < shared_buffers

Basically, ensure that you have enough RAM and that the potential memory use does not exceed RAM and the shared buffer size. This inequality uses the max_connections settings instead of the actual number of used connections for some extra assurance.

Reloading and Restarting

The configuration file can be reloaded in an active session without a restart using the sql command line console or through PG Admin with the query:

SELECT pg_reload_config()

This query does not require administrator privileges but will not work when setting the maximum number of allotted connections or buffer size. Any variable in the configuration file proceeded by the following comment requires restarting the database server:

# (change requires restart)

Both max_connections and shared_buffers cause a change in memory usage which cannot be reworked on the fly. From the Linux command line, restart the server with the command:

./postgresql restart

On Ubuntu, the restart script normally resides at /etc/init.d.

If PostgreSQL is setup as a service, use:

service postgresql restart

Conclusion

In today’s world where data is bountiful, it is easy to exceed the connection limit in PostgreSQL. This article reviewed how to reset the connection limit and considerations for resetting the limit and associated shared buffer size.

PostgreSQL for Converting NoSQL to SQL

In need of a solution to prep for ETL and recognizing that drill may not be as comprehensive as I can build with PGPLSQL as it only goes one layer deep, it was time to find a way to move from dynamically created Jsonb in PostgreSQL to PostgreSQL relational tables.

The solution was this little function. This can be built to use jsonb_split_array and other functions to easily and quickly build up functions that delve deeper than drill. Add the future master replication and seemingly improving distriution and threading to Enterprise DB’s growing set of accomplishments with Postgres and why use drill.

breakoutNoSQL(inputTable text,outputTable text,jsonColumn text,otherColumns text[],condition text)

Only othercolumns and condition can be null.

Code

CREATE OR REPLACE FUNCTION breakoutNoSQL(inputTable text,outputTable text,jsonColumn text,otherColumns text[],condition text,splitData boolean) RETURNS text[] AS
$BODY$
DECLARE
    k text;
    keys text[];
    stmt text;
    insertKeys text;
BEGIN
    IF outputTable IS NULL THEN
        RAISE EXCEPTION 'OUTPUT TABLE CANNOT BE NULL';	
    END IF;

    if inputTable IS NULL THEN
        RAISE EXCEPTION 'INPUT TABLE CANNOT BE NULL';
    END IF;

    --get the initial keys
    if condition IS NOT NULL THEN
       IF splitData IS TRUE THEN
	  execute 'SELECT array_agg(key) FROM (SELECT distinct(jsonb_object_keys(jsonb_array_elements('||jsonColumn||'::jsonb))) as key FROM '||inputTable||') as q1 WHERE '||condition into keys;
       ELSE
	execute 'SELECT array_agg(key) FROM (SELECT distinct(jsonb_object_keys('||jsonColumn||'::jsonb)) as key FROM '||inputTable||') as q1 WHERE '||condition into keys;
       END IF;
    else
       IF splitData IS TRUE THEN
	execute 'SELECT array_agg(key) FROM (SELECT distinct(jsonb_object_keys(jsonb_array_elements('||jsonColumn||'::jsonb))) as key FROM '||inputTable||') as q1' into keys;
       ELSE
	execute 'SELECT array_agg(key) FROM (SELECT distinct(jsonb_object_keys('||jsonColumn||'::jsonb)) as key FROM '||inputTable||') as q1' into keys;
       END IF;
    end if;

    IF keys IS NULL OR array_length(keys,1) = 0 THEN
	RAISE EXCEPTION 'NUMBER OF DISCOVERED KEYS WAS 0';
    END IF;

    --build the statement
    stmt = 'CREATE TABLE '||outputTable||' AS SELECT ';

    --build the insert keys statement 
    insertKeys = NULL;
    FOREACH k IN ARRAY keys LOOP
      if insertKeys IS NULL THEN
         insertKeys = '';
      else
         insertKeys = insertKeys||',';
      end if;
      insertKeys = insertKeys||'btrim(cast('||'j'||jsonColumn||'::jsonb->'||''''||k||''''||'as text),''"'') as '||k;
    END LOOP;

    if otherColumns IS NOT NULL THEN
	FOREACH k IN ARRAY otherColumns LOOP
           if insertKeys IS NULL THEN
            insertKeys = '';
           else
             insertKeys = insertKeys||',';
           end if;  
           insertKeys = insertKeys||k;
       END LOOP;
     END IF;
     	
    --concat to make full statement
    stmt = stmt||' '||insertKeys||' FROM '||' (SELECT *,';
    IF splitData IS TRUE THEN
      stmt = stmt||'jsonb_array_elements('||jsonColumn||'::jsonb) as j'||jsonColumn||' FROM '||inputTable||') as q1';
    ELSE
      stmt = stmt||jsonColumn||' as j'||jsonColumn||' FROM '||inputTable||') as q1';
    END IF;

    RAISE NOTICE 'QUERY: %',stmt;
    
    --execute and print statement
    RAISE NOTICE 'QUERY: %',stmt;
    execute stmt;
    
    --return the keys from json
    return keys;
END;
$BODY$
Language plpgsql;

Tune PostGres for Faster Inserts

Insertions in PostGreSQL take a while, especially for large amounts of data. A project of mine at work sees both large batch updates and another sees single insert statements in a way that they are preferable over others. There are ways to tune PostgreSQL to make sure that you are achieving the most out of your database.



These variables should be in postgres.conf.

  • Tune the Wal (write-ahead logging) with wal_logging set to minimal and ensure a decent wal_writer_delay: When Wal logging times are short and content large, this can greatly effect servers. 10,000 record inserts went from 30 minutes to 2 seconds by extending the wal_writer_delay from per transaction (0 seconds) to 100 milliseconds. The default delay is 200 milliseconds
  • Increase thread limits: Increased thread limits help large transactions. A 10,000 record batch update in my case went from 15 minutes to 3 seconds when thread limits went from 10000 to 50000 threads

Be aware that other configurations will affect your system and that Wal logging is necessary for replication as Wal logs are used to recreate the state in a hot-swapping environment.

Lack of Nested and Implicit Data Support in Drill, PostgreSQL and Pentaho when Working with Json Data

JSon is great. It can contain a variety of data types in an expected format. It is becoming easier and easier to work with Json in existing formats as well making it a future workhorse for NoSQL based ETL. However, and not in the least because NoSQL ingestion needs to result in relational tables using SQL standards, there is still one bug to work out. Ingestion with Json will not break out nested tables and requires direct knowledge of data to complete tasks.

This may seem petty but when millions of recods are being read, it clearly is not.

In drill, this could potentially be overcome by creating a table for every single submap we wish to analyze but CREATE TABLE from the tool itself will not work. Instead, it is necessary to limit use cases to the data we want to use.

In PostgreSQL, it is possible to concatenate JSon data using a query whose individual results can then be saved. It is also possible to ‘pop’ keys that are unneeded. However, this approach requires many different tables at one per somewhat normalized form. It also requires recombining data.


SELECT row_to_json(r.*) FROM (SELECT nonJson AS nonJson, ((((data->'level1')::jsonb - 'bkey1')::jsonb - 'bkey2')::jsonb -'bkey3')::jsonb AS jdata FROM table WHERE data::jsonb ?| array['mustHaveKey'] AND data::jsonb ?| array['notHaveKey'] IS FALSE) r

Drill is still much more ahead of the game than Pentaho and PostgreSQL in terms of conversion though. Postgresql can guess types but has no function to attempt to automatically generate tables. Pentaho requires explicit conversion as well.

Of course, if one already knows every key that will be present, this is not a problem. That, however, means more maintenance as it is then impossible to write programs to automatically handle changes to data. Perhaps implicit conversion will happen soon but any argument as to data type issues should really look at the depth of the standards and conform to them instead of complaining.

Building Indexes for Full Text on a Database

Text is difficult to store in a database. Math, much easier since the core separating variables are usually much more distinguishable (e.g. form a primary key on age, income, and zip code). Text has the issue of being fuzzy, and often contains multiple topics. There is an approach based on clustering which could alleviate most if not all of this problem. Using topic modeling, representative documents as would be used to choose the initial K clusters in the K-Means ++ algorithm, and the notion behind a blocking key (think blocks of objects), it is possible to develop a valid index and even primary key for storing text documents as a whole in a database.

Topic Modelling

Topic modelling encompasses quite a few algorithms from Bayes to Neural nets and even can use LDA. With block of documents and set number of documents, LDA can basically form a generic equation using Single Value Decomposition and some basic magic. If the document set is somewhat better defined, speed is important due to re-indexing, and the number of documents are plenty, training a Naiive Bayesian model might be preferential. NLTK has a decent trainer for Python programmers. The advantage to Bayes is not having to choose one category but using many and is preferential as a probability is determined for each category based on the ‘features’ provided. The feature frequencies determine the probability. Still, with more hard line modelling it is possible to use cosines on a groups most representative document or complete other tasks to develop a category ranking.

Either way, having the best category choice as an attribute is likely a good solution.

Choosing Representative Documents

A drawback to choosing a category is that the user or program wants to glean information not on the general category but specific pieces. For this, individual clusterings within the grouping are useful.

It is possible to use the idea behind the K-means++ algorithm to find representative texts in each category. Start by finding the most representative document as this will also be needed. Next distribute the cosines and find the farthest cosine away from this document, this is the next representative document. Then, take the average cosines that are furthest from this document and find the largest value. Continue this process to an optimal number of documents, perhaps four or five. Vectorize these documents for cosine analysis and save them in an appropriate table, likely providing a lookup table with category name and key and using the key as a foreign key in this table. PostgreSQL allows for NoSQL data to be stored in a more relational format using the hstore field in 9.5 or jsonB in 9.4.

The representative documents should then be used to find the cosines of each appropriately topic clustered document. This data will be stored for the blocking key, all of it.

Building a Blocking Key

Building the blocking key is a crucial step. It will speed up indexing and querying to limit large document sets to an appropriate number for future analysis. It also has the added advantage of encompassing more or less data more easily (again see PostreSQL’s NoSQL capabilities). This can be done by mashing together all of the discovered data, formatting numbers to a certain length of course or could be a more complicated process. Another option, still treating this data as a string is to use something akin to Horner’s hashing mechanism to generate a hash key for the data. Generating keys should avoid topic overlap as much as possible. Think of your new table more like a hash table. Since we used cosine similarity which has magnitude and direction being a vector but whose use does not provide true distance and direction due to the data, using distance for the blocking key is more difficult. ASN.1 can hash multiple strings but just throwing the strings together may produce a better numeric solution.

Messaging, ETL, and an AKKA Proposal

Data sources are becoming many. NoSQL can help aggregate multiple sources into a more coherent whole. Akka, which can split data across multiple sources, servers as a perfect way of writing distributed systems. The combination with messaging via Queues or Topics and the Master-Slave pattern could provide a significant boost to ETL. Using databases as messaging systems, it is easy to see how processes can kick start. My goal will be to create a highly concurrent system that takes data from a scraper, from any source as can be done with my Python crawl modules, write the data to a NoSQL based JSONB store in PostgreSQL, notify a set of parsers which then look at patterns in the data to determine how to ETL the data. This is not really revolutionary but a good test of concurrency and automation.

Results will be reported.

Collection with NoSQL and Storage with SQL

There are four really well known forms of NoSQL databases. They are key-value, document, column-family, and graph databases. In the case of ETL, key-value is a good way to expand data without worrying about what if anything is present. However, even in demoralized form, this is not the best storage solution for customer facing solutions. Therefore, data will be placed into a client facing database configured with relational PostgreSQL tables.

Messaging and Building Patterns for AKKA and Scala

With messaging and state machines, actual uses for an actor do not need to be known at runtime. During runtime, interactions or patterns force the actor to take on a different role. This can be accomplished with a simple case-switch statement. From here a message with the data to be transformed can be passed to an actor. This data, with a rowID, can then be parsed after an Actor reads a message from a Queue. The queue specifies conditions such as which Parser-Combinator to use and then completes an activity based on this. This is not incredibly different from the Message slip Pattern, just that no re-routing occurs.

The data would be aggregated using the available row ideas in batches of a certain size. Perhaps batch iterators would best do the trick in determining the size of the batch to process.

Returning Data back to the original Actor

Returning the data requires messaging as well. The message returns from the initial actor where it needs to be matched with the appropriate row.

Recap

To recap, the question is, can AKKA perform more generic ETL than comes in currently available Open Source Tools?

To test this question I am developing Akka ETL. The tool will take in scraped data (from processes that can be managed with the same messaging technique but not easily distributed due to statefullness and security). The design includes taking in completed sources from a database, acquiring data, messaging an Actor with the appropriate parsing information, receiving the transformed data from these actors and posting to a relational database.

The real tests will be maintaining data-deduplication, non-mixed data, and unique identifiers.