搭建

​ docker-compose.yaml

version: '2'
services:
    zookeeper:
        image: confluentinc/cp-zookeeper:5.1.2
        hostname: zookeeper
        restart: always
        ports:
            - 2182:2182
        environment:
            ZOOKEEPER_CLIENT_PORT: 2182
            ZOOKEEPER_TICK_TIME: 2000
            ZOOKEEPER_MAXCLIENTCNXNS: 0
            ZOOKEEPER_AUTHPROVIDER.1: org.apache.zookeeper.server.auth.SASLAuthenticationProvider
            ZOOKEEPER_REQUIRECLIENTAUTHSCHEME: sasl
            ZOOKEEPER_JAASLOGINRENEW: 3600000
            KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/server_jaas.conf
        volumes:
            - ./secrets:/etc/kafka/secrets
    kafka:
        image: confluentinc/cp-kafka:5.1.2
        hostname: broker
        container_name: kafka
        depends_on:
            - zookeeper
        ports:
            - 9092:9092
        environment:
            KAFKA_BROKER_ID: 1
            KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2182'
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
            KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://127.0.0.1:9092
            KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
            KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
            KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
            KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
            KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
            KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/server_jaas.conf
            KAFKA_SUPER_USERS: User:admin
        volumes:
            - ./secrets:/etc/kafka/secrets

server_jaas.conf

Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="123456";
};


Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="123456"
    user_super="123456"
    user_admin="123456";
};

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="123456"  
    user_admin="123456";
};

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="123456";
};

server_jaas.conf 文件在与docker-compose.yaml 同级目录下的secrets的文件夹中,如果要修改位置,同步修改yaml中的挂载目录

集成到spring boot

相关的maven依赖就不做演示了,直接给配置

spring:
    # kafka 配置
  kafka:
    bootstrap-servers: 127.0.0.1:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
    consumer:
      group-id: am #群组ID
      # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
      auto-offset-reset: earliest
      enable-auto-commit: true
      #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT

生产者跟消费者都要配置好sasl的配置,不让测试的时候项目能跑起来,测试发消息就一直抛异常,