posted on Friday, August 20, 2004 8:59 PM
by
csteen
IKVM - Using ActiveMQ JMS from C#
Wishing to have a publish/subscribe message queue available in .NET, I have looking for ways to allow .NET to talk to a JMS implementation. After reading the article
An Introduction to IKVM by Avik Sengupta, I took the latest release of
ActiveMQ and
IKVM, complied the jar files (as described in the article), and modified one of the examples from the ActiveMQ distribution and created a C# version (see below). Everything seems to work. The example submits items to the queue, and them retrieves them.
using System;
using System.Threading;
using ikvm.lang;
using org.codehaus.activemq;
using org.codehaus.activemq.util;
using org.codehaus.activemq.message;
using javax.jms;
using java.util;
namespace DefaultNamespace
{
class MainClass : MessageListener
{
protected int messageCount = 100;
protected String[] data;
protected ActiveMQConnectionFactory connectionFactory;
protected Session session;
protected MessageConsumer consumer;
protected MessageProducer producer;
protected Destination destination;
protected Connection connection;
protected Connection receiveConnection;
protected Session receiveSession;
protected List messages = Collections.synchronizedList(new ArrayList());
protected bool topic = true;
protected ActiveMQMessage createMessage()
{
return new ActiveMQMessage();
}
protected Destination createDestination(String subject)
{
return new ActiveMQTopic(subject);
}
public void testSendReceive()
{
messages.clear();
for (int i = 0; i < data.Length; i++)
{
Message message = session.createTextMessage(data[i]);
Console.WriteLine("About to send a message: " + message + " with text: " + data[i]);
producer.send(destination, message);
}
// lets wait a little while
Thread.Sleep(4000);
Console.WriteLine("should have received a message: {0} {1} {2}", messages, data.Length, messages.size());
for (int i = 0; i < data.Length; i++)
{
TextMessage received = (TextMessage) messages.get(i);
String text = received.getText();
Console.WriteLine("Received Text: " + text);
}
}
public MainClass()
{
topic = true;
data = new String[messageCount];
for (int i = 0; i < messageCount; i++)
{
data[i] = "Text for message: " + i + " at " + DateTime.Now.ToLongTimeString();
}
connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("vm://localhost");
connectionFactory.setUseEmbeddedBroker(true);
connection = connectionFactory.createConnection();
receiveConnection = connectionFactory.createConnection();
Console.WriteLine("Created connection: " + connection);
session = connection.createSession(false, 1);
receiveSession = receiveConnection.createSession(false, 1);
Console.WriteLine("Created session: " + session);
producer = session.createProducer(null);
Console.WriteLine("Created producer: " + producer);
if (topic)
{
destination = receiveSession.createTopic("FOO.BAR");
}
else
{
destination = session.createQueue("FOO.QUEUE");
}
consumer = receiveSession.createConsumer(destination);
consumer.setMessageListener(this);
receiveConnection.start();
connection.start();
Console.WriteLine("Created connection: " + connection);
testSendReceive();
Console.WriteLine("Dumping stats...");
connectionFactory.getFactoryStats().dump(new IndentPrinter());
Console.WriteLine("Closing down connection");
session.close();
receiveSession.close();
connection.close();
receiveConnection.close();
connectionFactory.stop();
connection.close();
}
public void onMessage(Message message)
{
Console.WriteLine("Received message: " + message);
messages.add(message);
}
public static void Main(string[] args)
{
MainClass mc = new MainClass();
}
}
}