Многопоточное чтение файлов java

Как читать файлы в многопоточном режиме?

В настоящее время у меня есть программа, которая читает файл (очень большой) в однопоточном режиме и создает индекс поиска, но он занимает слишком много времени для индексации в однопоточной среде.

Теперь я пытаюсь заставить его работать в многопоточном режиме, но не уверен, что это лучший способ.

Моя основная программа создает буферизованный читатель и передает экземпляр в поток, и поток использует буферный экземпляр считывателя для чтения файлов.

Я не думаю, что это работает так, как ожидалось, и каждый поток читает одну и ту же строку снова и снова.

Есть ли способ заставить потоки читать только строки, которые не читаются другим потоком? Нужно ли разбить файл? Есть ли способ реализовать это без разделения файла?

Пример Основной программы:

import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.ArrayList; public class TestMTFile < public static void main(String args[]) < BufferedReader reader = null; ArrayListthreads = new ArrayList(); try < reader = new BufferedReader(new FileReader( "test.tsv")); >catch (FileNotFoundException e1) < e1.printStackTrace(); >for (int i = 0; i int running = 0; int runner1 = 0; int runner2 = 0; do < running = 0; for (Thread thread : threads) < if (thread.isAlive()) < runner1 = running++; >> if (runner2 != runner1) < runner2 = runner1; System.out.println("We have " + runner2 + " running threads. "); >> while (running > 0); if (running == 0) < System.out.println("Ended"); >> > 
import java.io.BufferedReader; import java.io.IOException; public class ReadFileMT implements Runnable < BufferedReader bReader = null; ReadFileMT(BufferedReader reader) < this.bReader = reader; >public synchronized void run() < String line; try < while ((line = bReader.readLine()) != null) < try < System.out.println(line); >catch (Exception e) < >> > catch (IOException e) < // TODO Auto-generated catch block e.printStackTrace(); >> > 

Ваше узкое место скорее всего является индексированием, а не чтением файла. предполагая, что ваша система индексирования поддерживает несколько потоков, вы, вероятно, хотите настроить производителя/потребителя с одним потоком, читающим файл, и нажатием каждой строки в BlockingQueue (продюсер) и несколькими потоками, вытягивающими линии из BlockingQueue и нажатиями их в индекс ( потребителей).

Читайте также:  What is html hypertext markup language является

См. этот поток – если ваши файлы находятся на одном диске, вы не можете сделать лучше, чем чтение их одним потоком, хотя это возможно для обработки файлов с несколькими потоками после их чтения в основной памяти.

Если вы можете использовать Java 8, вы можете сделать это быстро и легко, используя API Streams. Прочитайте файл в MappedByteBuffer, который может быстро открыть файл до 2 ГБ, а затем прочитать строки из буфера (вам нужно убедиться, что ваша JVM имеет достаточно дополнительной памяти для хранения файла):

package com.objective.stream; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.Paths; import java.util.stream.Stream; public class StreamsFileProcessor < private MappedByteBuffer buffer; public static void main(String[] args)< if (args[0] != null)< Path myFile = Paths.get(args[0]); StreamsFileProcessor proc = new StreamsFileProcessor(); try < proc.process(myFile); >catch (IOException e) < e.printStackTrace(); >> > public void process(Path file) throws IOException < readFileIntoBuffer(file); getBufferStream().parallel() .forEach(this::doIndex); >private Stream getBufferStream() throws IOException < try (BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer.array()))))< return reader.lines(); >> private void readFileIntoBuffer(Path file) throws IOException < try(FileInputStream fis = new FileInputStream(file.toFile()))< FileChannel channel = fis.getChannel(); buffer = channel.map(FileChannel.MapMode.PRIVATE, 0, channel.size()); >> private void doIndex(String s) < // Do whatever I need to do to index the line here >> 

Во-первых, я согласен с @Zim-Zam в том, что это файл IO, а не индексирование, вероятно, это шаг определения скорости. (Поэтому я не согласен с @jtahlborn). Зависит от того, насколько сложна индексация.

Во-вторых, в вашем коде каждый поток имеет свой собственный, независимый BufferedReader . Поэтому они все прочитают весь файл. Одним из возможных исправлений является использование одного BufferedReader , который они разделяют. И тогда вам нужно синхронизировать метод BufferedReader.readLine() (я думаю), так как javadocs молчат о том, является ли BufferedReader потокобезопасным. И, поскольку я думаю, что я – ботленок, это станет узким местом, и я сомневаюсь, что многопоточность принесет вам много. Но попробуй, я иногда ошибался.: -)

p.s. Я согласен с @jtahlmorn в том, что образец производителя/потребителя лучше, чем моя доля, идея BufferedReader, но для вас это будет намного больше.

Источник

Русские Блоги

Прежде чем писать, объявите, что эта статья основана на примере многопоточного файла JAVA для чтения и записи, полученного на веб-сайте блогового сада. Я был в положении, когда писал свою собственную программу. Автор улучшил на основе написания, но не могу вспомнить исходный адрес. Если есть инсайдеры, напишите адрес, я процитирую или перепечатаю эту статью.

Эта программа основана на том, что на заднем плане системы существует файл журнала размером почти 2 ГБ. Вам будет трудно открыть его в любом редакторе. Для такой обработки анализа больших файлов решение состоит в том, чтобы использовать несколько потоков для разделения и чтения указанного большого файла. Получите информацию, которая нам нужна. Не так много, чтобы сказать, код включен, есть комментарии, чтобы помочь понять.

package com.thread.multipl.mysolution;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.CountDownLatch;

/**
* Этот поток используется для чтения файла. Когда получено указанное ключевое слово, добавьте 1 к указанному объекту.
* @author 2
*
*/
public class ReadThread extends Thread
// Определяем длину массива байтов (бамбуковая трубка для забора воды)
private final int BUFF_LEN = 256;
// Определяем начальную точку чтения
private long start;
// Определяем конечную точку чтения
private long end;
// прочитанные байты выводятся в raf. RandomAccessFile можно понимать как файловый поток, то есть указанная часть объекта упаковки извлекается из файла
private RandomAccessFile raf;
// Ключевые слова, которые необходимо указать в теме
private String keywords;
// Сколько раз эта тема читала ключевое слово
private int curCount = 0;
/**
* Класс, к которому начал присоединяться jdk1.5, является многопоточным вспомогательным классом.
* Используется для равномерного выполнения операций перед началом многопоточности или для вызова основного потока для выполнения соответствующих операций после завершения многопоточности.
*/
private CountDownLatch doneSignal;
public ReadThread(long start, long end, RandomAccessFile raf,String keywords,CountDownLatch doneSignal) this.start = start;
this.end = end;
this.raf = raf;
this.keywords = keywords;
this.doneSignal = doneSignal;
>

public void run() try raf.seek(start);
// Этот поток отвечает за чтение размера файла
long contentLen = end - start;
// Определите, что вам нужно прочитать не более нескольких раз, чтобы завершить чтение этой темы
long times = contentLen / BUFF_LEN+1;
System.out.println (this.toString () + "Количество необходимых чтений:" + раз);
byte[] buff = new byte[BUFF_LEN];
int hasRead = 0;
String result = null;
for (int i = 0; i < times; i++) <
// Прежде чем SEEK определит начальную позицию, прочитайте содержимое указанной длины группы байтов здесь, метод read возвращает следующую позицию, чтобы начать чтение
hasRead = raf.read(buff);
// Если количество прочитанных байтов меньше 0, выйдите из цикла! (До конца байтового массива)
if (hasRead < 0) <
break;
>
result = new String(buff,"gb2312");
/// System.out.println(result);
int count = this.getCountByKeywords(result, keywords);
if(count > 0) this.curCount += count;
>
>

KeyWordsCount kc = KeyWordsCount.getCountObject();

kc.addCount(this.curCount);

doneSignal.countDown();//current thread finished! noted by latch object!
> catch (IOException e) // TODO Auto-generated catch block
e.printStackTrace();
>
>

public long getStart() return start;
>

public void setStart(long start) this.start = start;
>

public long getEnd() return end;
>

public void setEnd(long end) this.end = end;
>

public RandomAccessFile getRaf() return raf;
>

public void setRaf(RandomAccessFile raf) this.raf = raf;
>

public int getCountByKeywords(String statement,String key) return statement.split(key).length-1;
>

public int getCurCount() return curCount;
>

public void setCurCount(int curCount) this.curCount = curCount;
>

public CountDownLatch getDoneSignal() return doneSignal;
>

public void setDoneSignal(CountDownLatch doneSignal) this.doneSignal = doneSignal;
>
>
package com.thread.multipl.mysolution;

import java.io.File;
import java.io.RandomAccessFile;
import java.util.concurrent.CountDownLatch;

public class MultiReadTest
/**
* Тест многопоточного чтения файла
* @param args
*/
public static void main(String[] args) // TODO Auto-generated method stub
final int DOWN_THREAD_NUM = 10; // Запуск 10 потоков для чтения указанного файла
final String OUT_FILE_NAME = "d: \\ Yi Tian Tu Long Ji.txt";
окончательные строковые ключевые слова = "Wuji";
//jdk1.5 вспомогательный класс потока, класс, используемый основным потоком для ожидания завершения всех дочерних потоков,
// Другое решение: напишите свой таймер, лично предложите использовать этот класс
CountDownLatch doneSignal = new CountDownLatch(DOWN_THREAD_NUM);
RandomAccessFile[] outArr = new RandomAccessFile[DOWN_THREAD_NUM];
try long length = new File(OUT_FILE_NAME).length();
System.out.println ("Общая длина файла:" + длина + "байт");
// Количество байтов, которые должны быть прочитаны на поток
long numPerThred = length / DOWN_THREAD_NUM;
System.out.println ("Количество байтов, прочитанных каждым потоком:" + numPerThred + "bytes");
// остаток после деления всего файла
long left = length % DOWN_THREAD_NUM;
for (int i = 0; i < DOWN_THREAD_NUM; i++) <
// Открываем входной поток и объект RandomAccessFile для каждого потока,

// Пусть каждый поток отвечает за чтение разных частей файла
outArr[i] = new RandomAccessFile(OUT_FILE_NAME, "rw");
if (i != 0) <
//
// isArr [i] = new FileInputStream ("d: / brave heart.rmvb");
// Создать несколько объектов RandomAccessFile с указанным выходным файлом

>
if (i == DOWN_THREAD_NUM - 1) <
// // последний поток читает указанный numPerThred + левые байты
// System.out.println («первый» поток + i + »считывает позицию из" + i * numPerThred + "в" + ((i + 1) * numPerThred + left) + "");
new ReadThread(i * numPerThred, (i + 1) * numPerThred
+ left, outArr[i],keywords,doneSignal).start();
> else <
// Каждый поток отвечает за чтение определенного количества байтов numPerThred
// System.out.println («первый» поток + i + »считывает позицию из" + i * numPerThred + "в" + ((i + 1) * numPerThred) + "");
new ReadThread(i * numPerThred, (i + 1) * numPerThred,
outArr[i],keywords,doneSignal).start();
>
>
>catch(Exception e) e.printStackTrace();
>
// finally//
// >
// Подтверждаем, что все задачи потока выполнены, и начинаем выполнение операции основного потока
try doneSignal.await();
> catch (InterruptedException e) // TODO Auto-generated catch block
e.printStackTrace();
>
// Здесь необходимо принять решение, все прочитанные рабочие потоки выполнены.
KeyWordsCount k = KeyWordsCount.getCountObject();
// Map resultMap = k.getMap();
System.out.println ("Количество вхождений указанного ключевого слова:" + k.getCount ());
>

>
package com.thread.multipl.mysolution;


/**
* Статистические ключевые объекты
* @author 2
*
*/

public class KeyWordsCount
private static KeyWordsCount kc;

private int count = 0;
private KeyWordsCount()
>

public static synchronized KeyWordsCount getCountObject() if(kc == null) kc = new KeyWordsCount();
>
return kc;
>

public synchronized void addCount(int count) System.out.println («Увеличьте количество раз:» + количество);
this.count += count;
>

public int getCount() return count;
>

public void setCount(int count) this.count = count;
>

>

Результаты приведены ниже:
[quote] Общая длина файла: 2012606 байт
Количество байтов, прочитанных каждым потоком: 201260 байтов
Тема [Тема-0,5, главная] Необходимо прочитать количество раз: 787
Тема [Тема-1,5, главная] Необходимо прочитать количество раз: 787
Тема [Тема-2,5, главная] Нужно прочитать количество раз: 787
Тема [Тема-3,5, главная] Нужно прочитать количество раз: 787
Тема [Тема-4,5, главная] Необходимо прочитать количество раз: 787
Тема [Тема 5,5, главная] Необходимо прочитать количество раз: 787
Тема [Тема 6,5, главная] Необходимо прочитать количество раз: 787
Тема [Тема-7,5, главная] Необходимо прочитать количество раз: 787
Тема [Тема 8,5, главная] Необходимо прочитать количество раз: 787
Тема [Тема-9,5, главная] Необходимо прочитать количество раз: 787
Увеличивает: 0
Увеличивает: 146
Увеличивает: 432
Увеличивает: 539
Увеличивает: 587
Увеличивает: 717
Увеличивает: 631
Увеличивает: 467
Увеличивает: 665
Увеличивает: 538
Количество вхождений указанного ключевого слова: 4722 [/ quote]

Я использовал 10 потоков, чтобы разобрать историю «И Тянь Ту Лонг Джи», написанную Мастером Цзинь Юном. Слово «Уцзи» появилось в этом романе 4472 раза. Я не мог найти файл большего размера. Размер Yi Tian Tu Long Ji.txt составляет 4 метра.

[b] О роли CountDownLatch: [/ b]
В документации API это объясняется как вспомогательный класс. Класс инструмента для управления переключением между основным потоком и дочерним потоком. Как искать в Интернете. Некоторые люди обсуждали это в ITEYE. Я использую его здесь для решения таких проблем: [b] Убедившись, что 10 потоков завершили работу по разбору файлов, система вызывает основной поток, чтобы выполнить оставшиеся действия, а именно: вывести «число вхождений». [/ b] Если это не обеспечено, это приведет к выполнению четвертого потока, последующий поток еще не запущен, и система сделала последний шаг для вывода статистических результатов, которые не достигнут желаемого эффекта. Здесь есть еще одно простое решение, напишите счетчик самостоятельно, от 10 до 1, 10 потоков. Это зависит от личных предпочтений.

Источник

Оцените статью