海邊的 Kafka 與 Python Part 2 - Producer & Consumer

Posted on  Oct 13, 2018  in  Python 程式設計 - 高階  by  Amo Chen  ‐ 5 min read

海邊的 Kafka 與 Python - Part 1 中我們已初識 Kafka 大致樣貌,並且透過其內建指令體驗 Kafka 中 producer 與 consumer 的運作情況。本篇就會透過撰寫 Python 程式,更進一步深入了解 Kafka 的細節。

本文環境

  • Docker version 18.06.1-ce
  • Python 3.6.5
  • kafka-python 1.4.3
  • macOS High Sierra 10.13.6

安裝 kafka-python 指令:

$ pip install kafka-python==1.4.3

p.s. 如尚未建立 Kafka 執行環境,請參考 海邊的 Kafka 與 Python - Part 1

再試 Producer

以下是摘自 kafka-python 使用文件 的 producer Python 範例程式:

# -*- coding: utf-8 -*-
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# asynchronous by default
future = producer.send('my-test-topic', b'raw_bytes')
print(future)

# block for 'synchronous' sends
try:
    record_metadata = future.get(timeout=10)
    print(record_metadata)
except KafkaError:
    # decide what to do if produce request failed...
    pass

# successful result returns assigned partition and offset
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)

# produce keyed messages to enable hashed partitioning
producer.send('my-test-topic', key=b'foo', value=b'bar')

上述範例程式透過 KafkaProducer(bootstrap_servers=['localhost:9092']) 建立一個 producer ,接著使用 producer.send('my-test-topic', b'raw_bytes')topic 送出 b'raw_bytes' binary 資料。

預設使用 producer.send 會使用非同步的方式先回傳 FutureRecordMetadata object 避免阻塞程式,所以此時我們還不知道送出的訊息在 topic 的情況為何,接著我們使用 future.get(timeout=10) 試圖取得該訊息的 metadata ,而 timeout=10 代表超過 10 秒就視為 timeout 。

接著可以看到 print(record_metadata) 印出 record_metadata 是個 RecordMetadata object ,裡面記錄著 topic , partition , offset , timestamp 等資訊。

上一篇文章中已經介紹 topic , partition , timestamp ,在此不多加贅述。

那麼 offset 代表什麼呢?

事實上,每則訊息(record)都還會紀錄其在 topic partition 中的位置,如果是最早的訊息,其 offset 就是 0 ,每新增一個訊息到 partition 其 offset 就會 +1 。

所以 Kafka 官網的圖中,如此介紹 producer, consumer 與 partition 之間如何互動:

Producer 會將訊息放到 partiton 中,每則訊息都會有 offset 代表其位置,而 consumer 讀取訊息時也同樣能夠取得 offset ,圖中畫有 2 個 consumer 分別讀取不同 offset 的訊息,也代表著每個 consumer 可以指定讀取不同 offset 的訊息。

程式的最後以 producer.send('my-test-topic', key=b'foo', value=b'bar') 再次發送訊息,這次參數多了 key=b'foo' ,也就是呼應上一篇文章所提到的每則訊息會有 key , valuetimestampkey 的作用在於 Kafka 在 topic 具有多個 partition 時,會將具有相同 key 的訊息送往相同的 partition ,如果不指定 key ,那麼 Kafka 就會以 round-robin 的方式將訊息送到不同的 partiton 之中。

上述程式執行之後,會輸出類似以下的結果:

<kafka.producer.future.FutureRecordMetadata object at 0x10ea5da58>
RecordMetadata(topic='my-test-topic', partition=0, topic_partition=TopicPartition(topic='my-test-topic', partition=0), offset=4, timestamp=1539447797151, checksum=None, serialized_key_size=-1, serialized_value_size=9)
my-test-topic
0
4

值得一提的是,訊息的 key 與 value 都需要以 binary 的形式發送。如果不是以 binary 的形式發送的話,就會出現以下的 AssertionError

Traceback (most recent call last):
  File "producer.py", line 25, in <module>
    producer.send('my-test-topic', key='foo', value='bar')
  File "/.../python3.6/site-packages/kafka/producer/kafka.py", line 551, in send
    assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))
AssertionError

批次(batch)發佈訊息

預設的 producer 都會立即發送訊息至 Kafka 伺服器,然而當訊息量大的時候,可以考慮用批次的方式發送,降低 requests 數量,以增加效能。

Kafka producer 可使用 linger.msbatch.size 這 2 項設定控制批次發送。

當設定 batch.size 為 500 時,訊息數量累積到 500 則時,就會批次發送( kafka-python 預設為 16384 則)。

當設定 linger.ms 為 5 時, producer 會等待個 5 毫秒(ms) ,如果這段等待期間有其他訊息也需要發佈,那麼就會在 5 毫秒結束後一起被發佈到 Kafka 伺服器,以減少 requests 的數量。

當同時設定 batch.sizelinger.ms 時,只要 linger.ms 時間內先累積到 batch.size 就會先行批次發送,若時間內未滿 batch.size 就會等到 linger.ms 時間到後發送。

設定 linger.msbatch.size 的範例:

# -*- coding: utf-8 -*-
from kafka import KafkaProducer


# produce json messages
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    linger_ms=1000,
    batch_size=1000,
)


# produce asynchronously
for _ in range(1000):
    producer.send('my-test-topic', b'Hello Kafka')

# block until all async messages are sent
producer.flush()

上述範例同時設定 linger.msbatch.size ,並且在程式最後呼叫 producer.flush() 強迫 producer 把 buffer 內的訊息發送出去。

再試 Consumer

Consumer 的部分,同樣以 kafka-python 所提供的範例進行說明:

# -*- coding: utf-8 -*-
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'my-test-topic',
    group_id='group-a',
    bootstrap_servers=['localhost:9092'],
    enable_auto_commit=False,
)

for message in consumer:
    print(message)
    print('%s:%d:%d: key=%s value=%s' % (
        message.topic,
        message.partition,
        message.offset,
        message.key,
        message.value
    ))

上述可以看到 consumer 相對簡單許多,但仍有一些細節值得詳細說明。

首先是 group_id='group-a' 的部分。事實上, consumer 還可以為自己設定群組(group),而群組會影響 Kafka 的訊息派送方式,根據 Kafka 官網的說明:

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.

1 個群組內可以有多個 consumers 的話,那麼在派送訊息時,群組內只會有 1 個 consumer 會收到(詳細派送方式可以在官網搜尋 partition.assignment.strategy )。如果每個 consumer 都分屬不同群組(也就是每個群組都只有 1 個 consumer),那麼 Kafka 就會採取廣播的方式派送訊息,每個群組都會收到訊息。

理解上述派送方式之後,再看 Kafka 官網所提供的圖片就會比較好理解:

上述圖片敘述每個 partition 內的訊息都會派送到各個群組內的 1 個 consumer 。可以設定 2 個不同的 group_id 並執行,接著試著發佈幾則訊息,就會發現每個群組內的 1 個 consumer 都會收到訊息。如果每個群組內的 consumer 數目都一樣,那麼將會發現同一群組內的 consumer 會輪流收到訊息(也就是 round-robin)。

再來必須談到一個重要概念 — Kafka 只確保 topic 中的 partition 內的訊息順序。

Kafka topic 設計上是有次序的,寫入的順序就是 offset 的次序。當有多個 partition 時, Kafka 只能確保同一個 partition 內的訊息順序,如果你的應用(application)對順序相當要求,那麼為求安全起見,你應該只能使用 1 個 partition ,甚至 1 個 topic 只能有 1 個 consumer 。

Kafka only provides a total order over records within a partition, not between different partitions in a topic. Per-partition ordering combined with the ability to partition data by key is sufficient for most applications. However, if you require a total order over records this can be achieved with a topic that has only one partition, though this will mean only one consumer process per consumer group.

接著談談 enable_auto_commit=False ,什麼是 auto commit 呢(可以在官網搜尋 auto.commit.interval.msauto.commit.enable )?

為了防止 consumer 意外停擺,所以 consumer 預設每 5 秒,就會對 Zookeeper commit 現在 consumer 所消耗到 offset 是幾號,如果從意外恢復之後,就可以直接從最後 commit offset 之後開始重新消耗這期間停擺的工作,而不必從頭開始。

所以 enable_auto_commit=False 的設定代表停止 auto commit ,此時就得由 consumer 自行決定何時該 commit 了,也因為 enable_auto_commit=False ,所以上述範例程式,每次重新啟動都會從最早的訊息開始接收。

p.s. 預設 enable_auto_commitTrue

不過 Kafka 並不會保存所有的訊息,每則訊息都會有過期的時候,一旦過期就會從 topic 中刪去,預設保存的時間為 168 小時,刪除後會再保留 24 小時(可以在官網搜尋 log.retention.hours delete.retention.ms ),所以當保存時間需要拉長時,就得更改設定,避免 consumer 停擺太久而漏掉訊息。

小結

本文透過實際撰寫 Kafka Python 程式進一步了解不少關於 topic , partitiongroup 等概念。下一篇將介紹更多關於 consumer 的進階使用技巧與 Log Compaction

References

https://kafka-python.readthedocs.io/en/master/

https://kafka.apache.org/

FOLLOW US

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!