Running a Gevent StreamServer in a Thread for Maximum Control

There are times when serving requests needs to be controlled without forcibly killing a running server instance. It is possible to do this with Python’s gevent library. In this article, we will examine how to control the gevent StreamServer through an event.

All code is available on my Github through an actor system project which I intend to use in production (e.g. it will be completed).

Greenlet Threads

Gevent utilizes green threads through the gevent.greenlet package. A greenlet, a Green thread in gevent, utilizes a similar API to the Python asyncio library but with more control over scheduling.

The greenlet is scheduled by the program instead of the OS and works more akin to threading in the JVM. Greenlet threads run code concurrently but not in parallel, although this could be achieved by starting greenlets in a new process.

In this manner, it is possible to schedule Greenlet threads on a new operating system based thread or, more likely due to the GIL, in a new process to achieve a similar level of concurrency and even parallelism to Java. Of course, the cost of parallelism must be considered.

StreamServer

Gevent maintains a server through gevent.server.StreamServer. This server utilizes greenlet threads to concurrently handle requests.

A StreamServer takes an address and port and can optionally be given a pool for controlling the number of connections created:

pool = Pool(MAX_THREADS)
server = StreamServer((self.host, self.port), handle_connect, spawn=pool)

This concurrency allows for faster communication in the bi-directional way that troubles asyncio.

 Gracefully Exiting a StreamServer

In instances where we want to gracefully exit a server while still running code, it is possible to use the gevent.event.Event class to communicate between threads and force the server to stop.

As a preliminary note, it is necessary to call the gevent.monkey.patch_all method in the native thread to allow for cross-thread communications.

from gevent import monkey

monkey.patch_all()

Instead of using the serve_forever method, we must utilize the start and stop methods. this must also be accompanied by the use of the Event class:

evt = Event()
pool = Pool(MAX_THREADS)
server = StreamServer((self.host, self.port), handle_connect, spawn=pool)
server.start()
gevent.signal(signal.SIGQUIT, self.evt.set)
gevent.signal(signal.SIGTERM, self.evt.set)
gevent.signal(signal.SIGINT, self.evt.set)
self.signal_queue.put(ServerStarted())
atexit.register(self.__server.stop)
evt.wait()
server.stop(10)

For good measure, termination signals are handled through gevent.signal to allow the server to close and kill the thread in case of user based termination. This hopefully leaves no loose ends.

The event can be set externally to force the server to stop with the preset 10 second timeout:

evt.set()

Conclusion

In this article, we examined how a greenlet thread or server can be controlled using a separate thread for graceful termination without exiting a running program. An Event was used to achieve our goal.

Akka: An Introduction

Akkas documentation is immense. This series helps tackle the many components by providing a working example of the master slave design pattern built with this powerful tool. The following article reviews the higher level concepts behind Akka and its usage.

Links are provided to different parts of the Akka documentation throughout the article.

See also:

Akka

Akka is a software tool  used to build multi-threaded and distributed systems based on the actor model. It takes care of lower level systems building by providing high level APIs for node and actor generation.

Actors are the primitives behind Akka. They are useful for performing repeated tasks concurrently.  Actors run until terminated, receiving work through message passing.

actorsys

Resource Usage

Akka is extremely lightweight. The creators boast that the tool can handle thousands of actors on a single machine.

Message passing occurs through mailboxes. The maximum number of messages a mailbox holds is configurable with a 1000 messages default but messages must be under one megabyte.

The Actor

The actor is the universal primitive used in Akka. Unlike when using threading in a program language, this primitive runs like a daemon server. As such, it should be shut down gracefully.

Actors are user created.

class MyActor extends Actor with ActorLogging{

     override def preStart()= log.debug("Starting")
     override def postStop()= log.debug("Stopping")
     override def preRestart(reason: Throwable, message: Option[Any]) = log.error(s"Restarting because of ${reason.message}. ${message}")     
     override def postRestart(reason : Throwable) = 

     override def receive():Receive={
         case _ => sender ! "Hello from Actor"
     }
}

object MyActor{
   def setupMyActor()={
        val conf = ConfigFactory.load()
        val system = ActorSystem("MySystem",conf)
        val actor : ActorRef = system.actorOf(Props[MyActor],name = "myactor") 
   }
}

 

The example above creates an actor and a Scala companion class for instantiation.

Actors must extend Actor. ActorLogging provides the log library. The optional functions preRestart and postRestart handle exceptions, while the optional preStart and postStop methods handle setup and tear down tasks. The basic actor above incorporates logging and error processing.

An actor can:

  • Create and supervise other actors
  • Maintain a State
  • Control the flow of work in a system
  • Perform a unit of work on request or repeatably
  • Send and receive messages
  • Return the results of a computation

Akka’s serialization is extremely powerful. Anything available across a cluster or on the classpath that implements Serializable can be sent to and from an actor. Instances of classes are de-serialized without having the programmer recast them.

When to Use Akka

Actor systems are not a universal solution. When not performing repeated tasks and not benefiting from high levels of concurrency, they are a hindrance.

State persistence also weighs heavily in the use of an actor system. Take the example of cookies in network requests. Maintaining different network sessions across different remote actors can be highly beneficial in ingestion.

Any task provided to an actor should contain very little new code and a limited number of configuration variables.

Questions that should be asked based on these concepts include:

  • Can I break up tasks into sufficiently large quantities of work to benefit from concurrency?
  • How often will  tasks be repeated in the system?
  • How minimal can I make the configuration for the actor if necessary?
  • Is there a need for state persistence?
  • Is there a significant need for concurrency or is it a nice thought?
  • Is there a resource constraint that distribution can solve or that will limit threading?

State is definitely a large reason to use Akka. This could be in the form of actually  maintaining variables or in the actor itself.

In some distributed use cases involving the processing of enormous numbers of short lived requests, the actors own state and Akka’s mailbox capabilities are what is most important. This is the reasoning behind tools built on Akka such as Spark.

As is always the case when deciding to create a new system, the following should be asked as well:

  • Can I use an existing tools such as Spark or Tensor Flow?
  • Does the time required to build the system outweigh the overall benefit it will provide?

Clustering

Clustering is available in Akka. Akka provides high level APIs for generating distributed clusters. Specified seed nodes handle communications, serving as the system’s entry point.

Network design is provided entirely by the developer. Since node generation, logging, fault tolerance, and basic communication are the only pieces of a distributed system Akka handle’s, any distribution model will suffice. Two common models are the master-slave and graph based models.

Akka clusters are resilient with well developed fault tolerance.

Configuration

Configuration is performed either through a file or in a class. Many components can be configured including logging levels, cluster components, metrics collection, and routers.

Conclusion

This article is the entry point for the Akka series, providing the basic understanding needed as we begin to build a cluster.