SpringCloudBus使用Kafka实现消息总线

news/2024/11/9 20:14:59
Kafka是分布式发布-订阅消息系统,最初由LinkedIn公司开发,之后成为之后成为Apache基金会的一部分,由 Scala和 Java编写。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

在开始本文前,需要搭建kafka的环境,如果是在CentOS环境下,可以看看我前面的文章:CentOS7下Kafka的安装介绍 。其他平台下可以自行百度或Google。

在之前的环境中,需要修改server.properties文件,开启9092端口的监听:

listeners=PLAINTEXT://your.host.name:9092

SpringBoot简单整合Kafka

因为SpringCloud是基于SpringBoot的,所以在使用SpringCloudBus整合之前先用SpringBoot整合并记录下来。

创建项目

这里创建一个名为kafka-hello的SpringBoot项目,并添加以下依赖:

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>

  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
  </dependency>

  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
  </dependency>

  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.1.1.RELEASE</version>
  </dependency>

  <dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.2</version>
  </dependency>
</dependencies>

消息实体类

@Data
public class Message {
    private Long id;//id
    private String msg; //消息
    private Date sendTime; //发送时间
}

消息产生者

在该类中创建一个消息发送的方法,使用KafkaTemplate.send()发送消息,wqh是Kafka里的Topic。

@Component
@Slf4j
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    public void send(Long i){
        Message message = new Message();
        message.setId(i);
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        log.info("========发送消息  "+i+" >>>>{}<<<<<==========",gson.toJson(message));
        kafkaTemplate.send("wqh",gson.toJson(message));
    }
}

消息接收类,

在这个类中,创建consumer方法,并使用@KafkaListener注解监听指定的topic,如这里是监听wanqh和wqh两个topic。

@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = {"wanqh","wqh"})
    public void consumer(ConsumerRecord<?,?> consumerRecord){
        //判断是否为null
        Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
        log.info(">>>>>>>>>> record =" + kafkaMessage);
        if(kafkaMessage.isPresent()){
            //得到Optional实例中的值
            Object message = kafkaMessage.get();
            log.info(">>>>>>>>接收消息message =" + message);
        }
    }
}

修改启动类

@SpringBootApplication
public class KafkaApplication {

    @Autowired
    private KafkaSender kafkaSender;

    @PostConstruct
    public void init(){
      for (int i = 0; i < 10; i++) {
        //调用消息发送类中的消息发送方法
        kafkaSender.send((long) i);
      }
    }
    public static void main(String[] args) {
       SpringApplication.run(KafkaApplication.class, args);
    }
}

配置文件

spring.application.name=kafka-hello
server.port=8080
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.18.136:9092

#=============== provider  =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

测试

直接启动该项目:

SpringCloudBus整合Kafka

前面介绍使用RabbitMQ整合SpringCloudBus实现了消息总线,并且测试了动态刷新配置文件。RabbitMQ是通过引入spring-cloud-starter-bus-amqp模块来实现消息总线。若使用Kafka实现消息总线,我们可以直接将之前添加的spring-cloud-starter-bus-amqp替换成spring-cloud-starter-bus-kafka

这里我将前面的config-client复制一份,改名config-client-kafka。传送门:SpingCloudBus整合RabbitMQ

  • 所添加的依赖:
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.retry</groupId>
            <artifactId>spring-retry</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-kafka</artifactId>
        </dependency>
    </dependencies>
  • 添加kafka的配置信息
#Kafka的服务端列表,默认localhost
spring.cloud.stream.kafka.binder.brokers=192.168.18.136:9092
#Kafka服务端的默认端口,当brokers属性中没有配置端口信息时,就会使用这个默认端口,默认9092
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
#Kafka服务端连接的ZooKeeper节点列表,默认localhost
spring.cloud.stream.kafka.binder.zkNodes=192.168.18.136:2181
#ZooKeeper节点的默认端口,当zkNodes属性中没有配置端口信息时,就会使用这个默认端口,默认2181
spring.cloud.stream.kafka.binder.defaultZkPort=2181

测试方法与前一篇一样,不介绍了。


参考:

  • 《SpringCloud微服务实战》
  • SpringBoot Kafka 整合使用
  • Spring Cloud构建微服务架构(七)消息总线(续:Kafka)

项目地址:

  • https://gitee.com/wqh3520/

原文[地址:

  • SpringCloudBus使用Kafka实现消息总线

http://www.niftyadmin.cn/n/3497448.html

相关文章

2021年9月记录

1.jpa在方法生成sql时find后面一定要加by如 WeatherHefengNow findFirstByOrderByObsTimeDesc(); 2.win10图片服务&#xff0c;打开以下&#xff0c;在打开IIS 3.postgre创建序列&#xff1a;https://www.freesion.com/article/65121299935/ postgre函数postgresql----seria…

11g oracle 客户端驱动_【Oracle 11g下载】Oracle 11g客户端 官方下载(32位、64位)-开心电玩...

软件介绍Oracle 11g是甲骨文公司推出的一款专业数据库处理的电脑软件&#xff0c;积攒了数十年的设计经验&#xff0c;成功地开发了这款高效性、稳定性、延展性、安全性的Oracle 11g&#xff0c;用户通过该软件能够更加深入地洞察业务状况、管理企业信息&#xff0c;沉着安稳地…

windows通过cmd重新启动网卡

ipconfig/release ipconfig/renew 转载于:https://www.cnblogs.com/blfshiye/p/5152248.html

好的文章mark

2019独角兽企业重金招聘Python工程师标准>>> 手Q性能专项实践2.0-从小数据到大数据 转载于:https://my.oschina.net/u/268088/blog/1614933

LaneCat网关——有效的流量控制工具

LaneCat网关综合了防火墙、路由、多线路负载均衡、、***预警、网络病毒防范、访问控制、带宽控制、流量管理、网络流量控制、网络流量监控等功能&#xff0c;是高效、多功能的路由系统&#xff0c;可为您提供有效的网络安全管理服务。  LaneCat网关系统基于嵌入式架构&#x…

多数据源事务(非分布式)

参考SpringBootMybatis配置多数据源并且实现事务一致性_周先生丶的博客-CSDN博客_多数据源事务一致性 springboot在多数据源时默认只能开启一个主数据库的事务&#xff0c;如果要同时开启多个数据源的事务&#xff0c;并回滚&#xff0c;需要在切面中手动开启所有数据源事务&a…

matlab符号函数绘图法_《MATLAB符号运算及其应用》(黄忠霖著).pdf

《MATLAB符号运算及其应用》(黄忠霖著).pdf[General Information]书名MATLAB符号运算及其应用作者黄忠霖&#xff0c;黄京编著页数425出版社北京市&#xff1a;国防工业出版社出版日期2004SS号DX号000003120011URL/bookDetail.jsp?dxNumber000003120011&dFF2A7DD4D65FE8CB…

LVS负载均衡群集1(NAT模式)

一、群集技术概述 1、群集的类型1&#xff09;负载均衡群集&#xff1a;主要的功能将来自客户机的访问请求分流给多台服务器&#xff0c;从而缓单台服务器的负载压力&#xff0c;例如京东淘宝的购物节的时候&#xff0c;当天的并发量是分常大的&#xff0c;单台服务器是无法承载…