海邊的 Kafka 與 Python Part 1 - 發佈(publish)與訂閱(subscribe)

Posted on  Oct 9, 2018  in  Python 程式設計 - 高階  by  Amo Chen  ‐ 5 min read

Apache Kafka 是知名的分散式串流資料平台(distributed streaming platform),具備高效能、高吞吐量、可容錯(fault-tolerant)設計等特性,所以 Kafka 通常被應用在即時(real-time)串流資料的處理,例如應用程式產生的日誌(log)、使用者的活動紀錄(例如電子商務網站可能會紀錄使用者瀏覽哪些商品)等等。

而目前 MicroSoft, airbnb, NETFLIX, LinkedIn, LINE 等知名公司也是 Apache Kafka 的使用者,也因為有這些公司的背書,所以不太需要擔心 Apache Kafka 的成熟度及可靠度等問題。

不過單看 Apache Kafka 官網的介紹會有些難以理解其切確的功能及用途,所以本文將藉由撰寫 Python 應用程式實際認識 Kafka 的幾個主要面向與功能。

本文環境

  • Docker version 18.06.1-ce
  • Python 3.6.5
  • kafka-python 1.4.3
  • macOS High Sierra 10.13.6

安裝 kafka-python 指令:

$ pip install kafka-python==1.4.3

Apache Kafka

Apache Kafka 一開始是由 LinkedIn 所開發用來收集提交日誌(commit log)的訊息佇列系統(messaging queue system),讓應用程式(Application)可以訂閱(subscribe)與發佈(publish)資料,例如 App 可以透過伺服器所提供的 API 發佈使用者所在位置到 Kafka ,而所有訂閱的子系統或應用程式都能夠同時接收到這些資料並即時進行分析與處理。

而 LinkedIn 於 2011 年將 Kafka 開放原始碼之後,後人在其基礎之上開發,使得 Kafka 從原本的訊息佇列系統迅速演化成現在的分散式串流資料平台。

所以 Kafka 不僅適合原先的 Messaging 功能,另外在 Website Activity Tracking, Metrics, Log Aggregation, Stream Processing 等用途也相當合適,不外乎靠的是其高效能、高吞吐量與高可靠能力。

接著談談 Kafka 幾個核心觀念。

  1. Kafka 是以叢集(cluster)的形式運作 1 至多個 Kafka 伺服器,因此可彈性擴充叢集。
  2. Kafka 會將收到的串流資料稱為 record ,而 record 會儲存在 topic 之中,可以把 topic 想像成類別的概念,不同的串流資料可以發佈至不同的 topic 分門別類。
  3. 每 1 個 record 由 key, value, timestamp 3 個值所組成。

Kafka 有 4 個核心 APIs:

  1. Producer API

Producer API 讓應用程式(application)能夠發佈(publish)串流資料(record)到 1 至多個 Kafka topics 。

  1. Consumer API

有 Producer 就有相對應的 Consumer, Consumer API 讓應用程式訂閱(subscribe) 1 至多個 Kafka topics 。

  1. Streams API

Streams API 則同時具備 Producer API 與 Consumer API 兩者能力,可以訂閱 1 至多個 Kafka topics 並且處理之後輸出到 1 至多個 Kafka topics ,也就是具備雙向處理的能力。

  1. Connector API

Connector API 則讓人能夠打造介於 Kafka 與其他資料系統(例如 RDBMS, NoSQL)之間的 consumer 或 producer 。

上述 Kafka 官方文件 所提供的圖,就將各個 API 所扮演的角色與串流資料的流向陳述的相當清楚。

用 Docker 安裝 Kafka

除了官方文件所提供的 Kafka 安裝方法 之外,如果只是想玩看看 Kafka 的話,可以利用 Docker 執行 Kafka 會方便許多。

本文利用 wurstmeister/kafka-docker 建立 Kafka 環境。

因為 wurstmeister/kafka 利用 docker-compose 建立 Kafka 的執行環境,所以我們首先建立 1 個資料夾存放其 repository:

$ mkdr kafka; cd kafka
$ git clone https://github.com/wurstmeister/kafka-docker
$ cd kafka-docker

接著以編輯器打開 docker-compose.yml ,其內容為:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    build: .
    ports:
      - "9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

由於 KAFKA_ADVERTISED_HOST_NAME 的值需要修改為 docker host 的 IP 位址(因為 Kafka 將會以 docker host 的 IP 對外提供服務),因為我們只在 localhost 進行,所以將 192.168.99.100 改為 1277.0.0.1 ,並且將 - "9092" 改為 - "9092:9092" 讓我們可以用 python 程式連到 Kafka 伺服器。

如果順利的話,應該可以在 docker ps 指令的結果中看到 Zookeeper 與 Kafka 正在執行:

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
8c40a1241b7c        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   5 hours ago         Up 5 hours          22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafka-docker_zookeeper_1
c5d0bddbcabf        kafka-docker_kafka       "start-kafka.sh"         5 hours ago         Up 5 hours          0.0.0.0:9092->9092/tcp                               kafka-docker_kafka_1

由於 Kafka 透過 Zookeeper 進行管理,因此才會有 2 個 containers 同時執行, 1 個執行 Zookeeper 另 1 個執行 Kafka 。

新增 Topic

由於所有的串流資料都會被發佈到 topic 之中,所以學習 Kafka 的第 1 步就是新增一個 topic ,後續就可以試著發佈與訂閱 topic

新增 topic 之前,得先查出執行 Kafka 與 Zookeeper 的容器網路,因為我們會透過被放在 docker image 內的 Kafka 指令新增 topic ,此時必須在同一容器網路內才能正常執行:

$ docker network ls
NETWORK ID          NAME                   DRIVER              SCOPE
39ee1f630a5e        bridge                 bridge              local
928fbe4c6768        host                   host                local
cf425740b888        kafka-docker_default   bridge              local
0802fe8d3f10        kafka_default          bridge              local
$
$ docker network inspect cf425740b888
[
    {
        "Name": "kafka-docker_default",
        "Id": "cf425740b8886444e472866f660732528dc0548726de4794fb5392cfb3f78a77",
        "Created": "2018-10-10T03:17:53.5458651Z",
        "Scope": "local",
        "Driver": "bridge",
        "EnableIPv6": false,
        "IPAM": {
            "Driver": "default",
            "Options": null,
            "Config": [
                {
                    "Subnet": "172.22.0.0/16",
                    "Gateway": "172.22.0.1"
                }
            ]
        },
        "Internal": false,
        "Attachable": false,
        "Ingress": false,
        "ConfigFrom": {
            "Network": ""
        },
        "ConfigOnly": false,
        "Containers": {
            "8c40a1241b7c00ed13508a4202662461be11b21a4fca3862ea6ef36ed44a345a": {
                "Name": "kafka-docker_zookeeper_1",
                "EndpointID": "550f91d9d5e60bf99dc82dc86c9ce0c185e270822336762bf0e8a3118b8e0361",
                "MacAddress": "02:42:ac:16:00:02",
                "IPv4Address": "172.22.0.2/16",
                "IPv6Address": ""
            },
            "c5d0bddbcabf1931daba09f3bc464483c20510299171694ae5bf5d28754a567c": {
                "Name": "kafka-docker_kafka_1",
                "EndpointID": "e2b1716713216d03982e33250d52aa60b2eb189dcb7e1ee566c528c95dbaa602",
                "MacAddress": "02:42:ac:16:00:03",
                "IPv4Address": "172.22.0.3/16",
                "IPv6Address": ""
            }
        },
        "Options": {},
        "Labels": {}
    }
]

上述指令查出 Zookeeper 與 Kafka 的網路 ID 為 cf425740b888 ,接著在 docker run 時指定 --network cf425740b888 參數,就能夠將新的 docker 容器加到同一網路之中:

$ docker run --network cf425740b888 -it kafka-docker_kafka bash
bash-4.4# cd /opt/kafka/bin/  # 所有 Kafka 指令在此

接著即可使用 container 內的 kafka-topics.sh 指令新增 1 個 topic - my-test-topic :

bash-4.4# kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic my-test-topic

在此解說幾個參數的意思:

--zookeeper zookeeper:2181 承前所述, Kafka 的管理需要透過 Zookeeper ,所以指定 zookeeper:2181 。不需要指明 IP 或 Domain 的原因是 docker 會自動將 docker-compose.yml 中的設定進行映對,所以 zookeeper 會自動映對到 docker-compose.yml 中的 zookeeper

--replication-factor 1 Kafka 具備 replication 的功能,因此可以將資料複製到多個 Kafka 伺服器,如果主要伺服器(也就是 Kafka 文件中的 Leader)無法提供服務,其他作為副本的 Kafka 伺服器會被選拔成為新的 Leader 繼續提供服務。在此最少需要 1 個副本,因此指定為 1 。

--partitions 1 Kafka 每個 topic 中還可以切分為數個 partition ,達到所謂分流的目的,每 1 個 consumer 都可以指派 1 個 partition , 而發布訊息時也可以指定發到特定的 partition 。在此我們只需要 1 個 partition ,因此指定為 1 。

成功的話,可以用 kafka-topics.sh --list 查看剛剛新增的 topic

$ kafka-topics.sh --list --zookeeper zookeeper:2181
my-test-topic

用指令體驗發佈(publish)與訂閱(subscribe)

新增 topic 之後,同樣可以利用內建的指令體驗發佈與訂閱的功能。

首先,先用指令訂閱 my-test-topic:

bash-4.4# kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic my-test-topic

--form-beginning 代表會從 topic 最早的 record 開始處理。

有 consumer 之後,我們在另 1 個 terminal 中試著啟動 1 個 container 扮演 producer 的角色,並試著發布訊息(同樣需要加到 Kafka 的網路之中):

$ docker run --network cf425740b888 -it kafka-docker_kafka bash  # 新的 container
bash-4.4# cd /opt/kafka/bin/  # 所有 Kafka 指令在此
bash-4.4# kafka-console-producer.sh --broker-list kafka:9092 --topic my-test-topic
>Hello, this is a message from producer

上述指令,會發佈一則訊息 Hello, this is a message from producer 到 my-test-topic 。如果成功的話,就會看到執行 consumer 的 container 也印出相同訊息。

此時如果把 consumer 停止執行(Ctrl + c) ,再重新執行一次,就會看到相同的訊息又被列印一次,這就是 --from-beginning 的功用。

小結

至此,我們已初步體驗 Kafka 的發佈與訂閱功能,也稍微暸解 Consumer 與 Producer 的角色。

下一篇文章,我們將實際撰寫 Python 應用程式,並深入了解 topicpartition 等其他概念。

References

https://docs.docker.com/docker-for-mac/networking/

https://github.com/wurstmeister/kafka-docker

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!