Kafka的简单上手

1.使用Docker安装Kafka

首先,确保已经安装了Docker和Docker Compose。

1.1 获取Kafka Docker镜像:

使用wurstmeister/kafka这个流行的Docker镜像。这个镜像也包含了Zookeeper,因为Kafka依赖于Zookeeper。

docker pull wurstmeister/kafka

为什么说Kafka依赖于Zookeeper?

  • 集群协调:Kafka 使用 Zookeeper 来协调 broker,例如:确定哪个 broker 是分区的 leader,哪些是 followers。
  • 存储元数据:Zookeeper 保存了关于 Kafka 集群的元数据信息,例如:当前存在哪些主题,每个主题的分区和副本信息等。
  • 维护集群状态:例如 broker 的加入和退出、分区 leader 的选举等,都需要 Zookeeper 来帮助维护状态和通知相关的 broker。
  • 动态配置:Kafka 的某些配置可以在不重启 broker 的情况下动态更改,这些动态配置的信息也是存储在 Zookeeper 中的。
  • 消费者偏移量:早期版本的 Kafka 使用 Zookeeper 来保存消费者的偏移量。尽管在后续版本中,这个功能被移到 Kafka 自己的内部主题 (__consumer_offsets) 中,但在一些老的 Kafka 集群中,Zookeeper 仍然扮演这个角色。 因为 Zookeeper 在 Kafka 的运作中起到了如此关键的作用,所以当部署一个 Kafka 集群时,通常也需要部署一个 Zookeeper 集群来与之配合。

1.2 使用docker-compose启动Kafka:

创建一个docker-compose.yml文件,并输入以下内容:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
     - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
     - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181  # 添加这一行
    volumes:
     - /var/run/docker.sock:/var/run/docker.sock
  • version: 这指定了docker-compose的版本。版本’2’是一个相对较早的版本,但它足够满足我们的需求。
  • services: 定义了要启动的所有服务容器。这里,我们有两个服务:zookeeper和kafka。
  • zookeeper:
    • image: 指定了我们要使用的Docker镜像。这里使用的是wurstmeister/zookeeper:3.4.6,它是一个流行的Zookeeper Docker镜像。
    • ports: 将容器内的2181端口映射到宿主机的2181端口。这意味着我们可以直接从宿主机上访问Zookeeper。
  • kafka:
    • image: 同样使用wurstmeister提供的Kafka Docker镜像。
    • ports: 将容器的9092端口映射到宿主机的9092端口。
    • environment: 定义了一系列环境变量,这些变量将被传递给Kafka进程,并影响其行为。
      • KAFKA_ADVERTISED_LISTENERS: 定义了两个监听器:一个用于容器内部通信(INSIDE),一个用于与外部宿主机通信(OUTSIDE)。
      • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 定义了每个监听器所使用的安全协议。这里,两个监听器都使用PLAINTEXT,意味着没有加密。
      • KAFKA_INTER_BROKER_LISTENER_NAME: 这是broker之间互相通信使用的监听器。这里,它们使用容器内部的监听器。
      • KAFKA_LISTENERS: 定义了两个监听器的地址和端口。
      • KAFKA_ZOOKEEPER_CONNECT: 这指定了Zookeeper的地址和端口,Kafka需要这个信息来与Zookeeper互动。
    • volumes: 这里,我们只是映射了宿主机的Docker socket。这通常用于使Kafka容器能够与Docker守护进程进行通信,以便它可以查询其他容器的IP地址。 从这个配置中,Kafka容器的broker的具体配置主要在environment部分。这里定义了它的监听器、安全协议以及如何连接到Zookeeper。这些配置都将在启动Kafka容器时传递给Kafka进程。

运行以下命令来启动Kafka和Zookeeper:

docker-compose up -d
WARN[0000] Found multiple config files with supported names: /Users/gongna/docker-compose.yml, /Users/gongna/docker-compose.yaml 
WARN[0000] Using /Users/gongna/docker-compose.yml       
WARN[0000] Found multiple config files with supported names: /Users/gongna/docker-compose.yml, /Users/gongna/docker-compose.yaml 
WARN[0000] Using /Users/gongna/docker-compose.yml       
[+] Running 2/2
 ✔ Container gongna-zookeeper-1  Started                                                                          0.2s 
 ✔ Container gongna-kafka-1      Started  

看到以上消息代表已经成功的启动了。

1.3 使用Kafka

1.3.1 创建主题:

查找容器ID

docker ps

进入Kafka容器:

docker exec -it [KAFKA_CONTAINER_ID] /bin/bash

使用Kafka的命令行工具创建一个名为test的主题:

kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test
  • kafka-topics.sh:这是Kafka提供的一个shell脚本工具,用于管理Kafka topics。
  • --create:这个标识表示我们要创建一个新的topic。
  • --zookeeper zookeeper:2181:指定Zookeeper的地址和端口。Kafka使用Zookeeper来存储集群元数据和topic的配置信息。在这里,zookeeper:2181表示Zookeeper服务运行在名为zookeeper的容器上,并监听2181端口。
  • --replication-factor 1:定义了这个topic的每个partition应该有多少个replica(副本)。在这里,我们设置为1,意味着每个partition只有一个副本。在生产环境中,可能会希望有更多的副本来增加数据的可靠性。
  • --partitions 1:定义了这个topic应该有多少个partitions(分区)。
  • --topic test:定义了新创建的topic的名称,这里是test。
  • 这条命令创建了一个名为test的新topic,这个topic有1个partition和1个replica,并存储在运行在zookeeper:2181上的Zookeeper中。
1.3.2 生产消息:

使用Kafka的命令行生产者工具发送消息:

kafka-console-producer.sh --broker-list kafka:9093 --topic test
  • kafka-console-producer.sh:这是Kafka提供的一个shell脚本工具,用于启动一个控制台生产者。这个生产者允许通过控制台手动输入并发送消息到Kafka。
  • --broker-list kafka:9093:这个参数指定了Kafka broker的地址和端口。在这里,我们指定的是运行在名为kafka的容器上的broker,监听9093端口。注意,这里的9093端口是在Docker容器内部使用的端口,与我们在docker-compose.yml文件中设置的外部端口9092不同。
  • --topic test:这个参数指定了消息应该发送到哪个Kafka topic。在这里,我们选择发送到名为test的topic。

当运行这个命令后,会进入一个控制台界面。在这个控制台,可以手动输入消息,每输入一条消息并按下回车,这条消息就会被发送到test topic。这是一个非常有用的工具,特别是当想要在没有编写生产者代码的情况下,手动测试Kafka消费者或整个系统的功能时。 然后,您可以键入消息并按Enter发送。

docker exec -it 1e35c5fa5306 /bin/bash
root@1e35c5fa5306:/# kafka-console-producer.sh --broker-list kafka:9093 --topic test
> hello world
1.3.3 消费消息:

在另一个终端或者容器内,使用Kafka的命令行消费者工具来接收消息:

kafka-console-consumer.sh --bootstrap-server kafka:9093 --topic test --from-beginning
  • kafka-console-consumer.sh: 这是Kafka的命令行消费者工具,它允许从指定的topic中读取数据。
  • –bootstrap-server kafka:9093:
  • –bootstrap-server是指定要连接的Kafka broker或bootstrap服务器的参数。
  • kafka:9093表示消费者应该连接到名为kafka的服务器上的9093端口。这里,kafka是Docker Compose文件中定义的Kafka服务的名称。在Docker网络中,可以使用服务名称作为其主机名。
  • –topic test:
  • –topic是指定要从中读取数据的topic的参数。
  • test是之前创建的topic的名称。
  • –from-beginning: 这个参数表示消费者从topic的开始位置读取数据,而不是从最新的位置。换句话说,使用这个参数,会看到topic中存储的所有消息,从最早的消息开始。 此时,您应该能在消费者终端看到在生产者终端输入的消息。
docker exec -it 1e35c5fa5306 /bin/bash
root@1e35c5fa5306:/# kafka-console-consumer.sh --bootstrap-server kafka:9093 --topic test --from-beginning
hello world

似不似很简单!!🎉🎉🎉

1.4 Kafka的实际使用场景

现在已经了解了Kafka的基础操作,这里是一些Kafka的典型使用场景:

  1. 日志聚合:将分布式系统中的各种日志汇总到一个集中的日志系统。
  2. 流处理:使用Kafka Streams或其他流处理框架实时处理数据。
  3. 事件源:记录系统中发生的每一个状态变化,以支持事务和系统状态的恢复。
  4. 集成与解耦:在微服务架构中,使用Kafka作为各个微服务之间的中间件,确保它们之间的解耦。

1.5 编码实现

安装库:

go get -u github.com/confluentinc/confluent-kafka-go/kafka

编写生产者代码:

package main

import (
    "fmt"
    "os"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    broker := "localhost:9092"
    topic := "test"

    producer, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": broker,
    })
    if err != nil {
        fmt.Printf("Failed to create producer: %s\n", err)
        os.Exit(1)
    }

    defer producer.Close()

    // Produce a message to the 'test' topic
    message := "Hello, Kafka from Go!"
    producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          []byte(message),
    }, nil)

    // Wait for message deliveries
    producer.Flush(15 * 1000)
}
  • 在Kafka的上下文中,一个broker是一个单独的Kafka服务器实例,负责存储数据并为生产者和消费者服务。一个Kafka集群通常由多个brokers组成,这样可以确保数据的可用性和容错性。
  • 为什么叫“broker”呢?因为在许多系统中,broker是一个中介或协调者,帮助生产者和消费者之间的交互。在Kafka中,brokers确保数据的持久化、冗余存储和分发给消费者。
  • 当在代码中指定”bootstrap.servers”: broker,实际上是在告诉Kafka生产者客户端在哪里可以找到集群的一个或多个broker以连接到整个Kafka集群。
  • bootstrap.servers可以是Kafka集群中的一个或多个broker的地址。不需要列出集群中的所有broker,因为一旦客户端连接到一个broker,它就会发现集群中的其他brokers。但是,通常建议列出多个brokers以增加初始连接的可靠性。 综上所述,可以将broker视为Kafka的单个服务器实例,它存储数据并处理客户端请求。当的生产者或消费者代码连接到localhost:9092时,它实际上是在连接到运行在该地址的Kafka broker。如果有一个包含多个brokers的Kafka集群,的bootstrap.servers配置可能会看起来像这样:broker1:9092,broker2:9092,broker3:9092。 编写消费者代码:
package main

import (
    "fmt"
    "os"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    broker := "localhost:9092"
    groupId := "myGroup"
    topic := "test"

    consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": broker,
        "group.id":          groupId,
        "auto.offset.reset": "earliest", // Use "latest" to only receive new messages
    })
    if err != nil {
        fmt.Printf("Failed to create consumer: %s\n", err)
        os.Exit(1)
    }

    defer consumer.Close()

    // Subscribe to the topic
    err = consumer.Subscribe(topic, nil)
    if err != nil {
        fmt.Printf("Failed to subscribe to topic: %s\n", err)
        os.Exit(1)
    }

    for {
        msg, err := consumer.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Received message: %s\n", string(msg.Value))
        } else {
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
            break
        }
    }
}
  • Group ID: Kafka消费者使用group.id进行分组。这允许多个消费者实例共同协作并共享处理主题的分区。Kafka保证每条消息只会被每个消费者组中的一个消费者实例消费。group.id 是用来标识这些消费者属于哪个消费者组的。当多个消费者有相同的 group.id 时,他们属于同一个消费者组。
  • auto.offset.reset: 这告诉消费者从哪里开始读取消息。earliest表示从起始位置开始,latest表示只读取新消息。Kafka中的每条消息在其所属的分区内都有一个唯一的序号,称为offset。消费者在消费消息后会存储它已经消费到的位置信息(offset)。如果消费者是首次启动并且之前没有offset记录,auto.offset.reset 决定了它从哪里开始消费。设置为 earliest 会从最早的可用消息开始消费,而 latest 会从新的消息开始消费。
  • 为了运行这个代码,需要确保Kafka broker正在运行、可以从的Go应用程序访问,而且主题中有消息(可以使用上面的生产者代码来产生消息)。 Kafka的基本架构:
  • Producer:生产者,发送消息到Kafka主题。
  • Topic:消息的分类和来源,可以视为消息队列或日志的名称。

Topic(主题):

  • Kafka 中的 Topic 是一个消息流的分类或名称的标识。可以把它看作是一个消息的类别或者分类,比如”用户注册”、”订单支付”等。(同类数据单元)
  • 可以认为 Topic 就像是一个数据库中的表(但它的行为和特性与数据库表是不同的)。
  • 一个 Topic 可以有多个 Partition(分区)。 消息(Message):
  • Message 是发送或写入 Kafka 的数据单元。
  • 每条 Message 包含一个 key 和一个 value。key 通常用于决定消息应该写入哪个 Partition。
  • 可以认为 Message 就像数据库表中的一行记录。 Partition(分区):
  • Partition 是 Kafka 提供数据冗余和扩展性的方法。每个 Topic 可以被分为多个 Partition,每个 Partition 是一个有序的、不可变的消息序列。
  • Partition 允许 Kafka 在多个服务器上存储、处理和复制数据,从而提供了数据冗余和高可用性。每个 Partition 会在 Kafka 集群中的多台机器上进行复制。
  • 当生产者发送消息到 Kafka 时,它可以根据某种策略(通常是消息的 key)来决定该消息应写入哪个 Partition。

Topic 和 Partition 的关系:

  • Topic:可以视为一个抽象的数据集或数据流的名称,类似于数据库中的一个表。
  • Partition:实际上是Topic的物理实现。每个Partition都是一个有序的消息日志,存储了该Topic的一部分数据。当我们提及将数据发送到一个Topic时,实际上是在将数据发送到该Topic的一个或多个Partition。
  • 一个 Topic 被切分为多个 Partition 可以使 Kafka 有效地在多台机器上并行处理数据。
  • 当消费者消费数据时,它可以并行从多个 Partition 读取数据,从而实现高吞吐量。 总之,也就是说一个Topic 相当于是一个完整的表,而Partition 就相当于把这个表进行水平分片,每个Partition存储的是这个表的一部分数据而不是完整的数据,而每个Partition不仅存储在一个broker上,而且还会在其他几个broker上复制,分为LeaderReplicas,FollowerReplicas, Partition 提供了 Kafka 的核心能力,如数据冗余、高可用性、扩展性和并行处理能力。而 Topic 为消息分类,并可以由多个 Partition 支持以满足扩展和并行处理的需求。Topic 是 Kafka 中的分类或命名空间,用于组织和管理消息。而消息是 Kafka 中传输的数据单元。生产者发送消息到特定的 Topic,而消费者从 Topic 读取这些消息。
  • Partition:主题可以分成多个分区,每个分区是一个有序的、不可变的消息序列。分区允许Kafka在多个broker上水平扩展。

Partition与Message的关系:

  • 消息在被发送到 Kafka 主题时,会被分配到某一个特定的分区。如何分配通常基于消息的 key,或者轮询策略,或其他自定义的策略。
  • 一旦消息被写入分区,它会被分配一个唯一的序列号,称为 offset。这个 offset 在分区内是连续的,并且随着每个新消息的添加而递增。
  • 因此,可以说分区实际上是消息的容器,而 offset 是在这个容器内定位消息的方法。
  • Broker:Kafka服务实例,存储数据并与客户端交互。
  • Consumer:消费者,从Kafka主题读取消息。
  • Consumer Group:由一个或多个消费者组成的组,共同读取并处理一个主题。每个分区在任何时候都只分配给消费者组中的一个消费者实例。
  • Offset:每条消息在其所属的分区中的位置。
Tags: kafka
Share: X (Twitter) Facebook LinkedIn