diff --git a/src/main/java/org/olat/commons/coordinate/cluster/jms/ClusterEventBus.java b/src/main/java/org/olat/commons/coordinate/cluster/jms/ClusterEventBus.java index 80026c1087a2804f10189501c9abedde6bd73681..916cea01bd8b7c42575fa4138140fd505fa3c558 100644 --- a/src/main/java/org/olat/commons/coordinate/cluster/jms/ClusterEventBus.java +++ b/src/main/java/org/olat/commons/coordinate/cluster/jms/ClusterEventBus.java @@ -33,6 +33,7 @@ import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -70,8 +71,8 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener private ClusterConfig clusterConfig; // settings - private long sendInterval = 1000; // 1000 miliseconds between each "ping/alive/info" message, can be set using spring - private long jmsMsgDelayLimit = 5000; // max duration of ClusterInfoEvent send-receive time in ms + private long sendInterval = 5000; // 1000 miliseconds between each "ping/alive/info" message, can be set using spring + private long jmsMsgDelayLimit = 10000; // max duration of ClusterInfoEvent send-receive time in ms // counters private long latestSentMsgId = -1; @@ -228,7 +229,7 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener try { ObjectMessage message = sessionProducer.createObjectMessage(); message.setObject(new JMSWrapper(nodeId, msgId, ores, event)); - producer.send(message); + producer.send(message, DeliveryMode.NON_PERSISTENT, 3, 5000); } catch (Exception e) { log.error("Cannot send JMS message", e); // cluster:::: what shall we do here: the JMS bus is broken! and we thus cannot know if other nodes are alive. diff --git a/src/main/java/org/olat/search/service/indexer/JmsIndexer.java b/src/main/java/org/olat/search/service/indexer/JmsIndexer.java index 3902f8c372ad55a7923b30bbe286cdaba3147980..e726342123f2f0fa15a7af0b2a0188a5dffbc5d4 100644 --- a/src/main/java/org/olat/search/service/indexer/JmsIndexer.java +++ b/src/main/java/org/olat/search/service/indexer/JmsIndexer.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -37,6 +38,7 @@ import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; +import org.apache.logging.log4j.Logger; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; @@ -54,7 +56,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.olat.core.commons.persistence.DBFactory; import org.olat.core.configuration.ConfigOnOff; -import org.apache.logging.log4j.Logger; import org.olat.core.logging.Tracing; import org.olat.core.util.coordinate.CoordinatorManager; import org.olat.search.SearchModule; @@ -166,7 +167,7 @@ public class JmsIndexer implements MessageListener, LifeFullIndexer, ConfigOnOff public void initQueue() throws JMSException { connection = (QueueConnection)connectionFactory.createConnection(); connection.start(); - log.info("springInit: JMS connection started with connectionFactory=" + connectionFactory); + log.info("springInit: JMS connection started with connectionFactory={}", connectionFactory); if(indexingNode) { //listen to the queue only if indexing node @@ -262,34 +263,24 @@ public class JmsIndexer implements MessageListener, LifeFullIndexer, ConfigOnOff @Override public void indexDocument(String type, Long key) { - QueueSender sender; - QueueSession session; - try { - JmsIndexWork workUnit = new JmsIndexWork(JmsIndexWork.INDEX, type, key); - session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE ); - ObjectMessage message = session.createObjectMessage(); - message.setObject(workUnit); - - sender = session.createSender(getJmsQueue()); - sender.send( message ); - session.close(); - } catch (JMSException e) { - log.error("", e ); - } + sendMessage(new JmsIndexWork(JmsIndexWork.INDEX, type, key)); } @Override public void indexDocument(String type, List<Long> keyList) { + sendMessage(new JmsIndexWork(JmsIndexWork.INDEX, type, keyList)); + } + + private void sendMessage(JmsIndexWork workUnit) { QueueSender sender; QueueSession session; try { - JmsIndexWork workUnit = new JmsIndexWork(JmsIndexWork.INDEX, type, keyList); - session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE ); + session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); ObjectMessage message = session.createObjectMessage(); message.setObject(workUnit); - + sender = session.createSender(getJmsQueue()); - sender.send( message ); + sender.send(message, DeliveryMode.NON_PERSISTENT, 3, 120000); session.close(); } catch (JMSException e) { log.error("", e );