Interprocess Communication

Two primary forms of interprocess communication are supported by the multiprocessing module: pipes and queues. Both methods are implemented using message passing. However, the queue interface is meant to mimic the use of queues commonly used with thread programs.

Queue([maxsize])

Creates a shared process queue. maxsize is the maximum number of items allowed in the queue. If omitted, there is no size limit. The underlying queue is implemented using pipes and locks. In addition, a support thread is launched in order to feed queued data into the underlying pipe.

An instance q of Queue has the following methods:

g.cancel_join_thread()

Don't automatically join the background thread on process exit. This prevents the join_thread() method from blocking.

Closes the queue, preventing any more data from being added to it. When this is called, the background thread will continue to write any queued data not yet written but will shut down as soon as this is complete. This method is called automatically if q is garbage-collected. Closing a queue does not generate any kind of end-of-data signal or exception in queue consumers. For example, if a consumer is blocking on a get() operation, closing the queue in the producer does not cause the get() to return with an error.

Returns True if q is empty at the time of the call. If other processes or threads are being used to add queue items, be aware that the result is not reliable (e.g., new items could have been added to the queue in between the time that the result is returned and used).

Returns True if q is full. The result is also not reliable due to threads (see q.empty()). g.get([block [, timeout]])

Returns an item from q. If q is empty, blocks until a queue item becomes available. block controls the blocking behavior and is True by default. If set to False, a Queue.Empty exception (defined in the Queue library module) is raised if the queue is empty. timeout is an optional timeout to use in blocking mode. If no items become available in the specified time interval, a Queue.Empty exception is raised.

g.get_nowait()

The same as q.get(False). g.join_thread()

Joins the queue's background thread. This is used to wait for all queue items to be consumed after q.close() has been called. This method gets called by default in all processes that are not the original creator of q. This behavior can be disabled by called q.cancel_join_thread().

Puts item onto the queue. If the queue is full, block until space becomes available. block controls the blocking behavior and is True by default. If set to False, a Queue.Full exception (defined in the Queue library module) is raised if the queue is full. timeout specifies how long to wait for space to become available in blocking mode.A Queue.Full exception is raised on timeout.

q.put_nowait(item)

Returns the approximate number of items currently in the queue.The result of this function is not reliable because items may have been added or removed from the queue in between the time the result is returned and later used in a program. On some systems, this method may raise an NotImplementedError.

JoinableQueue([maxsize])

Creates a joinable shared process queue.This is just like a Queue except that the queue allows a consumer of items to notify the producer that the items have been successfully been processed. The notification process is implemented using a shared semaphore and condition variable.

An instance q of JoinableQueue has the same methods as Queue, but it has the following additional methods:

Used by a consumer to signal that an enqueued item returned by q.get() has been processed.A ValueError exception is raised if this is called more times than have been removed from the queue.

Used by a producer to block until all items placed in a queue have been processed. This blocks until q.task_done() is called for every item placed into the queue.

The following example shows how you set up a process that runs forever, consuming and processing items on a queue. The producer feeds items into the queue and waits for them to be processed.

import multiprocessing def consumer(input_q): while True:

# Process item print(item) # Replace with useful work

# Signal task completion input_q.task_done()

def producer(sequence, output_q): for item in sequence:

# Put the item on the queue output_q.put(item)

q = multiprocessing.JoinableQueue()

# Launch the consumer process cons_p = multiprocessing.Process(target=consumer,args=(q,))

cons_p.daemon=True cons_p.start()

# Produce items, sequence represents a sequence of items to

# be sent to the consumer. In practice, this could be the output

# of a generator or produced in some other manner, sequence = [1,2,3,4]

producer(sequence, q)

# Wait for all items to be processed q.join()

In this example, the consumer process is set to daemonic because it runs forever and we want it to terminate when the main program finishes (if you forget this, the program will hang).A JoinableQueue is being used so that the producer actually knows when all of the items put in the queue have been successfully processed.The join() operation ensures this; if you forget this step, the consumer will be terminated before it has had time to complete all of its work.

If desired, multiple processes can put and get items from the same queue. For example, if you wanted to have a pool of consumer processes, you could just write code like this:

q = multiprocessing.JoinableQueue()

# Launch some consumer processes cons_p1 = multiprocessing.Process(target=consumer,args=(q,))

cons_p1.daemon=True cons_p1.start()

cons_p2 = multiprocessing.Process(target=consumer,args=(q,))

cons_p2.daemon=True cons_p2.start()

# Produce items, sequence represents a sequence of items to

# be sent to the consumer. In practice, this could be the output

# of a generator or produced in some other manner, sequence = [1,2,3,4]

producer(sequence, q)

# Wait for all items to be processed q.join()

When writing code such as this, be aware that every item placed into the queue is pickled and sent to the process over a pipe or socket connection.As a general rule, it is better to send fewer large objects than many small objects.

In certain applications, a producer may want to signal consumers that no more items will be produced and that they should shut down. To do this, you should write code that uses a sentinel—a special value that indicates completion. Here is an example that illustrates this concept using None as a sentinel:

import multiprocessing def consumer(input_q): while True:

break # Process item print(item) # Replace with useful work

# Shutdown print("Consumer done")

def producer(sequence, output_q): for item in sequence:

# Put the item on the queue output_q.put(item)

q = multiprocessing.Queue()

# Launch the consumer process cons_p = multiprocessing.Process(target=consumer,args=(q,)) cons_p.start()

# Produce items sequence = [1,2,3,4] producer(sequence, q)

# Signal completion by putting the sentinel on the queue q.put(None)

# Wait for the consumer process to shutdown cons_p.join()

If you are using sentinels as shown in this example, be aware that you will need to put a sentinel on the queue for every single consumer. For example, if there were three consumer processes consuming items on the queue, the producer needs to put three sentinels on the queue to get all of the consumers to shut down.

As an alternative to using queues, a pipe can be used to perform message passing between processes.

Pipe([duplex])

Creates a pipe between processes and returns a tuple (connl, conn2) where connl and conn2 are Connection objects representing the ends of the pipe. By default, the pipe is bidirectional. If duplex is set False, then connl can only be used for receiving and conn2 can only be used for sending. Pipe() must be called prior to creating and launching any Process objects that use the pipe.

An instance c of a Connection object returned by Pipe() has the following methods and attributes:

Closes the connection. Called automatically if c is garbage collected. c.fileno()

Returns the integer file descriptor used by the connection. c.poll([timeout])

Returns True if data is available on the connection. timeout specifies the maximum amount of time to wait. If omitted, the method returns immediately with a result. If timeout is set to None, then the operation will wait indefinitely for data to arrive.

Receives an object sent by c.send(). Raises EOFError if the other end of the connection has been closed and there is no more data.

c.recv_bytes([maxlength])

Receives a complete byte message sent by c.send_bytes(). maxlength specifies the maximum number of bytes to receive. If an incoming message exceeds this, an IOError is raised and no further reads can be made on the connection. Raises EOFError if the other end of the connection has been closed and there is no more data.

c.recv_bytes_into(buffer [, offset])

Receives a complete byte message and stores it in the object buffer, which supports the writable buffer interface (e.g., a bytearray object or similar). offset specifies the byte offset into the buffer where to place the message. Returns the number of bytes received. Raises BufferTooShort if the length of the message exceeds available buffer space.

c.send(obj)

Sends an object through the connection. obj is any object that is compatible with pickle. c.send_bytes(buffer [, offset [, size]])

Sends a buffer of byte data through the connection. buffer is any object that supports the buffer interface, offset is the byte offset into the buffer, and size is the number of bytes to send. The resulting data is sent as a single message to be received using a single call to c.recv_bytes().

Pipes can be used in a similar manner as queues. Here is an example that shows the previous producer-consumer problem implemented using pipes:

import multiprocessing

# Consume items on a pipe, def consumer(pipe):

output_p, input_p = pipe input_p.close() # Close the input end of the pipe while True: try:

item = output_p.recv() except EOFError: break

# Process item print(item) # Replace with useful work

# Shutdown print("Consumer done")

# Produce items and put on a queue, sequence is an

# iterable representing items to be processed, def producer(sequence, input_p):

for item in sequence:

# Put the item on the queue input_p.send(item)

(output_p, input_p) = multiprocessing.Pipe()

# Launch the consumer process cons_p = multiprocessing.Process(target=consumer,args=((output_p, input_p),)) cons_p.start()

# Close the output pipe in the producer output_p.close()

# Produce items sequence = [1,2,3,4] producer(sequence, input_p)

# Signal completion by closing the input pipe input_p.close()

# Wait for the consumer process to shutdown cons_p.join()

Great attention should be given to proper management of the pipe endpoints. If one of the ends of the pipe is not used in either the producer or consumer, it should be closed. This explains, for instance, why the output end of the pipe is closed in the producer and the input end of the pipe is closed in the consumer. If you forget one of these steps, the program may hang on the recv() operation in the consumer. Pipes are reference counted by the operating system and have to be closed in all processes to produce the EOFError exception.Thus, closing the pipe in the producer doesn't have any effect unless the consumer also closes the same end of the pipe.

Pipes can be used for bidirectional communication.This can be used to write programs that interact with a process using a request/response model typically associated with client/server computing or remote procedure call. Here is an example:

import multiprocessing # A server process def adder(pipe):

server_p, client_p = pipe client_p.close() while True: try:

x,y = server_p.recv() except EOFError:

break result = x + y server_p.send(result)

# Shutdown print("Server done")

(server_p, client_p) = multiprocessing.Pipe()

# Launch the server process adder_p = multiprocessing.Process(target=adder/args=((server_p/ client_p),)) adder_p.start()

# Close the server pipe in the client server_p.close()

# Make some requests on the server client_p.send((3,4)) print(client_p.recv())

client_p.send(('Hello','World')) print(client_p.recv())

# Done. Close the pipe client_p.close()

# Wait for the consumer process to shutdown adder_p.join()

In this example, the adder() function runs as a server waiting for messages to arrive on its end of the pipe.When received, it performs some processing and sends the result back on the pipe. Keep in mind that send() and recv() use the pickle module to serialize objects. In the example, the server receives a tuple (x, y) as input and returns the result x + y. For more advanced applications that use remote procedure call, however, you should use a process pool as described next.

0 0

Post a comment

  • Receive news updates via email from this site