posted on Friday, August 20, 2004 8:59 PM by csteen

IKVM - Using ActiveMQ JMS from C#

Wishing to have a publish/subscribe message queue available in .NET, I have looking for ways to allow .NET to talk to a JMS implementation. After reading the article An Introduction to IKVM by Avik Sengupta, I took the latest release of ActiveMQ and IKVM, complied the jar files (as described in the article), and modified one of the examples from the ActiveMQ distribution and created a C# version (see below). Everything seems to work. The example submits items to the queue, and them retrieves them.
using System;
using System.Threading;
using ikvm.lang;
using org.codehaus.activemq;
using org.codehaus.activemq.util;
using org.codehaus.activemq.message;
using javax.jms;
using java.util;


namespace DefaultNamespace
{
    class MainClass : MessageListener
    {
        protected int messageCount = 100;
        protected String[] data;
        protected ActiveMQConnectionFactory connectionFactory;
        protected Session session;
        protected MessageConsumer consumer;
        protected MessageProducer producer;
        protected Destination destination;
        protected Connection connection;
        
        protected Connection receiveConnection;
        protected Session receiveSession;
        
        protected List messages = Collections.synchronizedList(new ArrayList());
        protected bool topic = true;
       
        protected ActiveMQMessage createMessage() 
        {
            return new ActiveMQMessage();
        }

        protected Destination createDestination(String subject) 
        {
            return new ActiveMQTopic(subject);
        }

        public void testSendReceive() 
        {
            messages.clear();
        
            for (int i = 0; i < data.Length; i++) 
            {
                Message message = session.createTextMessage(data[i]);
        
                Console.WriteLine("About to send a message: " + message + " with text: " + data[i]);
        
                producer.send(destination, message);
            }
        
            // lets wait a little while
            Thread.Sleep(4000);
        
            Console.WriteLine("should have received a message: {0} {1} {2}", messages, data.Length, messages.size());
        
            for (int i = 0; i < data.Length; i++) 
            {
                TextMessage received = (TextMessage) messages.get(i);
                String text = received.getText();
        
                Console.WriteLine("Received Text: " + text);
        
            }
        }
    
    
        public MainClass() 
        {
            topic = true;
            
            data = new String[messageCount];
            for (int i = 0; i < messageCount; i++) 
            {
                data[i] = "Text for message: " + i + " at " + DateTime.Now.ToLongTimeString();
            }
            
            connectionFactory = new ActiveMQConnectionFactory();
            connectionFactory.setBrokerURL("vm://localhost");
            connectionFactory.setUseEmbeddedBroker(true);
            
            connection = connectionFactory.createConnection();
            receiveConnection = connectionFactory.createConnection();
    
            Console.WriteLine("Created connection: " + connection);
    
            session = connection.createSession(false, 1);
            receiveSession = receiveConnection.createSession(false, 1);

            Console.WriteLine("Created session: " + session);
            producer = session.createProducer(null);
    
            Console.WriteLine("Created producer: " + producer);
    
            if (topic) 
            {
                destination = receiveSession.createTopic("FOO.BAR");
            }
            else 
            {
                destination = session.createQueue("FOO.QUEUE");
            }
    
            consumer = receiveSession.createConsumer(destination);
            consumer.setMessageListener(this);
            receiveConnection.start();
                connection.start();
    
            Console.WriteLine("Created connection: " + connection);
            
            testSendReceive();
            
            Console.WriteLine("Dumping stats...");
            connectionFactory.getFactoryStats().dump(new IndentPrinter());
    
            Console.WriteLine("Closing down connection");
    
            session.close();
            receiveSession.close();
    
            connection.close();
            receiveConnection.close();


            connectionFactory.stop();
            connection.close();
            
        }
    
    
        public void onMessage(Message message) 
        {
            Console.WriteLine("Received message: " + message);
    
            messages.add(message);            
        }
        
        public static void Main(string[] args)
        {
            MainClass mc = new MainClass();
        }
    }
}

Comments