Kafka
| 组件名称 | 相关说明 |
|---|---|
| Topic(主题) | 特定类型的消息类型为主题,数据存储在主题中,主题被分拆成分区 |
| Partition(分区) | 主题数据分割为一个或多个分区,每个分区数据使用多个 segment 文件存储,分区中的数据是有序的 |
| Offset(偏移量) | 每个分区消息具有唯一序列标识 |
| Replica(副本) | 副本只是一个分区副本,用于防止数据丢失 |
| Producer(生成者) | 数据的发布者,该角色将消息发布到集群的主题中 |
| Consumer(消费者) | 消费者可以从 Broker 中读取数据,消费者可以消费多个主题 数据 |
| Broker(消息代理) | Kafka 集群包含一个或多个服务器,每个 Kafka 服务节点成为 Broker,Broker 接受到消息后,将消息追加到 segment 文件中。 |
| Leader(领导者) | 负责分区所有读写操作,每个分区都有一个服务器充当 Leader |
| Follower(追随者) | 跟随 Leader 指令信息,如果 Leader 发生故障则会从 Follower 选举一个 Leader。 |
| Consumer Group(消费组) | 实现一个主题的广播和单播 |
Kafka 集群部署
- 解压
1 | tar -zxvf kafka_2.12-2.8.0.tgz -C /export/servers/ |
修改配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44## roker 全局唯一编号 0 以上整数 不能重复
=0
# 用来监听链接端口,producer 或 consumer 将在此端口建立连接
port=9092
# 处理网络请求的线程数
=3
# 用来处理磁盘 I/O 的线程数
=8
# 发送套接字的缓冲区大小
=102400
# 接受套接字的缓冲区大小
=104857600
# 请求套接字的缓冲区大小
=102400
## 运行日志存放路径,可以指定多个目录,以逗号分割
=/export/data/kafka/
# topic 在当前 broker 上的分片个数
=2
# 用来恢复和清理 data 下数据的线程数
=2
# 用来恢复和清理 data 下数据的线程数量
=1
# segment 文件保留的最长时间
=1
# 滚动生成新的 segment 文件的最大时间
=1
# 日志文件中每个 segment 的大小 默认 1GB
=1073741824
# 周期性检查文件大小的时间
=300000
# 日志清理是否打开
=true
## broker 需要使用 zookeeper 保存 meta数据
=hadoop01:2181,hadoop02:2181,hadoop03:2181
# 连接超时时间
=18000
# partion buffer 中,消息的条数达到阀值,将触发 flush 到磁盘
=10000
# 消息缓存的时间,达到阀值将触发 flush 操作
=1000
## 删除 Topic,是否允许删除 Topic
=true
## 设置本机 IP,若设置错误,会抛出错误:Producer connection to xxx:9092 unsuccessful
=hadoop01添加环境变量
1
2export KAFKA_HOME=/export/servers/kafka_2.12-2.8.0
export PATH=$PATH:$KAFKA_HOME/bin分发文件
1 | scp -r kafka_2.12-2.8.0/ hadoop02:/export/servers/ |
启动 zookeeper 的前提下 启动 Kafka
1
2
3
4kafka-server-start.sh kafka_2.12-2.8.0/config/server.properties
# 注意:关闭终端 Kafka 服务就会停止
# 打开一个新终端,使用 jps 查看 Kafka 是否允许成功
Kafka 消费者生产者实例
- 控制台实例
1 | # 创建一个名为 msgatopic 主题 |
Java API
API 类型 说明 Producer API 构建应用程序发送数据流到Kafka API 集群中的主题 Consumer API 从 kafka 集群中主题读取数据流 Strems API 流处理程序库,能够处理流式数据 Connect API 实现连接器,用于 Kafka 和其他系统扩展、可靠的流式传输工具 AdminClient API 构建集群管理工具,查看 Kafka 集群组件信息
生产者客户端 Producer API 提供了 KafkaProducer 类,该类实例化对象用来代表一个生产者进程,生产者发送消息时会先在客户端中把消息存入队列,然后由发送线程从队列中消费消息,并以批量的方式发送消息给服务端。
KafkaProducer API
| 方法名称 | 相关说明 |
|---|---|
| abortTransaction() | 终止正在进行的事物 |
| close() | 关闭生产者 |
| 方法名称 | 相关说明 |
|---|---|
| abortTransaction() | 终止正在进行的事物 |
| close() | 关闭生产者 |
| flush() | 立即发送缓冲记录 |
| partitionsFor(String topic) | 获取给定主题分区元数据 |
| send(ProducerRecord<K,V> record) | 异步发送记录到主题 |
消费者客户端 Consumer API 提供了 KafkaConsumer, Kafka 支持多个生产者和多个消费者,生产者可以将消息发布到集群中不同的几点的不同分区上,消费者也可以消费集群中多个节点的多个分区上的消息。
KafkaConsumer API
| 方法名称 | 相关说明 |
|---|---|
| subscribe(Collection |
订阅指定主题列表以获取动态分区 |
| close() | 关闭消费者 |
| wakeup() | 唤醒消费者 |
| metrics() | 获取消费者保留的指标 |
| listTopics | 获取有关用户有权查看的所有主题的分区元数据 |
创建工程,添加依赖
1
2
3
4
5
6<!-- kafka 依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>Kafka Java API
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36package com.msga.spark.demo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerTest {
public static void main(String[] args) {
Properties prop = new Properties();
// 指定 Kafka 集群IP地址和端口号
prop.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
// acks,消息确认机制 等待所有副本节点的应答
prop.put("acks", "all");
// 消息发送最大尝试次数, 设置为 0 意味着请求失败不会出现发送,可以避免消息重复发送
prop.put("retries", 0);
// 指定一批消息处理大小,生产者维护着每个分区的未发送数据内存缓冲区,越大吞吐量也就越大,内存消耗越大
prop.put("batch.size", 16384);
// 指定请求延时,缓冲区未填满,会增加 1ms 延迟,等待更多数据进入缓冲区增加内存利用率,
// 默认缓冲区由未使用空间也可以立即发送该缓冲区
prop.put("linger.ms", 1);
// 指定缓冲区内存大小
prop.put("buffer.memory", 33554432);
// 设置 key 序列化, 网络传输需要序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 生产数据
KafkaProducer producer = new KafkaProducer<String,String>(prop);
for (int i = 0; i < 50; i++) {
producer.send(new ProducerRecord<String,String>("msgatopic", Integer.toString(i), "hello world-"+i));
}
producer.close();
}
}消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42package com.msga.spark.demo.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
// 指定消费组id 同一时刻同一个消费组只有一个线程
// 可以去消费一个分区消息,不同消费组可以去消费同一个分区的消息
props.put("group.id", "msgatopic");
// 自动提交偏移量
props.put("enable.auto.commit", "true");
// 1s 提交时间间隔
props.put("auto.commit.interval.ms", "1000");
// 序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 订阅信息,可以订阅多个
consumer.subscribe(Collections.singletonList("msgatopic"));
// 获取消息
while(true){
// 每隔 100ms 就拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic =%s, offset =%d, key =%s, value =%s%n",
record.topic(), record.offset(), record.key(), record.value());
}
}
}
}Kafka Streams
使用 Kafka 集群中的数据进行流式运算,Kafka 内置了一个流式处理框架 Kafka Streams,开发者可以以 Kafka 为核心构建流失计算系统。流式处理,具有低延迟性、高扩展性、高弹性、高容错性的特点,易于集成到现成的应用中。Kafka 流处理框架同样是将“ 输入主题 -> 自定义 -> 处理器 -> 输出主题”。抽象成一个 DAG 拓扑图。
### Kafka Streams 开发单词计数应用
1. 添加依赖
1
2
3
4
5
6
<!-- Kafka Streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
2. 编写代码
LogProcessor 类,并且继承 Streams API 中的 Processor 接口。接口有以下三个方法
1. init(ProcessorContext processorContext): 初始化上下文对象
2. process(Key, Value): 每接收到一条消息是,都会调用该方法处理并更新状态进行存储。
3. close(): 关闭处理器,这里可以做一些资源清理工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.msga.spark.demo.kafka.streams;
public class LogProcessor implements Processor<byte[], byte[], byte[], byte[]> {
private ProcessorContext processorContext;
private KeyValueStore<String, Integer> kvStore;
public void init(ProcessorContext<byte[], byte[]> processorContext) {
this.processorContext = processorContext;
this.processorContext.schedule(
Duration.ofSeconds(1),
PunctuationType.STREAM_TIME,
timestamp -> {
try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
System.out.println("--------" + timestamp + "--------");
while (iter.hasNext()) {
final KeyValue<String, Integer> entry = iter.next();
System.out.println("[" + entry.key + ", " + entry.value + "]");
processorContext.forward(new Record<>(entry.key.getBytes(), entry.key.getBytes(), timestamp));
}
}
});
kvStore = processorContext.getStateStore("Counts");
}
public void process(Record<byte[], byte[]> record) {
final String[] words = record.value().toString().toLowerCase(Locale.getDefault()).split(" ");
for (final String word : words){
final Integer oldValue = kvStore.get(word);
if (oldValue == null){
kvStore.put(word, 1);
} else {
kvStore.put(word, oldValue + 1);
}
}
}
public void close() { }
}
App 类,启动程序,创建 KafkaSreams 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.msga.spark.demo.kafka.streams;
/**
* 参考案例 https://github.com/apache/kafka/blob/2.8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
*/
public class WordCountProcessorDemo {
static class WordCountProcessorSupplier implements ProcessorSupplier<byte[], byte[], byte[], byte[]> {
public Processor<byte[], byte[], byte[], byte[]> get() { return new LogProcessor(); }
}
public static void main(String[] args) {
// 声明来源主题
final String fromTopic = "testStreams1";
// 声明目标主题
final String toTopic = "testStreams2";
// 设置 KafkaSteams 参数信息
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop01:9092,hadoop02:9092,hadoop03:9092");
final Topology builder = new Topology()
.addSource("Source", fromTopic)
.addProcessor("Process", new WordCountProcessorSupplier(), "Source")
.addStateStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("Counts"),
Serdes.String(),
Serdes.Integer()),
"Process")
.addSink("Sink", toTopic, "Process");
// 实例化 KafkaStreams 对象
final KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
// 监听 control-c 关闭流
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (InterruptedException e) {
System.exit(1);
}
System.exit(0);
}
}
3. 执行测试
在 hadoop01 节点创建 testStreams1 和 testStreams2 主题
1
2
3
4
5
6
7
8
9
10
11
12
13
# 创建来源主题
kafka-topics.sh --create \
--topic testStreams1 \
--partitions 1 \
--replication-factor 1 \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
# 创建目标主题
kafka-topics.sh --create \
--topic testStreams2 \
--partitions 1 \
--replication-factor 1 \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
分布在 hadoop01 和 hadoop02 节点启动生产者服务 和消费者服务
1
2
3
kafka-console-producer.sh \
--broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--topic testStreams1
启动消费者
1
2
3
4
kafka-console-consumer.sh \
--from-beginning \
--topic testStreams2 \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092
运行 App 程序类。在生产者服务节点 hadoop01 输入 hello word hello spark hello kafka 语句,返回消费者服务节点 hadoop02 查看执行效果
此处效果可能稍有不同,可以自行修改代码。
控制台输出单词计数结果 {Spark=1, word=1,kafka=1,hello=3}.
- 本文作者: MISAKIGA
- 本文链接: https://misakiga.github.io/2021/06/15/big-data/Kafka/
- 版权声明: 本博客所有文章除特别声明外,均采用 MIT 许可协议。转载请注明出处!
