Skip to content
Snippets Groups Projects
Commit dfd33a6a authored by srosse's avatar srosse
Browse files

OO-925: use a single thread executor to send the messages

parent 84647b03
No related branches found
No related tags found
No related merge requests found
......@@ -28,6 +28,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
......@@ -92,8 +94,9 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener
private ConnectionFactory connectionFactory;
private Topic destination;
private Connection connection;
private Session session;
private Session sessionConsumer;
private MessageConsumer consumer;
private Session sessionProducer;
private MessageProducer producer;
private long lastOnMessageFinishTime_ = -1;
......@@ -105,6 +108,8 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener
//final LinkedList<Object> incomingMessagesQueue_ = new LinkedList<Object>();
//private final static int LIMIT_ON_INCOMING_MESSAGE_QUEUE = 200;
private ExecutorService jmsExecutor;
/**
* [used by spring]
*
......@@ -115,11 +120,14 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener
}
public void springInit() throws JMSException {
jmsExecutor = Executors.newSingleThreadExecutor();
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(destination);
sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = sessionConsumer.createConsumer(destination);
consumer.setMessageListener(this);
producer = session.createProducer(destination);
sessionProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = sessionProducer.createProducer(destination);
connection.start();
log.info("ClusterEventBus JMS started");
......@@ -158,50 +166,6 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener
t.start();
// register to listen for other nodes' clusterinfoevents
registerFor(this, null, CLUSTER_CHANNEL);
/*
Thread serveThread = new Thread(new Runnable() {
public void run() {
ThreadLocalUserActivityLoggerInstaller.initEmptyUserActivityLogger();
while(true) {
try{
Message m = null;
long time = -1;
synchronized(incomingMessagesQueue_) {
while(incomingMessagesQueue_.size()<2) {
try {
incomingMessagesQueue_.wait();
} catch (InterruptedException e) {
// ignore
}
}
m = (Message) incomingMessagesQueue_.removeLast();
time = (Long) incomingMessagesQueue_.removeLast();
incomingMessagesQueue_.notifyAll();
}
serveMessage(m, time);
} catch(RuntimeException re) {
log.error("RuntimeException enountered by serve-thread:", re);
// continue
} catch(Error er) {
log.error("Error enountered by serve-thread:", er);
// continue
} finally {
try {
DBFactory.getInstance().commitAndCloseSession();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
});
serveThread.setDaemon(true);
serveThread.start();
*/
}
public SimpleProbe getMrtgProbeJMSDeliveryTime() {
......@@ -252,39 +216,36 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener
* @see org.olat.core.util.event.AbstractOLATSystemBus#fireEventToListenersOf(org.olat.core.util.event.MultiUserEvent,
* org.olat.core.id.OLATResourceable)
*/
public void fireEventToListenersOf(MultiUserEvent event, OLATResourceable ores) {
// 1. fire directly within vm, because it used to be so before, and in this way this olat node can run even if jms is down
//TODO jms doFire(event, ores);
// 2. send the event wrapped over jms to all nodes
public void fireEventToListenersOf(final MultiUserEvent event, final OLATResourceable ores) {
// send the event wrapped over jms to all nodes
// (the receiver will detect whether messages are from itself and thus can be ignored, since they were already sent directly.
long msgId;
Integer nodeId;
final long msgId = ++latestSentMsgId;
final Integer nodeId = clusterConfig.getNodeId();
nodeId = clusterConfig.getNodeId();
try {
//TODO jms synchronized (this) { //cluster_ok needed, not atomar read in one vm
msgId = ++latestSentMsgId;
ObjectMessage message = session.createObjectMessage();
message.setObject(new JMSWrapper(nodeId, msgId, ores, event));
producer.send(message);
//TODO jms }
} catch (Exception e) {
log.error("Cannot send JMS message", e);
// cluster:::: what shall we do here: the JMS bus is broken! and we thus cannot know if other nodes are alive.
// if we are the only node running, then we could continue.
// a) either throw an exception - meaning olat doesn't really run at all and produces redscreens all the time and logging in is not possible.
// b) or warn in the log/jmx - but surveillance is critical here!!
// -> do the more fail-fast option a) at the moment for correctness reasons.
System.err.println("###############################################################################################");
System.err.println("### ClusterEventBus: communication error with JMS - cannot send messages!!!" + e);
System.err.println("###############################################################################################");
throw new OLATRuntimeException("communication error with JMS - cannot send messages!!!", e);
}
numOfSentMessages++;
jmsExecutor.execute(new Runnable() {
public void run() {
try {
ObjectMessage message = sessionProducer.createObjectMessage();
message.setObject(new JMSWrapper(nodeId, msgId, ores, event));
producer.send(message);
} catch (Exception e) {
log.error("Cannot send JMS message", e);
// cluster:::: what shall we do here: the JMS bus is broken! and we thus cannot know if other nodes are alive.
// if we are the only node running, then we could continue.
// a) either throw an exception - meaning olat doesn't really run at all and produces redscreens all the time and logging in is not possible.
// b) or warn in the log/jmx - but surveillance is critical here!!
// -> do the more fail-fast option a) at the moment for correctness reasons.
System.err.println("###############################################################################################");
System.err.println("### ClusterEventBus: communication error with JMS - cannot send messages!!!" + e);
System.err.println("###############################################################################################");
throw new OLATRuntimeException("communication error with JMS - cannot send messages!!!", e);
}
numOfSentMessages++;
}
});
// store it for later access by the admin controller
// store it for later access by the admin controller
String sentMsg = "sent msg: from node:" + nodeId + ", olat-id:" + msgId + ", ores:" + ores.getResourceableTypeName() + ":" + ores.getResourceableId()+", event:"+event;
addToSentScreen(sentMsg);
if (log.isDebug()) log.debug(sentMsg);
......@@ -451,40 +412,6 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener
public long getLatestSentMsgId() {
return latestSentMsgId;
}
/**
* [used by jmx]
* cluster:::: to be improved: this is just a quick solution to output all data from all nodes
* @return jmx-readable data of all statistics of all foreign cluster nodes
*/
/*public CompositeDataSupport getForeignClusterNodeStatistics() {
Map<String, String> p = new HashMap<String, String>();
for (Integer key : nodeInfos.keySet()) {
NodeInfo fns = nodeInfos.get(key);
Integer nodeId = fns.getNodeId();
p.put(nodeId+".getLatestReceivedMsgId", ""+fns.getLatestReceivedMsgId());
p.put(nodeId+".getNumOfReceivedMessages", ""+fns.getNumOfReceivedMessages());
p.put(nodeId+".getNumOfMissedMsgs", ""+fns.getNumOfMissedMsgs());
}
return propertiesToCompositeData(p);
}
private CompositeDataSupport propertiesToCompositeData(Map<String, ?> properties) {
// try {
try {
String[] keys = properties.keySet().toArray(new String[0]);
OpenType<String>[] itemTypes = new OpenType[keys.length];
for (int i = 0; i < itemTypes.length; i++) {
itemTypes[i] = SimpleType.STRING;
}
CompositeType propsType;
propsType = new CompositeType("Properties type", "properties", keys, keys, itemTypes);
CompositeDataSupport propsData = new CompositeDataSupport(propsType, properties);
return propsData;
} catch (OpenDataException e) {
throw new AssertException("problem with jmx data generation", e);
}
}*/
Map<Integer, NodeInfo> getNodeInfos() {
return nodeInfos;
......@@ -551,7 +478,9 @@ public class ClusterEventBus extends AbstractEventBus implements MessageListener
log.info("ClusterEventBus: Set stop flag for ClusterInfoEvent-Thread.");
isClusterInfoEventThreadRunning = false;
try {
session.close();
jmsExecutor.shutdownNow();
sessionProducer.close();
sessionConsumer.close();
connection.close();
log.info("ClusterEventBus stopped");
} catch (JMSException e) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment