ETL 1 Billion Rows in 2.5 Hours Without Paying on 4 cores and 7gb of RAM

There are a ton of ETL tools in the world. Alteryx, Tableau, Pentaho. This list goes on. Out of each, only Pentaho offers a quality free version. Alteryx prices can reach as high as $100,000 per year for a six person company and it is awful and awfully slow. Pentaho is not the greatest solution for streaming ETL either as it is not reactive but is a solid choice over the competitors.

How then, is it possible to ETL large datasets, stream on the same system from a TCP socket, or run flexible computations at speed. Surprisingly, this article will describe how to do just that using Celery and a tool which I am currently working on, CeleryETL.

Celery

Python is clearly an easy language to learn over others such as Scala, Java, and, of course, C++. These languages handle the vast majority of tasks for data science, AI, and mathematics outside of specialized languages such as R. They are likely the front runners in building production grade systems.

In place of the actor model popular with other languages, Python, being more arcane and outdated than any of the popular languages, requires task queues. My own foray into actor systems in Python led to a design which was, in fact, Celery backed by Python’s Thespian.

Celery handles tasks through RabbitMQ or other brokers claiming that the former can achieve up to 50 million messages per second. That is beyond the scope of this article but would theoretically cause my test case to outstrip the capacity of my database to write records. I only hazard to guess at what that would do to my file system.

Task queues are clunky, just like Python. Still, especially with modern hardware, they get the job done fast, blazingly fast. A task is queued with a module name specified as modules are loaded into a registry at run time. The queues, processed by a distributed set of workers running much like an actor in Akka, can be managed externally.

Celery allows for task streaming through chains and chords. The technical documentation is quite extensive and requires a decent chunk of time to get through.

Processing at Speed

Processing in Python at speed requires little more than properly chunking operations, batching record processing appropriately to remove latency, and performing other simple tasks as described in the Akka streams documentation. In fact, I wrote my layer on Celery using the Akka streams play book.

The only truly important operation, chunk your records. When streaming over TCP, this may not be necessary unless TCP connections happen extremely rapidly. Thresholding in this case may be an appropriate solution. If there are more connection attempts than can be completed at once, buffer requests and empty the buffer appropriately upon completion of each chain. I personally found that a maximum bucket size of 1000 for typical records was appropriate and 100 for large records including those containing text blobs was appropriate.

Take a look at my tool for implementation. However, I was able to remap,  split fields to rows, perform string operations, and write to my Neo4J graph database at anywhere from 80,000 to 120,000 records per second.

Conclusion

While this article is shorter than my others, it is something I felt necessary to write in the short time I have to write it. This discovery allows me to write a single language system through Celery, Neo4J, Django, PyQt, and PyTorch for an entire company. That, is phenomenal and only rivaled by Scala which is, sadly, dying despite being a far superior, faster, and less arcane language. By all measures, Scala should have won over the data science community but people detest the JVM. Until this changes, there is Celery.

 

Advertisements

Messaging, ETL, and an AKKA Proposal

Data sources are becoming many. NoSQL can help aggregate multiple sources into a more coherent whole. Akka, which can split data across multiple sources, servers as a perfect way of writing distributed systems. The combination with messaging via Queues or Topics and the Master-Slave pattern could provide a significant boost to ETL. Using databases as messaging systems, it is easy to see how processes can kick start. My goal will be to create a highly concurrent system that takes data from a scraper, from any source as can be done with my Python crawl modules, write the data to a NoSQL based JSONB store in PostgreSQL, notify a set of parsers which then look at patterns in the data to determine how to ETL the data. This is not really revolutionary but a good test of concurrency and automation.

Results will be reported.

Collection with NoSQL and Storage with SQL

There are four really well known forms of NoSQL databases. They are key-value, document, column-family, and graph databases. In the case of ETL, key-value is a good way to expand data without worrying about what if anything is present. However, even in demoralized form, this is not the best storage solution for customer facing solutions. Therefore, data will be placed into a client facing database configured with relational PostgreSQL tables.

Messaging and Building Patterns for AKKA and Scala

With messaging and state machines, actual uses for an actor do not need to be known at runtime. During runtime, interactions or patterns force the actor to take on a different role. This can be accomplished with a simple case-switch statement. From here a message with the data to be transformed can be passed to an actor. This data, with a rowID, can then be parsed after an Actor reads a message from a Queue. The queue specifies conditions such as which Parser-Combinator to use and then completes an activity based on this. This is not incredibly different from the Message slip Pattern, just that no re-routing occurs.

The data would be aggregated using the available row ideas in batches of a certain size. Perhaps batch iterators would best do the trick in determining the size of the batch to process.

Returning Data back to the original Actor

Returning the data requires messaging as well. The message returns from the initial actor where it needs to be matched with the appropriate row.

Recap

To recap, the question is, can AKKA perform more generic ETL than comes in currently available Open Source Tools?

To test this question I am developing Akka ETL. The tool will take in scraped data (from processes that can be managed with the same messaging technique but not easily distributed due to statefullness and security). The design includes taking in completed sources from a database, acquiring data, messaging an Actor with the appropriate parsing information, receiving the transformed data from these actors and posting to a relational database.

The real tests will be maintaining data-deduplication, non-mixed data, and unique identifiers.