RabbitMQ 是时下流行的分布式消息系统,本实验手册演示如何通过 RabbitMQ 的 Java 客户端访问消息。
在第一次练习中,我们首先搭建开发环境及实现一个简单的消息生产者,其中使用的案例参考了 RabbitMQ 的官方文档。
搭建开发环境
当然,我们首先需要一个已经运行起来的 RabbitMQ 服务。 在开发环境,使用 Docker 进行安装最方便,最快捷的。如果你还没有安装,可以参考 在Docker中使用 RabbitMQ 服务器 进行安装。
Java 我们使用 Java 8, 这是当下主流 Java 版本。
IDE 推荐使用 IDEA Community 版本
以上资源都是免费的。
搭建项目
在 IDE 中新建一个 maven 项目,但项目建好后,打开 pom.xml 文件,加入以下依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency>
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> <scope>test</scope> </dependency>
|
因为需要项目中需要使用 Java 8 的特性,所以在 maven 中指定了编译版本:
1 2 3 4 5 6 7 8 9 10 11 12 13
| <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
|
完整的 pom.xm 文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>cn.com.hohistar.training</groupId> <artifactId>Java8-Sender</artifactId> <version>1.0-SNAPSHOT</version>
<dependencies>
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency>
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> <scope>test</scope> </dependency>
<dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.2.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-runner</artifactId> <version>1.2.0</version> <scope>test</scope> </dependency>
</dependencies>
<repositories> <repository> <id>alimaven</id> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
</project>
|
开发消息发送者
在 RabbitMQ Java Cient 中,它将连接 RabbitMQ 服务及发送消息的逻辑封装在 Connection, Channel 这两个主要的类中。要发送消息,我们首先需要建立连接(Connection) 然后再通过通道(Channel)进行消息发送。代码如下:
1 2 3 4 5 6 7 8 9
| ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); }
|
在上面的代码中,我们使用了 Java 7 中新的 try 语法来保证资源(Connection, Channel)在使用以后能够关闭。
完整的类代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
public class HelloWorldSender {
private final static String QUEUE_NAME = "hello";
public void send(String message) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } } }
|
测试消息发送
为测试上面的发送者代码,我们构建一个单元测试,测试类如下:
1 2 3 4 5 6 7 8 9 10
| @DisplayName("Message Sender") public class SenderTester {
@Test @DisplayName("Send Hello World Message") public void testHelloWorldSender() throws Exception { HelloWorldSender sender = new HelloWorldSender(); sender.send("Hello World"); } }
|
运行 testHelloWorldSender 方法以后,我们可以通过 RabbitMQ 的 WEB 管理界面进行查看,可以看到新添加了一个名为: hello 的 queue, 并且可以看到当前有一条消息在 queue 里面。
下一步
下一步,我们将编写一个简单的消息接收者