Что такое пул потоков Java?
Всякий раз, когда вы создаете серверное приложение, потребность в создании нового потока возникает каждый раз, когда поступает запрос. Этот новый запрос является новым потоком, который создается. Рассмотрим пул потоков в Java, показывая его преимущества и недостатки.
В простом смысле он содержит группу рабочих потоков, ожидающих выполнения задания. Они повторно используются во всем процессе.
В пуле потоков создается группа потоков фиксированного размера. Всякий раз, когда задача должна быть предоставлена, один из потоков извлекается и назначается этой задачей поставщиком услуг, как только задание завершается, поток возвращается обратно в пул потоков. Пул потоков предпочтительно используется, потому что активные потоки потребляют системные ресурсы, когда JVM создает слишком много потоков одновременно, системе может не хватить памяти.
Следовательно, количество создаваемых потоков должно быть ограничено. Поэтому концепция пула потоков является предпочтительной!
Риски
Есть несколько рисков, когда вы имеете дело с пулом потоков, например;
- Утечка потока: Если поток удален из пула для выполнения задачи, но не возвращен обратно к нему, когда задача завершена, происходит утечка потока.
- Дедлок: в пуле потоков выполняется поток, ожидающий вывода из блока, который поток ожидает в очереди из-за недоступности потока для выполнения, есть случай тупика.
- Перерасход ресурсов: большее количество потоков, чем требуется оптимальное количество, может вызвать проблемы с истощением, приводящие к перерасходу ресурсов.
Преимущества
Некоторые из преимуществ использования пула потоков при программировании на Java:
- Лучшая производительность
- Экономит время
- Не нужно создавать тему снова и снова
- Легкий доступ
- Использование в реальном времени
Недостатки
Некоторые недостатки использования пула потоков при программировании:
- Нет контроля над приоритетом и состоянием потока, с которым вы работаете.
- Нет стабильной идентификации, данной потоку, трек не может быть сохранен.
- Когда существует высокий спрос на пул потоков, процесс может быть удален.
- Пул потоков может работать некорректно, когда два потока работают параллельно.
- Существует несколько ситуаций, когда код приложения может зависеть от кода другого приложения, несмотря на надежную изоляцию приложения.
Посмотрите код ниже, чтобы понять концепцию пула потоков в Java.
package MyPackage; import java.util.concurrent.LinkedBlockingQueue; public class ThreadPool < private final int nThreads; private final PoolWorker[] threads; private final LinkedBlockingQueuequeue; public ThreadPool(int Threads) < this.nThreads = Threads; queue = new LinkedBlockingQueue(); threads = new PoolWorker[Threads]; for (int i = 0; i < nThreads; i++) < threads[i] = new PoolWorker(); threads[i].start(); >> public void execute(Runnable task) < synchronized (queue) < queue.add(task); queue.notify(); >> private class PoolWorker extends Thread < public void run() < Runnable task; while (true) < synchronized (queue) < while (queue.isEmpty()) < try < queue.wait(); >catch (InterruptedException e) < System.out.println("An error occurred while queue is waiting: " + e.getMessage()); >> task = (Runnable)queue.poll(); > // If we don't catch RuntimeException, // the pool could leak threads try < task.run(); >catch (RuntimeException e) < System.out.println("Thread pool is interrupted due to an issue: " + e.getMessage()); >> > > >
Как это работает в мире java. Пул потоков
Основной принцип программирования гласит: не изобретать велосипед. Но иногда, чтобы понять, что происходит и как использовать инструмент неправильно, нам нужно это сделать. Сегодня изобретаем паттерн многопоточного выполнения задач.
Представим, что у вас которая вызывает большую загрузку процессора:
public class Counter < public Double count(double a) < for (int i = 0; i < 1000000; i++) < a = a + Math.tan(a); >return a; > >
Мы хотим как можно быстрее обработать ряд таких задач, попробуем*:
public class SingleThreadClient < public static void main(String[] args) < Counter counter = new Counter(); long start = System.nanoTime(); double value = 0; for (int i = 0; i < 400; i++) < value += counter.count(i); >System.out.println(format("Executed by %d s, value : %f", (System.nanoTime() - start) / (1000_000_000), value)); > >
На моей тачке с 4 физическими ядрами использование ресурсов процессора top -pid :
Как вы заметили, загрузка одного процессора на один java-процесс с одним выполняемым потоком составляет 100%, но общая загрузка процессора в пользовательском пространстве составляет всего 2,5%, и у нас есть много неиспользуемых системных ресурсов.
Давайте попробуем использовать больше, добавив больше рабочих потоков:
public class MultithreadClient < public static void main(String[] args) throws ExecutionException, InterruptedException < ExecutorService threadPool = Executors.newFixedThreadPool(8); Counter counter = new Counter(); long start = System.nanoTime(); List> futures = new ArrayList<>(); for (int i = 0; i < 400; i++) < final int j = i; futures.add( CompletableFuture.supplyAsync( () ->counter.count(j), threadPool )); > double value = 0; for (Future future : futures) < value += future.get(); >System.out.println(format("Executed by %d s, value : %f", (System.nanoTime() - start) / (1000_000_000), value)); threadPool.shutdown(); > >
ThreadPoolExecutor
Для ускорения мы использовали ThreadPool — в java его роль играет ThreadPoolExecutor, который может быть реализован непосредственно или из одного из методов в классе Utilities. Если мы заглянем внутрь ThreadPoolExecutor, мы можем найти очередь:
private final BlockingQueue workQueue;
в которой задачи собираются, если запущено больше потоков чем размер начального пула. Если запущено меньше потоков начального размера пула, пул попробует стартовать новый поток:
public void execute(Runnable command) < . if (workerCountOf(c) < corePoolSize) < if (addWorker(command, true)) return; . if (isRunning(c) && workQueue.offer(command)) < . addWorker(null, false); . >>
Каждый addWorker запускает новый поток с задачей Runnable, которая опрашивает workQueue на наличие новых задач и выполняет их.
final void runWorker(Worker w)
ThreadPoolExecutor имеет очень понятный javadoc, поэтому нет смысла его перефразировать. Вместо этого, давайте попробуем сделать наш собственный:
public class ThreadPool implements Executor < private final QueueworkQueue = new ConcurrentLinkedQueue<>(); private volatile boolean isRunning = true; public ThreadPool(int nThreads) < for (int i = 0; i < nThreads; i++) < new Thread(new TaskWorker()).start(); >> @Override public void execute(Runnable command) < if (isRunning) < workQueue.offer(command); >> public void shutdown() < isRunning = false; >private final class TaskWorker implements Runnable < @Override public void run() < while (isRunning) < Runnable nextTask = workQueue.poll(); if (nextTask != null) < nextTask.run(); >> > > >
Теперь давайте выполним ту же задачу, что и выше, с нашим пулом.
Меняем строку в MultithreadClient:
// ExecutorService threadPool = Executors.newFixedThreadPool (8); ThreadPool threadPool = new ThreadPool (8);
Время выполнения практически одинаковое — 15 секунд.
Размер пула потоков
Попробуем еще больше увеличить количество запущенных потоков в пуле — до 100.
ThreadPool threadPool = new ThreadPool(100);
Мы можем видеть, что время выполнения увеличилось до 28 секунд — почему это произошло?
Существует несколько независимых причин, по которым производительность могла упасть, например, из-за постоянных переключений контекста процессора, когда он приостанавливает работу над одной задачей и должен переключаться на другую, переключение включает сохранение состояния и восстановление состояния. Пока процессор занято переключением состояний, оно не делает никакой полезной работы над какой-либо задачей.
Количество переключений контекста процесса можно увидеть, посмотрев на csw параметр при выводе команды top.
На 8 потоках:
На 100 потоках:
Размер зависит от типа выполняемых задач. Разумеется, размер пула потоков редко должен быть захардокожен, скорее он должен быть настраиваемый а оптимальный размер выводится из мониторинга пропускной способности исполняемых задач.
Предполагая, что потоки не блокируют друг друга, нет циклов ожидания I/O, и время обработки задач одинаково, оптимальный пул потоков = Runtime.getRuntime().availableProcessors() + 1.
Если потоки в основном ожидают I/O, то оптимальный размер пула должен быть увеличен на отношение между временем ожидания процесса и временем вычисления. Например. У нас есть процесс, который тратит 50% времени в iowait, тогда размер пула может быть 2 * Runtime.getRuntime().availableProcessors() + 1.
Другие виды пулов
- Пул потоков с ограничением по памяти, который блокирует отправку задачи, когда в очереди слишком много задач MemoryAwareThreadPoolExecutor
- Пул потоков, который регистрирует JMX-компонент для контроля и настройки размера пула в runtime.
JMXEnabledThreadPoolExecutor
Thread Pools
Most of the executor implementations in java.util.concurrent use thread pools, which consist of worker threads. This kind of thread exists separately from the Runnable and Callable tasks it executes and is often used to execute multiple tasks.
Using worker threads minimizes the overhead due to thread creation. Thread objects use a significant amount of memory, and in a large-scale application, allocating and deallocating many thread objects creates a significant memory management overhead.
One common type of thread pool is the fixed thread pool. This type of pool always has a specified number of threads running; if a thread is somehow terminated while it is still in use, it is automatically replaced with a new thread. Tasks are submitted to the pool via an internal queue, which holds extra tasks whenever there are more active tasks than threads.
An important advantage of the fixed thread pool is that applications using it degrade gracefully. To understand this, consider a web server application where each HTTP request is handled by a separate thread. If the application simply creates a new thread for every new HTTP request, and the system receives more requests than it can handle immediately, the application will suddenly stop responding to all requests when the overhead of all those threads exceed the capacity of the system. With a limit on the number of the threads that can be created, the application will not be servicing HTTP requests as quickly as they come in, but it will be servicing them as quickly as the system can sustain.
A simple way to create an executor that uses a fixed thread pool is to invoke the newFixedThreadPool factory method in java.util.concurrent.Executors This class also provides the following factory methods:
- The newCachedThreadPool method creates an executor with an expandable thread pool. This executor is suitable for applications that launch many short-lived tasks.
- The newSingleThreadExecutor method creates an executor that executes a single task at a time.
- Several factory methods are ScheduledExecutorService versions of the above executors.
If none of the executors provided by the above factory methods meet your needs, constructing instances of java.util.concurrent.ThreadPoolExecutor or java.util.concurrent.ScheduledThreadPoolExecutor will give you additional options.