PEP 3148 – futures - execute computations asynchronously
- Author:
- Brian Quinlan <brian at sweetapp.com>
- Status:
- Final
- Type:
- Standards Track
- Created:
- 16-Oct-2009
- Python-Version:
- 3.2
- Post-History:
Abstract
This PEP proposes a design for a package that facilitates the evaluation of callables using threads and processes.
Motivation
Python currently has powerful primitives to construct multi-threaded and multi-process applications but parallelizing simple operations requires a lot of work i.e. explicitly launching processes/threads, constructing a work/results queue, and waiting for completion or some other termination condition (e.g. failure, timeout). It is also difficult to design an application with a global process/thread limit when each component invents its own parallel execution strategy.
Specification
Naming
The proposed package would be called “futures” and would live in a new “concurrent” top-level package. The rationale behind pushing the futures library into a “concurrent” namespace has multiple components. The first, most simple one is to prevent any and all confusion with the existing “from __future__ import x” idiom which has been in use for a long time within Python. Additionally, it is felt that adding the “concurrent” precursor to the name fully denotes what the library is related to - namely concurrency - this should clear up any addition ambiguity as it has been noted that not everyone in the community is familiar with Java Futures, or the Futures term except as it relates to the US stock market.
Finally; we are carving out a new namespace for the standard library - obviously named “concurrent”. We hope to either add, or move existing, concurrency-related libraries to this in the future. A prime example is the multiprocessing.Pool work, as well as other “addons” included in that module, which work across thread and process boundaries.
Interface
The proposed package provides two core classes: Executor
and
Future
. An Executor
receives asynchronous work requests (in terms
of a callable and its arguments) and returns a Future
to represent
the execution of that work request.
Executor
Executor
is an abstract class that provides methods to execute calls
asynchronously.
submit(fn, *args, **kwargs)
Schedules the callable to be executed asfn(*args, **kwargs)
and returns aFuture
instance representing the execution of the callable.This is an abstract method and must be implemented by Executor subclasses.
map(func, *iterables, timeout=None)
Equivalent tomap(func, *iterables)
but func is executed asynchronously and several calls to func may be made concurrently. The returned iterator raises aTimeoutError
if__next__()
is called and the result isn’t available after timeout seconds from the original call tomap()
. If timeout is not specified orNone
then there is no limit to the wait time. If a call raises an exception then that exception will be raised when its value is retrieved from the iterator.
shutdown(wait=True)
Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls toExecutor.submit
andExecutor.map
and made after shutdown will raiseRuntimeError
.If wait is
True
then this method will not return until all the pending futures are done executing and the resources associated with the executor have been freed. If wait isFalse
then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.
__enter__()
__exit__(exc_type, exc_val, exc_tb)
When using an executor as a context manager,__exit__
will callExecutor.shutdown(wait=True)
.
ProcessPoolExecutor
The ProcessPoolExecutor
class is an Executor
subclass that uses a
pool of processes to execute calls asynchronously. The callable
objects and arguments passed to ProcessPoolExecutor.submit
must be
pickleable according to the same limitations as the multiprocessing
module.
Calling Executor
or Future
methods from within a callable
submitted to a ProcessPoolExecutor
will result in deadlock.
__init__(max_workers)
Executes calls asynchronously using a pool of a most max_workers processes. If max_workers isNone
or not given then as many worker processes will be created as the machine has processors.
ThreadPoolExecutor
The ThreadPoolExecutor
class is an Executor
subclass that uses a
pool of threads to execute calls asynchronously.
Deadlock can occur when the callable associated with a Future
waits
on the results of another Future
. For example:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
And:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
__init__(max_workers)
Executes calls asynchronously using a pool of at most max_workers threads.
Future Objects
The Future
class encapsulates the asynchronous execution of a
callable. Future
instances are returned by Executor.submit
.
cancel()
Attempt to cancel the call. If the call is currently being executed then it cannot be cancelled and the method will returnFalse
, otherwise the call will be cancelled and the method will returnTrue
.
cancelled()
ReturnTrue
if the call was successfully cancelled.
running()
ReturnTrue
if the call is currently being executed and cannot be cancelled.
done()
ReturnTrue
if the call was successfully cancelled or finished running.
result(timeout=None)
Return the value returned by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds then aTimeoutError
will be raised. If timeout is not specified orNone
then there is no limit to the wait time.If the future is cancelled before completing then
CancelledError
will be raised.If the call raised then this method will raise the same exception.
exception(timeout=None)
Return the exception raised by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds then aTimeoutError
will be raised. If timeout is not specified orNone
then there is no limit to the wait time.If the future is cancelled before completing then
CancelledError
will be raised.If the call completed without raising then
None
is returned.
add_done_callback(fn)
Attaches a callable fn to the future that will be called when the future is cancelled or finishes running. fn will be called with the future as its only argument.Added callables are called in the order that they were added and are always called in a thread belonging to the process that added them. If the callable raises an
Exception
then it will be logged and ignored. If the callable raises anotherBaseException
then behavior is not defined.If the future has already completed or been cancelled then fn will be called immediately.
Internal Future Methods
The following Future
methods are meant for use in unit tests and
Executor
implementations.
set_running_or_notify_cancel()
Should be called byExecutor
implementations before executing the work associated with theFuture
.If the method returns
False
then theFuture
was cancelled, i.e.Future.cancel
was called and returnedTrue
. Any threads waiting on theFuture
completing (i.e. throughas_completed()
orwait()
) will be woken up.If the method returns
True
then theFuture
was not cancelled and has been put in the running state, i.e. calls toFuture.running()
will returnTrue
.This method can only be called once and cannot be called after
Future.set_result()
orFuture.set_exception()
have been called.
set_result(result)
Sets the result of the work associated with theFuture
.
set_exception(exception)
Sets the result of the work associated with theFuture
to the givenException
.
Module Functions
wait(fs, timeout=None, return_when=ALL_COMPLETED)
Wait for theFuture
instances (possibly created by differentExecutor
instances) given by fs to complete. Returns a named 2-tuple of sets. The first set, named “done”, contains the futures that completed (finished or were cancelled) before the wait completed. The second set, named “not_done”, contains uncompleted futures.timeout can be used to control the maximum number of seconds to wait before returning. If timeout is not specified or None then there is no limit to the wait time.
return_when indicates when the method should return. It must be one of the following constants:
Constant Description FIRST_COMPLETED
The method will return when any future finishes or is cancelled. FIRST_EXCEPTION
The method will return when any future finishes by raising an exception. If not future raises an exception then it is equivalent to ALL_COMPLETED. ALL_COMPLETED
The method will return when all calls finish.
as_completed(fs, timeout=None)
Returns an iterator over theFuture
instances given by fs that yields futures as they complete (finished or were cancelled). Any futures that completed beforeas_completed()
was called will be yielded first. The returned iterator raises aTimeoutError
if__next__()
is called and the result isn’t available after timeout seconds from the original call toas_completed()
. If timeout is not specified orNone
then there is no limit to the wait time.The
Future
instances can have been created by differentExecutor
instances.
Check Prime Example
from concurrent import futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime,
PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Web Crawl Example
from concurrent import futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
return urllib.request.urlopen(url, timeout=timeout).read()
def main():
with futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = dict(
(executor.submit(load_url, url, 60), url)
for url in URLS)
for future in futures.as_completed(future_to_url):
url = future_to_url[future]
try:
print('%r page is %d bytes' % (
url, len(future.result())))
except Exception as e:
print('%r generated an exception: %s' % (
url, e))
if __name__ == '__main__':
main()
Rationale
The proposed design of this module was heavily influenced by the Java java.util.concurrent package [1]. The conceptual basis of the module, as in Java, is the Future class, which represents the progress and result of an asynchronous computation. The Future class makes little commitment to the evaluation mode being used e.g. it can be used to represent lazy or eager evaluation, for evaluation using threads, processes or remote procedure call.
Futures are created by concrete implementations of the Executor class (called ExecutorService in Java). The reference implementation provides classes that use either a process or a thread pool to eagerly evaluate computations.
Futures have already been seen in Python as part of a popular Python cookbook recipe [2] and have discussed on the Python-3000 mailing list [3].
The proposed design is explicit, i.e. it requires that clients be
aware that they are consuming Futures. It would be possible to design
a module that would return proxy objects (in the style of weakref
)
that could be used transparently. It is possible to build a proxy
implementation on top of the proposed explicit mechanism.
The proposed design does not introduce any changes to Python language syntax or semantics. Special syntax could be introduced [4] to mark function and method calls as asynchronous. A proxy result would be returned while the operation is eagerly evaluated asynchronously, and execution would only block if the proxy object were used before the operation completed.
Anh Hai Trinh proposed a simpler but more limited API concept [5] and the API has been discussed in some detail on stdlib-sig [6].
The proposed design was discussed on the Python-Dev mailing list [7]. Following those discussions, the following changes were made:
- The
Executor
class was made into an abstract base class - The
Future.remove_done_callback
method was removed due to a lack of convincing use cases - The
Future.add_done_callback
method was modified to allow the same callable to be added many times - The
Future
class’s mutation methods were better documented to indicate that they are private to theExecutor
that created them
Reference Implementation
The reference implementation [8] contains a complete implementation of the proposed design. It has been tested on Linux and Mac OS X.
References
Copyright
This document has been placed in the public domain.
Source: https://github.com/python/peps/blob/main/peps/pep-3148.rst
Last modified: 2023-09-09 17:39:29 GMT