博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ与java、Spring结合实例详细讲解
阅读量:6455 次
发布时间:2019-06-23

本文共 15443 字,大约阅读时间需要 51 分钟。

hot3.png

林炳文Evankaka原创作品。转载请注明出处

         摘要:本文介绍了rabbitMq,提供了如何在Ubuntu下安装RabbitMQ 服务的方法。最好以RabbitMQ与java、Spring结合的两个实例来演示如何使用RabbitMQ。

一、rabbitMQ简介

1.1、rabbitMQ的优点(适用范围)

1. 基于erlang语言开发具有高可用高并发的优点,适合集群服务器。
2. 健壮、稳定、易用、跨平台、支持多种语言、文档齐全。
3. 有消息确认机制和持久化机制,可靠性高。
4. 开源
其他MQ的优势:
1. Apache ActiveMQ曝光率最高,但是可能会丢消息。
2. ZeroMQ延迟很低、支持灵活拓扑,但是不支持消息持久化和崩溃恢复。

1.2、几个概念说明

producer&Consumer
producer指的是消息生产者,consumer消息的消费者。
Queue
消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。
设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失
设置为临时队列,queue中的数据在系统重启之后就会丢失
设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除Exchange

Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。

Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:
Direct
直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
fanout
广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。
topic
主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)
headers
消息体的header匹配(ignore)
Binding
所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。
virtual host
在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。

1.3、消息队列的使用过程

1. 客户端连接到消息队列服务器,打开一个channel。
2. 客户端声明一个exchange,并设置相关属性。
3. 客户端声明一个queue,并设置相关属性。
4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。
5. 客户端投递消息到exchange。
6. exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里

 

二、环境配置与安装

1、Erlang环境安装

RabbitMQ是基于Erlang的,所以首先必须配置Erlang环境。
从Erlang的官网 http://www.erlang.org/download.html 下载最新的erlang安装包,我下载的版本是 otp_src_R14B03.tar.gz 。然后:

$ tar xvzf otp_src_R14B03.tar.gz$ cd otp_src_R14B03$ ./configure

编译后的输出 

如下图: 

注:

可能会报错 configure: error: No curses library functions found
configure: error: /bin/sh '/home/liyixiang/erlang/configure' failed for erts

 

 

原因是缺少ncurses包

解决:在ubuntu系统下 

apt-cache search ncursesapt-get install libncurses5-dev

 

然后重新执行

 

./configure

 

提示没有wxWidgets和fop、ssh、odbc、ssl,但是问题不大。继续:

make

然后:

sudo make install

 

配置erlang环境变量 

修改/etc/profile文件,增加下面的环境变量:(vim profile i插入 编辑完毕ESC退出 wq!强制修改)

#set erlang environmentexport PATH=$PATH:/usr/erlang/bin:$PATHsource profile使得文件生效

 

下面是我的

2、RabbitMQ-Server安装

安装完Erlang,开始安装RabbitMQ-Server。安装方法有三种,这里笔者三者都试过了,就只有以下这个方法成功了。

直接使用:

 

apt-get  install rabbitmq-server

 

安装完成后会自动打开:

使用命令查看rabbitmq运行状态:

 

rabbitmqctl status

 

停止

 

rabbitmqctl stop

 

开启

 

rabbitmq-server start

 

3、rabbitmq web管理页面插件安装

输入以下命令

 

cd /usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management

 

 

这里笔者一直安装不成功。

 

 

如果安装成功打开浏览器,输入 http://[server-name]:15672/ 如 http://localhost:15672/ ,会要求输入用户名和密码,用默认的guest/guest即可(guest/guest用户只能从localhost地址登录,如果要配置远程登录,必须另创建用户)。

如果要从远程登录怎么做呢?处于安全考虑,guest这个默认的用户只能通过http://localhost:15672来登录,其他的IP无法直接用这个guest帐号。这里我们可以通过配置文件来实现从远程登录管理界面,只要编辑/etc/rabbitmq/rabbitmq.config文件(没有就新增),添加以下配置就可以了。

4、添加用户

 

vim /etc/rabbitmq/rabbitmq.config

然后添加 

[ {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]} ].

注意上面有个点号 

现在添加了一个新授权用户asdf,可以远程使用这个用户名。记得要先用命令添加这个命令才行: 

cd /usr/lib/rabbitmq/bin/

#用户名与密码 

sudo rabbitmqctl add_user asdf 123456

用户设置为administrator才能远程访问 

sudo rabbitmqctl set_user_tags asdf administrator sudo rabbitmqctl set_permissions -p / asdf ".*" ".*" ".*"

其实也可以通过管理平台页面直接添加用户和密码等信息。如果还不能远程访问或远程登录检查是不是5672, 15672端口没有开放!!!!!! 

 

5、开放端口

 

ufw allow 5672

 

 

三、简单Java实例

下面来演示一个使用java的简单实例:

1、首先是消息生产者和提供者的基类

package com.lin;import java.io.IOException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory; /** *  * 功能概要: EndPoint类型的队列 *  * @author linbingwen * @since  2016年1月11日 */public abstract class EndPoint{         protected Channel channel;    protected Connection connection;    protected String endPointName;         public EndPoint(String endpointName) throws IOException{         this.endPointName = endpointName;                  //Create a connection factory         ConnectionFactory factory = new ConnectionFactory();                  //hostname of your rabbitmq server         factory.setHost("10.75.4.25");         factory.setPort(5672);         factory.setUsername("asdf");         factory.setPassword("123456");                  //getting a connection         connection = factory.newConnection();                  //creating a channel         channel = connection.createChannel();                  //declaring a queue for this channel. If queue does not exist,         //it will be created on the server.         channel.queueDeclare(endpointName, false, false, false, null);    }              /**     * 关闭channel和connection。并非必须,因为隐含是自动调用的。     * @throws IOException     */     public void close() throws IOException{         this.channel.close();         this.connection.close();     }}

2、消息提供者

package com.lin.producer;import java.io.IOException;import java.io.Serializable;import org.apache.commons.lang.SerializationUtils;import com.lin.EndPoint;  /** *  * 功能概要:消息生产者 *  * @author linbingwen * @since  2016年1月11日 */public class Producer extends EndPoint{         public Producer(String endPointName) throws IOException{        super(endPointName);    }     public void sendMessage(Serializable object) throws IOException {        channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));    }  }

3、消息消费者

package com.lin.consumer;import java.io.IOException;import java.util.HashMap;import java.util.Map;import org.apache.commons.lang.SerializationUtils;import com.lin.EndPoint;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.ShutdownSignalException;  /** *  * 功能概要:读取队列的程序端,实现了Runnable接口 *  * @author linbingwen * @since  2016年1月11日 */public class QueueConsumer extends EndPoint implements Runnable, Consumer{         public QueueConsumer(String endPointName) throws IOException{        super(endPointName);           }         public void run() {        try {            //start consuming messages. Auto acknowledge messages.            channel.basicConsume(endPointName, true,this);        } catch (IOException e) {            e.printStackTrace();        }    }     /**     * Called when consumer is registered.     */    public void handleConsumeOk(String consumerTag) {        System.out.println("Consumer "+consumerTag +" registered");        }     /**     * Called when new message is available.     */    public void handleDelivery(String consumerTag, Envelope env,            BasicProperties props, byte[] body) throws IOException {        Map map = (HashMap)SerializationUtils.deserialize(body);        System.out.println("Message Number "+ map.get("message number") + " received.");             }     public void handleCancel(String consumerTag) {}    public void handleCancelOk(String consumerTag) {}    public void handleRecoverOk(String consumerTag) {}    public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}}

4、测试

package com.lin.test;import java.io.IOException;import java.sql.SQLException;import java.util.HashMap;import com.lin.consumer.QueueConsumer;import com.lin.producer.Producer; public class Test {    public Test() throws Exception{                 QueueConsumer consumer = new QueueConsumer("queue");        Thread consumerThread = new Thread(consumer);        consumerThread.start();                 Producer producer = new Producer("queue");                 for (int i = 0; i < 1000000; i++) {            HashMap message = new HashMap();            message.put("message number", i);            producer.sendMessage(message);            System.out.println("Message Number "+ i +" sent.");        }    }         /**     * @param args     * @throws SQLException     * @throws IOException     */    public static void main(String[] args) throws Exception{      new Test();    }}

其中引入的jar包:

com.rabbitmq
amqp-client
3.0.4
commons-lang
commons-lang
2.6
org.apache.commons
commons-lang3
3.1

测试结果:

在提供消息

在消费消息 

然后同时打开rabbitmq的服务端,输入如下:

rabbitmqctl list_queues

这个命令是用来查看服务端中有多处个消息队列的。

可以看到有个名为queue的消息队列(更好的方法是安装好web监控插件,笔者一直安装失败,所以这里就不展示了)

 

 

四、Rbbitmq与Spring结合使用

首先建立一个maven工程,整个项目的结构如下:

 

下面将具体来讲讲整个过程

1、jar包的引入

pom.xml配置即可,如下:

4.0.0
com.lin
rabbit_c2
0.0.1-SNAPSHOT
3.2.8.RELEASE
1.6.6
1.2.12
4.10
org.springframework
spring-core
${spring.version}
org.springframework
spring-webmvc
${spring.version}
org.springframework
spring-context
${spring.version}
org.springframework
spring-context-support
${spring.version}
org.springframework
spring-aop
${spring.version}
org.springframework
spring-aspects
${spring.version}
org.springframework
spring-tx
${spring.version}
org.springframework
spring-jdbc
${spring.version}
org.springframework
spring-web
${spring.version}
junit
junit
${junit.version}
test
log4j
log4j
${log4j.version}
org.slf4j
slf4j-api
${slf4j.version}
org.slf4j
slf4j-log4j12
${slf4j.version}
org.springframework
spring-test
${spring.version}
test
org.springframework.amqp
spring-rabbit
1.3.5.RELEASE
javax.validation
validation-api
1.1.0.Final
org.hibernate
hibernate-validator
5.0.1.Final
src/main/resources
${basedir}/target/classes
**/*.properties
**/*.xml
true
src/main/resources
${basedir}/target/resources
**/*.properties
**/*.xml
true
org.apache.maven.plugins
maven-compiler-plugin
1.6
1.6
UTF-8
org.apache.maven.plugins
maven-war-plugin
2.1.1
${warExcludes}
org.apache.maven.plugins
maven-surefire-plugin
2.4.3
true
true
org.apache.maven.plugins
maven-source-plugin
attach-sources
jar
org.apache.maven.plugins
maven-resources-plugin
UTF-8

2、消息生产者

package com.lin.producer;import javax.annotation.Resource;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.stereotype.Service;/** * 功能概要:消息产生,提交到队列中去 *  * @author linbingwen * @since  2016年1月15日  */@Servicepublic class MessageProducer {		private Logger logger = LoggerFactory.getLogger(MessageProducer.class);	@Resource	private AmqpTemplate amqpTemplate;	public void sendMessage(Object message){	  logger.info("to send message:{}",message);	  amqpTemplate.convertAndSend("queueTestKey",message);	}}

3、消息消费者

package com.lin.consumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageListener;/** * 功能概要:消费接收 *  * @author linbingwen * @since  2016年1月15日  */public class MessageConsumer implements MessageListener {		private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);	@Override	public void onMessage(Message message) {		logger.info("receive message:{}",message);	}}

4、rabbitMq.xml配置信息

5、spring集成rabbiqMq。application.xml内容如下:

6、最后,为了方便,打印了日志,log4j.properties配置如下

log4j.rootLogger=DEBUG,Console,Stdout#Consolelog4j.appender.Console=org.apache.log4j.ConsoleAppenderlog4j.appender.Console.layout=org.apache.log4j.PatternLayoutlog4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%nlog4j.logger.java.sql.ResultSet=INFOlog4j.logger.org.apache=INFOlog4j.logger.java.sql.Connection=DEBUGlog4j.logger.java.sql.Statement=DEBUGlog4j.logger.java.sql.PreparedStatement=DEBUG log4j.appender.Stdout = org.apache.log4j.DailyRollingFileAppender  log4j.appender.Stdout.File = E://logs/log.log  log4j.appender.Stdout.Append = true  log4j.appender.Stdout.Threshold = DEBUG   log4j.appender.Stdout.layout = org.apache.log4j.PatternLayout  log4j.appender.Stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n

 

接着运行整个工程即可:

下面是运行的结果:

 

一会发一会收:因为在同一工程,所以发消息和接消息是交替出现的

 

我们出可以去rabbitMq 服务器上看:

可以看到,我们配置的队列已存在了:

到此,整个工程结束。

转载于:https://my.oschina.net/keke412/blog/761618

你可能感兴趣的文章
考研随笔2
查看>>
ubuntu Linux 操作系统安装与配置
查看>>
操作系统os常识
查看>>
乱码的情况
查看>>
虚拟机centos 同一个tomcat、不同端口访问不同的项目
查看>>
在不花一分钱的情况下,如何验证你的创业想法是否可行?《转》
查看>>
Linux/Android 性能优化工具 perf
查看>>
GitHub使用教程、注册与安装
查看>>
论以结果为导向
查看>>
CODE[VS] 1294 全排列
查看>>
<<The C Programming Language>>讀書筆記
查看>>
如何在目录中查找具有指定字符串的文件(shell)
查看>>
JS详细入门教程(上)
查看>>
Android学习笔记21-ImageView获取网络图片
查看>>
线段树分治
查看>>
git代码冲突
查看>>
lnmp1.3 配置pathinfo---thinkphp3.2 亲测有效
查看>>
利用android studio 生成 JNI需要的动态库so文件
查看>>
poll
查看>>
衡量优秀的卓越的前端工程师
查看>>