

Khi làm việc với Apache Kafka, ngoài việc tích hợp thông qua các ứng dụng Java, Go hay Python, bộ công cụ Command Line Interface (CLI) là cách nhanh chóng để kiểm tra kết nối, tạo topic, gửi và đọc dữ liệu hoặc hỗ trợ xử lý sự cố trong quá trình vận hành.
Trong bài viết này, chúng ta sẽ thực hành với một ví dụ thống nhất sử dụng:
Lưu ý: Ví dụ trong bài giả định demo-user đã được cấp đầy đủ quyền cần thiết trên demo-topic, bao gồm quyền Publish (WRITE) và Subscribe (READ). Đối với các lệnh quản trị như liệt kê hoặc mô tả topic, tài khoản cũng cần có quyền tương ứng. Nếu chưa được cấp quyền, bạn có thể gặp các lỗi như Topic authorization failed hoặc Cluster authorization failed.
Tạo file cấu hình để xác thực với Kafka:
cat > demo.properties <<EOF
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="demo-user" password="your-password";
EOFKhai báo biến môi trường:
BOOTSTRAP_SERVER=192.168.0.81:9094kafka-topics.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--command-config demo.properties \
--listVí dụ kết quả:
demo-topic
application-log
audit-eventLệnh này giúp kiểm tra nhanh kết nối tới Kafka và xem những topic mà tài khoản hiện tại có thể truy cập.
kafka-topics.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--command-config demo.properties \
--describe \
--topic demo-topicVí dụ:
Topic: demo-topic
PartitionCount: 3
ReplicationFactor: 3
Partition:0 Leader:1 Replicas:1,2,3 ISR:1,2,3
Partition:1 Leader:2 Replicas:2,3,1 ISR:2,3,1
Partition:2 Leader:3 Replicas:3,1,2 ISR:3,1,2Qua đó có thể kiểm tra số lượng partition, replication factor và trạng thái của các replica.
kafka-console-producer.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--producer.config demo.properties \
--topic demo-topicSau đó nhập:
Hello Kafka
User created
Order completedMỗi dòng sẽ được gửi thành một message vào demo-topic.
Đọc toàn bộ dữ liệu từ đầu topic:
kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--consumer.config demo.properties \
--topic demo-topic \
--from-beginningVí dụ kết quả:
Hello Kafka
User created
Order completedNếu không sử dụng --from-beginning, consumer chỉ nhận các message mới được tạo sau khi lệnh bắt đầu chạy.
kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--consumer.config demo.properties \
--topic demo-topic \
--from-beginning \
--property print.key=true \
--property print.partition=true \
--property print.offset=trueVí dụ:
Partition:0 Offset:0 Key:null Value:Hello Kafka
Partition:0 Offset:1 Key:null Value:User created
Partition:1 Offset:5 Key:order-1001 Value:Order completedThông tin này rất hữu ích khi cần debug hoặc theo dõi quá trình phân phối dữ liệu giữa các partition.
Liệt kê Consumer Group:
kafka-consumer-groups.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--command-config demo.properties \
--listXem chi tiết một Consumer Group:
kafka-consumer-groups.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--command-config demo.properties \
--group demo-group \
--describeKết quả thường bao gồm:
Theo dõi chỉ số Lag giúp đánh giá khả năng xử lý dữ liệu của consumer.
Đưa consumer về đầu topic:
kafka-consumer-groups.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--command-config demo.properties \
--group demo-group \
--topic demo-topic \
--reset-offsets \
--to-earliest \
--executeNếu chỉ muốn xem trước thay đổi, sử dụng --dry-run thay cho --execute.
Ví dụ thay đổi thời gian lưu dữ liệu thành 7 ngày:
kafka-configs.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--command-config demo.properties \
--entity-type topics \
--entity-name demo-topic \
--alter \
--add-config retention.ms=604800000Kiểm tra lại:
kafka-configs.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--command-config demo.properties \
--entity-type topics \
--entity-name demo-topic \
--describeKafka CLI là bộ công cụ hữu ích giúp người dùng thao tác nhanh với Kafka mà không cần viết thêm ứng dụng riêng. Chỉ với một file cấu hình xác thực và một số lệnh cơ bản, bạn có thể kết nối tới Kafka cluster có bật SASL/SCRAM, liệt kê và kiểm tra thông tin topic, gửi hoặc đọc dữ liệu trực tiếp từ terminal, theo dõi Consumer Group, kiểm tra độ trễ xử lý và quản lý một số cấu hình topic. Việc thành thạo Kafka CLI sẽ giúp quá trình phát triển, kiểm thử và vận hành Kafka trở nên nhanh chóng, thuận tiện và hiệu quả hơn.
