Description
- Publisher/Subscriber (Pub/Sub Messaging)
- A message can be delivered to all of its subscribers.
- We could also use a Topic as a message-oriented middleware that is responsible for holding and delivering messages.
- There is a timing dependency between the publisher and the subscriber.
Requirements
- JVM 17+
- ActiveMQ on Docker
Install ActiveMQ on Docker
docker-compose:
version: '3'
services:
activemq:
image: rmohr/activemq
ports:
- "61616:61616"
- "8161:8161"
volumes:
- data:/data
- conf:/conf
volumes:
data:
conf:Open the URL http://localhost:8161/admin
username: admin
password: admin

Publisher
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jreact</groupId>
<artifactId>activemq-publish</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>activemq-publish</name>
<description>activemq-publish test project</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.0-alpha6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.0-alpha6</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<finalName>activemq</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<inherited>true</inherited>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.jreact.jmsPublisher.Producer</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Producer.java
package com.jreact.jmsPublisher;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.Scanner;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.apache.activemq.ActiveMQConnection.DEFAULT_BROKER_URL;
public class Producer {
public static void main(String[] args) throws Exception {
// Create and start connection
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection("admin", "admin");
connection.start();
// Create Session
Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);
// Create topic
Topic topic = session.createTopic("MyTopic");
MessageProducer producer = session.createProducer(topic);
/* This section of code simply reads input from the console and then sends that
* input as JMS Message to the ActiveMQ broker.
*/
Scanner input = new Scanner(System.in);
String response;
do {
System.out.println("Enter message: ");
response = input.nextLine();
// Create a message object
TextMessage msg = session.createTextMessage(response);
// Send the message to the queue
producer.send(msg);
} while (!response.equalsIgnoreCase("Quit"));
input.close();
connection.close();
System.exit(1);
}
}Subscriber
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jreact</groupId>
<artifactId>activemq-subscribe</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>activemq-subscribe</name>
<description>activemq-subscribe test project</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.0-alpha6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.0-alpha6</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<finalName>activemq</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.jreact.jmsConsumer.Consumer</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<inherited>true</inherited>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
</plugins>
</build>
</project>Consumer.java
package com.jreact.jmsConsumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import java.io.Console;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import javax.jms.*;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.apache.activemq.ActiveMQConnection.DEFAULT_BROKER_URL;
public class Consumer {
public static void main(String[] args) throws JMSException {
// Create and start connection
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection("admin", "admin");
connection.start();
// Create Session
Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);
// Create topic
Topic topic = session.createTopic("MyTopic");
MessageConsumer consumer = session.createConsumer(topic);
Console c = System.console();
String response;
do {
// Receive the message
Message msg = consumer.receive();
response = ((TextMessage) msg).getText();
System.out.println("Received = "+response);
} while (!response.equalsIgnoreCase("Quit"));
connection.close();
System.exit(1);
}
}Running
Consumer
goto: activemq-subscribe
mvn package
goto: activemq-subscribe/target
java -jar activemq-jar-with-dependencies.jar
Output:
[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Producer
goto: activemq-publish
mvn package
goto: activemq-publish/target
java -jar activemq-jar-with-dependencies.jar
Output:
[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Enter message:
111111
Enter message:
Consumer:
java -jar activemq-jar-with-dependencies.jar
[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Received = 111111
See also:

–
Source Code: https://github.com/ZbCiok/zjc-examples/tree/main/traditional-messaging/activemq/activemq-publish-subscribe-1
