springboot整合集成rabbitmq

原创 创建时间:2019-06-08 访问量:32 赞:0 踩:0
compile 'org.springframework.boot:spring-boot-starter-amqp'

常量:

package com.fight.strive.sys.modules.rabbmitmq.enums;

/**
 * @author ZHOUXIANG
 */
@SuppressWarnings("unused")
public class RabbitConstants {

    /**
     * Directory类型交换器
     */
    public static final String DIRECT_EXCHANGE = "direct_exchange";

    /**
     * 测试队列
     */
    public static final String QUEUE_TEST = "queue_test";

}

配置:

# rabbitmq 配置
  rabbitmq:
    host: 192.168.133.129
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 3
        max-concurrency: 10
package com.fight.strive.sys.modules.rabbmitmq.config;

import com.fight.strive.sys.modules.rabbmitmq.enums.RabbitConstants;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author ZHOUXIANG
 */
@EnableRabbit
@Configuration
public class RabbitQueueConfig {

    @Bean
    @Qualifier(RabbitConstants.DIRECT_EXCHANGE)
    public Exchange exchange() {
        return new DirectExchange(RabbitConstants.DIRECT_EXCHANGE, true, false);
    }

    @Bean
    @Qualifier(RabbitConstants.QUEUE_TEST)
    public Queue queue() {
        return new Queue(RabbitConstants.QUEUE_TEST, true, false, false);
    }

    @Bean
    @Qualifier(RabbitConstants.QUEUE_TEST)
    public Binding binding(@Qualifier(RabbitConstants.DIRECT_EXCHANGE) Exchange exchange,
                           @Qualifier(RabbitConstants.QUEUE_TEST) Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(RabbitConstants.QUEUE_TEST).and(null);
    }
}

发送:

package com.fight.strive.sys.modules.rabbmitmq.sender;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @author ZHOUXIANG
 */
@Component
public class RabbitSender {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送RabbitMQ消息
     */
    public void sendRabbitmqMessage(String exchange, String routingKey, Object obj) {
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.convertAndSend(exchange, routingKey, obj);
    }
}

接收:

package com.fight.strive.sys.modules.rabbmitmq.receiver;

import com.fight.strive.sys.modules.rabbmitmq.enums.RabbitConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * queue_test队列接收处理
 *
 * @author ZHOUXIANG
 */
@Slf4j
@Component
public class RabbitReceiver {

    @RabbitListener(queues = RabbitConstants.QUEUE_TEST)
    public void handle(Object content, Channel channel, Message message) {
        try {
            log.info("接收到的消息为:{}", content);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("接收RabbitMQ消息时报错", e);
        }
    }
}

测试:

package com.fight.strive.sys.modules.rabbitmq;

import com.fight.strive.StriveApplicationTests;
import com.fight.strive.sys.modules.rabbitmq.dto.Student;
import com.fight.strive.sys.modules.rabbmitmq.enums.RabbitConstants;
import com.fight.strive.sys.modules.rabbmitmq.utils.RabbitUtils;
import org.junit.Test;

public class RabbitmqTests extends StriveApplicationTests {

    @Test
    public void testSendMessage() {
        Student student = new Student();
        student.setName("zx");
        student.setAge(20);
        RabbitUtils.sendMessage(RabbitConstants.DIRECT_EXCHANGE, RabbitConstants.QUEUE_TEST, student);
        System.out.println();
    }
}


评论
 我想说:
==已经到底了==
关注: 粉丝: 积分:
Copyright ©2018 工联信息网——打造最实用且免费的互联网资源共享社区
如有疑问和需求请致邮箱:need@glxxw2018.com
不良信息反馈及建议请致邮箱:accusation@glxxw2018.com
沪ICP备18018158号-1