From f9c8b1a8c6407ecd9d73a5f4b398c98fa8c9ce4e Mon Sep 17 00:00:00 2001 From: srosse <none@none> Date: Wed, 22 Apr 2015 16:33:20 +0200 Subject: [PATCH] OO-1522: force writing the index in a single threaded executor which access is blocked to prevent a queue with a lot of big documents --- .../java/org/olat/search/SearchModule.java | 11 - .../org/olat/search/_spring/searchContext.xml | 3 +- .../service/indexer/IndexWriterWorker.java | 167 --------------- .../service/indexer/OlatFullIndexer.java | 201 +++++++----------- 4 files changed, 81 insertions(+), 301 deletions(-) delete mode 100644 src/main/java/org/olat/search/service/indexer/IndexWriterWorker.java diff --git a/src/main/java/org/olat/search/SearchModule.java b/src/main/java/org/olat/search/SearchModule.java index fdc789f96e5..41ffc0bcbb8 100644 --- a/src/main/java/org/olat/search/SearchModule.java +++ b/src/main/java/org/olat/search/SearchModule.java @@ -59,7 +59,6 @@ public class SearchModule extends AbstractOLATModule { private static final String CONF_INDEX_INTERVAL = "indexInterval"; private static final String CONF_MAX_HITS = "maxHits"; private static final String CONF_MAX_RESULTS = "maxResults"; - private static final String CONF_NUMBER_INDEX_WRITER = "numberIndexWriter"; private static final String CONF_FOLDER_POOL_SIZE = "folderPoolSize"; private static final String CONF_RESTART_WINDOW_START = "restartWindowStart"; private static final String CONF_RESTART_WINDOW_END = "restartWindowEnd"; @@ -83,7 +82,6 @@ public class SearchModule extends AbstractOLATModule { private static final int DEFAULT_INDEX_INTERVAL = 0; private static final int DEFAULT_MAX_HITS = 1000; private static final int DEFAULT_MAX_RESULTS = 100; - private static final int DEFAULT_NUMBER_INDEX_WRITER = 0; private static final int DEFAULT_FOLDER_POOL_SIZE = 0; private static final int DEFAULT_RESTART_WINDOW_START = 0; private static final int DEFAULT_RESTART_WINDOW_END = 24; @@ -104,7 +102,6 @@ public class SearchModule extends AbstractOLATModule { private List<String> fileBlackList; private List<String> customFileBlackList; - private int numberIndexWriter; private int folderPoolSize; private int restartWindowStart; private int restartWindowEnd; @@ -185,7 +182,6 @@ public class SearchModule extends AbstractOLATModule { indexInterval = getIntConfigParameter(CONF_INDEX_INTERVAL, DEFAULT_INDEX_INTERVAL); maxHits = getIntConfigParameter(CONF_MAX_HITS, DEFAULT_MAX_HITS); maxResults = getIntConfigParameter(CONF_MAX_RESULTS, DEFAULT_MAX_RESULTS); - numberIndexWriter = getIntConfigParameter(CONF_NUMBER_INDEX_WRITER, DEFAULT_NUMBER_INDEX_WRITER); folderPoolSize = getIntConfigParameter(CONF_FOLDER_POOL_SIZE, DEFAULT_FOLDER_POOL_SIZE); restartWindowStart = getIntConfigParameter(CONF_RESTART_WINDOW_START, DEFAULT_RESTART_WINDOW_START); restartWindowEnd = getIntConfigParameter(CONF_RESTART_WINDOW_END, DEFAULT_RESTART_WINDOW_END); @@ -344,13 +340,6 @@ public class SearchModule extends AbstractOLATModule { return customFileBlackList; } - /** - * @return Number of IndexWriterWorker in Multithreaded mode. - */ - public int getNumberIndexWriter() { - return numberIndexWriter; - } - /** * @return Number of FolderIndexWorker in Multithreaded mode. */ diff --git a/src/main/java/org/olat/search/_spring/searchContext.xml b/src/main/java/org/olat/search/_spring/searchContext.xml index c0bb134a347..d847b8cece5 100644 --- a/src/main/java/org/olat/search/_spring/searchContext.xml +++ b/src/main/java/org/olat/search/_spring/searchContext.xml @@ -129,8 +129,7 @@ <!-- Files bigger than maxFileSize (and on fileSizeSuffixes-list) will be excluded from index --> maxFileSize=10485760 - <!-- Control indexer prozess --> - numberIndexWriter=0 + <!-- Control indexer prozess --> folderPoolSize=4 <!-- Define automatic restart time window e.g. 01:00-02:59 restartWindowStart=1 restartWindowEnd=3 --> restartWindowStart=${restart.window.start} diff --git a/src/main/java/org/olat/search/service/indexer/IndexWriterWorker.java b/src/main/java/org/olat/search/service/indexer/IndexWriterWorker.java deleted file mode 100644 index 15eda703c0b..00000000000 --- a/src/main/java/org/olat/search/service/indexer/IndexWriterWorker.java +++ /dev/null @@ -1,167 +0,0 @@ -/** -* OLAT - Online Learning and Training<br> -* http://www.olat.org -* <p> -* Licensed under the Apache License, Version 2.0 (the "License"); <br> -* you may not use this file except in compliance with the License.<br> -* You may obtain a copy of the License at -* <p> -* http://www.apache.org/licenses/LICENSE-2.0 -* <p> -* Unless required by applicable law or agreed to in writing,<br> -* software distributed under the License is distributed on an "AS IS" BASIS, <br> -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. <br> -* See the License for the specific language governing permissions and <br> -* limitations under the License. -* <p> -* Copyright (c) since 2004 at Multimedia- & E-Learning Services (MELS),<br> -* University of Zurich, Switzerland. -* <hr> -* <a href="http://www.openolat.org"> -* OpenOLAT - Online Learning and Training</a><br> -* This file has been modified by the OpenOLAT community. Changes are licensed -* under the Apache 2.0 license as the original file. -*/ - -package org.olat.search.service.indexer; - -import java.io.File; -import java.io.IOException; -import java.util.List; - -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.standard.StandardAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.olat.core.logging.OLog; -import org.olat.core.logging.Tracing; -import org.olat.search.SearchService; - -/** - * Used in multi-threaded mode. - * Reads lucene documents from document-queue in main-indexer and add this documents to - * index-writer. The index-writer works with his own 'part-[ID]' index-directory. - * Initial Date: 10.10.2006 <br> - * @author guretzki - */ -public class IndexWriterWorker implements Runnable { - - private static OLog log = Tracing.createLoggerFor(IndexWriterWorker.class); - - private Thread indexerWriterThread; - private int id; - private OlatFullIndexer fullIndexer; - private IndexWriter indexWriter; - private boolean running = true; - private boolean finish = false; - private boolean closed = false; - - /** - * - * @param id Unique index ID. Is used to generate unique directory name. - * @param tempIndexPath Absolute directory-path where the temporary index can be generated. - * @param fullIndexer Reference to full-index - */ - public IndexWriterWorker(int id, File tempIndexDir, OlatFullIndexer fullIndexer) { - this.id = id; - this.fullIndexer = fullIndexer; - try { - File indexPartFile = new File(tempIndexDir, "part" + id); - Directory indexPartDirectory = FSDirectory.open(indexPartFile); - Analyzer analyzer = new StandardAnalyzer(SearchService.OO_LUCENE_VERSION); - IndexWriterConfig indexWriterConfig = new IndexWriterConfig(SearchService.OO_LUCENE_VERSION, analyzer); - indexWriter = new IndexWriter(indexPartDirectory, indexWriterConfig); - indexWriter.deleteAll(); - } catch (IOException e) { - log.warn("Can not create IndexWriter"); - } - } - - /** - * Create and start a new index-writer thread. - */ - public void start() { - indexerWriterThread = new Thread(this, "indexWriter-" + id ); - indexerWriterThread.setPriority(Thread.MIN_PRIORITY); - indexerWriterThread.setDaemon(true); - indexerWriterThread.start(); - } - - public boolean isClosed() { - return closed; - } - - /** - * Set finish flag. The index-writer will be closed when the document-queue is empty. - */ - public void finishIndexing() { - finish = true; - } - - /** - * Check if document-queue of main-indexer has elements. - * Get document from queue and add it to index-writer. - * @see java.lang.Runnable#run() - */ - public void run() { - List<Document> documentQueue = fullIndexer.getDocumentQueue(); - while(running) { - try { - while (!finish && documentQueue.isEmpty()) { - // nothing to do => sleep - Thread.sleep(200); - } - Document document = null; - synchronized (documentQueue) { //o_clusterOK by:fj, when only one indexwriter runs in the whole cluster - if (!documentQueue.isEmpty()) { - document = documentQueue.remove(0); - } - } - if (document != null) { - indexWriter.addDocument(document); - if (log.isDebug()) { - log.debug("documentQueue.remove size=" + documentQueue.size()); - log.debug("IndexWriter docCount=" + indexWriter.maxDoc()); - } - } - } catch (InterruptedException ex) { - log.warn("InterruptedException in run"); - } catch (Exception ex) { - log.warn("Exception in run",ex); - } - if (finish && documentQueue.isEmpty()) { - running = false; - } - if (fullIndexer.isInterupted() ) { - running = false; - } - } - try { - indexWriter.close(); - indexWriter = null; - closed = true; - if (log.isDebug()) log.debug("IndexWriter " + id + " closed"); - } catch (IOException e) { - log.warn("Can not close IndexWriter",e); - } - log.info("IndexWriter " + id + " end of run."); - } - - /** - * @return Lucene Directory object of index-writer. - */ - public Directory getIndexDir() { - return indexWriter.getDirectory(); - } - - /** - * @return Return number of added documents. - */ - public int getDocCount() { - return indexWriter.maxDoc(); - } - -} diff --git a/src/main/java/org/olat/search/service/indexer/OlatFullIndexer.java b/src/main/java/org/olat/search/service/indexer/OlatFullIndexer.java index 05b6e422d79..f0006595a8c 100644 --- a/src/main/java/org/olat/search/service/indexer/OlatFullIndexer.java +++ b/src/main/java/org/olat/search/service/indexer/OlatFullIndexer.java @@ -34,9 +34,14 @@ import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.FileTime; import java.util.Hashtable; -import java.util.List; import java.util.Map; -import java.util.Vector; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.LucenePackage; @@ -69,9 +74,6 @@ public class OlatFullIndexer { private static final OLog log = Tracing.createLoggerFor(OlatFullIndexer.class); private static final int INDEX_MERGE_FACTOR = 1000; - private static final int MAX_SIZE_QUEUE = 500; - private int numberIndexWriter = 5; - private String indexPath; private String tempIndexPath; @@ -93,12 +95,7 @@ public class OlatFullIndexer { /** Used to build number of indexed documents per minute. */ private long lastMinute; - private int currentMinuteCounter; - - /** Queue to pass documents from indexer to index-writers. Only in multi-threaded mode. */ - private Vector<Document> documentQueue; - private IndexWriterWorker[] indexWriterWorkers; /* Define number of documents which will be added befor sleeping (indexInterval for CPU load). */ int documentsPerInterval; @@ -111,6 +108,9 @@ public class OlatFullIndexer { private MainIndexer mainIndexer; private CoordinatorManager coordinatorManager; + private static final Object indexerWriterBlock = new Object(); + private ThreadPoolExecutor indexerWriterExecutor; + /** * @@ -127,12 +127,10 @@ public class OlatFullIndexer { indexPath = searchModuleConfig.getFullIndexPath(); tempIndexPath = searchModuleConfig.getFullTempIndexPath(); indexInterval = searchModuleConfig.getIndexInterval(); - numberIndexWriter = searchModuleConfig.getNumberIndexWriter(); documentsPerInterval = searchModuleConfig.getDocumentsPerInterval(); ramBufferSizeMB = searchModuleConfig.getRAMBufferSizeMB(); - fullIndexerStatus = new FullIndexerStatus(numberIndexWriter); + fullIndexerStatus = new FullIndexerStatus(1); stopIndexing = true; - documentQueue = new Vector<Document>(); initStatus(); resetDocumentCounters(); } @@ -212,24 +210,17 @@ public class OlatFullIndexer { private void doIndex() throws InterruptedException{ try { WorkThreadInformations.setLongRunningTask("indexer"); + + if(indexerWriterExecutor == null) { + BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(2); + indexerWriterExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue); + } File tempIndexDir = new File(tempIndexPath); - Directory indexPath = FSDirectory.open(new File(tempIndexDir, "main")); - - indexWriter = new IndexWriter(indexPath, newIndexWriterConfig());// analyzer, true, IndexWriter.MAX_TERM_LENGTH.UNLIMITED); + Directory tmpIndexPath = FSDirectory.open(new File(tempIndexDir, "main")); + indexWriter = new IndexWriter(tmpIndexPath, newIndexWriterConfig());// analyzer, true, IndexWriter.MAX_TERM_LENGTH.UNLIMITED); indexWriter.deleteAll(); - // Create IndexWriterWorker - log.info("Running with " + numberIndexWriter + " IndexerWriterWorker"); - indexWriterWorkers = new IndexWriterWorker[numberIndexWriter]; - Directory[] partIndexDirs = new Directory[numberIndexWriter]; - for (int i = 0; i < numberIndexWriter; i++) { - IndexWriterWorker indexWriterWorker = new IndexWriterWorker(i, tempIndexDir, this); - indexWriterWorkers[i] = indexWriterWorker; - indexWriterWorkers[i].start(); - partIndexDirs[i] = indexWriterWorkers[i].getIndexDir(); - } - SearchResourceContext searchResourceContext = new SearchResourceContext(); log.info("doIndex start. OlatFullIndexer with Debug output"); mainIndexer.doIndex(searchResourceContext, null /*no parent*/, this); @@ -244,57 +235,32 @@ public class OlatFullIndexer { Thread.sleep(10000); } if (waitingCount >= MAX_WAITING_COUNT) log.info("Finished with max waiting time!"); - log.info("Set Finish-flag for each indexWriterWorkers"); - // Set Finish-flag - for (int i = 0; i < numberIndexWriter; i++) { - indexWriterWorkers[i].finishIndexing(); - } + - log.info("Wait until every indexworker is finished"); - // check if every indexworker is finished max waiting-time 10Min (=waitingCount-limit = 60) - waitingCount = 0; - while (!areIndexingDone() && (waitingCount++ < MAX_WAITING_COUNT) ) { - Thread.sleep(10000); + log.info("Wait until index writer executor is finished"); + int waitWriter = 0; + while (indexerWriterExecutor.getActiveCount() > 0 && (waitWriter++ < (10 * MAX_WAITING_COUNT))) { + Thread.sleep(1000); } - if (waitingCount >= MAX_WAITING_COUNT) log.info("Finished with max waiting time!"); - // Merge all partIndex - DBFactory.getInstance().commitAndCloseSession(); - if(partIndexDirs.length > 0) { - log.info("Start merging part Indexes"); - indexWriter.addIndexes(partIndexDirs); - log.info("Added all part Indexes"); - } + log.info("Close index writer executor"); fullIndexerStatus.setIndexSize(indexWriter.maxDoc()); - indexWriter.close(); - indexWriter = null; - indexWriterWorkers = null; + //shutdown the index writer thread + indexerWriterExecutor.submit(new CloseIndexCallable()); + indexerWriterExecutor.shutdown(); + indexerWriterExecutor.awaitTermination(1, TimeUnit.MINUTES); } catch (IOException e) { log.warn("Can not create IndexWriter, indexname=" + tempIndexPath, e); } finally { WorkThreadInformations.unsetLongRunningTask("indexer"); DBFactory.getInstance().commitAndCloseSession(); log.debug("doIndex: commit & close session"); - } - } - - /** - * Ensure that all indexworker is finished - * @return - */ - private boolean areIndexingDone() { - if(indexWriterWorkers != null && indexWriterWorkers.length > 0) { - if(!documentQueue.isEmpty()) { - return false; - } - for(IndexWriterWorker worker:indexWriterWorkers) { - if(!worker.isClosed()) { - return false; - } + if(indexerWriterExecutor != null) { + indexerWriterExecutor.shutdownNow(); + indexerWriterExecutor = null; } } - return documentQueue.isEmpty(); } /** @@ -303,9 +269,6 @@ public class OlatFullIndexer { */ public void run() { try { - //TODO: Workround : does not start immediately - Thread.sleep(10000); - log.info("full indexing starts... Lucene-version:" + LucenePackage.get().getImplementationVersion()); fullIndexerStatus.indexingStarted(); doIndex(); @@ -348,55 +311,31 @@ public class OlatFullIndexer { } } - - - /** - * Callback to addDocument to indexWriter. + * Add a document to the index writer. The document is indexed by a single threaded executor, + * Lucene want that write operations happen within a single thread. The access is synchronized + * to block concurrent access to the executor. It blocks the text extractors and allow a + * ridiculously small queue but memory efficient. + * * @param document * @throws IOException */ public void addDocument(Document document) throws IOException,InterruptedException { - if (numberIndexWriter == 0 ) { - indexWriter.addDocument(document); - fullIndexerStatus.incrementDocumentCount(); - if (indexInterval != 0 && sleepDocumentCounter++ >= documentsPerInterval) { - sleepDocumentCounter = 0; - Thread.sleep(indexInterval); - } else { - // do not sleep, check for stopping indexing - if (stopIndexing) { - throw new InterruptedException("Do stop indexing at element=" + indexWriter.maxDoc()); - } - } - countIndexPerMinute(); - } else { - // clusterOK by:cg synchronizes only access of index-writer, indexer runs only on one cluster-node - synchronized (documentQueue) { - while (documentQueue.size() > MAX_SIZE_QUEUE) { - log.warn("Document queue over " + MAX_SIZE_QUEUE); - Thread.sleep(60000); - } - documentQueue.add(document); - fullIndexerStatus.incrementDocumentCount(); - fullIndexerStatus.setDocumentQueueSize(documentQueue.size()); - countIndexPerMinute(); - if (log.isDebug()) log.debug("documentQueue.add size=" + documentQueue.size()); - // check for stopping indexing - if (stopIndexing) { - throw new InterruptedException("Do stop indexing at element=" + indexWriter.maxDoc()); + DBFactory.getInstance().commitAndCloseSession(); + + if (!stopIndexing && indexerWriterExecutor != null && !indexerWriterExecutor.isShutdown()) { + synchronized(indexerWriterBlock) {//once at a time please, wait, you have enough time + Future<Boolean> future = indexerWriterExecutor.submit(new AddDocumentCallable(document)); + try { + future.get(); + } catch (ExecutionException e) { + log.error("", e); } } } + incrementDocumentTypeCounter(document); incrementFileTypeCounter(document); -// TODO:cg/07.10.2010 try to fix Indexer ERROR 'Overdue resource check-out stack trace.' on OLATNG -// close and commit after each document -// if (fullIndexerStatus.getDocumentCount() % 20 == 0) { - // Do commit after certain number of documents because the transaction should not be too big - DBFactory.getInstance().commitAndCloseSession(); - log.debug("DB: intermediateCommit"); -// } } private void incrementFileTypeCounter(Document document) { @@ -443,15 +382,9 @@ public class OlatFullIndexer { * @return Return current full-indexer status. */ public FullIndexerStatus getStatus() { - if (indexWriterWorkers != null) { - // IndexWorker exist => set current document-counter - for (int i = 0; i < numberIndexWriter; i++) { - fullIndexerStatus.setPartDocumentCount(indexWriterWorkers[i].getDocCount(),i); - } - } fullIndexerStatus.setDocumentCounters(documentCounters); fullIndexerStatus.setFileTypeCounters(fileTypeCounters); - fullIndexerStatus.setDocumentQueueSize(documentQueue.size()); + fullIndexerStatus.setDocumentQueueSize(0); return fullIndexerStatus; } @@ -465,13 +398,6 @@ public class OlatFullIndexer { public void setIndexInterval(long indexInterval) { this.indexInterval = indexInterval; } - - /** - * @return Return document-queue which is used in multi-threaded mode. - */ - public List<Document> getDocumentQueue() { - return documentQueue; - } /** * Check if the indexing process is interrupted. @@ -485,4 +411,37 @@ public class OlatFullIndexer { documentCounters = new Hashtable<String,Integer>(); fileTypeCounters = new Hashtable<String,Integer>(); } + + private class CloseIndexCallable implements Callable<Boolean> { + + @Override + public Boolean call() throws Exception { + indexWriter.commit(); + indexWriter.close(); + indexWriter = null; + return Boolean.TRUE; + } + } + + private class AddDocumentCallable implements Callable<Boolean> { + private final Document document; + + public AddDocumentCallable(Document document) { + this.document = document; + } + + @Override + public Boolean call() throws Exception { + indexWriter.addDocument(document); + fullIndexerStatus.incrementDocumentCount(); + if (indexInterval != 0 && sleepDocumentCounter++ >= documentsPerInterval) { + sleepDocumentCounter = 0; + Thread.sleep(indexInterval); + } else if (stopIndexing) { + throw new InterruptedException("Do stop indexing at element=" + indexWriter.maxDoc()); + } + countIndexPerMinute(); + return Boolean.TRUE; + } + } } -- GitLab