Skip to content
Snippets Groups Projects
Commit fe2bf47e authored by srosse's avatar srosse
Browse files

Merge remote-tracking branch 'origin/OpenOLAT_15.2'

parents fe587cb5 e3d074c2
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,7 @@ import java.util.concurrent.Executors;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
......@@ -70,8 +71,8 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener
private ClusterConfig clusterConfig;
// settings
private long sendInterval = 1000; // 1000 miliseconds between each "ping/alive/info" message, can be set using spring
private long jmsMsgDelayLimit = 5000; // max duration of ClusterInfoEvent send-receive time in ms
private long sendInterval = 5000; // 1000 miliseconds between each "ping/alive/info" message, can be set using spring
private long jmsMsgDelayLimit = 10000; // max duration of ClusterInfoEvent send-receive time in ms
// counters
private long latestSentMsgId = -1;
......@@ -137,8 +138,8 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener
while(isClusterInfoEventThreadRunning) {
try {
ClusterInfoEvent cie = new ClusterInfoEvent(clusterConfig, createBusListenerInfo());
fireEventToListenersOf(cie, CLUSTER_CHANNEL);
if (log.isDebugEnabled()) log.debug("sent via jms clusterInfoEvent with timestamp:"+cie.getCreated()+" from node:"+nodeId);
fireEventToListenersOf(cie, CLUSTER_CHANNEL, false);
if (log.isDebugEnabled()) log.debug("sent via jms clusterInfoEvent with timestamp:{} from node: {}", cie.getCreated(),nodeId);
} catch (Exception e) {
// log error, but do not throw exception, but retry.
try {
......@@ -217,6 +218,10 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener
*/
@Override
public void fireEventToListenersOf(final MultiUserEvent event, final OLATResourceable ores) {
fireEventToListenersOf(event, ores, true);
}
private void fireEventToListenersOf(final MultiUserEvent event, final OLATResourceable ores, boolean strict) {
// send the event wrapped over jms to all nodes
// (the receiver will detect whether messages are from itself and thus can be ignored, since they were already sent directly.
final long msgId = ++latestSentMsgId;
......@@ -228,7 +233,11 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener
try {
ObjectMessage message = sessionProducer.createObjectMessage();
message.setObject(new JMSWrapper(nodeId, msgId, ores, event));
producer.send(message);
if(strict) {
producer.send(message);
} else {
producer.send(message, DeliveryMode.NON_PERSISTENT, 3, 5000);
}
} catch (Exception e) {
log.error("Cannot send JMS message", e);
// cluster:::: what shall we do here: the JMS bus is broken! and we thus cannot know if other nodes are alive.
......
......@@ -308,6 +308,7 @@ public class RepositoryServiceImpl implements RepositoryService, OrganisationDat
//copy the license
licenseService.copy(sourceResource, copyResource);
dbInstance.commit();
//copy the image
RepositoryManager.getInstance().copyImage(sourceEntry, copyEntry);
......@@ -322,7 +323,6 @@ public class RepositoryServiceImpl implements RepositoryService, OrganisationDat
ThreadLocalUserActivityLogger.log(LearningResourceLoggingAction.LEARNING_RESOURCE_CREATE, getClass(),
LoggingResourceable.wrap(copyEntry, OlatResourceableType.genRepoEntry));
lifeIndexer.indexDocument(RepositoryEntryDocument.TYPE, copyEntry.getKey());
return copyEntry;
}
......
......@@ -27,6 +27,7 @@ import org.olat.basesecurity.OrganisationModule;
import org.olat.basesecurity.OrganisationRoles;
import org.olat.basesecurity.OrganisationService;
import org.olat.basesecurity.model.OrganisationRefImpl;
import org.olat.core.commons.persistence.DB;
import org.olat.core.commons.services.license.LicenseModule;
import org.olat.core.commons.services.license.LicenseService;
import org.olat.core.commons.services.license.LicenseType;
......@@ -83,6 +84,8 @@ public class CreateRepositoryEntryController extends FormBasicController impleme
private Object userObject;
private LicenseType licenseType;
@Autowired
private DB dbInstance;
@Autowired
private RepositoryManager repositoryManager;
@Autowired
......@@ -270,6 +273,7 @@ public class CreateRepositoryEntryController extends FormBasicController impleme
}
afterEntryCreated();
dbInstance.commit();
repositoryManager.triggerIndexer(addedEntry);
......
......@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.List;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
......@@ -37,6 +38,7 @@ import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
......@@ -54,7 +56,6 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.olat.core.commons.persistence.DBFactory;
import org.olat.core.configuration.ConfigOnOff;
import org.apache.logging.log4j.Logger;
import org.olat.core.logging.Tracing;
import org.olat.core.util.coordinate.CoordinatorManager;
import org.olat.search.SearchModule;
......@@ -166,7 +167,7 @@ public class JmsIndexer implements MessageListener, LifeFullIndexer, ConfigOnOff
public void initQueue() throws JMSException {
connection = (QueueConnection)connectionFactory.createConnection();
connection.start();
log.info("springInit: JMS connection started with connectionFactory=" + connectionFactory);
log.info("springInit: JMS connection started with connectionFactory={}", connectionFactory);
if(indexingNode) {
//listen to the queue only if indexing node
......@@ -262,34 +263,24 @@ public class JmsIndexer implements MessageListener, LifeFullIndexer, ConfigOnOff
@Override
public void indexDocument(String type, Long key) {
QueueSender sender;
QueueSession session;
try {
JmsIndexWork workUnit = new JmsIndexWork(JmsIndexWork.INDEX, type, key);
session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE );
ObjectMessage message = session.createObjectMessage();
message.setObject(workUnit);
sender = session.createSender(getJmsQueue());
sender.send( message );
session.close();
} catch (JMSException e) {
log.error("", e );
}
sendMessage(new JmsIndexWork(JmsIndexWork.INDEX, type, key));
}
@Override
public void indexDocument(String type, List<Long> keyList) {
sendMessage(new JmsIndexWork(JmsIndexWork.INDEX, type, keyList));
}
private void sendMessage(JmsIndexWork workUnit) {
QueueSender sender;
QueueSession session;
try {
JmsIndexWork workUnit = new JmsIndexWork(JmsIndexWork.INDEX, type, keyList);
session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE );
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
ObjectMessage message = session.createObjectMessage();
message.setObject(workUnit);
sender = session.createSender(getJmsQueue());
sender.send( message );
sender.send(message, DeliveryMode.NON_PERSISTENT, 3, 120000);
session.close();
} catch (JMSException e) {
log.error("", e );
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment