Kafka需求

刘超 11天前 ⋅ 1167 阅读   编辑

一、获取kafka中一个特定topic下所有partition以及每个partition当前最小的offset(即该partition当前的earliest offset)

  1、代码方式

consumer.seekToBeginning() //可以接受分区也可以接受主题

consumer.position()

具体使用见这里

  2、命令行

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka01:9092--time -2 --topic topic_1

参数:

  --time:-1 获取最新offset,-2获取最老

二、如何将文件写入kafka producer

cat adx_1.topic_adx_bid_win.20200517.23.1589760003004 | /web/kafka/bin/kafka-console-producer.sh --broker-list 10.7.6.25:9092 --topic topic_adx_bid_win

/web/kafka/bin/kafka-console-producer.sh --broker-list 10.7.6.25:9092 --topic topic_adx_bid_win < adx_1.topic_adx_bid_win.20200517.23.1589760003004

三、怎么实现回溯

重置位移,以前是修改是修改元数据库中的offset

四、根据主题查看各个分区的offset

zhao@zhao:~/liujichao$ /home/zhao/software/kafka_2.11-0.10.2.1/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list server-1:9092 --time -1 --topic topic_adx_up_request
topic_adx_up_request:2:10364013
topic_adx_up_request:5:10364001
topic_adx_up_request:4:10364033
topic_adx_up_request:1:10364016
topic_adx_up_request:3:10364025
topic_adx_up_request:0:10364025

五、打印kafka-run-class.sh加载那些jar

zhao@zhao:~/liujichao$  /home/zhao/software/kafka_2.11-0.10.2.1/bin/kafka-run-class.sh -verbose:class kafka.tools.GetOffsetShell --broker-list server-1:9092 --time -1 --topic topic_adx_dm_dsp
[Opened /web/java/jre/lib/rt.jar]
[Loaded java.lang.Object from /web/java/jre/lib/rt.jar]
[Loaded java.io.Serializable from /web/java/jre/lib/rt.jar]
[Loaded java.lang.Comparable from /web/java/jre/lib/rt.jar]
[Loaded java.lang.CharSequence from /web/java/jre/lib/rt.jar]
[Loaded java.lang.String from /web/java/jre/lib/rt.jar]


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: