activemq-publish-subscribe-1

Description

Requirements

Install ActiveMQ on Docker

docker-compose:

version: '3'
services:
  activemq:
    image: rmohr/activemq
    ports:
      - "61616:61616"
      - "8161:8161"
    volumes:
      - data:/data
      - conf:/conf
volumes:
  data:
  conf:

Publisher

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.jreact</groupId>
	<artifactId>activemq-publish</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>activemq-publish</name>
	<description>activemq-publish test project</description>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>2.0.0-alpha6</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-simple</artifactId>
			<version>2.0.0-alpha6</version>
		</dependency>
	</dependencies>

	<build>
		<defaultGoal>install</defaultGoal>
		<finalName>activemq</finalName>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.5.1</version>
				<inherited>true</inherited>
				<configuration>
					<source>17</source>
					<target>17</target>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>3.6.0</version>
				<configuration>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
					<archive>
						<manifest>
							<mainClass>com.jreact.jmsPublisher.Producer</mainClass>
						</manifest>
					</archive>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

Producer.java

package com.jreact.jmsPublisher;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;

import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import java.util.Scanner;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.apache.activemq.ActiveMQConnection.DEFAULT_BROKER_URL;


public class Producer {

    public static void main(String[] args) throws Exception {

        // Create and start connection
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        Connection connection = connectionFactory.createConnection("admin", "admin");
        connection.start();

        // Create Session
        Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);

        // Create topic
        Topic topic = session.createTopic("MyTopic");

        MessageProducer producer = session.createProducer(topic);

         /* This section of code simply reads input from the console and then sends that
         * input as JMS Message to the ActiveMQ broker.
         */
        Scanner input = new Scanner(System.in);
        String response;

        do {
            System.out.println("Enter message: ");
            response = input.nextLine();

            // Create a message object
            TextMessage msg = session.createTextMessage(response);

            // Send the message to the queue
            producer.send(msg);

        } while (!response.equalsIgnoreCase("Quit"));
        input.close();

        connection.close();
        System.exit(1);
    }
}

Subscriber

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.jreact</groupId>
	<artifactId>activemq-subscribe</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>activemq-subscribe</name>
	<description>activemq-subscribe test project</description>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>2.0.0-alpha6</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-simple</artifactId>
			<version>2.0.0-alpha6</version>
		</dependency>
	</dependencies>

	<build>
		<defaultGoal>install</defaultGoal>
		<finalName>activemq</finalName>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>3.6.0</version>
				<configuration>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
					<archive>
						<manifest>
							<mainClass>com.jreact.jmsConsumer.Consumer</mainClass>
						</manifest>
					</archive>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.5.1</version>
				<inherited>true</inherited>
				<configuration>
					<source>17</source>
					<target>17</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

Consumer.java

package com.jreact.jmsConsumer;

import org.apache.activemq.ActiveMQConnectionFactory;
import java.io.Console;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import javax.jms.*;

import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.apache.activemq.ActiveMQConnection.DEFAULT_BROKER_URL;

public class Consumer {

    public static void main(String[] args) throws JMSException {

		// Create and start connection
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
		Connection connection = connectionFactory.createConnection("admin", "admin");
		connection.start();

		// Create Session
		Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);

		// Create topic
		Topic topic = session.createTopic("MyTopic");

		MessageConsumer consumer = session.createConsumer(topic);

		Console c = System.console();

		String response;

		do {
			// Receive the message
			Message msg = consumer.receive();
			response = ((TextMessage) msg).getText();

			System.out.println("Received = "+response);

		} while (!response.equalsIgnoreCase("Quit"));

        connection.close();
        System.exit(1);
    }
}

Running

Consumer

Output:

[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616

Producer

Output:

[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Enter message: 
111111
Enter message:

Consumer:

java -jar activemq-jar-with-dependencies.jar
[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Received = 111111

See also:

Mark