Skip to content
Snippets Groups Projects
Commit f9c8b1a8 authored by srosse's avatar srosse
Browse files

OO-1522: force writing the index in a single threaded executor which access is...

OO-1522: force writing the index in a single threaded executor which access is blocked to prevent a queue with a lot of big documents
parent 3071feec
No related branches found
No related tags found
No related merge requests found
...@@ -59,7 +59,6 @@ public class SearchModule extends AbstractOLATModule { ...@@ -59,7 +59,6 @@ public class SearchModule extends AbstractOLATModule {
private static final String CONF_INDEX_INTERVAL = "indexInterval"; private static final String CONF_INDEX_INTERVAL = "indexInterval";
private static final String CONF_MAX_HITS = "maxHits"; private static final String CONF_MAX_HITS = "maxHits";
private static final String CONF_MAX_RESULTS = "maxResults"; private static final String CONF_MAX_RESULTS = "maxResults";
private static final String CONF_NUMBER_INDEX_WRITER = "numberIndexWriter";
private static final String CONF_FOLDER_POOL_SIZE = "folderPoolSize"; private static final String CONF_FOLDER_POOL_SIZE = "folderPoolSize";
private static final String CONF_RESTART_WINDOW_START = "restartWindowStart"; private static final String CONF_RESTART_WINDOW_START = "restartWindowStart";
private static final String CONF_RESTART_WINDOW_END = "restartWindowEnd"; private static final String CONF_RESTART_WINDOW_END = "restartWindowEnd";
...@@ -83,7 +82,6 @@ public class SearchModule extends AbstractOLATModule { ...@@ -83,7 +82,6 @@ public class SearchModule extends AbstractOLATModule {
private static final int DEFAULT_INDEX_INTERVAL = 0; private static final int DEFAULT_INDEX_INTERVAL = 0;
private static final int DEFAULT_MAX_HITS = 1000; private static final int DEFAULT_MAX_HITS = 1000;
private static final int DEFAULT_MAX_RESULTS = 100; private static final int DEFAULT_MAX_RESULTS = 100;
private static final int DEFAULT_NUMBER_INDEX_WRITER = 0;
private static final int DEFAULT_FOLDER_POOL_SIZE = 0; private static final int DEFAULT_FOLDER_POOL_SIZE = 0;
private static final int DEFAULT_RESTART_WINDOW_START = 0; private static final int DEFAULT_RESTART_WINDOW_START = 0;
private static final int DEFAULT_RESTART_WINDOW_END = 24; private static final int DEFAULT_RESTART_WINDOW_END = 24;
...@@ -104,7 +102,6 @@ public class SearchModule extends AbstractOLATModule { ...@@ -104,7 +102,6 @@ public class SearchModule extends AbstractOLATModule {
private List<String> fileBlackList; private List<String> fileBlackList;
private List<String> customFileBlackList; private List<String> customFileBlackList;
private int numberIndexWriter;
private int folderPoolSize; private int folderPoolSize;
private int restartWindowStart; private int restartWindowStart;
private int restartWindowEnd; private int restartWindowEnd;
...@@ -185,7 +182,6 @@ public class SearchModule extends AbstractOLATModule { ...@@ -185,7 +182,6 @@ public class SearchModule extends AbstractOLATModule {
indexInterval = getIntConfigParameter(CONF_INDEX_INTERVAL, DEFAULT_INDEX_INTERVAL); indexInterval = getIntConfigParameter(CONF_INDEX_INTERVAL, DEFAULT_INDEX_INTERVAL);
maxHits = getIntConfigParameter(CONF_MAX_HITS, DEFAULT_MAX_HITS); maxHits = getIntConfigParameter(CONF_MAX_HITS, DEFAULT_MAX_HITS);
maxResults = getIntConfigParameter(CONF_MAX_RESULTS, DEFAULT_MAX_RESULTS); maxResults = getIntConfigParameter(CONF_MAX_RESULTS, DEFAULT_MAX_RESULTS);
numberIndexWriter = getIntConfigParameter(CONF_NUMBER_INDEX_WRITER, DEFAULT_NUMBER_INDEX_WRITER);
folderPoolSize = getIntConfigParameter(CONF_FOLDER_POOL_SIZE, DEFAULT_FOLDER_POOL_SIZE); folderPoolSize = getIntConfigParameter(CONF_FOLDER_POOL_SIZE, DEFAULT_FOLDER_POOL_SIZE);
restartWindowStart = getIntConfigParameter(CONF_RESTART_WINDOW_START, DEFAULT_RESTART_WINDOW_START); restartWindowStart = getIntConfigParameter(CONF_RESTART_WINDOW_START, DEFAULT_RESTART_WINDOW_START);
restartWindowEnd = getIntConfigParameter(CONF_RESTART_WINDOW_END, DEFAULT_RESTART_WINDOW_END); restartWindowEnd = getIntConfigParameter(CONF_RESTART_WINDOW_END, DEFAULT_RESTART_WINDOW_END);
...@@ -344,13 +340,6 @@ public class SearchModule extends AbstractOLATModule { ...@@ -344,13 +340,6 @@ public class SearchModule extends AbstractOLATModule {
return customFileBlackList; return customFileBlackList;
} }
/**
* @return Number of IndexWriterWorker in Multithreaded mode.
*/
public int getNumberIndexWriter() {
return numberIndexWriter;
}
/** /**
* @return Number of FolderIndexWorker in Multithreaded mode. * @return Number of FolderIndexWorker in Multithreaded mode.
*/ */
......
...@@ -129,8 +129,7 @@ ...@@ -129,8 +129,7 @@
<!-- Files bigger than maxFileSize (and on fileSizeSuffixes-list) will be excluded from index --> <!-- Files bigger than maxFileSize (and on fileSizeSuffixes-list) will be excluded from index -->
maxFileSize=10485760 maxFileSize=10485760
<!-- Control indexer prozess --> <!-- Control indexer prozess -->
numberIndexWriter=0
folderPoolSize=4 folderPoolSize=4
<!-- Define automatic restart time window e.g. 01:00-02:59 restartWindowStart=1 restartWindowEnd=3 --> <!-- Define automatic restart time window e.g. 01:00-02:59 restartWindowStart=1 restartWindowEnd=3 -->
restartWindowStart=${restart.window.start} restartWindowStart=${restart.window.start}
......
/**
* OLAT - Online Learning and Training<br>
* http://www.olat.org
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); <br>
* you may not use this file except in compliance with the License.<br>
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,<br>
* software distributed under the License is distributed on an "AS IS" BASIS, <br>
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. <br>
* See the License for the specific language governing permissions and <br>
* limitations under the License.
* <p>
* Copyright (c) since 2004 at Multimedia- & E-Learning Services (MELS),<br>
* University of Zurich, Switzerland.
* <hr>
* <a href="http://www.openolat.org">
* OpenOLAT - Online Learning and Training</a><br>
* This file has been modified by the OpenOLAT community. Changes are licensed
* under the Apache 2.0 license as the original file.
*/
package org.olat.search.service.indexer;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.olat.core.logging.OLog;
import org.olat.core.logging.Tracing;
import org.olat.search.SearchService;
/**
* Used in multi-threaded mode.
* Reads lucene documents from document-queue in main-indexer and add this documents to
* index-writer. The index-writer works with his own 'part-[ID]' index-directory.
* Initial Date: 10.10.2006 <br>
* @author guretzki
*/
public class IndexWriterWorker implements Runnable {
private static OLog log = Tracing.createLoggerFor(IndexWriterWorker.class);
private Thread indexerWriterThread;
private int id;
private OlatFullIndexer fullIndexer;
private IndexWriter indexWriter;
private boolean running = true;
private boolean finish = false;
private boolean closed = false;
/**
*
* @param id Unique index ID. Is used to generate unique directory name.
* @param tempIndexPath Absolute directory-path where the temporary index can be generated.
* @param fullIndexer Reference to full-index
*/
public IndexWriterWorker(int id, File tempIndexDir, OlatFullIndexer fullIndexer) {
this.id = id;
this.fullIndexer = fullIndexer;
try {
File indexPartFile = new File(tempIndexDir, "part" + id);
Directory indexPartDirectory = FSDirectory.open(indexPartFile);
Analyzer analyzer = new StandardAnalyzer(SearchService.OO_LUCENE_VERSION);
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(SearchService.OO_LUCENE_VERSION, analyzer);
indexWriter = new IndexWriter(indexPartDirectory, indexWriterConfig);
indexWriter.deleteAll();
} catch (IOException e) {
log.warn("Can not create IndexWriter");
}
}
/**
* Create and start a new index-writer thread.
*/
public void start() {
indexerWriterThread = new Thread(this, "indexWriter-" + id );
indexerWriterThread.setPriority(Thread.MIN_PRIORITY);
indexerWriterThread.setDaemon(true);
indexerWriterThread.start();
}
public boolean isClosed() {
return closed;
}
/**
* Set finish flag. The index-writer will be closed when the document-queue is empty.
*/
public void finishIndexing() {
finish = true;
}
/**
* Check if document-queue of main-indexer has elements.
* Get document from queue and add it to index-writer.
* @see java.lang.Runnable#run()
*/
public void run() {
List<Document> documentQueue = fullIndexer.getDocumentQueue();
while(running) {
try {
while (!finish && documentQueue.isEmpty()) {
// nothing to do => sleep
Thread.sleep(200);
}
Document document = null;
synchronized (documentQueue) { //o_clusterOK by:fj, when only one indexwriter runs in the whole cluster
if (!documentQueue.isEmpty()) {
document = documentQueue.remove(0);
}
}
if (document != null) {
indexWriter.addDocument(document);
if (log.isDebug()) {
log.debug("documentQueue.remove size=" + documentQueue.size());
log.debug("IndexWriter docCount=" + indexWriter.maxDoc());
}
}
} catch (InterruptedException ex) {
log.warn("InterruptedException in run");
} catch (Exception ex) {
log.warn("Exception in run",ex);
}
if (finish && documentQueue.isEmpty()) {
running = false;
}
if (fullIndexer.isInterupted() ) {
running = false;
}
}
try {
indexWriter.close();
indexWriter = null;
closed = true;
if (log.isDebug()) log.debug("IndexWriter " + id + " closed");
} catch (IOException e) {
log.warn("Can not close IndexWriter",e);
}
log.info("IndexWriter " + id + " end of run.");
}
/**
* @return Lucene Directory object of index-writer.
*/
public Directory getIndexDir() {
return indexWriter.getDirectory();
}
/**
* @return Return number of added documents.
*/
public int getDocCount() {
return indexWriter.maxDoc();
}
}
...@@ -34,9 +34,14 @@ import java.nio.file.SimpleFileVisitor; ...@@ -34,9 +34,14 @@ import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime; import java.nio.file.attribute.FileTime;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Vector; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.LucenePackage; import org.apache.lucene.LucenePackage;
...@@ -69,9 +74,6 @@ public class OlatFullIndexer { ...@@ -69,9 +74,6 @@ public class OlatFullIndexer {
private static final OLog log = Tracing.createLoggerFor(OlatFullIndexer.class); private static final OLog log = Tracing.createLoggerFor(OlatFullIndexer.class);
private static final int INDEX_MERGE_FACTOR = 1000; private static final int INDEX_MERGE_FACTOR = 1000;
private static final int MAX_SIZE_QUEUE = 500;
private int numberIndexWriter = 5;
private String indexPath; private String indexPath;
private String tempIndexPath; private String tempIndexPath;
...@@ -93,12 +95,7 @@ public class OlatFullIndexer { ...@@ -93,12 +95,7 @@ public class OlatFullIndexer {
/** Used to build number of indexed documents per minute. */ /** Used to build number of indexed documents per minute. */
private long lastMinute; private long lastMinute;
private int currentMinuteCounter; private int currentMinuteCounter;
/** Queue to pass documents from indexer to index-writers. Only in multi-threaded mode. */
private Vector<Document> documentQueue;
private IndexWriterWorker[] indexWriterWorkers;
/* Define number of documents which will be added befor sleeping (indexInterval for CPU load). */ /* Define number of documents which will be added befor sleeping (indexInterval for CPU load). */
int documentsPerInterval; int documentsPerInterval;
...@@ -111,6 +108,9 @@ public class OlatFullIndexer { ...@@ -111,6 +108,9 @@ public class OlatFullIndexer {
private MainIndexer mainIndexer; private MainIndexer mainIndexer;
private CoordinatorManager coordinatorManager; private CoordinatorManager coordinatorManager;
private static final Object indexerWriterBlock = new Object();
private ThreadPoolExecutor indexerWriterExecutor;
/** /**
* *
...@@ -127,12 +127,10 @@ public class OlatFullIndexer { ...@@ -127,12 +127,10 @@ public class OlatFullIndexer {
indexPath = searchModuleConfig.getFullIndexPath(); indexPath = searchModuleConfig.getFullIndexPath();
tempIndexPath = searchModuleConfig.getFullTempIndexPath(); tempIndexPath = searchModuleConfig.getFullTempIndexPath();
indexInterval = searchModuleConfig.getIndexInterval(); indexInterval = searchModuleConfig.getIndexInterval();
numberIndexWriter = searchModuleConfig.getNumberIndexWriter();
documentsPerInterval = searchModuleConfig.getDocumentsPerInterval(); documentsPerInterval = searchModuleConfig.getDocumentsPerInterval();
ramBufferSizeMB = searchModuleConfig.getRAMBufferSizeMB(); ramBufferSizeMB = searchModuleConfig.getRAMBufferSizeMB();
fullIndexerStatus = new FullIndexerStatus(numberIndexWriter); fullIndexerStatus = new FullIndexerStatus(1);
stopIndexing = true; stopIndexing = true;
documentQueue = new Vector<Document>();
initStatus(); initStatus();
resetDocumentCounters(); resetDocumentCounters();
} }
...@@ -212,24 +210,17 @@ public class OlatFullIndexer { ...@@ -212,24 +210,17 @@ public class OlatFullIndexer {
private void doIndex() throws InterruptedException{ private void doIndex() throws InterruptedException{
try { try {
WorkThreadInformations.setLongRunningTask("indexer"); WorkThreadInformations.setLongRunningTask("indexer");
if(indexerWriterExecutor == null) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(2);
indexerWriterExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue);
}
File tempIndexDir = new File(tempIndexPath); File tempIndexDir = new File(tempIndexPath);
Directory indexPath = FSDirectory.open(new File(tempIndexDir, "main")); Directory tmpIndexPath = FSDirectory.open(new File(tempIndexDir, "main"));
indexWriter = new IndexWriter(tmpIndexPath, newIndexWriterConfig());// analyzer, true, IndexWriter.MAX_TERM_LENGTH.UNLIMITED);
indexWriter = new IndexWriter(indexPath, newIndexWriterConfig());// analyzer, true, IndexWriter.MAX_TERM_LENGTH.UNLIMITED);
indexWriter.deleteAll(); indexWriter.deleteAll();
// Create IndexWriterWorker
log.info("Running with " + numberIndexWriter + " IndexerWriterWorker");
indexWriterWorkers = new IndexWriterWorker[numberIndexWriter];
Directory[] partIndexDirs = new Directory[numberIndexWriter];
for (int i = 0; i < numberIndexWriter; i++) {
IndexWriterWorker indexWriterWorker = new IndexWriterWorker(i, tempIndexDir, this);
indexWriterWorkers[i] = indexWriterWorker;
indexWriterWorkers[i].start();
partIndexDirs[i] = indexWriterWorkers[i].getIndexDir();
}
SearchResourceContext searchResourceContext = new SearchResourceContext(); SearchResourceContext searchResourceContext = new SearchResourceContext();
log.info("doIndex start. OlatFullIndexer with Debug output"); log.info("doIndex start. OlatFullIndexer with Debug output");
mainIndexer.doIndex(searchResourceContext, null /*no parent*/, this); mainIndexer.doIndex(searchResourceContext, null /*no parent*/, this);
...@@ -244,57 +235,32 @@ public class OlatFullIndexer { ...@@ -244,57 +235,32 @@ public class OlatFullIndexer {
Thread.sleep(10000); Thread.sleep(10000);
} }
if (waitingCount >= MAX_WAITING_COUNT) log.info("Finished with max waiting time!"); if (waitingCount >= MAX_WAITING_COUNT) log.info("Finished with max waiting time!");
log.info("Set Finish-flag for each indexWriterWorkers");
// Set Finish-flag
for (int i = 0; i < numberIndexWriter; i++) {
indexWriterWorkers[i].finishIndexing();
}
log.info("Wait until every indexworker is finished"); log.info("Wait until index writer executor is finished");
// check if every indexworker is finished max waiting-time 10Min (=waitingCount-limit = 60) int waitWriter = 0;
waitingCount = 0; while (indexerWriterExecutor.getActiveCount() > 0 && (waitWriter++ < (10 * MAX_WAITING_COUNT))) {
while (!areIndexingDone() && (waitingCount++ < MAX_WAITING_COUNT) ) { Thread.sleep(1000);
Thread.sleep(10000);
} }
if (waitingCount >= MAX_WAITING_COUNT) log.info("Finished with max waiting time!");
// Merge all partIndex log.info("Close index writer executor");
DBFactory.getInstance().commitAndCloseSession();
if(partIndexDirs.length > 0) {
log.info("Start merging part Indexes");
indexWriter.addIndexes(partIndexDirs);
log.info("Added all part Indexes");
}
fullIndexerStatus.setIndexSize(indexWriter.maxDoc()); fullIndexerStatus.setIndexSize(indexWriter.maxDoc());
indexWriter.close(); //shutdown the index writer thread
indexWriter = null; indexerWriterExecutor.submit(new CloseIndexCallable());
indexWriterWorkers = null; indexerWriterExecutor.shutdown();
indexerWriterExecutor.awaitTermination(1, TimeUnit.MINUTES);
} catch (IOException e) { } catch (IOException e) {
log.warn("Can not create IndexWriter, indexname=" + tempIndexPath, e); log.warn("Can not create IndexWriter, indexname=" + tempIndexPath, e);
} finally { } finally {
WorkThreadInformations.unsetLongRunningTask("indexer"); WorkThreadInformations.unsetLongRunningTask("indexer");
DBFactory.getInstance().commitAndCloseSession(); DBFactory.getInstance().commitAndCloseSession();
log.debug("doIndex: commit & close session"); log.debug("doIndex: commit & close session");
}
}
/**
* Ensure that all indexworker is finished
* @return
*/
private boolean areIndexingDone() {
if(indexWriterWorkers != null && indexWriterWorkers.length > 0) {
if(!documentQueue.isEmpty()) {
return false;
}
for(IndexWriterWorker worker:indexWriterWorkers) { if(indexerWriterExecutor != null) {
if(!worker.isClosed()) { indexerWriterExecutor.shutdownNow();
return false; indexerWriterExecutor = null;
}
} }
} }
return documentQueue.isEmpty();
} }
/** /**
...@@ -303,9 +269,6 @@ public class OlatFullIndexer { ...@@ -303,9 +269,6 @@ public class OlatFullIndexer {
*/ */
public void run() { public void run() {
try { try {
//TODO: Workround : does not start immediately
Thread.sleep(10000);
log.info("full indexing starts... Lucene-version:" + LucenePackage.get().getImplementationVersion()); log.info("full indexing starts... Lucene-version:" + LucenePackage.get().getImplementationVersion());
fullIndexerStatus.indexingStarted(); fullIndexerStatus.indexingStarted();
doIndex(); doIndex();
...@@ -348,55 +311,31 @@ public class OlatFullIndexer { ...@@ -348,55 +311,31 @@ public class OlatFullIndexer {
} }
} }
/** /**
* Callback to addDocument to indexWriter. * Add a document to the index writer. The document is indexed by a single threaded executor,
* Lucene want that write operations happen within a single thread. The access is synchronized
* to block concurrent access to the executor. It blocks the text extractors and allow a
* ridiculously small queue but memory efficient.
*
* @param document * @param document
* @throws IOException * @throws IOException
*/ */
public void addDocument(Document document) throws IOException,InterruptedException { public void addDocument(Document document) throws IOException,InterruptedException {
if (numberIndexWriter == 0 ) { DBFactory.getInstance().commitAndCloseSession();
indexWriter.addDocument(document);
fullIndexerStatus.incrementDocumentCount(); if (!stopIndexing && indexerWriterExecutor != null && !indexerWriterExecutor.isShutdown()) {
if (indexInterval != 0 && sleepDocumentCounter++ >= documentsPerInterval) { synchronized(indexerWriterBlock) {//once at a time please, wait, you have enough time
sleepDocumentCounter = 0; Future<Boolean> future = indexerWriterExecutor.submit(new AddDocumentCallable(document));
Thread.sleep(indexInterval); try {
} else { future.get();
// do not sleep, check for stopping indexing } catch (ExecutionException e) {
if (stopIndexing) { log.error("", e);
throw new InterruptedException("Do stop indexing at element=" + indexWriter.maxDoc());
}
}
countIndexPerMinute();
} else {
// clusterOK by:cg synchronizes only access of index-writer, indexer runs only on one cluster-node
synchronized (documentQueue) {
while (documentQueue.size() > MAX_SIZE_QUEUE) {
log.warn("Document queue over " + MAX_SIZE_QUEUE);
Thread.sleep(60000);
}
documentQueue.add(document);
fullIndexerStatus.incrementDocumentCount();
fullIndexerStatus.setDocumentQueueSize(documentQueue.size());
countIndexPerMinute();
if (log.isDebug()) log.debug("documentQueue.add size=" + documentQueue.size());
// check for stopping indexing
if (stopIndexing) {
throw new InterruptedException("Do stop indexing at element=" + indexWriter.maxDoc());
} }
} }
} }
incrementDocumentTypeCounter(document); incrementDocumentTypeCounter(document);
incrementFileTypeCounter(document); incrementFileTypeCounter(document);
// TODO:cg/07.10.2010 try to fix Indexer ERROR 'Overdue resource check-out stack trace.' on OLATNG
// close and commit after each document
// if (fullIndexerStatus.getDocumentCount() % 20 == 0) {
// Do commit after certain number of documents because the transaction should not be too big
DBFactory.getInstance().commitAndCloseSession();
log.debug("DB: intermediateCommit");
// }
} }
private void incrementFileTypeCounter(Document document) { private void incrementFileTypeCounter(Document document) {
...@@ -443,15 +382,9 @@ public class OlatFullIndexer { ...@@ -443,15 +382,9 @@ public class OlatFullIndexer {
* @return Return current full-indexer status. * @return Return current full-indexer status.
*/ */
public FullIndexerStatus getStatus() { public FullIndexerStatus getStatus() {
if (indexWriterWorkers != null) {
// IndexWorker exist => set current document-counter
for (int i = 0; i < numberIndexWriter; i++) {
fullIndexerStatus.setPartDocumentCount(indexWriterWorkers[i].getDocCount(),i);
}
}
fullIndexerStatus.setDocumentCounters(documentCounters); fullIndexerStatus.setDocumentCounters(documentCounters);
fullIndexerStatus.setFileTypeCounters(fileTypeCounters); fullIndexerStatus.setFileTypeCounters(fileTypeCounters);
fullIndexerStatus.setDocumentQueueSize(documentQueue.size()); fullIndexerStatus.setDocumentQueueSize(0);
return fullIndexerStatus; return fullIndexerStatus;
} }
...@@ -465,13 +398,6 @@ public class OlatFullIndexer { ...@@ -465,13 +398,6 @@ public class OlatFullIndexer {
public void setIndexInterval(long indexInterval) { public void setIndexInterval(long indexInterval) {
this.indexInterval = indexInterval; this.indexInterval = indexInterval;
} }
/**
* @return Return document-queue which is used in multi-threaded mode.
*/
public List<Document> getDocumentQueue() {
return documentQueue;
}
/** /**
* Check if the indexing process is interrupted. * Check if the indexing process is interrupted.
...@@ -485,4 +411,37 @@ public class OlatFullIndexer { ...@@ -485,4 +411,37 @@ public class OlatFullIndexer {
documentCounters = new Hashtable<String,Integer>(); documentCounters = new Hashtable<String,Integer>();
fileTypeCounters = new Hashtable<String,Integer>(); fileTypeCounters = new Hashtable<String,Integer>();
} }
private class CloseIndexCallable implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
indexWriter.commit();
indexWriter.close();
indexWriter = null;
return Boolean.TRUE;
}
}
private class AddDocumentCallable implements Callable<Boolean> {
private final Document document;
public AddDocumentCallable(Document document) {
this.document = document;
}
@Override
public Boolean call() throws Exception {
indexWriter.addDocument(document);
fullIndexerStatus.incrementDocumentCount();
if (indexInterval != 0 && sleepDocumentCounter++ >= documentsPerInterval) {
sleepDocumentCounter = 0;
Thread.sleep(indexInterval);
} else if (stopIndexing) {
throw new InterruptedException("Do stop indexing at element=" + indexWriter.maxDoc());
}
countIndexPerMinute();
return Boolean.TRUE;
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment