利用docker-compose搭建带SASL认证的kafka
搭建
docker-compose.yaml
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
hostname: zookeeper
restart: always
ports:
- 2182:2182
environment:
ZOOKEEPER_CLIENT_PORT: 2182
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_MAXCLIENTCNXNS: 0
ZOOKEEPER_AUTHPROVIDER.1: org.apache.zookeeper.server.auth.SASLAuthenticationProvider
ZOOKEEPER_REQUIRECLIENTAUTHSCHEME: sasl
ZOOKEEPER_JAASLOGINRENEW: 3600000
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/server_jaas.conf
volumes:
- ./secrets:/etc/kafka/secrets
kafka:
image: confluentinc/cp-kafka:5.1.2
hostname: broker
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2182'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://127.0.0.1:9092
KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/server_jaas.conf
KAFKA_SUPER_USERS: User:admin
volumes:
- ./secrets:/etc/kafka/secrets
server_jaas.conf
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="123456";
};
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="123456"
user_super="123456"
user_admin="123456";
};
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456"
user_admin="123456";
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456";
};
server_jaas.conf 文件在与docker-compose.yaml 同级目录下的secrets的文件夹中,如果要修改位置,同步修改yaml中的挂载目录
集成到spring boot
相关的maven依赖就不做演示了,直接给配置
spring:
# kafka 配置
kafka:
bootstrap-servers: 127.0.0.1:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
consumer:
group-id: am #群组ID
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
auto-offset-reset: earliest
enable-auto-commit: true
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
生产者跟消费者都要配置好sasl的配置,不让测试的时候项目能跑起来,测试发消息就一直抛异常,
其他文章