kafka 安装 使用

http://kafka.apache.org/downloads

(1) 安装

 使用kafka时需要zookeeper,所以需要安装 kafka 和 zookeeper

(1.1) zookeeper安装

 下载  apache-zookeeper-3.5.5-bin.tar.gz

 解压 tar -zxvf apache-zookeeper-3.5.5-bin.tar.gz

 配置 cp -rf conf/zoo_sample.cfg conf/zoo.cfg

 启动 ./bin/zkServer.sh start

1
2
3
4
5
ZBMAC-C02PGMT0F:apache-zookeeper-3.5.5-bin weikeqin1$ ./bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /Users/weikeqin1/SoftWare/apache-zookeeper-3.5.5-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
1
2
3
4
5
ZBMAC-C02PGMT0F:apache-zookeeper-3.5.5-bin weikeqin1$ ./bin/zkServer.sh stop
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /Users/weikeqin1/SoftWare/apache-zookeeper-3.5.5-bin/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED

(1.2) kafka安装

 下载 kafka_2.11-0.10.1.0.tgz

 解压 tar -zxvf kafka_2.11-0.10.1.0.tgz

 配置 修改 conf/server.properties

1
2
broker.id=1
log.dir=/tmp/kafka-logs/logs-1

 启动 ./bin/kafka-server-start.sh config/server.properties

 验证

(1.3) 使用

 (1.3.1) 创建topic

 ./bin/kafka-topics.sh --create --topic topic_test --replication-factor 1 --partitions 1 --zookeeper localhost:2181

1
2
3
ZBMAC-C02PGMT0F:kafka_2.11-0.10.1.0 weikeqin1$ ./bin/kafka-topics.sh --create --topic topic_test --replication-factor 1 --partitions 1 --zookeeper localhost:2181
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "topic_test".

 (1.3.2) 查看所有 topics

./bin/kafka-topics.sh --list --zookeeper localhost:2181

1
2
3
ZBMAC-C02PGMT0F:kafka_2.11-0.10.1.0 weikeqin1$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
test
topic_test

 (1.3.3) 启动 Producer 生产消息

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_test

 (1.3.4) 启动 Consumer 消费消息

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_test --from-beginning

 (1.3.5) 删除Topic

./bin/kafka-run-class.sh kafka.admin.TopicCommand --delete --topic topic_test --zookeeper localhost:2181

1
2
3
ZBMAC-C02PGMT0F:kafka_2.11-0.10.1.0 weikeqin1$ ./bin/kafka-run-class.sh kafka.admin.TopicCommand --delete --topic topic_test --zookeeper localhost:2181
Topic topic_test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

 (1.3.6) 查看 Topic 的offset

./bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic topic_test --group consumer

 (1.3.7) 查看描述 topics 信息

./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_test

1
2
3
ZBMAC-C02PGMT0F:kafka_2.11-0.10.1.0 weikeqin1$ ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_test
Topic:topic_test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: topic_test Partition: 0 Leader: 1 Replicas: 1 Isr: 1

第一行给出了所有分区的摘要,每个附加行给出了关于一个分区的信息。 由于我们只有一个分区,所以只有一行。

“Leader”: 是负责给定分区的所有读取和写入的节点。 每个节点将成为分区随机选择部分的领导者。

“Replicas”: 是复制此分区日志的节点列表,无论它们是否是领导者,或者即使他们当前处于活动状态。

“Isr”: 是一组“同步”副本。这是复制品列表的子集,当前活着并被引导到领导者。

(2) 原理介绍

(2.1) 消息队列(Message Queue)

Message Queue消息传送系统提供传送服务。消息传送依赖于大量支持组件,这些组件负责处理连接服务、消息的路由和传送、持久性、安全性以及日志记录。消息服务器可以使用一个或多个代理实例。

JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。

(2.2) 消息队列分类

消息队列分类:点对点和发布/订阅两种:

1、点对点:

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。

消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

2、发布/订阅:

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

(2.3) 消息队列对比

1、RabbitMQ:支持的协议多,非常重量级消息队列,对路由(Routing),负载均衡(Loadbalance)或者数据持久化都有很好的支持。

2、ZeroMQ:号称最快的消息队列系统,尤其针对大吞吐量的需求场景,擅长的高级/复杂的队列,但是技术也复杂,并且只提供非持久性的队列。

3、ActiveMQ:Apache下的一个子项,类似ZeroMQ,能够以代理人和点对点的技术实现队列。

4、Redis:是一个key-Value的NOSql数据库,但也支持MQ功能,数据量较小,性能优于RabbitMQ,数据超过10K就慢的无法忍受。

(2.4) Kafka简介

Kafka是分布式发布-订阅消息系统,它最初由 LinkedIn 公司开发,使用 Scala语言编写,之后成为 Apache 项目的一部分。在Kafka集群中,没有“中心主节点”的概念,集群中所有的服务器都是对等的,因此,可以在不做任何配置的更改的情况下实现服务器的的添加与删除,同样的消息的生产者和消费者也能够做到随意重启和机器的上下线。

(2.5) Kafka术语介绍

1、消息生产者:即:Producer,是消息的产生的源头,负责生成消息并发送到Kafka服务器上。

2、消息消费者:即:Consumer,是消息的使用方,负责消费Kafka服务器上的消息。

3、主题:即:Topic,由用户定义并配置在Kafka服务器,用于建立生产者和消息者之间的订阅关系:生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。

4、消息分区:即:Partition,一个Topic下面会分为很多分区,例如:“kafka-test”这个Topic下可以分为6个分区,分别由两台服务器提供,那么通常可以配置为让每台服务器提供3个分区,假如服务器ID分别为0、1,则所有的分区为0-0、0-1、0-2和1-0、1-1、1-2。Topic物理上的分组,一个 topic可以分为多个 partition,每个 partition 是一个有序的队列。partition中的每条消息都会被分配一个有序的 id(offset)。

5、Broker:即Kafka的服务器,用户存储消息,Kafa集群中的一台或多台服务器统称为 broker。

6、消费者分组:Group,用于归组同类消费者,在Kafka中,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。

7、Offset:消息存储在Kafka的Broker上,消费者拉取消息数据的过程中需要知道消息在文件中的偏移量,这个偏移量就是所谓的Offset。



(3) 使用 (Java代码)

kafka 使用 0.10.1.0,由于java python 等都要用,选一个各个语言支持比较好的版本,所以选 0.10.1.0

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>

(3.1) 生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


/**
* 生产者
*
* <pre>
* https://blog.csdn.net/m0_37739193/article/details/78396773
* </pre>
*
* @author: weikeqin.cn@gmail.com
* @date: 2019-03-26
**/
public class kafkaProducerExample {

/**
* 此配置是 Producer 在确认一个请求发送完成之前需要收到的反馈信息的数量。
* acks=0 如果设置为0,则 producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries也不会生效(因为客户端无法获得失败信息)。每个记录返回的 offset 总是被设置为-1。
* acks=1 如果设置为1,leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。在这种情况下,如果 leader 节点在接收记录之后,并且在 follower 节点复制数据完成之前产生错误,则这条记录会丢失。
* acks=all 如果设置为all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。这种方式是对请求传递的最有效保证。acks=-1与acks=all是等效的。
*/
private final static String ACKS = "all";

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String topic = "test";

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>(topic, Integer.toString(i), Integer.toString(i)));
// producer.send(new ProducerRecord<>("test", "Hello" + i));
}
producer.close();
}

}

(3.2)消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

//import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
* 消费者
*
* @author: weikeqin.cn@gmail.com
* @date: 2019-03-26
**/
//@Slf4j
public class KafkaConsumerExample {

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
String topic = "test";

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
//log.info("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
System.out.println(record.offset() + " " + record.key() + " " + record.value())
}
}
}

}

(4)管理 Kafka-manager

Kafka教程(三)Kafka-manager安装

https://github.com/yahoo/kafka-manager

References

[1] Kafka教程(一)Kafka入门教程
[2] Kafka安装及快速入门
[3] Kafka安装配置
[4] Kafka安装教程(详细过程)
[5] Kafka使用Java进行Producer和Consumer编程
[6] Kafka教程(二)Kafka集群环境安装
[7] Kafka教程(三)Kafka-manager安装
[8] kafka学习非常详细的经典教程