海邊的 Kafka 與 Python Part 1 - 發佈(publish)與訂閱(subscribe)
Posted on Oct 9, 2018 in Python 程式設計 - 高階 by Amo Chen ‐ 5 min read
Apache Kafka 是知名的分散式串流資料平台(distributed streaming platform),具備高效能、高吞吐量、可容錯(fault-tolerant)設計等特性,所以 Kafka 通常被應用在即時(real-time)串流資料的處理,例如應用程式產生的日誌(log)、使用者的活動紀錄(例如電子商務網站可能會紀錄使用者瀏覽哪些商品)等等。
而目前 MicroSoft, airbnb, NETFLIX, LinkedIn, LINE 等知名公司也是 Apache Kafka 的使用者,也因為有這些公司的背書,所以不太需要擔心 Apache Kafka 的成熟度及可靠度等問題。
不過單看 Apache Kafka 官網的介紹會有些難以理解其切確的功能及用途,所以本文將藉由撰寫 Python 應用程式實際認識 Kafka 的幾個主要面向與功能。
本文環境
- Docker version 18.06.1-ce
- Python 3.6.5
- kafka-python 1.4.3
- macOS High Sierra 10.13.6
安裝 kafka-python 指令:
$ pip install kafka-python==1.4.3
Apache Kafka
Apache Kafka 一開始是由 LinkedIn 所開發用來收集提交日誌(commit log)的訊息佇列系統(messaging queue system),讓應用程式(Application)可以訂閱(subscribe)與發佈(publish)資料,例如 App 可以透過伺服器所提供的 API 發佈使用者所在位置到 Kafka ,而所有訂閱的子系統或應用程式都能夠同時接收到這些資料並即時進行分析與處理。
而 LinkedIn 於 2011 年將 Kafka 開放原始碼之後,後人在其基礎之上開發,使得 Kafka 從原本的訊息佇列系統迅速演化成現在的分散式串流資料平台。
所以 Kafka 不僅適合原先的 Messaging 功能,另外在 Website Activity Tracking, Metrics, Log Aggregation, Stream Processing 等用途也相當合適,不外乎靠的是其高效能、高吞吐量與高可靠能力。
接著談談 Kafka 幾個核心觀念。
- Kafka 是以叢集(cluster)的形式運作 1 至多個 Kafka 伺服器,因此可彈性擴充叢集。
- Kafka 會將收到的串流資料稱為
record
,而record
會儲存在topic
之中,可以把topic
想像成類別的概念,不同的串流資料可以發佈至不同的topic
分門別類。 - 每 1 個
record
由 key, value, timestamp 3 個值所組成。
Kafka 有 4 個核心 APIs:
- Producer API
Producer API 讓應用程式(application)能夠發佈(publish)串流資料(record)到 1 至多個 Kafka topics 。
- Consumer API
有 Producer 就有相對應的 Consumer, Consumer API 讓應用程式訂閱(subscribe) 1 至多個 Kafka topics 。
- Streams API
Streams API 則同時具備 Producer API 與 Consumer API 兩者能力,可以訂閱 1 至多個 Kafka topics 並且處理之後輸出到 1 至多個 Kafka topics ,也就是具備雙向處理的能力。
- Connector API
Connector API 則讓人能夠打造介於 Kafka 與其他資料系統(例如 RDBMS, NoSQL)之間的 consumer 或 producer 。
上述 Kafka 官方文件 所提供的圖,就將各個 API 所扮演的角色與串流資料的流向陳述的相當清楚。
用 Docker 安裝 Kafka
除了官方文件所提供的 Kafka 安裝方法 之外,如果只是想玩看看 Kafka 的話,可以利用 Docker 執行 Kafka 會方便許多。
本文利用 wurstmeister/kafka-docker 建立 Kafka 環境。
因為 wurstmeister/kafka
利用 docker-compose
建立 Kafka 的執行環境,所以我們首先建立 1 個資料夾存放其 repository:
$ mkdr kafka; cd kafka
$ git clone https://github.com/wurstmeister/kafka-docker
$ cd kafka-docker
接著以編輯器打開 docker-compose.yml
,其內容為:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: .
ports:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
由於 KAFKA_ADVERTISED_HOST_NAME
的值需要修改為 docker host 的 IP 位址(因為 Kafka 將會以 docker host 的 IP 對外提供服務),因為我們只在 localhost 進行,所以將 192.168.99.100
改為 1277.0.0.1
,並且將 - "9092"
改為 - "9092:9092"
讓我們可以用 python 程式連到 Kafka 伺服器。
如果順利的話,應該可以在 docker ps
指令的結果中看到 Zookeeper 與 Kafka 正在執行:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8c40a1241b7c wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 5 hours ago Up 5 hours 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafka-docker_zookeeper_1
c5d0bddbcabf kafka-docker_kafka "start-kafka.sh" 5 hours ago Up 5 hours 0.0.0.0:9092->9092/tcp kafka-docker_kafka_1
由於 Kafka 透過 Zookeeper 進行管理,因此才會有 2 個 containers 同時執行, 1 個執行 Zookeeper 另 1 個執行 Kafka 。
新增 Topic
由於所有的串流資料都會被發佈到 topic
之中,所以學習 Kafka 的第 1 步就是新增一個 topic
,後續就可以試著發佈與訂閱 topic
。
新增 topic
之前,得先查出執行 Kafka 與 Zookeeper 的容器網路,因為我們會透過被放在 docker image 內的 Kafka 指令新增 topic
,此時必須在同一容器網路內才能正常執行:
$ docker network ls
NETWORK ID NAME DRIVER SCOPE
39ee1f630a5e bridge bridge local
928fbe4c6768 host host local
cf425740b888 kafka-docker_default bridge local
0802fe8d3f10 kafka_default bridge local
$
$ docker network inspect cf425740b888
[
{
"Name": "kafka-docker_default",
"Id": "cf425740b8886444e472866f660732528dc0548726de4794fb5392cfb3f78a77",
"Created": "2018-10-10T03:17:53.5458651Z",
"Scope": "local",
"Driver": "bridge",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": null,
"Config": [
{
"Subnet": "172.22.0.0/16",
"Gateway": "172.22.0.1"
}
]
},
"Internal": false,
"Attachable": false,
"Ingress": false,
"ConfigFrom": {
"Network": ""
},
"ConfigOnly": false,
"Containers": {
"8c40a1241b7c00ed13508a4202662461be11b21a4fca3862ea6ef36ed44a345a": {
"Name": "kafka-docker_zookeeper_1",
"EndpointID": "550f91d9d5e60bf99dc82dc86c9ce0c185e270822336762bf0e8a3118b8e0361",
"MacAddress": "02:42:ac:16:00:02",
"IPv4Address": "172.22.0.2/16",
"IPv6Address": ""
},
"c5d0bddbcabf1931daba09f3bc464483c20510299171694ae5bf5d28754a567c": {
"Name": "kafka-docker_kafka_1",
"EndpointID": "e2b1716713216d03982e33250d52aa60b2eb189dcb7e1ee566c528c95dbaa602",
"MacAddress": "02:42:ac:16:00:03",
"IPv4Address": "172.22.0.3/16",
"IPv6Address": ""
}
},
"Options": {},
"Labels": {}
}
]
上述指令查出 Zookeeper 與 Kafka 的網路 ID 為 cf425740b888
,接著在 docker run 時指定 --network cf425740b888
參數,就能夠將新的 docker 容器加到同一網路之中:
$ docker run --network cf425740b888 -it kafka-docker_kafka bash
bash-4.4# cd /opt/kafka/bin/ # 所有 Kafka 指令在此
接著即可使用 container 內的 kafka-topics.sh
指令新增 1 個 topic
- my-test-topic
:
bash-4.4# kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic my-test-topic
在此解說幾個參數的意思:
--zookeeper zookeeper:2181
承前所述, Kafka 的管理需要透過 Zookeeper ,所以指定 zookeeper:2181
。不需要指明 IP 或 Domain 的原因是 docker 會自動將 docker-compose.yml
中的設定進行映對,所以 zookeeper
會自動映對到 docker-compose.yml
中的 zookeeper
。
--replication-factor 1
Kafka 具備 replication 的功能,因此可以將資料複製到多個 Kafka 伺服器,如果主要伺服器(也就是 Kafka 文件中的 Leader)無法提供服務,其他作為副本的 Kafka 伺服器會被選拔成為新的 Leader 繼續提供服務。在此最少需要 1 個副本,因此指定為 1 。
--partitions 1
Kafka 每個 topic
中還可以切分為數個 partition
,達到所謂分流的目的,每 1 個 consumer 都可以指派 1 個 partition
, 而發布訊息時也可以指定發到特定的 partition
。在此我們只需要 1 個 partition
,因此指定為 1 。
成功的話,可以用 kafka-topics.sh --list
查看剛剛新增的 topic
:
$ kafka-topics.sh --list --zookeeper zookeeper:2181
my-test-topic
用指令體驗發佈(publish)與訂閱(subscribe)
新增 topic
之後,同樣可以利用內建的指令體驗發佈與訂閱的功能。
首先,先用指令訂閱 my-test-topic:
bash-4.4# kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic my-test-topic
--form-beginning
代表會從 topic
最早的 record 開始處理。
有 consumer 之後,我們在另 1 個 terminal 中試著啟動 1 個 container 扮演 producer 的角色,並試著發布訊息(同樣需要加到 Kafka 的網路之中):
$ docker run --network cf425740b888 -it kafka-docker_kafka bash # 新的 container
bash-4.4# cd /opt/kafka/bin/ # 所有 Kafka 指令在此
bash-4.4# kafka-console-producer.sh --broker-list kafka:9092 --topic my-test-topic
>Hello, this is a message from producer
上述指令,會發佈一則訊息 Hello, this is a message from producer
到 my-test-topic 。如果成功的話,就會看到執行 consumer 的 container 也印出相同訊息。
此時如果把 consumer 停止執行(Ctrl + c) ,再重新執行一次,就會看到相同的訊息又被列印一次,這就是 --from-beginning
的功用。
小結
至此,我們已初步體驗 Kafka 的發佈與訂閱功能,也稍微暸解 Consumer 與 Producer 的角色。
下一篇文章,我們將實際撰寫 Python 應用程式,並深入了解 topic
與 partition
等其他概念。
- 海邊的 Kafka 與 Python Part 1 - 發佈(publish)與訂閱(subscribe)
- 海邊的 Kafka 與 Python Part 2 - Producer & Consumer
- 海邊的 Kafka 與 Python Part 3 - 海邊的 Kafka 與 Python Part 3 - Partition
References
https://docs.docker.com/docker-for-mac/networking/
https://github.com/wurstmeister/kafka-docker