diff --git a/src/main/java/org/olat/commons/coordinate/cluster/jms/ClusterEventBus.java b/src/main/java/org/olat/commons/coordinate/cluster/jms/ClusterEventBus.java index 80026c1087a2804f10189501c9abedde6bd73681..6d956090fbda1a34de486e9392f9dd6a0a0ecb4d 100644 --- a/src/main/java/org/olat/commons/coordinate/cluster/jms/ClusterEventBus.java +++ b/src/main/java/org/olat/commons/coordinate/cluster/jms/ClusterEventBus.java @@ -33,6 +33,7 @@ import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -70,8 +71,8 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener private ClusterConfig clusterConfig; // settings - private long sendInterval = 1000; // 1000 miliseconds between each "ping/alive/info" message, can be set using spring - private long jmsMsgDelayLimit = 5000; // max duration of ClusterInfoEvent send-receive time in ms + private long sendInterval = 5000; // 1000 miliseconds between each "ping/alive/info" message, can be set using spring + private long jmsMsgDelayLimit = 10000; // max duration of ClusterInfoEvent send-receive time in ms // counters private long latestSentMsgId = -1; @@ -137,8 +138,8 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener while(isClusterInfoEventThreadRunning) { try { ClusterInfoEvent cie = new ClusterInfoEvent(clusterConfig, createBusListenerInfo()); - fireEventToListenersOf(cie, CLUSTER_CHANNEL); - if (log.isDebugEnabled()) log.debug("sent via jms clusterInfoEvent with timestamp:"+cie.getCreated()+" from node:"+nodeId); + fireEventToListenersOf(cie, CLUSTER_CHANNEL, false); + if (log.isDebugEnabled()) log.debug("sent via jms clusterInfoEvent with timestamp:{} from node: {}", cie.getCreated(),nodeId); } catch (Exception e) { // log error, but do not throw exception, but retry. try { @@ -217,6 +218,10 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener */ @Override public void fireEventToListenersOf(final MultiUserEvent event, final OLATResourceable ores) { + fireEventToListenersOf(event, ores, true); + } + + private void fireEventToListenersOf(final MultiUserEvent event, final OLATResourceable ores, boolean strict) { // send the event wrapped over jms to all nodes // (the receiver will detect whether messages are from itself and thus can be ignored, since they were already sent directly. final long msgId = ++latestSentMsgId; @@ -228,7 +233,11 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener try { ObjectMessage message = sessionProducer.createObjectMessage(); message.setObject(new JMSWrapper(nodeId, msgId, ores, event)); - producer.send(message); + if(strict) { + producer.send(message); + } else { + producer.send(message, DeliveryMode.NON_PERSISTENT, 3, 5000); + } } catch (Exception e) { log.error("Cannot send JMS message", e); // cluster:::: what shall we do here: the JMS bus is broken! and we thus cannot know if other nodes are alive. diff --git a/src/main/java/org/olat/repository/manager/RepositoryServiceImpl.java b/src/main/java/org/olat/repository/manager/RepositoryServiceImpl.java index 452c64d847325bc80bcb14443104c13e313f3305..ba61ec6971f13812773953a0fe7af27dd04688a6 100644 --- a/src/main/java/org/olat/repository/manager/RepositoryServiceImpl.java +++ b/src/main/java/org/olat/repository/manager/RepositoryServiceImpl.java @@ -308,6 +308,7 @@ public class RepositoryServiceImpl implements RepositoryService, OrganisationDat //copy the license licenseService.copy(sourceResource, copyResource); + dbInstance.commit(); //copy the image RepositoryManager.getInstance().copyImage(sourceEntry, copyEntry); @@ -322,7 +323,6 @@ public class RepositoryServiceImpl implements RepositoryService, OrganisationDat ThreadLocalUserActivityLogger.log(LearningResourceLoggingAction.LEARNING_RESOURCE_CREATE, getClass(), LoggingResourceable.wrap(copyEntry, OlatResourceableType.genRepoEntry)); - lifeIndexer.indexDocument(RepositoryEntryDocument.TYPE, copyEntry.getKey()); return copyEntry; } diff --git a/src/main/java/org/olat/repository/ui/author/CreateRepositoryEntryController.java b/src/main/java/org/olat/repository/ui/author/CreateRepositoryEntryController.java index 4cfed8b9d01c503da2818eca012f0eed30151a1c..8fda5b32eb19d2b3cfde6902699681a86dbba015 100644 --- a/src/main/java/org/olat/repository/ui/author/CreateRepositoryEntryController.java +++ b/src/main/java/org/olat/repository/ui/author/CreateRepositoryEntryController.java @@ -27,6 +27,7 @@ import org.olat.basesecurity.OrganisationModule; import org.olat.basesecurity.OrganisationRoles; import org.olat.basesecurity.OrganisationService; import org.olat.basesecurity.model.OrganisationRefImpl; +import org.olat.core.commons.persistence.DB; import org.olat.core.commons.services.license.LicenseModule; import org.olat.core.commons.services.license.LicenseService; import org.olat.core.commons.services.license.LicenseType; @@ -83,6 +84,8 @@ public class CreateRepositoryEntryController extends FormBasicController impleme private Object userObject; private LicenseType licenseType; + @Autowired + private DB dbInstance; @Autowired private RepositoryManager repositoryManager; @Autowired @@ -270,6 +273,7 @@ public class CreateRepositoryEntryController extends FormBasicController impleme } afterEntryCreated(); + dbInstance.commit(); repositoryManager.triggerIndexer(addedEntry); diff --git a/src/main/java/org/olat/search/service/indexer/JmsIndexer.java b/src/main/java/org/olat/search/service/indexer/JmsIndexer.java index 3902f8c372ad55a7923b30bbe286cdaba3147980..e726342123f2f0fa15a7af0b2a0188a5dffbc5d4 100644 --- a/src/main/java/org/olat/search/service/indexer/JmsIndexer.java +++ b/src/main/java/org/olat/search/service/indexer/JmsIndexer.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -37,6 +38,7 @@ import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; +import org.apache.logging.log4j.Logger; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; @@ -54,7 +56,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.olat.core.commons.persistence.DBFactory; import org.olat.core.configuration.ConfigOnOff; -import org.apache.logging.log4j.Logger; import org.olat.core.logging.Tracing; import org.olat.core.util.coordinate.CoordinatorManager; import org.olat.search.SearchModule; @@ -166,7 +167,7 @@ public class JmsIndexer implements MessageListener, LifeFullIndexer, ConfigOnOff public void initQueue() throws JMSException { connection = (QueueConnection)connectionFactory.createConnection(); connection.start(); - log.info("springInit: JMS connection started with connectionFactory=" + connectionFactory); + log.info("springInit: JMS connection started with connectionFactory={}", connectionFactory); if(indexingNode) { //listen to the queue only if indexing node @@ -262,34 +263,24 @@ public class JmsIndexer implements MessageListener, LifeFullIndexer, ConfigOnOff @Override public void indexDocument(String type, Long key) { - QueueSender sender; - QueueSession session; - try { - JmsIndexWork workUnit = new JmsIndexWork(JmsIndexWork.INDEX, type, key); - session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE ); - ObjectMessage message = session.createObjectMessage(); - message.setObject(workUnit); - - sender = session.createSender(getJmsQueue()); - sender.send( message ); - session.close(); - } catch (JMSException e) { - log.error("", e ); - } + sendMessage(new JmsIndexWork(JmsIndexWork.INDEX, type, key)); } @Override public void indexDocument(String type, List<Long> keyList) { + sendMessage(new JmsIndexWork(JmsIndexWork.INDEX, type, keyList)); + } + + private void sendMessage(JmsIndexWork workUnit) { QueueSender sender; QueueSession session; try { - JmsIndexWork workUnit = new JmsIndexWork(JmsIndexWork.INDEX, type, keyList); - session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE ); + session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); ObjectMessage message = session.createObjectMessage(); message.setObject(workUnit); - + sender = session.createSender(getJmsQueue()); - sender.send( message ); + sender.send(message, DeliveryMode.NON_PERSISTENT, 3, 120000); session.close(); } catch (JMSException e) { log.error("", e );