Java ThreadPoolExecutor with BlockingQueue
Learn to use Java ThreadPoolExecutor in combination with BlockingQueue.
1. Creating ThreadPoolExecutor
A ThreadPoolExecutor is a type of ExecutorService that executes each submitted task using one of the threads from a thread pool. This class provides many flexible ways to create a pool of threads in different contexts.
The following constructors can be used to create a thread pool executor instance based on our requirements.
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
The constructor arguments are:
- corePoolSize – the number of threads to keep in the pool, even if they are idle.
- maximumPoolSize – the maximum number of threads to allow in the pool.
- keepAliveTime – when the number of threads is greater than the core, this is the maximum time an idle thread will wait for the new task.
- unit – the time unit for the keepAliveTime argument.
- workQueue – the queue to use for holding Runnable tasks before they are executed.
- threadFactory – an optional factory to use when the executor creates a new thread.
- handler – rejected task execution handler.
1.2. Custom ThreadPoolExecutor
Even without extending the ThreadPoolExecutor, we can use it very effectively. But, we will miss some extremely useful features in terms of controlling the execution flow.
For example, ThreadPoolExecutor class provides two excellent methods which I will highly recommend to override. These methods provide a very good handle on the execution lifecycle of a Runnable to be executed.
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class CustomThreadPoolExecutor extends ThreadPoolExecutor < public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue) < super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); >@Override protected void beforeExecute(Thread t, Runnable r) < super.beforeExecute(t, r); System.out.println("Perform beforeExecute() logic"); >@Override protected void afterExecute(Runnable r, Throwable t) < super.afterExecute(r, t); if (t != null) < System.out.println("Perform exception handler logic"); >System.out.println("Perform afterExecute() logic"); > >
A BlockingQueue is like another Queue implementations with additional capabilities. Any attempt, to retrieve something out of it, can be seen safe as it will not return empty. The consumer thread will automatically wait until BlockingQueue is not populated with some data. Once it fills, the thread will consume the resource.
A BlockingQueue may be used to transfer and hold the tasks to be executed by the thread pool. Blocking queues helps in many ways:
- If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
- If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
- If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
The ThreadPoolExecutor support different kind of blocking queues. Each queue provides a different behavior to the processing of the tasks.
2.1.1. Direct Handoffs
This can be achieved with SynchronousQueue that does not have any internal capacity. We cannot insert a task (using any method) unless another thread is trying to take it.
When using the synchronous queue, when we attempt to queue a task then this will fail if no threads are immediately available to run it. If it still has not reached to maximumPoolSize thread then a new thread will be constructed. Else, the task will be rejected immediately.
2.1.2. Unbounded Queues
An unbounded queue (for example, LinkedBlockingQueue ) causes new submitted tasks to wait in the queue when all (corePoolSize) threads are busy. Because tasks can wait for unlimited time, the executor needs not create new threads. So maximumPoolSize has no effect if this queue is used.
This style of queuing can be useful when a sudden burst of requests comes to the server. Although, this may lead to memory issues if requests continue to come faster than they are processed.
2.1.3. Bounded Queues
Bounded queues (for example, ArrayBlockingQueue ) helps in managing the resources in a much better way. It provides mechanisms to control the number of threads as well as the tasks in the queues to prevent resource exhaustion.
For different scenarios, we can test custom pool sizes and queue sizes, and finally, use what is best suited for our usecase.
- Using large queues and small pools minimizes the system overhead, but leads to low throughput.
- Using small queues and large pools also keeps the CPU busy which also can lead to low throughput.
- So finding a right balance between the queue size and pool size is important.
2.2. Handling Rejected Tasks
There may be situations when the submitted tasks cannot be executed by the executor service and thus have been rejected. Task rejection may occur when no more threads or queue slots are available because their bounds have been exceeded, or the executor has been shut down.
ThreadPoolExecutor provides the following 4 inbuild handlers to handle these rejected tasks. We can create our own custom handler as well.
- AbortPolicy : This is the default policy. It causes the executor to throw a RejectedExecutionException.
- CallerRunsPolicy : This policy runs the rejected task directly in the calling thread of the execute method. If the executor has been shut down, the task will be discarded.
- DiscardOldestPolicy : This policy discards the oldest unhandled request and then retries execute . If the executor has been shut down, the task will be discarded.
- DiscardPolicy : This policy silently discards the rejected task.
- Custom Policy : We can implement the RejectedExecutionHandler interface and provide our own logic to handle the rejected tasks.
3. Using ThreadPoolExecutor with BlockingQueue
To demonstrate the usage of ThreadPoolExecutor with BlockingQueue, we have created one task DemoTask . This task does nothing. It simply waits for 500ms and then completes.
public class DemoTask implements Runnable < private String name = null; public DemoTask(String name) < this.name = name; >public String getName() < return this.name; >@Override public void run() < try < Thread.sleep(500); >catch (InterruptedException e) < e.printStackTrace(); >System.out.println("Executing : " + name); > >
Now lets suppose we have total 100 tasks. We want to run them using ideally 10, and the maximum of 20 threads.
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class DemoExecutor < public static void main(String[] args) throws InterruptedException < BlockingQueueblockingQueue = new LinkedBlockingQueue(); CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10, 20, 5, TimeUnit.SECONDS, blockingQueue, new ThreadPoolExecutor.AbortPolicy()); // Let start all core threads initially executor.prestartAllCoreThreads(); for (int i = 1; i executor.shutdown(); executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); > >
Execute the above code and you will see all the tasks gets executed one by one.
Как это работает в мире java. Пул потоков
Основной принцип программирования гласит: не изобретать велосипед. Но иногда, чтобы понять, что происходит и как использовать инструмент неправильно, нам нужно это сделать. Сегодня изобретаем паттерн многопоточного выполнения задач.
Представим, что у вас которая вызывает большую загрузку процессора:
public class Counter < public Double count(double a) < for (int i = 0; i < 1000000; i++) < a = a + Math.tan(a); >return a; > >
Мы хотим как можно быстрее обработать ряд таких задач, попробуем*:
public class SingleThreadClient < public static void main(String[] args) < Counter counter = new Counter(); long start = System.nanoTime(); double value = 0; for (int i = 0; i < 400; i++) < value += counter.count(i); >System.out.println(format("Executed by %d s, value : %f", (System.nanoTime() - start) / (1000_000_000), value)); > >
На моей тачке с 4 физическими ядрами использование ресурсов процессора top -pid :
Как вы заметили, загрузка одного процессора на один java-процесс с одним выполняемым потоком составляет 100%, но общая загрузка процессора в пользовательском пространстве составляет всего 2,5%, и у нас есть много неиспользуемых системных ресурсов.
Давайте попробуем использовать больше, добавив больше рабочих потоков:
public class MultithreadClient < public static void main(String[] args) throws ExecutionException, InterruptedException < ExecutorService threadPool = Executors.newFixedThreadPool(8); Counter counter = new Counter(); long start = System.nanoTime(); List> futures = new ArrayList<>(); for (int i = 0; i < 400; i++) < final int j = i; futures.add( CompletableFuture.supplyAsync( () ->counter.count(j), threadPool )); > double value = 0; for (Future future : futures) < value += future.get(); >System.out.println(format("Executed by %d s, value : %f", (System.nanoTime() - start) / (1000_000_000), value)); threadPool.shutdown(); > >
ThreadPoolExecutor
Для ускорения мы использовали ThreadPool — в java его роль играет ThreadPoolExecutor, который может быть реализован непосредственно или из одного из методов в классе Utilities. Если мы заглянем внутрь ThreadPoolExecutor, мы можем найти очередь:
private final BlockingQueue workQueue;
в которой задачи собираются, если запущено больше потоков чем размер начального пула. Если запущено меньше потоков начального размера пула, пул попробует стартовать новый поток:
public void execute(Runnable command) < . if (workerCountOf(c) < corePoolSize) < if (addWorker(command, true)) return; . if (isRunning(c) && workQueue.offer(command)) < . addWorker(null, false); . >>
Каждый addWorker запускает новый поток с задачей Runnable, которая опрашивает workQueue на наличие новых задач и выполняет их.
final void runWorker(Worker w)
ThreadPoolExecutor имеет очень понятный javadoc, поэтому нет смысла его перефразировать. Вместо этого, давайте попробуем сделать наш собственный:
public class ThreadPool implements Executor < private final QueueworkQueue = new ConcurrentLinkedQueue<>(); private volatile boolean isRunning = true; public ThreadPool(int nThreads) < for (int i = 0; i < nThreads; i++) < new Thread(new TaskWorker()).start(); >> @Override public void execute(Runnable command) < if (isRunning) < workQueue.offer(command); >> public void shutdown() < isRunning = false; >private final class TaskWorker implements Runnable < @Override public void run() < while (isRunning) < Runnable nextTask = workQueue.poll(); if (nextTask != null) < nextTask.run(); >> > > >
Теперь давайте выполним ту же задачу, что и выше, с нашим пулом.
Меняем строку в MultithreadClient:
// ExecutorService threadPool = Executors.newFixedThreadPool (8); ThreadPool threadPool = new ThreadPool (8);
Время выполнения практически одинаковое — 15 секунд.
Размер пула потоков
Попробуем еще больше увеличить количество запущенных потоков в пуле — до 100.
ThreadPool threadPool = new ThreadPool(100);
Мы можем видеть, что время выполнения увеличилось до 28 секунд — почему это произошло?
Существует несколько независимых причин, по которым производительность могла упасть, например, из-за постоянных переключений контекста процессора, когда он приостанавливает работу над одной задачей и должен переключаться на другую, переключение включает сохранение состояния и восстановление состояния. Пока процессор занято переключением состояний, оно не делает никакой полезной работы над какой-либо задачей.
Количество переключений контекста процесса можно увидеть, посмотрев на csw параметр при выводе команды top.
На 8 потоках:
На 100 потоках:
Размер зависит от типа выполняемых задач. Разумеется, размер пула потоков редко должен быть захардокожен, скорее он должен быть настраиваемый а оптимальный размер выводится из мониторинга пропускной способности исполняемых задач.
Предполагая, что потоки не блокируют друг друга, нет циклов ожидания I/O, и время обработки задач одинаково, оптимальный пул потоков = Runtime.getRuntime().availableProcessors() + 1.
Если потоки в основном ожидают I/O, то оптимальный размер пула должен быть увеличен на отношение между временем ожидания процесса и временем вычисления. Например. У нас есть процесс, который тратит 50% времени в iowait, тогда размер пула может быть 2 * Runtime.getRuntime().availableProcessors() + 1.
Другие виды пулов
- Пул потоков с ограничением по памяти, который блокирует отправку задачи, когда в очереди слишком много задач MemoryAwareThreadPoolExecutor
- Пул потоков, который регистрирует JMX-компонент для контроля и настройки размера пула в runtime.
JMXEnabledThreadPoolExecutor