Parallel programming code in Python to execute tasks asynchronously

This post will show how to write Python code to run multiple processes asynchronously in parallel. I will use the concurrent.futures module of Python that provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.

The Executor class provides methods to execute calls asynchronously. You can use either Executor.submit() or Executor.map() to call a function. Executor.submit() returns a Future object representing the execution of the function; you can use result() to get the value returned by the function. On the other hand, if you use Executor.map() to call a function, you can access values returned by the function without using result(). If you want to know more details about these, you can refer to the link given above.

The following code finds prime numbers in a given range of numbers. I am using ProcessPoolExecutor as this is a CPU-intensive task. I am using max_workers=4, but you can change this value if running this code on a high-end machine.

The code has two variants of Executor.submit(): one with as_compeleted() and another without as_completed(). The as_completed() version will show results in the order as futures finished or canceled. Without as_completed() version will show results in the original order. You can use any of the three approaches. There is no significant difference in their runtime.

import concurrent.futures as cf
from sympy import isprime
import time


def find_prime_numbers(num_range):
    """
    Find all prime number in the give range
    """
    pnum = []
    print("Given range is: {0} - {1}".format(num_range[0], num_range[1]))
    # check prime
    for v in range(num_range[0], num_range[1], 1):
        if v == 1:
            continue
        elif v == 2:
            pnum.append(2)
        elif isprime(v):
            pnum.append(v)
    return pnum


def main():
    """
    Call function to run in parallel from this main function
    I am running a CPU intensive task to find prime numbers between a given range
    """
    start_time = time.time()
    intervals = [[1, 100], [200, 700], [300, 800], [400, 900], [500, 1000], [600, 1100], [700, 1200]]

    # run parallel processes asynchronously
    # approach 1 starts
    with cf.ProcessPoolExecutor(max_workers=4) as executor:
        prime_nums = [executor.submit(find_prime_numbers, iv) for iv in intervals]
        for p_num in cf.as_completed(prime_nums):
            try:
                print("Prime numbers in the range are: {0}".format(p_num.result()))
            except Exception as exc:
                print("Exception generated: ", exc)
    # approach 1 ends

    '''
    # approach 2 starts
    with cf.ProcessPoolExecutor(max_workers=4) as executor:
        for prime in executor.map(find_prime_numbers, intervals):
            print("Prime numbers in the range are: {0}".format(prime))
    # approach 2 ends
    
    # approach 3 starts
    with cf.ProcessPoolExecutor(max_workers=4) as executor:
        prime_nums = [executor.submit(find_prime_numbers, iv) for iv in intervals]
        for p_num in prime_nums:
            try:
                print("Prime numbers in the range are: {0}".format(p_num.result()))
            except Exception as exc:
                print("Exception generated: ", exc)
    # approach 3 ends
    '''
    print("Time taken by the code: {0}".format(time.time() - start_time))


if __name__ == "__main__":
    main()

Similar Posts

Leave a Reply

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.