海邊的 Kafka 與 Python Part 3 - Partition

Posted on  Oct 15, 2018  in  Python 程式設計 - 高階  by  Amo Chen  ‐ 5 min read

海邊的 Kafka 與 Python - Part 2 中,已經介紹 producer 與 consumer 的運作,並且透過其運作進一步了解群組(Group)與 partition 相關的概念。

本文將介紹更多關於 Consumer 的使用方法,以及介紹 Log Compaction

本文環境

  • Docker version 18.06.1-ce
  • Python 3.6.5
  • kafka-python 1.4.3
  • macOS High Sierra 10.13.6

安裝 kafka-python 指令:

$ pip install kafka-python==1.4.3

p.s. 如尚未建立 Kafka 執行環境,請參考 海邊的 Kafka 與 Python - Part 1

如何切分 Partition ?

經過 2 篇文章的介紹,我們知道 Kafka 的 topic 能夠再細分成不同的 partition

那麼要如何將 topic 切成數個 partition 呢?

首先,一樣透過以下指令建立 1 container 並連到 Kafka 所在的 Docker Network 中:

$ docker run --network cf425740b888 -it kafka-docker_kafka bash

接著使用以下指令將 topic 切成 2 個 partition:

bash-4.4# $ kafka-topics.sh --alter --zookeeper zookeeper:2181 --partitions 2 --topic my-test-topic

此處必須做個提醒, partition 的數量只能向上增加,因此ㄧ旦向上增加,就不能再回頭了。所以如果再將 partition 改回 1 ,就會出現以下訊息:

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased. Topic my-test-topic currently has 2 partitions, 1 would not be an increase.
[2018-10-20 03:33:02,868] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic my-test-topic currently has 2 partitions, 1 would not be an increase.
 (kafka.admin.TopicCommand$)

上述訊息提示非常明確: The number of partitions for a topic can only be increased.

另外也必須注意,增加有指定 key 的 topic 的 partition 的數量時,也會影響訊息(message)次序或者分派訊息到 partition 的規則。

如果想知道某個 topic 的詳細資訊(例如 partition 的數量),可以利用以下指令:

bash-4.4# kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic my-test-topic
Topic:my-test-topic	PartitionCount:2	ReplicationFactor:1	Configs:
	Topic: my-test-topic	Partition: 0	Leader: 1001	Replicas: 1001	Isr: 1001
	Topic: my-test-topic	Partition: 1	Leader: 1001	Replicas: 1001	Isr: 1001

上述指令可以看到 my-test-topic 現在有 2 個 partitions ,也就是 PartitionCount:2 的部分,以及 Leader 是哪一號的 Kafka 伺服器,由於 ReplicationFactor:1 也只有 1 個 replica ,所以都是同一個 Leader ,同一個 replica 。

Isr"in-sync" replicas 的縮寫,代表目前正在同步的 replicas ,由於 Leader 本身也在 Isr 之中,所以會出現在 Isr 也是正常的。

Consumer 如何指定 Partition ?

學會切分 partition 之後, Consumer 也就可以指定要消化特定的 partition 的訊息,其範例程式如下:

# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(enable_auto_commit=False)
consumer.assign([
    TopicPartition('my-test-topic', 0),
])

for message in consumer:
    print(message)

上述範例程式利用 TopicPartition('my-test-topic', 0) 放到 1 個 list 中,並透過 assign 方法,令該 consumer 訂閱 my-test-topic 的 0 號 partition 。之所以會是 list 是因為 1 個 consumer 可同時訂閱多個 topic 與 partition 。

如何指定 Offset ?

除了 partition 之外, consumer 也可以自由指定要從哪個 offset 開始取得訊息:

# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
from kafka.structs import OffsetAndMetadata
from kafka.structs import TopicPartition


consumer = KafkaConsumer(enable_auto_commit=False)
partition_0 = TopicPartition('my-test-topic', 0)
offset = 3
consumer.assign([partition_0, ])
consumer.seek(partition_0, offset)

for message in consumer:
    print(message)

上述範例即是利用 seek 方法指定從特定 partition 的特定 offset 開始讀取 message 。

如何 Commit ?

本文目前為止的範例程式都將 enable_auto_commit 設定為 False ,所以每次執行 consumer 都會從最早的訊息開始讀取,因為 Kafka 並不知道要從哪個 offset 開始將訊息派送給 consumer 。

為了讓 Kafka 知道 consumer 現在完成了幾則訊息,我們可以把 enable_auto_commit 改為 True 或者自行在程式中加上 commit 的部分,告訴 Kafka 現在 consumer 讀取訊息的情況:

# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
from kafka.structs import OffsetAndMetadata
from kafka.structs import TopicPartition

consumer = KafkaConsumer(enable_auto_commit=False, group_id='group_a')
partition = TopicPartition('my-test-topic', 0)
consumer.assign([partition, ])

for message in consumer:
    print(message)
    options = {
        partition: OffsetAndMetadata(message.offset + 1, b'it is my commit'),
    }
    consumer.commit(options)
    print(consumer.committed(partition))

上述範例即是使用 OffsetAndMetadata 先產生一個 offset 指向下一則訊息,如此一來, consumer 下一次就會從下一則訊息開始讀取。

接著使用 commit 方法將 offset commit 到 Kakfa 中,最後列印 consumer.committed(partition) 查看該 partition 最後 commit 的 offset 是多少。

此處也有需要注意的地方,如果要使用 commit 功能,就得指定 consumer 的 group_id ,可以看到官方文件說明 group_id 如果不指定的情況下 offset commits 會被停用:

group_id (str or None) – The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None

所以沒指定 group_id 的情況下使用 commit 就會出現以下錯誤:

Traceback (most recent call last):
  File "5-commit-offset-consumer.py", line 13, in <module>
    consumer.commit(options)
  File "/.../lib/python3.6/site-packages/kafka/consumer/group.py", line 504, in commit
    assert self.config['group_id'] is not None, 'Requires group_id'
AssertionError: Requires group_id

方便的 value_serializervalue_deserializer

先前談到 Kafka producer 發佈與 consumer 訂閱的訊息 binary 資料,所以在發佈或者訂閱訊息時,都須將資料轉成 binary 。

不過 kafka-python 的 KafkaProducer 與 KafkaConsumer 都有分別提供方便的 value_serializervalue_deserializer ,可以代入自訂的 serializerdeserializer 幫忙進行資料轉換。

接著示範如何用 JSON 格式進行發佈與訂閱。

Producer 範例:

# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError


def value_serializer(m):
    return json.dumps(m).encode()


producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=value_serializer,
)
producer.send('my-test-topic', {'Hello': 'World'})

上述可以看到 value_serializer 將 dictionary 轉成 JSON 字串之後,用 encode() 轉成 binary 字串,所以 producer.send('my-test-topic', {'Hello': 'World'}) 變成可以接受 dictionary 型態的資料。

Consumer 範例:

# -*- coding: utf-8 -*-
import json
from kafka import KafkaConsumer
from kafka.structs import OffsetAndMetadata
from kafka.structs import TopicPartition


def value_deserializer(m):
    try:
        return json.loads(m.decode())
    except Exception:
        return {}


consumer = KafkaConsumer(
    'my-test-topic',
    enable_auto_commit=False,
    value_deserializer=value_deserializer,
    group_id='group_a',
)

for message in consumer:
    print(message.value)

上述可以看到 value_deserializer 將用 decode() 將 binary 訊息轉成字串之後,再用 json.loads 將字串轉回 dictionary ,因此最後 message.value 會是 dictionary 型態。

像 JSON 字串之類的訊息格式非常適合用 value_serializervalue_deserializer 進行資料轉換,可以多加利用。

Log Compaction

本文的最後來聊聊 Log Compaction 。

先前曾提過 Kafka 的訊息有保存期限,預設為 24 小時,訊息保存超過 24 小時就會被刪除(可以在 Kafka 官網搜尋 log.cleanup.policy ,設定預設為 delete )。

而 Log Compaction 可以想像成另 1 種刪除資料的規則,其運作方式是具有相同 key 的訊息,在經過特定時間後,就會自動壓實(compact),只保留最後一定比例的資料。

譬如追蹤使用者最新的 GPS 位置,這種情況下,我們不需要每一筆 GPS 位置都紀錄,只需要保存最新一筆,就相當適合使用 Log Compaction 。

Kafka 預設的 cleanup 行為是刪除(delete),所以要預設 cleanup 行為為 Log Compaction 就得變更 Kafka 的設定,將設定變為log.cleanup.policy=compact ,可使用以下指令更改:

bash-4.4# kafka-configs.sh --alter --entity-type=brokers --entity-name=0 --add-config log.cleanup.policy=compact --zookeeper=zookeeper:2181

p.s. ---entity-name=0 的 0 其實代表 broker id ,會映對不起來是因為 Kafka 從版本 0.9.0.0 之後 broker id 會從 1000 + 1 開始(該設定為 reserved.broker.max.id ),所以 ---entity-name=0 是因為需要減掉 1001 的結果。

設定完 log.cleanup.policy=compact 之後,還有幾個相關設定:

  • log.cleaner.min.compaction.lag.ms ,也就是至少需要經過多久時間才能對訊息進行 compact ,如果是實驗需要,可以改成比較短的時間,例如 20 秒:

    bash-4.4# kafka-configs.sh --alter --entity-type=brokers --entity-name=0 --add-config log.cleaner.min.compaction.lag.ms=20000 --zookeeper=zookeeper:2181
    
  • segment.ms 則是多久時間後強制刪除或者 compact log ,預設為 7 天 Kafka 才會進行刪除或 compact log 。注意:如果 Poduction 環境設定太短,將會對 Kafka 效能造成影響。

  • min.cleanable.dirty.ratio 則是保持多少比例的 log 不會被 compact ,預設為 0.5 ,代表 log compaction 至少會保留 50% 的 log 不會被 compact 。如果設定為 0.01 則代表 1% 的 log 不會被 compact 。

了解上述設定之後,就可以自己新增 1 個啟用 Log Compaction 功能的 topic 使用:

bash-4.4# kafka-topics.sh --zookeeper zookeeper:2181 --create --topic my-log-compaction-test-topic --config "cleanup.policy=compact" --config "delete.retention.ms=100"  --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0.01" --config "log.cleaner.min.compaction.lag.ms=20000"--partitions 1 --replication-factor 1

新增完之後,就可以用 producer 發送大量訊息到 Kafka ,接著用 consumer 每次從最舊的訊息開始讀取,就會發現訊息數量經過 20 秒後逐漸變少,這樣就體驗到 Log Compaction 囉!

總結

透過此 3 篇文章的介紹,相信都能夠對 Kafka 的運作有基礎的認識,相信要上手再也不是難事!

Happy Coding!

References

https://kafka.apache.org/documentation/

https://lombardo-chcg.github.io/tools/zh-tw/kafka-log-compaction.html

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!