海邊的 Kafka 與 Python Part 3 - Partition
Posted on Oct 15, 2018 in Python 程式設計 - 高階 by Amo Chen ‐ 5 min read
海邊的 Kafka 與 Python - Part 2 中,已經介紹 producer 與 consumer 的運作,並且透過其運作進一步了解群組(Group)與 partition 相關的概念。
本文將介紹更多關於 Consumer 的使用方法,以及介紹 Log Compaction 。
本文環境
- Docker version 18.06.1-ce
- Python 3.6.5
- kafka-python 1.4.3
- macOS High Sierra 10.13.6
安裝 kafka-python 指令:
$ pip install kafka-python==1.4.3
p.s. 如尚未建立 Kafka 執行環境,請參考 海邊的 Kafka 與 Python - Part 1
如何切分 Partition ?
經過 2 篇文章的介紹,我們知道 Kafka 的 topic
能夠再細分成不同的 partition
。
那麼要如何將 topic
切成數個 partition
呢?
首先,一樣透過以下指令建立 1 container 並連到 Kafka 所在的 Docker Network 中:
$ docker run --network cf425740b888 -it kafka-docker_kafka bash
接著使用以下指令將 topic
切成 2 個 partition:
bash-4.4# $ kafka-topics.sh --alter --zookeeper zookeeper:2181 --partitions 2 --topic my-test-topic
此處必須做個提醒, partition 的數量只能向上增加,因此ㄧ旦向上增加,就不能再回頭了。所以如果再將 partition 改回 1 ,就會出現以下訊息:
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased. Topic my-test-topic currently has 2 partitions, 1 would not be an increase.
[2018-10-20 03:33:02,868] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic my-test-topic currently has 2 partitions, 1 would not be an increase.
(kafka.admin.TopicCommand$)
上述訊息提示非常明確: The number of partitions for a topic can only be increased.
另外也必須注意,增加有指定 key 的 topic 的 partition 的數量時,也會影響訊息(message)次序或者分派訊息到 partition 的規則。
如果想知道某個 topic 的詳細資訊(例如 partition 的數量),可以利用以下指令:
bash-4.4# kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic my-test-topic
Topic:my-test-topic PartitionCount:2 ReplicationFactor:1 Configs:
Topic: my-test-topic Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: my-test-topic Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
上述指令可以看到 my-test-topic 現在有 2 個 partitions ,也就是 PartitionCount:2
的部分,以及 Leader 是哪一號的 Kafka 伺服器,由於 ReplicationFactor:1
也只有 1 個 replica ,所以都是同一個 Leader ,同一個 replica 。
而 Isr
是 "in-sync" replicas
的縮寫,代表目前正在同步的 replicas ,由於 Leader 本身也在 Isr 之中,所以會出現在 Isr 也是正常的。
Consumer 如何指定 Partition ?
學會切分 partition 之後, Consumer 也就可以指定要消化特定的 partition 的訊息,其範例程式如下:
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(enable_auto_commit=False)
consumer.assign([
TopicPartition('my-test-topic', 0),
])
for message in consumer:
print(message)
上述範例程式利用 TopicPartition('my-test-topic', 0)
放到 1 個 list 中,並透過 assign
方法,令該 consumer 訂閱 my-test-topic 的 0 號 partition 。之所以會是 list 是因為 1 個 consumer 可同時訂閱多個 topic 與 partition 。
如何指定 Offset ?
除了 partition 之外, consumer 也可以自由指定要從哪個 offset 開始取得訊息:
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
from kafka.structs import OffsetAndMetadata
from kafka.structs import TopicPartition
consumer = KafkaConsumer(enable_auto_commit=False)
partition_0 = TopicPartition('my-test-topic', 0)
offset = 3
consumer.assign([partition_0, ])
consumer.seek(partition_0, offset)
for message in consumer:
print(message)
上述範例即是利用 seek
方法指定從特定 partition 的特定 offset 開始讀取 message 。
如何 Commit ?
本文目前為止的範例程式都將 enable_auto_commit
設定為 False
,所以每次執行 consumer 都會從最早的訊息開始讀取,因為 Kafka 並不知道要從哪個 offset 開始將訊息派送給 consumer 。
為了讓 Kafka 知道 consumer 現在完成了幾則訊息,我們可以把 enable_auto_commit
改為 True
或者自行在程式中加上 commit 的部分,告訴 Kafka 現在 consumer 讀取訊息的情況:
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
from kafka.structs import OffsetAndMetadata
from kafka.structs import TopicPartition
consumer = KafkaConsumer(enable_auto_commit=False, group_id='group_a')
partition = TopicPartition('my-test-topic', 0)
consumer.assign([partition, ])
for message in consumer:
print(message)
options = {
partition: OffsetAndMetadata(message.offset + 1, b'it is my commit'),
}
consumer.commit(options)
print(consumer.committed(partition))
上述範例即是使用 OffsetAndMetadata
先產生一個 offset 指向下一則訊息,如此一來, consumer 下一次就會從下一則訊息開始讀取。
接著使用 commit
方法將 offset commit 到 Kakfa 中,最後列印 consumer.committed(partition)
查看該 partition 最後 commit 的 offset 是多少。
此處也有需要注意的地方,如果要使用 commit 功能,就得指定 consumer 的 group_id
,可以看到官方文件說明 group_id
如果不指定的情況下 offset commits 會被停用:
group_id
(str or None) – The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None
所以沒指定 group_id
的情況下使用 commit 就會出現以下錯誤:
Traceback (most recent call last):
File "5-commit-offset-consumer.py", line 13, in <module>
consumer.commit(options)
File "/.../lib/python3.6/site-packages/kafka/consumer/group.py", line 504, in commit
assert self.config['group_id'] is not None, 'Requires group_id'
AssertionError: Requires group_id
方便的 value_serializer
與 value_deserializer
先前談到 Kafka producer 發佈與 consumer 訂閱的訊息 binary 資料,所以在發佈或者訂閱訊息時,都須將資料轉成 binary 。
不過 kafka-python 的 KafkaProducer 與 KafkaConsumer 都有分別提供方便的 value_serializer
與 value_deserializer
,可以代入自訂的 serializer
與 deserializer
幫忙進行資料轉換。
接著示範如何用 JSON 格式進行發佈與訂閱。
Producer 範例:
# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
def value_serializer(m):
return json.dumps(m).encode()
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=value_serializer,
)
producer.send('my-test-topic', {'Hello': 'World'})
上述可以看到 value_serializer
將 dictionary 轉成 JSON 字串之後,用 encode()
轉成 binary 字串,所以 producer.send('my-test-topic', {'Hello': 'World'})
變成可以接受 dictionary 型態的資料。
Consumer 範例:
# -*- coding: utf-8 -*-
import json
from kafka import KafkaConsumer
from kafka.structs import OffsetAndMetadata
from kafka.structs import TopicPartition
def value_deserializer(m):
try:
return json.loads(m.decode())
except Exception:
return {}
consumer = KafkaConsumer(
'my-test-topic',
enable_auto_commit=False,
value_deserializer=value_deserializer,
group_id='group_a',
)
for message in consumer:
print(message.value)
上述可以看到 value_deserializer
將用 decode()
將 binary 訊息轉成字串之後,再用 json.loads
將字串轉回 dictionary ,因此最後 message.value
會是 dictionary 型態。
像 JSON 字串之類的訊息格式非常適合用 value_serializer
與 value_deserializer
進行資料轉換,可以多加利用。
Log Compaction
本文的最後來聊聊 Log Compaction 。
先前曾提過 Kafka 的訊息有保存期限,預設為 24 小時,訊息保存超過 24 小時就會被刪除(可以在 Kafka 官網搜尋 log.cleanup.policy
,設定預設為 delete
)。
而 Log Compaction 可以想像成另 1 種刪除資料的規則,其運作方式是具有相同 key 的訊息,在經過特定時間後,就會自動壓實(compact),只保留最後一定比例的資料。
譬如追蹤使用者最新的 GPS 位置,這種情況下,我們不需要每一筆 GPS 位置都紀錄,只需要保存最新一筆,就相當適合使用 Log Compaction 。
Kafka 預設的 cleanup 行為是刪除(delete),所以要預設 cleanup 行為為 Log Compaction 就得變更 Kafka 的設定,將設定變為log.cleanup.policy=compact
,可使用以下指令更改:
bash-4.4# kafka-configs.sh --alter --entity-type=brokers --entity-name=0 --add-config log.cleanup.policy=compact --zookeeper=zookeeper:2181
p.s. ---entity-name=0
的 0 其實代表 broker id ,會映對不起來是因為 Kafka 從版本 0.9.0.0 之後 broker id 會從 1000 + 1 開始(該設定為 reserved.broker.max.id
),所以 ---entity-name=0
是因為需要減掉 1001 的結果。
設定完 log.cleanup.policy=compact
之後,還有幾個相關設定:
log.cleaner.min.compaction.lag.ms
,也就是至少需要經過多久時間才能對訊息進行 compact ,如果是實驗需要,可以改成比較短的時間,例如 20 秒:bash-4.4# kafka-configs.sh --alter --entity-type=brokers --entity-name=0 --add-config log.cleaner.min.compaction.lag.ms=20000 --zookeeper=zookeeper:2181
segment.ms
則是多久時間後強制刪除或者 compact log ,預設為 7 天 Kafka 才會進行刪除或 compact log 。注意:如果 Poduction 環境設定太短,將會對 Kafka 效能造成影響。min.cleanable.dirty.ratio
則是保持多少比例的 log 不會被 compact ,預設為 0.5 ,代表 log compaction 至少會保留 50% 的 log 不會被 compact 。如果設定為 0.01 則代表 1% 的 log 不會被 compact 。
了解上述設定之後,就可以自己新增 1 個啟用 Log Compaction 功能的 topic
使用:
bash-4.4# kafka-topics.sh --zookeeper zookeeper:2181 --create --topic my-log-compaction-test-topic --config "cleanup.policy=compact" --config "delete.retention.ms=100" --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0.01" --config "log.cleaner.min.compaction.lag.ms=20000"--partitions 1 --replication-factor 1
新增完之後,就可以用 producer 發送大量訊息到 Kafka ,接著用 consumer 每次從最舊的訊息開始讀取,就會發現訊息數量經過 20 秒後逐漸變少,這樣就體驗到 Log Compaction 囉!
總結
透過此 3 篇文章的介紹,相信都能夠對 Kafka 的運作有基礎的認識,相信要上手再也不是難事!
Happy Coding!
- 海邊的 Kafka 與 Python Part 1 - 發佈(publish)與訂閱(subscribe)
- 海邊的 Kafka 與 Python Part 2 - Producer & Consumer
- 海邊的 Kafka 與 Python Part 3 - 海邊的 Kafka 與 Python Part 3 - Partition
References
https://kafka.apache.org/documentation/
https://lombardo-chcg.github.io/tools/zh-tw/kafka-log-compaction.html