Chủ Nhật, 21/06/2026, 17:00 (GMT+0)

Làm chủ Apache Kafka với CLI: Từ Producer đến Consumer

Quay lại Trang chủ Blog
Trên trang này

Khi làm việc với Apache Kafka, ngoài việc tích hợp thông qua các ứng dụng Java, Go hay Python, bộ công cụ Command Line Interface (CLI) là cách nhanh chóng để kiểm tra kết nối, tạo topic, gửi và đọc dữ liệu hoặc hỗ trợ xử lý sự cố trong quá trình vận hành.

Trong bài viết này, chúng ta sẽ thực hành với một ví dụ thống nhất sử dụng:

  • Kafka Broker: 192.168.0.81:9094
  • Topic: demo-topic
  • User: demo-user
  • File cấu hình: demo.properties

Lưu ý: Ví dụ trong bài giả định demo-user đã được cấp đầy đủ quyền cần thiết trên demo-topic, bao gồm quyền Publish (WRITE) và Subscribe (READ). Đối với các lệnh quản trị như liệt kê hoặc mô tả topic, tài khoản cũng cần có quyền tương ứng. Nếu chưa được cấp quyền, bạn có thể gặp các lỗi như Topic authorization failed hoặc Cluster authorization failed.

Bước 1: Chuẩn bị

Tạo file cấu hình để xác thực với Kafka:

cat > demo.properties <<EOF
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="demo-user" password="your-password";
EOF

Khai báo biến môi trường:

BOOTSTRAP_SERVER=192.168.0.81:9094

Bước 2: Liệt kê danh sách Topic

kafka-topics.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --command-config demo.properties \
  --list

Ví dụ kết quả:

demo-topic
application-log
audit-event

Lệnh này giúp kiểm tra nhanh kết nối tới Kafka và xem những topic mà tài khoản hiện tại có thể truy cập.

Bước 3: Xem thông tin chi tiết của Topic

kafka-topics.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --command-config demo.properties \
  --describe \
  --topic demo-topic

Ví dụ:

Topic: demo-topic
PartitionCount: 3
ReplicationFactor: 3

Partition:0 Leader:1 Replicas:1,2,3 ISR:1,2,3
Partition:1 Leader:2 Replicas:2,3,1 ISR:2,3,1
Partition:2 Leader:3 Replicas:3,1,2 ISR:3,1,2

Qua đó có thể kiểm tra số lượng partition, replication factor và trạng thái của các replica.

Bước 4: Gửi Message bằng Console Producer

kafka-console-producer.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --producer.config demo.properties \
  --topic demo-topic

Sau đó nhập:

Hello Kafka
User created
Order completed

Mỗi dòng sẽ được gửi thành một message vào demo-topic.

Bước 5: Đọc Message bằng Console Consumer

Đọc toàn bộ dữ liệu từ đầu topic:

kafka-console-consumer.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --consumer.config demo.properties \
  --topic demo-topic \
  --from-beginning

Ví dụ kết quả:

Hello Kafka
User created
Order completed

Nếu không sử dụng --from-beginning, consumer chỉ nhận các message mới được tạo sau khi lệnh bắt đầu chạy.

Bước 6: Hiển thị Key, Partition và Offset

kafka-console-consumer.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --consumer.config demo.properties \
  --topic demo-topic \
  --from-beginning \
  --property print.key=true \
  --property print.partition=true \
  --property print.offset=true

Ví dụ:

Partition:0 Offset:0 Key:null Value:Hello Kafka
Partition:0 Offset:1 Key:null Value:User created
Partition:1 Offset:5 Key:order-1001 Value:Order completed

Thông tin này rất hữu ích khi cần debug hoặc theo dõi quá trình phân phối dữ liệu giữa các partition.

Bước 7: Kiểm tra Consumer Group

Liệt kê Consumer Group:

kafka-consumer-groups.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --command-config demo.properties \
  --list

Xem chi tiết một Consumer Group:

kafka-consumer-groups.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --command-config demo.properties \
  --group demo-group \
  --describe

Kết quả thường bao gồm:

  • Topic
  • Partition
  • Current Offset
  • Log End Offset
  • Lag
  • Consumer ID

Theo dõi chỉ số Lag giúp đánh giá khả năng xử lý dữ liệu của consumer.

Bước 8: Reset Offset của Consumer Group

Đưa consumer về đầu topic:

kafka-consumer-groups.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --command-config demo.properties \
  --group demo-group \
  --topic demo-topic \
  --reset-offsets \
  --to-earliest \
  --execute

Nếu chỉ muốn xem trước thay đổi, sử dụng --dry-run thay cho --execute.

Bước 9: Thay đổi cấu hình Topic

Ví dụ thay đổi thời gian lưu dữ liệu thành 7 ngày:

kafka-configs.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --command-config demo.properties \
  --entity-type topics \
  --entity-name demo-topic \
  --alter \
  --add-config retention.ms=604800000

Kiểm tra lại:

kafka-configs.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --command-config demo.properties \
  --entity-type topics \
  --entity-name demo-topic \
  --describe

Một số mẹo khi sử dụng CLI

  • Với Kafka sử dụng SASL hoặc SSL, nên lưu cấu hình trong file .properties và sử dụng --command-config--producer.config hoặc --consumer.config.
  • Dùng --from-beginning khi cần đọc lại toàn bộ dữ liệu trong topic.
  • Kết hợp print.partitionprint.offset và print.key để hỗ trợ debug.

Kafka CLI là bộ công cụ hữu ích giúp người dùng thao tác nhanh với Kafka mà không cần viết thêm ứng dụng riêng. Chỉ với một file cấu hình xác thực và một số lệnh cơ bản, bạn có thể kết nối tới Kafka cluster có bật SASL/SCRAM, liệt kê và kiểm tra thông tin topic, gửi hoặc đọc dữ liệu trực tiếp từ terminal, theo dõi Consumer Group, kiểm tra độ trễ xử lý và quản lý một số cấu hình topic. Việc thành thạo Kafka CLI sẽ giúp quá trình phát triển, kiểm thử và vận hành Kafka trở nên nhanh chóng, thuận tiện và hiệu quả hơn.

#CloudWave Radar
#CloudWave Radar
Sovereign Cloud không chỉ là đặt máy chủ trong nước. Với bối cảnh pháp lý dữ liệu mới tại Việt Nam, đây đang trở thành bài toán hạ tầng quan trọng cho doanh nghiệp Việt và doanh nghiệp nước ngoài hoạt động tại Việt Nam
Sovereign Cloud - Đám mây chủ quyền là gì? Và vì sao doanh nghiệp hoạt động tại Việt Nam nên quan tâm từ bây giờ?
Tiếp tục đọc