數據源管理 | Kafka集群環境搭建,消息存儲機制詳解

本文源碼:GitHub·點這裏 || GitEE·點這裏

一、Kafka集群環境

1、環境版本

版本:kafka2.11,zookeeper3.4

注意:這裏zookeeper3.4也是基於集群模式部署。

2、解壓重命名

tar -zxvf kafka_2.11-0.11.0.0.tgz
mv kafka_2.11-0.11.0.0 kafka2.11

創建日誌目錄

[root@en-master kafka2.11]# mkdir logs

注意:以上操作需要同步到集群下其他服務上。

3、添加環境變量

vim /etc/profile
export KAFKA_HOME=/opt/kafka2.11
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile

4、修改核心配置

[root@en-master /opt/kafka2.11/config]# vim server.properties
-- 核心修改如下
# 唯一編號
broker.id=0
# 開啟topic刪除
delete.topic.enable=true
# 日誌地址
log.dirs=/opt/kafka2.11/logs
# zk集群
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

注意:broker.id安裝集群服務個數編排即可,集群下不能重複。

5、啟動kafka集群

# 啟動命令
[root@node02 kafka2.11]# bin/kafka-server-start.sh -daemon config/server.properties
# 停止命令
[root@node02 kafka2.11]# bin/kafka-server-stop.sh
# 進程查看
[root@node02 kafka2.11]# jps

注意:這裏默認啟動了zookeeper集群服務,並且集群下的kafka分別啟動。

6、基礎管理命令

創建topic

bin/kafka-topics.sh --zookeeper zk01:2181 \
--create --replication-factor 3 --partitions 1 --topic one-topic

參數說明:

  • replication-factor 定義副本個數
  • partitions 定義分區個數
  • topic:定義topic名稱

查看topic列表

bin/kafka-topics.sh --zookeeper zk01:2181 --list

修改topic分區

bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic one-topic --partitions 5

查看topic

bin/kafka-topics.sh --zookeeper zk01:2181 \
--describe --topic one-topic

發送消息

bin/kafka-console-producer.sh \
--broker-list 192.168.72.133:9092 --topic one-topic

消費消息

bin/kafka-console-consumer.sh \
--bootstrap-server 192.168.72.133:9092 --from-beginning --topic one-topic

刪除topic

bin/kafka-topics.sh --zookeeper zk01:2181 \
--delete --topic first

7、Zk集群用處

Kafka集群中有一個broker會被選舉為Controller,Controller依賴Zookeeper環境,管理集群broker的上下線,所有topic的分區副本分配和leader選舉等工作。

二、消息攔截案例

1、攔截器簡介

Kafka中間件的Producer攔截器主要用於實現消息發送的自定義控制邏輯。用戶可以在消息發送前以及回調邏輯執行前有機會對消息做一些自定義,比如消息修改等,發送狀態監控等,用戶可以指定多個攔截器按順序執行攔截。

核心方法

  • configure:獲取配置信息和初始化數據時調用;
  • onSend:消息被序列化以及和計算分區前調用該方法,可以對消息做操作;
  • onAcknowledgement:消息發送到Broker之後,或發送過程失敗時調用;
  • close:關閉攔截器調用,執行一些資源清理工作;

注意:這裏說的攔截器是針對消息發送流程。

2、自定義攔截

定義方式:實現ProducerInterceptor接口即可。

攔截器一:在onSend方法中,對攔截的消息進行修改。

@Component
public class SendStartInterceptor implements ProducerInterceptor<String, String> {

    private final Logger LOGGER = LoggerFactory.getLogger("SendStartInterceptor");
    @Override
    public void configure(Map<String, ?> configs) {
        LOGGER.info("configs...");
    }
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // 修改消息內容
        return new ProducerRecord<>(record.topic(), record.partition(),
                                    record.timestamp(), record.key(),
                              "onSend:{" + record.value()+"}");
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        LOGGER.info("onAcknowledgement...");
    }
    @Override
    public void close() {
        LOGGER.info("SendStart close...");
    }
}

攔截器二:在onAcknowledgement方法中,判斷消息是否發送成功。

@Component
public class SendOverInterceptor implements ProducerInterceptor<String, String> {

    private final Logger LOGGER = LoggerFactory.getLogger("SendOverInterceptor");
    @Override
    public void configure(Map<String, ?> configs) {
        LOGGER.info("configs...");
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        LOGGER.info("record...{}", record.value());
        return record ;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception != null){
            LOGGER.info("Send Fail...exe-msg",exception.getMessage());
        }
        LOGGER.info("Send success...");
    }

    @Override
    public void close() {
        LOGGER.info("SendOver close...");
    }
}

加載攔截器:基於一個KafkaProducer配置Bean,加入攔截器。

@Configuration
public class KafkaConfig {

    @Bean
    public Producer producer (){
        Properties props = new Properties();
        // 省略其他配置...
        // 添加攔截器
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.kafka.cluster.interceptor.SendStartInterceptor");
        interceptors.add("com.kafka.cluster.interceptor.SendOverInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
        return new KafkaProducer<>(props) ;
    }
}

3、代碼案例

@RestController
public class SendMsgWeb {
    @Resource
    private KafkaProducer<String,String> producer ;
    @GetMapping("/sendMsg")
    public String sendMsg (){
        producer.send(new ProducerRecord<>("one-topic", "msgKey", "msgValue"));
        return "success" ;
    }
}

基於上述自定義Bean類型,進行消息發送,關注攔截器中打印日誌信息。

三、Kafka存儲分析

說明:該過程基於上述案例producer.send方法追蹤的源碼執行流程,源碼中的過程相對清楚,涉及的核心流程如下。

1、消息生成過程

Producer發送消息採用的是異步發送的方式,消息發送過程如下:

  • Producer發送消息之後,經過攔截器,序列化,事務判斷;
  • 流程執行后,消息內容放入容器中;
  • 容器在指定時間內如果裝滿(size),會喚醒Sender線程;
  • 容器如果在指定時間內沒有裝滿,也會執行一次Sender線程喚醒;
  • 喚醒Sender線程之後,把容器數據拉取到topic中;

絮叨一句:讀這些中間件的源碼,不僅能開闊思維,也會讓自己意識到平時寫的代碼可能真的叫搬磚。

2、存儲機制

Kafka中消息是以topic進行標識分類,生產者面向topic生產消息,topic分區(partition)是物理上的存儲,基於消息日誌文件的方式。

  • 每個partition對應於一個log文件,發送的消息不斷追加到該log文件末端;
  • log文件中存儲的就是producer生產的消息數據,採用分片和索引機制;
  • partition分為多個segment。每個segment對應兩個(.index)和(.log)文件;
  • index文件類型存儲的索引信息;
  • log文件存儲消息的數據;
  • 索引文件中的元數據指向對應數據文件中message的物理偏移地址;
  • 消費者組中的每個消費者,都會實時記錄消費的消息offset位置;
  • 當然消息消費出錯時,恢復是從上次的記錄位置繼續消費;

3、事務控制機制

Kafka支持消息的事務控制

Producer事務

跨分區跨會話的事務原理,引入全局唯一的TransactionID,並將Producer獲得的PID和TransactionID綁定。Producer重啟后可以通過正在進行的TransactionID獲得原來的PID。
Kafka基於TransactionCoordinator組件管理Transaction,Producer通過和TransactionCoordinator交互獲得TransactionID對應的任務狀態。TransactionCoordinator將事務狀態寫入Kafka的內部Topic,即使整個服務重啟,進行中的事務狀態可以得到恢復。

Consumer事務

Consumer消息消費,事務的保證強度很低,無法保證消息被精確消費,因為同一事務的消息可能會出現重啟后已經被刪除的情況。

四、源代碼地址

GitHub·地址
https://github.com/cicadasmile/data-manage-parent
GitEE·地址
https://gitee.com/cicadasmile/data-manage-parent

推薦關聯閱讀:數據源管理系列

序號 標題
01 數據源管理:主從庫動態路由,AOP模式讀寫分離
02 數據源管理:基於JDBC模式,適配和管理動態數據源
03 數據源管理:動態權限校驗,表結構和數據遷移流程
04 數據源管理:關係型分庫分表,列式庫分佈式計算
05 數據源管理:PostGreSQL環境整合,JSON類型應用
06 數據源管理:基於DataX組件,同步數據和源碼分析
07 數據源管理:OLAP查詢引擎,ClickHouse集群化管理

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案