- [Python] How To Use Multiprocessing Pool And Display Progress Bar
- How To Use Pool
- Display Progress Bar
- References
- Read More
- Share this:
- Checking progress of Python multiprocessing pools
- Contents
- Option 1: Manually check status of AsyncResult objects
- Sample code
- Option 2: Using tqdm
- Sample code
- tqdm-multiprocess 0.0.11
- Навигация
- Ссылки проекта
- Статистика
- Метаданные
- Сопровождающие
- Классификаторы
- Описание проекта
- tqdm-multiprocess
- Installation
- Usage
[Python] How To Use Multiprocessing Pool And Display Progress Bar
Python is a popular, easy and elegant programming language, its performance has always been criticized by user of other programming. So in terms of data pre-processing, it is very important to use multi-threading and multi-processing.
What I want to record today is how to use the pool process in python. In multi-core CPUs, the utilization is often higher than simply using threading, and the program will not crash due to a certain process death.
I heard that such a situation will be encountered in threading, but I have not encountered it personally.
It is worth noting that you should be careful about the problem of accessing the same file and variables between multiple processes.
If the situation permits, it is recommended to use the return value of the task to unify the data in order to face the problem of missing data.
How To Use Pool
First of all, multiprocessing is a native python package and does not require additional installation. In addition, we need to write the task that we want to multi-processing as a function.
The following is the simplest example program:
# coding: utf-8 import multiprocessing as mp # Task def task(item): return item % 10 if __name__ == '__main__': pool = mp.Pool(processes=4) inputs = range(10) results = pool.map(task, inputs) print(results)
And we can create a process pool. Among them, processes represents the number of CPU cores. If there is no setting, all cores of the system will be used by default. Then use:
results = pool.map(task, inputs)
Among them, input is python iterable object, which will input each iteration element into the task() function we defined for processing, and process tasks in parallel according to the set number of CPU cores to improve task efficiency.
And results is the return value after all tasks are completed.
The above is the simplest python pool program.
Display Progress Bar
Sometimes, if our task is very large, we often need to progress bar so that we can confirm that the program is still running normally at any time. Then you can refer to the following approach.
First of all, it is recommended to install tqdm , a Python package that visualizes iterations:
Then, change the program to:
# coding: utf-8 import multiprocessing as mp import tqdm # Task def task(item): return item % 10 if __name__ == '__main__': pool = mp.Pool(processes=4) inputs = range(10) results = [] for result in tqdm.tqdm(pool.imap_unordered(task, inputs), total=len(inputs)): results.append(result) print(results)
100%|████████████████████████████████████████████| 10/10 [00:00[0, 1, 2, 4, 3, 5, 6, 7, 8, 9]
In this way, we can enjoy the high-speed processing of multiple processing while being able to clearly see the current progress.
References
Read More
Share this:
Checking progress of Python multiprocessing pools
Use tqdm or roll your own code snippets to quickly check the progress of your Python multiprocessing pools!
Contents
Process pools, such as those afforded by Python’s multiprocessing.Pool class, are often used to parallelize loops or map a function over an iterable. It can be helpful sometimes to monitor the progress over the loop or iterable, and we demonstrate below several ways to do so.
In the sample code below, we consider a situation where you want to apply a function my_func to each value in a dictionary data .
Option 1: Manually check status of AsyncResult objects
This option assumes you are working with one of the _async pool methods ( apply_async , map_async , or starmap_async ). These are non-blocking and return AsyncResult objects, which allow you to check on the status of results.
Specifically, we take advantage of AsyncResult.successful() , which does one of the following:
- Returns True : the call completed successfully
- Returns False : the call completed but raised an exception
- Raises ValueError : the call is still running
- The current Python 3.7 documentation incorrectly states that an AssertionError is raised. The documentation will be presuably fixed for Python 3.8. See https://github.com/python/cpython/pull/13721.
By keeping track of calls that completed but raised an exception, this option makes it easier to debug what went wrong. For example, if we see that an error occurred when calling my_func on the value corresponding to ‘key3’ , we can run results[‘key3’].get() to recover and analyze the exception.
- Non-blocking
- Easy to debug exceptions
- Supports any _async method
- Does not support using the process pool as a context manager
Sample code
import multiprocessing import time # Start the pool. pool = multiprocessing.Pool(nproc) results = <> start_time = time.time() for key, value in data.items(): results[key] = pool.apply_async(my_func, (value,)) # This code block can be (re)run whenever you want to check on the progress of the pool. running, successful, error = [], [], [] current_time = time.time() for key, result in results.items(): try: if result.successful(): successful.append(key) else: error.append(key) except ValueError: running.append(key) rate = (len(successful) + len(error)) / (current_time - start_time) print('Running:', sorted(running)) print('Successful:', sorted(successful)) print('Error:', sorted(error)) print('Rate:', round(rate, 3)) print('Estimated time to completion:', time.strftime('%H:%M:%S', time.gmtime(len(running) / rate))) # Don't forget to join the pool after it finishes running! pool.close() pool.join()
Running: ['key4', 'key5'] Successful: ['key1', 'key2'] Error: ['key3'] Rate: 1.234 Estimated time to completion: 00:01:23
Option 2: Using tqdm
We take advantage of the very helpful progress bar provided by the tqdm package. How to use tqdm , however, depends on the pool method we choose.
- Each _async method includes callback and error_callback arguments that take functions to be called when the result completes, with or without error, respectively. We can use these callback functions to update our tqdm progress bar.
- imap_unordered and imap each return an iterator, so we can simply wrap it with tqdm .
- Blocking
- Succinct progress bar
- Supports any non-blocking pool method (i.e., any method except apply , map , and starmap )
- Supports using the process pool as a context manager
Sample code
import multiprocessing from tqdm import tqdm # imap_unordered example results = [] with multiprocessing.Pool(nproc) as pool: for result in tqdm(pool.imap_unordered(my_func, data.values()), total=len(data)): results.append(result) pool.close() pool.join() # imap example with mulitprocessing.Pool(nproc) as pool: results = list(tqdm(pool.imap(my_func, data.values()), total=len(data))) pool.close() pool.join() # apply_async example pbar = tqdm(total=len(data)) def update(*a): pbar.update() results = [] with multiprocessing.Pool(nproc) as pool: for key, value in data.items(): pool.apply_async(my_func, (value,), callback=update) pool.close() pool.join()
Posted: Jul 22, 2019.
Tags: python multiprocessing © Benjamin Yeh, 2018-2021. If you find any errors, please submit an issue or pull request.tqdm-multiprocess 0.0.11
Easy multiprocessing with tqdm and logging redirected to main process.
Навигация
Ссылки проекта
Статистика
Метаданные
Лицензия: MIT License
Требует: Python >=3.6
Сопровождающие
Классификаторы
Описание проекта
tqdm-multiprocess
Using queues, tqdm-multiprocess supports multiple worker processes, each with multiple tqdm progress bars, displaying them cleanly through the main process. The worker processes also have access to a single global tqdm for aggregate progress monitoring.
Logging is also redirected from the subprocesses to the root logger in the main process.
Currently doesn’t support tqdm(iterator), you will need to intialize your worker tqdms with a total and update manually.
Due to the performance limits of the default Python multiprocess queue you need to update your global and worker process tqdms infrequently to avoid flooding the main process. I will attempt to implement a lock free ringbuffer at some point to see if things can be improved.
Installation
pip install tqdm-multiprocess
Usage
TqdmMultiProcessPool creates a standard python multiprocessing pool with the desired number of processes. Under the hood it uses async_apply with an event loop to monitor a tqdm and logging queue, allowing the worker processes to redirect both their tqdm objects and logging messages to your main process. There is also a queue for the workers to update the single global tqdm.
As shown below, you create a list of tasks containing their function and a tuple with your parameters. The functions you pass in will need the extra arguments on the end «tqdm_func, global_tqdm». You must use tqdm_func when initializing your tqdms for the redirection to work. As mentioned above, passing iterators into the tqdm function is currently not supported, so set total=total_steps when setting up your tqdm, and then update the progress manually with the update() method. All other arguments to tqdm should work fine.
Once you have your task list, call the map() method on your pool, passing in the process count, global_tqdm (or None), task list, as well as error and done callback functions. The error callback will be trigerred if your task functions return anything evaluating as False (if not task_result in the source code). The done callback will be called when the task succesfully completes.
The map method returns a list containing the returned results for all your tasks in original order.