100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > kafka(一)基本概念 集群搭建及API使用及框架整合

kafka(一)基本概念 集群搭建及API使用及框架整合

时间:2023-12-16 16:12:54

相关推荐

kafka(一)基本概念 集群搭建及API使用及框架整合

官网:/

1、kafka概念及原理

Apache Kafka® isa distributed streaming platform——分布式的流数据平台

1.1 kafak简介

kafka具备三项关键能力:

①发布、订阅记录流,类似于消息队列或者企业级消息系统。

②以一种容错持久化的方式存储记录流(默认可以保存7天)。

③实时处理加工流数据(kakfa streaming)。

kafka的应用场景:

①构建实时的流数据管道,可靠的在系统和应用之间获取数据。

②构建实时的流数据应用,传输或者加工数据。

kakfa的四大核心API:

①Producer API:允许应用发布记录流给一个或多个topic。

②Consumer API:允许应用订阅一个或者多个topic,并且对topic中新产生的记录进行处理。

③Streams API:允许应用扮演流处理器,消费来自于一个过着多个topic中数据流,并且将处理的结果输出到一个或多个topic中,Streams API可以高效的处理传输数据。

④Connector API:连接外部的存储系统。

1.2 kakfa架构原理

核心概念

Broker:一个kafka服务实例。

Topic:某一个类别的记录。

Partition:对主题进行数据分区,一个主题有多个数据分区构成,分区规则:key.hashcode%partitonNum

Replication:topic中主分区的数据备份,故障恢复。

Lrader: topic的主分区,读写操作默认使用主分区。

Follower:同步主分区中的数据,当主分区不可用时,某个follower会自动升级为主分区。

Offset:标识读写操作的位置,并且读的offset <=写offset。

工作原理

首先Producer产生record发送到指定的topic(topic实质是有多个分区构成,每个分区都会有相应的复制分区),在真正存放到kafka集群时,会根据record的key.hashCode%partitionNum计算,根据计算的结果存放到对应的分区。Leader分区中的数据会自动同步到Follower分区中,Zookeeper会实时监控服务健康信息,一旦发生故障,会立即进行故障转移操作(将一个Follower分区自动升级为Leader分区)。kakfa的一个分区实际上是一个有序的record队列,队列在标识读写操作位置时,会使用Offset作为标识。最后Consumer会订阅一个Kafka topic,一旦topic中有数据产生,consumer会立即拉取最新记录,进行相应的业务处理。

1.3 集群搭建(完全分布式集群)

规模:三个节点的集群

版本:kafka_2.11-2.2.0

准备工作:三个节点、zookeeper正常运行(zookeeper集群在笔者其他文章进行分享)

①将三个节点的tgz包解压缩,解压缩的位置可以自己定义,笔者放在了usr下。

[root@nodex ~]# tar -zxf kafka_2.11-2.2.0.tgz -C /usr

②进入解压的文件夹,编辑config下的server.properties文件

broker.id=0 #唯一数字,其他机器上可以可以设置2、3.....

listenners=PLAINTEXT://node1:9092 #本节点的域名:9092

log.dirs=/usr/..... # 日志存储路径

log.retention.hours=168 #topic数据保留7天

zookeeper.connect==node1:2181,node2:2181,node3:2181 # zookeeper集群地址

至此kafka集群搭建完毕。

启动服务

-daemon #后台运行[root@node2 kafka_2.11-2.2.0]# bin/kafka-server-start.sh -daemon config/server.properties [root@node2 kafka_2.11-2.2.0]# jps1779 Kafka #出现进程,代表启动成功1796 Jps1462 QuorumPeerMain

1.4 kafka指令基本操作

创建topic

[root@node1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --create --topic topic --partitions 3 --replication-factor 3 --bootstrap-server node1:9092,node2:9092,node3:9092

展示topic列表

[root@node1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092

查看topic信息

[root@node1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --describe --topic topic --bootstrap-server node1:9092,node2:9092,node3:9092

删除topic

[root@node1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --delete --topic topic --bootstrap-server node1:9092,node2:9092,node3:9092

修改topic

[root@node1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --alter --topic topic --partitions 2 --bootstrap-server node1:9092,node2:9092,node3:9092

发布消息

[root@node1 kafka_2.11-2.2.0]# bin/kafka-console-producer.sh --topic topic --broker-list node1:9092,node2:9092,node3:9092

订阅消息

[root@node2 kafka_2.11-2.2.0]# bin/kafka-console-consumer.sh --topic topic --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning

1.5 概念详解

Kafka的一个Topic(主题)在集群内部实际是有1到N个Partition(分区)构成,每一个Partition都是一个有序的,持序追加的Record队列,一个Partition是一个structured commit log(结构化的提交日志)。

Record存放在Kafka集群中的,Record会有一个保留周期(默认是168小时),如果超过保留期,不论Record是否消费,Kafka都会丢弃。理论上来说,Kafka容量为维持在一个合理的范围区间之内(不断的产生新数据,不断丢失过期的数据)

kafka中的每一个Record在分区中都一个唯一的offset,offset会随着分区不断写入数据有序递增。并且Consumer会保留一个元数据(offset | position,记录了Consumer消费的位置),Kafka这种机制为我们提供Record重复处理或者跳过不感兴趣Record处理的功能

生产者(Producer)

Kafka Producer(生产者)是用来产生持续的Record,并发布到kafka某一个或者多个Topic中,一个Record是由key,value,timestamp构成,发布Record时的策略:

key = null: 轮询Partitionkey != null: key.hashCode % partitionNum = 存放Record分区的序号手动指定Partition序号: Producer一方在发布Record时手动指定要存放的分区序号

消费者(Consumer)

Kafka Comsumer(消费者)订阅一个或者多个感兴趣的Topic,一旦这些Topic中有新的数据产生,会立即拉取到本地(Consumer)一方,进行相应的业务处理。

Kafka使用Consumer Group组织管理Conumser,Consumer Group特点: 组外广播,组内负载均衡

2、Java API

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.0</version></dependency>

2.1 生产者API

public class KafkaProducerDemo {public static void main(String[] args) {//1. 创建配置对象 指定Producer的信息Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//2. 创建Producer对象KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);//3. 发布消息ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("t1",1,"Hello World");producer.send(record);//4. 提交producer.flush();producer.close();}}

2.2 消费者API

public class KafkaConsumerDemo {public static void main(String[] args) {//1. 创建配置对象 指定Consumer信息Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG, "g1"); // 消费组的ID 同组负载均衡 不同组广播//2. 创建消费者KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(properties);//3. 订阅topicconsumer.subscribe(Arrays.asList("t1"));//4. 拉取topic内新增的数据while (true) {ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(5));// 拉取的超时时间for (ConsumerRecord<Integer, String> record : records) {// 1 HelloWorld xxxx 0System.out.println(record.key() + " | " + record.value() + " | " + record.timestamp() + " | " + record.offset() +" | "+ record.partition());}}}}

2.3 topic操作管理

/*** topic: 创建 删除 展示列表 查看详情 修改(不支持)*/public class TopicManagerDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//1. 创建配置对象 指定Topic管理信息Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092");AdminClient client = AdminClient.create(properties);//2. 创建Topicclient.createTopics(Arrays.asList(new NewTopic("t2",3,(short)3)));//3. 展示Topic列表ListTopicsResult result = client.listTopics();Set<String> topics = result.names().get();// topics.forEach(topic -> System.out.println(topic));for (String topic : topics) {System.out.println(topic);}//4. 查看topic详情DescribeTopicsResult result = client.describeTopics(Arrays.asList("t1"));Map<String, KafkaFuture<TopicDescription>> values = result.values();// values.forEach((k,v) -> System.out.println(k +" | " +v));for (Map.Entry<String,KafkaFuture<TopicDescription>> entry: values.entrySet()) {System.out.println(entry.getKey() + " | "+ entry.getValue().get());}//5. 删除topicclient.deleteTopics(Arrays.asList("t2"));// 关闭连接client.close();}}

3、偏移量控制

Kafka的消费者在消费Record时会通过offset记录消费位置,Kafka会使用一个特殊的topic__consumer_offsets记录消费组的读的offset,__consumer_offsets由50个分区构成,每一个分区包含它本身有3个冗余备份。消费者在消费消息时,会根据所属的消费组的ID,去__consumer_offsets的特殊Topic中查找上一个提交的offset,然后从offset +1的位置继续消费。

Kafka偏移量的提交策略有两种

自动提交(默认):默认情况下kafka consumer在拉取完分区内的数据后,会自动提交读的offset。手动提交:大多数情况自动提交offset可以满足我们的需求,但是在一些特殊情况,我们需要手动提交offset。保证分区内数据都能够得到正确的处理。

public class KafkaConsumerDemo {public static void main(String[] args) {//1. 创建配置对象 指定Consumer信息Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG, "g1"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 关闭kafka consumer的offset自动提交功能//2. 创建消费者KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(properties);//3. 订阅主题consumer.subscribe(Arrays.asList("t3"));//4. 拉取主题内新增的数据while (true) {ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(5));// 拉取的超时时间for (ConsumerRecord<Integer, String> record : records) {// 业务处理// ....}// 手动提交offset偏移量mitAsync();}}}

4、指定消费分区和消费位置

4.1 订阅Topic

subscribe订阅一个或者多个主题,一旦主题中有新的数据产生,consumer会收到topic内所有分区内的消息

consumer.subscribe(Arrays.asList("t1"));

4.2 指定消费Topic特定的分区

指定消费的topic的分区序号,一旦分区序号内有新的数据产生,consumer会收到特定分区的新产生的记录

// 分配方法: 只消费t1主题0号分区内的消息consumer.assign(Arrays.asList(new TopicPartition("t1",0)));

4.3 手动控制消费offset

手动控制消费的offset,可以重置offset重复处理已经处理过的消息,设定offset跳过不感兴趣的消息

// 手动指定消费的offset:从t1主题的0号分区offset=0的位置开始消费消息consumer.seek(new TopicPartition("t1",0),0);

NOTE:

​ 如果手动控制消费位置,offset提交策略就没有作用了,因为每次都是从指定的offfset位置开始向后消费消息

5、自定义传输对象发送与接收

<dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version></dependency>

自定义对象

// 一定要实现序列化 iopublic class User implements Serializable {private String name;private Boolean sex;private Double salary;//...}

创建序列化器

/*** User ---> byte[]*/public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic byte[] serialize(String topic, User user) {byte[] bytes = SerializationUtils.serialize((Serializable) user);return bytes;}@Overridepublic void close() {}}

创建反序列化器

public class UserDeserializer implements Deserializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic User deserialize(String topic, byte[] bytes) {User user = (User) SerializationUtils.deserialize(bytes);return user;}@Overridepublic void close() {}}

生产者

public class UserProducerDemo {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);KafkaProducer<Integer, User> producer = new KafkaProducer<Integer, User>(properties);producer.send(new ProducerRecord<Integer, User>("t4", 1, new User("zs", true, 100D)));producer.flush();producer.close();}}

消费者

public class UserConsumerDemo {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,UserDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 关闭offset自动提交KafkaConsumer<Integer, User> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Arrays.asList("t4"));while(true){ConsumerRecords<Integer, User> records = consumer.poll(Duration.ofSeconds(5));for (ConsumerRecord<Integer, User> record : records) {System.out.println(record.key() +" " + record.value().getName() + " "+ record.offset());}}}}

6、kafka与springBoot整合

# =====================生产者配置信息==========================spring.kafka.producer.bootstrap-servers=node1:9092,node2:9092,node3:9092spring.kafka.producer.key-serializer=org.mon.serialization.IntegerSerializerspring.kafka.producer.value-serializer=org.mon.serialization.StringSerializer# =====================消费者配置信息==========================spring.kafka.consumer.bootstrap-servers=node1:9092,node2:9092,node3:9092spring.kafka.consumer.key-deserializer=org.mon.serialization.IntegerDeserializerspring.kafka.consumer.value-deserializer=org.mon.serialization.StringDeserializerspring.kafka.consumer.group-id=g1

生产者

@Component // 当前类自动注册为spring容器中的beanpublic class ProducerDemo {@Autowiredpublic KafkaTemplate<Integer, String> kafkaTemplate;private AtomicInteger atomicInteger = new AtomicInteger();// 每隔5秒发送一条数据@Scheduled(cron = "0/5 * * * * ?")public void send() {int num = atomicInteger.getAndIncrement();ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("t4", num, "Hello World: " + num);// 函数式编程future.addCallback((t) -> {System.out.println("发送成功:" + t);}, (e) -> {System.out.println("发送失败:" + e.getMessage());});}}

消费者

@Componentpublic class ConsumerDemo {@KafkaListener(topics = "t4") //订阅指定主题public void receive(ConsumerRecord<Integer,String> record){System.out.println(record.key() +" | " + record.value() + " | " +record.offset());}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。