Tag Archives: Springboot ActiveMQ Consumer

How to implement activeMQ JMS Producer in Spring-boot application?

Let’s consider a scenario where your application is deployed in clustering environment and application handles thousands of email request to handle it. To solve this problem, You could simple trigger an API endpoints and post the message to the process which sends email. This could solve the problem however when an application received 100’s message per seconds, You have to handle concurrency. You got to have a queue to handle these messages and that’s where you need JMS & ActiveMQ. ActiveMQ is implementation of JMS like any other JMS implementation (RabbitMQ, AmazonMQ etc).

As you know, Spring boot controllers do not provide concurrency.

In this post, I would like to put details how you can implement producer job to sent JMS messages. If you wish to know how to implement JMS Listener.. Read my previous post Follow below each step to implement ActiveMQ Producer JOB in your Spring boot application.

NOTES: In below code, There are a lots of ActiveMQ policies which are configured in the code listed below. These configurations are varies based on your requirement. And, Recommendation is to read actual document to understand more on these configurations.

Sample Configurations: Session.CLIENT_ACKNOWLEDGE and DeliveryMode.PERSISTENT

Maven POM.XML Dependencies

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
#Add dependencies if you have producer too in same environment.
<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-client</artifactId>
   <version>5.15.8</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>5.15.8</version>
</dependency>

ActiveMQ Configuration: application.properties

activemq.packages.trusted=<Optional but for security you can configure package name where the listener is>
activemq.brokerUrl=<brokenURL>
activemq.connection.pool=5
activemq.prefetch.limit=50
#consumer
producer.user=<userName>
producer.password=<Password>
producer.queues=<Queue Name>

ActiveMQ Java Configuration File mapped with application.properties file.

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;

@Service
@Configuration
public class AMQConfig {

    @Value("${activemq.packages.trusted}")
    private String trustedPackages;

    @Value("${activemq.connection.pool}")
    private String noOfConnections;

    @Value("${activemq.brokerUrl}")
    private String brokerUrl;

    @Value("${producer.user}")
    private String consumerUser;

    @Value("${producer.password}")
    private String consumerPassword;

    @Value("${producer.queues}")
    private String queues;

    @Value("${activemq.prefetch.limit}")
    private int prefetchLimit=50;

Springboot Java JMS Listener Service

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;

@Component
public class ProducerJob {

    @Autowired
    AMQConfig amqConfig;

    private final Logger LOGGER = LoggerFactory.getLogger(ProducerJob.class);
  // replace with your own producer cron
    @Scheduled(cron = "${PRODUCER_JOB_CRON}")
    public void run(){
            Session producerSession = null;
            try {
                Connection producerConnection = AMQProducer.getActiveMQProducerConnection(amqConfig);
                if (producerConnection != null) {
                    producerSession = producerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                    LOGGER.info(" Producer Job Starts. sending sample message ");
                    for (int index = 0; index < 10; index++) {
                        AMQProducer.sendMessage(producerSession, amqConfig, "{'orderno': index*100, 'type':'order', 'amount':'202'}");
                    }
                    LOGGER.info(" Job ran and queue size is: {} ",  count * 50);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (producerSession != null) {
                    try {
                        producerSession.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
    }
}

AMQProducer.java: This file contains the code which sends message in activeMQ.

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;

public final class AMQProducer {
    private AMQProducer(){ }

    private static final Logger LOGGER = LoggerFactory.getLogger(AMQProducer.class);

    static PooledConnectionFactory pooledConnectionFactory = null;
    static Connection producerConnection = null;

    private static PooledConnectionFactory getPooledConnectionFactory(final String brokenUrl, String userName, String password, String noOfConnections){
        final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokenUrl);
        // Pass the username and password.
        connectionFactory.setUserName(userName);
        connectionFactory.setPassword(password);
        // Create a pooled connection factory.
        final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);      pooledConnectionFactory.setMaxConnections(Integer.parseInt(noOfConnections));
        return pooledConnectionFactory;
    }
    public static Connection  getActiveMQProducerConnection(AMQConfig amqConfig) {
        if (pooledConnectionFactory == null) {
            LOGGER.info(" pooledConnectionFactory is null ");
            pooledConnectionFactory = getPooledConnectionFactory(amqConfig.getBrokerUrl(), amqConfig.getProducerUser(),
                    amqConfig.getProducerPassword(), amqConfig.getNoOfPooledConnections());
        }
        if (producerConnection == null){
            try {
                LOGGER.info(" producerConnection is null ");
                producerConnection = pooledConnectionFactory.createConnection();
                producerConnection.start();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        return producerConnection;
    }

    public static void sendMessage(Session producerSession, AMQConfig amqConfig, final String message) throws Exception{
        if (producerSession != null) {
            final Destination producerDestination = producerSession.createTopic(amqConfig.getTopics());
            final MessageProducer producer = producerSession.createProducer(producerDestination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            try {
                TextMessage producerMessage = producerSession.createTextMessage(message);
                producer.send(producerMessage);
                LOGGER.info(" ******************* message sent  ******************* ");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

Note: This is a working solution however there is a chance that code could throw error due to typing mistake. Reach out to me, will do my best to help you out.

References

https://activemq.apache.org/

https://activemq.apache.org/what-is-the-prefetch-limit-for

https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/getting-started-activemq.html

https://activemq.apache.org/using-apache-activemq

How to implement activeMQ JMS Listener in Spring-boot application?

There are thousands use cases where application required some sort of Java Messaging service API. And, I would like to put one use case to explain the need of JMS listener.

Let’s consider a scenario where your application is deployed in clustering environment and application handles thousands of email request to handle it. To solve this problem, You could simple trigger an API endpoints and post the message to the process which sends email. This could solve the problem however when an application received 100’s message per seconds, You have to handle concurrency. You got to have a queue to handle these messages and that’s where you need JMS & ActiveMQ. ActiveMQ is implementation of JMS like any other JMS implementation (RabbitMQ, AmazonMQ etc).

As you know, Spring boot controllers do not provide concurrency.

In this post, I have assumed that application is receiving messages and you need to implement an Active MQ listener to handle those messages. Follow below each step to implement listener in your Spring boot application.

Maven POM.XML Dependencies

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
#Add dependencies if you have producer too in same environment.
<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-client</artifactId>
   <version>5.15.8</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>5.15.8</version>
</dependency>

ActiveMQ Configuration: application.properties

activemq.packages.trusted=<Optional but for security you can configure package name where the listener is>
activemq.brokerUrl=<brokenURL>
activemq.connection.pool=5
activemq.maximumThreads=10
activemq.prefetch.limit=50
#consumer
consumer.user=<userName>
consumer.password=<Password>
consumer.queues=<Queue Name>

ActiveMQ Java Configuration File mapped with application.properties file.

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;

@Service
@Configuration
public class AMQConfig {

    @Value("${activemq.packages.trusted}")
    private String trustedPackages;

    @Value("${activemq.connection.pool}")
    private String noOfConnections;

    @Value("${activemq.brokerUrl}")
    private String brokerUrl;

    @Value("${consumer.user}")
    private String consumerUser;

    @Value("${consumer.password}")
    private String consumerPassword;

    @Value("${consumer.queues}")
    private String queues;

    @Value("${activemq.maximumThreads}")
    private int maximumThreads = 10;

    @Value("${activemq.prefetch.limit}")
    private int prefetchLimit=50;

Springboot Java JMS Listener Service

@Component
@Configuration
@EnableJms
public class MessageConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(MessageConsumer.class);
    @Autowired
    AMQConfig amqConfig;

    @Bean
    public ActiveMQConnectionFactory receiverActiveMQConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(amqConfig.getBrokerUrl());
        connectionFactory.setUserName(amqConfig.getConsumerUser());
        connectionFactory.setPassword(amqConfig.getConsumerPassword());
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
       activeMQPrefetchPolicy.setQueuePrefetch(amqConfig.getPrefetchLimit());
        activeMQPrefetchPolicy.setMaximumPendingMessageLimit(amqConfig.getPrefetchLimit());

        connectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        return connectionFactory;
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(receiverActiveMQConnectionFactory());
        return factory;
    }

    @Bean
    public MessageReceiver receiver() {
        return new MessageReceiver();
    }
    // Stop message receiver.
    class MessageReceiver {
        private CountDownLatch latch = new CountDownLatch(1);
        public CountDownLatch getLatch() {
            return latch;
        }
        @JmsListener(destination = "${consumer.queues}")
        public void receive(String message) {
            try {
                LOGGER.info("***************** Message Received *****************");
                Thread.sleep(100);
                if (StringUtils.isNotBlank(message)) {
                    //Execute Your code here.
                  LOG.info(" Message received {}",  message);
                }
                latch.countDown();
            }catch (Exception ex){
                LOGGER.error(" error occurs while creating new thread from message receiver {}", ex.getMessage());
            }
        }
    }
}

Note: This is a working solution however there is a chance that code could throw error due to typing mistake. Reach out to me, will do my best to help you out.

References

https://activemq.apache.org/

https://activemq.apache.org/what-is-the-prefetch-limit-for

https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/getting-started-activemq.html

https://activemq.apache.org/using-apache-activemq