Avoid Race Conditions with a Python3 Manager

 

race

In this short article, we examine the potential for race conditions in Python’s multiprocessing queue and how to potentially avoid this problem.

Race Condition

At times, the standard multi-processing queue can infinitely block despite the success of similar code. It appears that the reader-writer lock issue plagues the standard multiprocessing Queue. There is a bad egg at the table.

The following code runs without issue:

class OutMe(object):

    def __init__(self, q):
        self.q = q

    def do_put(self):
        self.q.put("Received")

    def rec(self):
        pass


class QMe(OutMe):

    def __init__(self, q, iq):
        self.q = q
        self.iq = iq
        OutMe.__init__(self, q)

    def rec(self):
        msg = self.iq.get(timeout=20)
        return msg

    def run(self):
        pass


class MProc(Process, QMe):

    def __init__(self, queue, oqueue):
        self.queue = queue
        self.oqueue = oqueue.get('oq')
        print(self.queue)
        Process.__init__(self)
        QMe.__init__(self, self.oqueue, self.queue)

    def run(self):
        while True:
            msg = self.rec()
            print(msg)
            self.do_put()


if __name__ == "__main__":
    mq = Queue()
    oq = {'oq': Queue()}
    mp = MProc(mq, oq)
    mp.start()
    while True:
        mq.put('Put')
        sleep(random.randint(0,3))
        print(oq.get('oq').get())

However, there are similar circumstances where the Queue blocks despite having a queue size larger than 0. This issue often appears when inheriting other objects and was submitted as a bug to the Python Foundation in 2013.

Python Manager

A python Manager coordinates data between different processes.  The Manager is a server process that is better capable of handling race conditions.

Where the above approach fails, it is more likely that the following will succeed:

if __name__ == "__main__":
    mgr = Manager()
    mgr2 = Manager()
    mq = mgr.Queue()
    oq = {'oq': mgr2.Queue()}
    mp = MProc(mq, oq)
    mp.start()
    while True:
        mq.put('Put')
        sleep(random.randint(0,3))
        print(oq.get('oq').get())

In this example, the queue from the previous code is created using a manager. The manager contains a proxy object in place of the queue.  The server process handles access to the queue.

Of note, the actual queue may contain nested yet shared items since Python 3.6.

Conclusion

Python is decades old.  This creates some problems. The standard multi-processing queue is plagued by race conditions. A python Manager helps resolve this issue.

Running a Gevent StreamServer in a Thread for Maximum Control

There are times when serving requests needs to be controlled without forcibly killing a running server instance. It is possible to do this with Python’s gevent library. In this article, we will examine how to control the gevent StreamServer through an event.

All code is available on my Github through an actor system project which I intend to use in production (e.g. it will be completed).

Greenlet Threads

Gevent utilizes green threads through the gevent.greenlet package. A greenlet, a Green thread in gevent, utilizes a similar API to the Python asyncio library but with more control over scheduling.

The greenlet is scheduled by the program instead of the OS and works more akin to threading in the JVM. Greenlet threads run code concurrently but not in parallel, although this could be achieved by starting greenlets in a new process.

In this manner, it is possible to schedule Greenlet threads on a new operating system based thread or, more likely due to the GIL, in a new process to achieve a similar level of concurrency and even parallelism to Java. Of course, the cost of parallelism must be considered.

StreamServer

Gevent maintains a server through gevent.server.StreamServer. This server utilizes greenlet threads to concurrently handle requests.

A StreamServer takes an address and port and can optionally be given a pool for controlling the number of connections created:

pool = Pool(MAX_THREADS)
server = StreamServer((self.host, self.port), handle_connect, spawn=pool)

This concurrency allows for faster communication in the bi-directional way that troubles asyncio.

 Gracefully Exiting a StreamServer

In instances where we want to gracefully exit a server while still running code, it is possible to use the gevent.event.Event class to communicate between threads and force the server to stop.

As a preliminary note, it is necessary to call the gevent.monkey.patch_all method in the native thread to allow for cross-thread communications.

from gevent import monkey

monkey.patch_all()

Instead of using the serve_forever method, we must utilize the start and stop methods. this must also be accompanied by the use of the Event class:

evt = Event()
pool = Pool(MAX_THREADS)
server = StreamServer((self.host, self.port), handle_connect, spawn=pool)
server.start()
gevent.signal(signal.SIGQUIT, self.evt.set)
gevent.signal(signal.SIGTERM, self.evt.set)
gevent.signal(signal.SIGINT, self.evt.set)
self.signal_queue.put(ServerStarted())
atexit.register(self.__server.stop)
evt.wait()
server.stop(10)

For good measure, termination signals are handled through gevent.signal to allow the server to close and kill the thread in case of user based termination. This hopefully leaves no loose ends.

The event can be set externally to force the server to stop with the preset 10 second timeout:

evt.set()

Conclusion

In this article, we examined how a greenlet thread or server can be controlled using a separate thread for graceful termination without exiting a running program. An Event was used to achieve our goal.

Akka: An Introduction

Akkas documentation is immense. This series helps tackle the many components by providing a working example of the master slave design pattern built with this powerful tool. The following article reviews the higher level concepts behind Akka and its usage.

Links are provided to different parts of the Akka documentation throughout the article.

See also:

Akka

Akka is a software tool  used to build multi-threaded and distributed systems based on the actor model. It takes care of lower level systems building by providing high level APIs for node and actor generation.

Actors are the primitives behind Akka. They are useful for performing repeated tasks concurrently.  Actors run until terminated, receiving work through message passing.

actorsys

Resource Usage

Akka is extremely lightweight. The creators boast that the tool can handle thousands of actors on a single machine.

Message passing occurs through mailboxes. The maximum number of messages a mailbox holds is configurable with a 1000 messages default but messages must be under one megabyte.

The Actor

The actor is the universal primitive used in Akka. Unlike when using threading in a program language, this primitive runs like a daemon server. As such, it should be shut down gracefully.

Actors are user created.

class MyActor extends Actor with ActorLogging{

     override def preStart()= log.debug("Starting")
     override def postStop()= log.debug("Stopping")
     override def preRestart(reason: Throwable, message: Option[Any]) = log.error(s"Restarting because of ${reason.message}. ${message}")     
     override def postRestart(reason : Throwable) = 

     override def receive():Receive={
         case _ => sender ! "Hello from Actor"
     }
}

object MyActor{
   def setupMyActor()={
        val conf = ConfigFactory.load()
        val system = ActorSystem("MySystem",conf)
        val actor : ActorRef = system.actorOf(Props[MyActor],name = "myactor") 
   }
}

 

The example above creates an actor and a Scala companion class for instantiation.

Actors must extend Actor. ActorLogging provides the log library. The optional functions preRestart and postRestart handle exceptions, while the optional preStart and postStop methods handle setup and tear down tasks. The basic actor above incorporates logging and error processing.

An actor can:

  • Create and supervise other actors
  • Maintain a State
  • Control the flow of work in a system
  • Perform a unit of work on request or repeatably
  • Send and receive messages
  • Return the results of a computation

Akka’s serialization is extremely powerful. Anything available across a cluster or on the classpath that implements Serializable can be sent to and from an actor. Instances of classes are de-serialized without having the programmer recast them.

When to Use Akka

Actor systems are not a universal solution. When not performing repeated tasks and not benefiting from high levels of concurrency, they are a hindrance.

State persistence also weighs heavily in the use of an actor system. Take the example of cookies in network requests. Maintaining different network sessions across different remote actors can be highly beneficial in ingestion.

Any task provided to an actor should contain very little new code and a limited number of configuration variables.

Questions that should be asked based on these concepts include:

  • Can I break up tasks into sufficiently large quantities of work to benefit from concurrency?
  • How often will  tasks be repeated in the system?
  • How minimal can I make the configuration for the actor if necessary?
  • Is there a need for state persistence?
  • Is there a significant need for concurrency or is it a nice thought?
  • Is there a resource constraint that distribution can solve or that will limit threading?

State is definitely a large reason to use Akka. This could be in the form of actually  maintaining variables or in the actor itself.

In some distributed use cases involving the processing of enormous numbers of short lived requests, the actors own state and Akka’s mailbox capabilities are what is most important. This is the reasoning behind tools built on Akka such as Spark.

As is always the case when deciding to create a new system, the following should be asked as well:

  • Can I use an existing tools such as Spark or Tensor Flow?
  • Does the time required to build the system outweigh the overall benefit it will provide?

Clustering

Clustering is available in Akka. Akka provides high level APIs for generating distributed clusters. Specified seed nodes handle communications, serving as the system’s entry point.

Network design is provided entirely by the developer. Since node generation, logging, fault tolerance, and basic communication are the only pieces of a distributed system Akka handle’s, any distribution model will suffice. Two common models are the master-slave and graph based models.

Akka clusters are resilient with well developed fault tolerance.

Configuration

Configuration is performed either through a file or in a class. Many components can be configured including logging levels, cluster components, metrics collection, and routers.

Conclusion

This article is the entry point for the Akka series, providing the basic understanding needed as we begin to build a cluster.

Messaging, ETL, and an AKKA Proposal

Data sources are becoming many. NoSQL can help aggregate multiple sources into a more coherent whole. Akka, which can split data across multiple sources, servers as a perfect way of writing distributed systems. The combination with messaging via Queues or Topics and the Master-Slave pattern could provide a significant boost to ETL. Using databases as messaging systems, it is easy to see how processes can kick start. My goal will be to create a highly concurrent system that takes data from a scraper, from any source as can be done with my Python crawl modules, write the data to a NoSQL based JSONB store in PostgreSQL, notify a set of parsers which then look at patterns in the data to determine how to ETL the data. This is not really revolutionary but a good test of concurrency and automation.

Results will be reported.

Collection with NoSQL and Storage with SQL

There are four really well known forms of NoSQL databases. They are key-value, document, column-family, and graph databases. In the case of ETL, key-value is a good way to expand data without worrying about what if anything is present. However, even in demoralized form, this is not the best storage solution for customer facing solutions. Therefore, data will be placed into a client facing database configured with relational PostgreSQL tables.

Messaging and Building Patterns for AKKA and Scala

With messaging and state machines, actual uses for an actor do not need to be known at runtime. During runtime, interactions or patterns force the actor to take on a different role. This can be accomplished with a simple case-switch statement. From here a message with the data to be transformed can be passed to an actor. This data, with a rowID, can then be parsed after an Actor reads a message from a Queue. The queue specifies conditions such as which Parser-Combinator to use and then completes an activity based on this. This is not incredibly different from the Message slip Pattern, just that no re-routing occurs.

The data would be aggregated using the available row ideas in batches of a certain size. Perhaps batch iterators would best do the trick in determining the size of the batch to process.

Returning Data back to the original Actor

Returning the data requires messaging as well. The message returns from the initial actor where it needs to be matched with the appropriate row.

Recap

To recap, the question is, can AKKA perform more generic ETL than comes in currently available Open Source Tools?

To test this question I am developing Akka ETL. The tool will take in scraped data (from processes that can be managed with the same messaging technique but not easily distributed due to statefullness and security). The design includes taking in completed sources from a database, acquiring data, messaging an Actor with the appropriate parsing information, receiving the transformed data from these actors and posting to a relational database.

The real tests will be maintaining data-deduplication, non-mixed data, and unique identifiers.