RabbitMQ学习之HelloWord

amqp-client:http://www.rabbitmq.com/java-client.html

1.依赖jar包

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>

2.生产者代码Send.Java

package cn.slimsmart.rabbitmq.demo.helloword;  
  
import com.rabbitmq.client.AMQP;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Send {  
    //消息队列名称  
    private final static String QUEUE_NAME = "helloword";    
  
    public static void main(String[] args) throws Exception {  
          /**  
         * 创建连接连接到MabbitMQ  
         */    
        ConnectionFactory factory = new ConnectionFactory();    
        //设置MabbitMQ所在主机ip或者主机名    
        factory.setHost("192.168.101.174");    
        //指定用户 密码  
        factory.setUsername("admin");  
        factory.setPassword("admin");  
        //指定端口  
        factory.setPort(AMQP.PROTOCOL.PORT);  
        //创建一个连接    
        Connection connection = factory.newConnection();    
        //创建一个频道    
        Channel channel = connection.createChannel();    
        //指定一个队列    
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);    
        //发送的消息    
        String message = "hello world!";    
        //往队列中发出一条消息    
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());    
        System.out.println("Sent Message:'" + message + "'");    
        //关闭频道和连接    
        channel.close();    
        connection.close();    
    }  
  
}


3.消费者代码Receive.java

package cn.slimsmart.rabbitmq.demo.helloword;  
  
import com.rabbitmq.client.AMQP;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.QueueingConsumer;  
  
public class Receive {  
      
    //消息队列名称  
    private final static String QUEUE_NAME = "helloword";    
  
    public static void main(String[] args) throws Exception {  
         //打开连接和创建频道,与发送端一样    
        ConnectionFactory factory = new ConnectionFactory();    
        factory.setHost("192.168.101.174");    
        //指定用户 密码  
        factory.setUsername("admin");  
        factory.setPassword("admin");  
        //指定端口  
        factory.setPort(AMQP.PROTOCOL.PORT);  
        //创建一个连接    
        Connection connection = factory.newConnection();    
        //创建一个频道    
        Channel channel = connection.createChannel();    
        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。    
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);    
            
        //创建队列消费者    
        QueueingConsumer consumer = new QueueingConsumer(channel);    
        //指定消费队列    
        channel.basicConsume(QUEUE_NAME, true, consumer);    
        while (true)    
        {    
            //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)    
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();    
            String message = new String(delivery.getBody());    
            System.out.println("Received Message:'" + message + "'");    
        }    
    }  
  
}


如果运行出现如下异常,可能创建的用户没有访问权限。

Exception in thread "main" java.io.IOException  
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)  
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)  
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)  
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:388)  
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)  
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)  
    at cn.slimsmart.rabbitmq.demo.test.Test.main(Test.java:18)  
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset  
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)  
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)  
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)  
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)  
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)  
    ... 4 more  
Caused by: java.net.SocketException: Connection reset  
    at java.net.SocketInputStream.read(Unknown Source)  
    at java.net.SocketInputStream.read(Unknown Source)  
    at java.io.BufferedInputStream.fill(Unknown Source)  
    at java.io.BufferedInputStream.read(Unknown Source)  
    at java.io.DataInputStream.readUnsignedByte(Unknown Source)  
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)  
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)  
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:515)


需要对该用户进行授权,登录web控制台后,点击"admin",进入需要授权的用户,在Admin标签页下点击新增的用户"admin",进入授权页面,默认直接点击"set permission"即可

20141105224019463.png