Kafka基本原理,Kakfa消息投递语义

来源:http://www.smjxgs.com 作者:王中王鉄算盘 人气:68 发布时间:2019-08-08
摘要:Kakfa音信投递语义,kakfa投递语义 简介 一.新闻投递保证分类 音讯的投递保险珍爱是三种样式: 1.At most once—Messages may be lost but are never redelivered. 最多一次 --- 音信只怕吐弃,但绝不会重

Kakfa音信投递语义,kakfa投递语义

简介

一.新闻投递保证分类

音讯的投递保险珍爱是三种样式:

1.At most once—Messages may be lost but are never redelivered.

最多一次 --- 音信只怕吐弃,但绝不会重发。

2.At least once—Messages are never lost but may be redelivered.

起码贰回 --- 音讯绝不会错失,但有极大大概再次发送。

3.Exactly once—this is what people actually want, each message is delivered once and only once.

正好三次 --- 那是大家的确想要的,各样音信传递一回且仅二回。

那三种音讯投递保证包蕴两上边的开始和结果:生产者发送消息和顾客开支音讯。

Message Delivery Semantics

  • At most once —— Messages may be lost but are never redelivered(音信大概有失但不会再一次投递)
  • At least once —— Messages are never lost but may be redelivered(音信不会屏弃但只怕再一次投递)
  • Exactly once —— this is what people actually want, each message is delivered once and only once(新闻只投递三回)

众多种类都宣示提供"exactly once"投递,可是留神翻阅很关键,大多数这种阐明都是误导(他们尚无设想生产者和买主只怕破产的动静,以及四个顾客进程同一时间管理的气象,还应该有写到磁盘上的多寡可能有失的图景)。

卡夫卡的音信投递语义是直接的。公布新闻的时候大家有一个定义叫新闻被提交到日志。一旦被宣布的新闻被交付到日志,只要有三个那一个音讯所在分区的broker活着音信就不会抛弃。从0.11.0.0版本之后,卡夫卡生产者支持幂等投递选项,以担保纵然音信被再一次发送,日志中也不会有再一次的条规。为了贯彻这点,broker给没用个生成者内定八个ID而且每条消息内定三个连串号。

实际不是具有的情况都亟需那样强的担保。

于今,让我们站在花费者的角度来看那么些语义。花费者用日志调控它的岗位。假如买主绝非崩溃它仅仅只是在内部存款和储蓄器中积存这些职位,但借使买主战败了,大家想要另三个经过来接管这么些分区,那么那一个新的进程供给接纳三个适合的岗位上马拍卖。让我们来看一下顾客读取音信后甩卖音信和翻新地点的两种选项。

 

The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction).

 

当从核心消费并生育到另一个核心的时候,大家得以用职业生产者。花费者的职位作为消息被寄存在topic中,以至于大家能够在接受管理数量的分外事情元帅offset写到kafka。借使事情被搁浅,花费者的岗位会东山复起成旧值况且生产的数目对任何成本者不可知,这取决隔开分离品级。暗许的隔离等第是"read_uncommitted"表示新闻对负有顾客可知,尽管稍微新闻来源于被中断的政工。

 

Apache Kafka是布满式公布-订阅音信系统。它最初由LinkedIn公司花费,之后成为Apache项目标一有个别。卡夫卡是一种高效、可扩展的、设计内在正是分布式的,分区的和可复制的提交日志服务。

二.劳动者新闻投递保障分类

1.最多三次

客户端只发送叁回,不关心发送成功与否

2.最少一次

平时会安装多个最大重试次数,在发送失利且未抵达重试次数前,会频仍重试。

当消息中间件保存信息成功但向客户端响应败北的景色下,生产者会将同一条新闻发送数次,导致新闻再度。

3.正好一回

1)须要生产者能够依据音信中的内容生产贰个独一且不重复的ID,借使ID一样则认为信息体同样

2)如果音讯中间件扶助依照某贰天性质进行幂等操作,则一贯运用该天性完成“正好二遍”。纵然新闻中间件具有该功能,也是一时光限定的,在利用时必要切实明白

3)若是新闻中间件不扶助该性子,则无从落到实处该中投递保险。因为对此新闻中间件保存音信成功但向客户端响应失败的图景在劳动者一端是没办法化解的

总结

1、音讯投递语义

  最多二次:大概屏弃新闻但不会重复投递

  最少一回:不会甩掉音讯但可能再次投递

  精确一回:只会投递二回

2、kafka给种种生产者内定三个ID,每一个公布的音信二个队列号,那样的话就算生产者重复发送音讯,在提交日志中也不会有重复记录

3、站在顾客的角度,先保存位置后甩卖音讯就是“最多三次”;先拍卖音信后保存地方正是“最少一遍”;至于“正确一回”,能够运用职业生产者来完结,即在同贰个事情中收到并拍卖音信,将地方(offset)保存到另一个topic中。只要工作成功了,拍手叫好,若事务败北,则地点苏醒。

 

参考

 

Message Delivery Semantics At most once ——Messages may be lost but are never redelivered(新闻恐怕舍弃但不会另行投递)...

Kafka架构

三.顾客音讯投递保证分类

1.最多三遍

先更新花费的offset,而后进行音讯花费,那样信息管理进程中崩溃了(进度或然机器),也不会另行实行开销。

2.起码三遍

先拍卖新闻,管理到位后再立异花费的offset。这样当音信管理进程中崩溃了(进度大概机器),当苏醒后会再一次费用新闻。

3.正好贰遍

平日供给在开销者地点的积累和买主输出的囤积之间引进布满式事务来促成,独有当音讯管理成功后再立异新闻的offset,通过这种措施来担保“正好二遍”。 四个简练的法子是把记录音信花费的offset和信息管理结果使用同样条sql保存在一张表中。

也得以行使“二级开支"的点子来贯彻”正好三次“,本身搭建三个系统和db来储存消息ID,对再一次的音信ID举办过滤,来担保再也的消息唯有一条(

它的架构包罗以下组件:

四.一般性采取

对于非常多系统来说,是不会经受新闻遗失,而恰巧一回的落到实处资金又相比较高,因此在新闻中间件实际的应用中,使用最普及的是“至少贰回”这种投递保证,也是音信中间件的暗中同意选项。

话题(Topic):是一定项指标消息流。音信是字节的立见成效载荷(Payload),话题是消息的分类名或种子(Feed)名。

五.参照他事他说加以考察资料

1.

生产者(Producer):是能够发表音信到话题的其它对象。

劳务代办(Broker):已公布的音讯保存在一组服务器中,它们被称之为代理(Broker)或卡夫卡集群。

消费者(Consumer):能够订阅一个或七个话题,并从Broker拉数码,进而开销那个已发布的音讯。

4887王中王鉄算盘奖结果 1

卡夫卡存款和储蓄计谋

1)kafka以topic来实行新闻管理,每一个topic包蕴四个partition,各样partition对应三个逻辑log,有七个segment组成。

2)各个segment中存款和储蓄多条消息(见下图),音信id由其逻辑地方决定,即从信息id可一直固定到音讯的囤积地点,防止id到岗位的额外映射。

3)每种part在内存中对应二个index,记录各类segment中的第一条新闻偏移。

4)公布者发到有个别topic的音信会被均匀的布满到多少个partition上(或依照用户钦点的路由准则进行分布),broker收到公布新闻往对应partition的最后一个segment上增多该音信,当某些segment上的消息条数达到配置值或消息发布时间抢先阈值时,segment上的音信会被flush到磁盘,唯有flush到磁盘上的新闻订阅者手艺订阅到,segment抵达自然的轻重缓急后将不会再往该segment写多少,broker会创制新的segment。

4887王中王鉄算盘奖结果 2

卡夫卡数据保存策略

1)N天前的删减。

2)保留近年来的有个别Size数据。

Kafka broker

与别的新闻系统分化,卡夫卡broker是无状态的。那意味成本者必须体贴已开销的情景新闻。那么些音信由花费者自身维护,broker完全不管(有offset managerbroker管理)。

  • 从代理删除新闻变得很吃力,因为代理并不知道开销者是还是不是早已应用了该音讯。卡夫卡立异性地减轻了那个主题材料,它将叁个简便的依赖时间的SLA应用于保留攻略。当音信在代理中中国足球球协会超级联赛过一定时期后,将会被机关删除。
  • 这种翻新设计有相当的大的低价,成本者能够故意倒回来老的偏移量再次花费数量。那违背了队列的分布约定,但被注明是累累主顾的基本特征。

以下摘抄自kafka官方文书档案:

Kafka Design

目标

1) 高吞吐量来协理高体积的事件流管理

2) 辅助从离线系统加载数据

3) 低延迟的新闻系统

持久化

1) 注重文件系统,长久化到本地

2) 数据长久化到log

效率

1) 解决”small IO problem“:

    使用”message set“组合消息。

    server使用”chunks of messages“写到log。

    consumer二遍获得大的消息块。

2)解决”byte copying“:

    在producer、broker和consumer之间使用统一的binary message format。

    使用系统的page cache。

    使用sendfile传输log,制止拷贝。

端到端的批量回退(End-to-end Batch Compression)

卡夫卡支持GZIP和Snappy压缩协议。

复制(Replication)

1)七个partition的复制个数(replication factor)富含这几个partition的leader本身。

2)全部对partition的读和写都由此leader。

3)Followers通过pull获取leader上log(message和offset)

4)假诺一个follower挂掉、卡住也许联合太慢,leader会把那几个follower从”in sync replicas“(ISEnclave)列表中删除。

5)当有着的”in sync replicas“的follower把一个音讯写入到温馨的log中时,这些音信才被以为是”committed“的。

6)要是针对某些partition的享有复制节点都挂了,卡夫卡采纳发轫复活的特别节点作为leader(那些节点不自然在ISCRUISER里)。

The Producer

发送确认

通过request.required.acks来设置,采取是或不是等待音讯commit(是不是等待全部的”in sync replicas“都成功复制了数据)

4887王中王鉄算盘奖结果,Producer能够通过acks参数钦点最少要求多少个Replica确认收到该新闻才视为该消息发送成功。acks的暗许值是1,即Leader收到该新闻后立刻告知Producer收到该音讯,此时一经在ISPAJERO中的新闻复制完该新闻前Leader宕机,那该条新闻会吐弃。

推荐介绍的做法是,将acks设置为all或者-1,此时独有ISEnclave中的全数Replica都收到该数量(也即该信息被Commit),Leader才会告诉Producer该新闻发送成功,进而确定保证不会有不敢问津的数目遗失。

负载均衡

1)producer能够自定义发送到哪个partition的路由法规。暗中认可路由准绳:hash(key)%numPartitions,假设key为null则随机挑选叁个partition。

2)自定义路由:假若key是二个user id,可以把同八个user的新闻发送到同多少个partition,那时consumer就足以从同几个partition读取同一个user的音信。

异步批量出殡和埋葬

批量出殡和埋葬:配置非常少于一定音信数目一同发送並且等待时间低于二个定点延迟的数额。

The Consumer

consumer调控音讯的读取。

Push vs Pull

1) producer push data to broker,consumer pull data from broker

2) consumer pull的帮助和益处:consumer本人支配音信的读取速度和数目。

3) consumer pull的弱项:若是broker未有数据,则可能要pull多次忙等待,卡夫卡能够安插consumer long pull一贯等到有数量。

Consumer Position

1) 超越八分之四消息系统由broker记录哪些音信被花费了,但卡夫卡不是。

2) 卡夫卡由consumer调控音讯的开销,consumer以至可以回去四个old offset的岗位再度开支音信。

Consumer group

每二个consumer实例都属于贰个consumer group。

每一条新闻只会被同四个consumer group里的贰个consumer实例花费。

差异consumer group能够相同的时候费用同样条音讯。

Consumer Rebalance

Kafka consumer high level API:

若是某consumer group中consumer数量少于partition数量,则至少有三个consumer会消费两个partition的数据。

一经consumer的多少与partition数量同样,则刚好两个consumer花费三个partition的数量。

假若consumer的数据多于partition的数额时,会有一点consumer不能成本该topic下任何一条新闻。

Message Delivery Semantics

三种:

At most once—Messages may be lost but are never redelivered.

At least once—Messages are never lost but may be redelivered.

Exactly once—this is what people actually want, each message is delivered once and only once.

Producer:有个”acks“配置能够垄断接收的leader的在如何动静下就答复producer音讯写入成功。

Consumer:

* 读取音讯,写log,管理音信。假设管理音讯战败,log已经写入,则无从再一次拍卖战败的音讯,对应”At most once“。

* 读裁撤息,管理新闻,写log。假如音信管理成功,写log退步,则消息会被拍卖五次,对应”At least once“。

* 读取音讯,同不平时间管理音信并把result和log同期写入。那样保障result和log同一时间革新或同一时间战败,对应”Exactly once“。

卡夫卡暗中同意保险at-least-once delivery,容许用户达成at-most-once语义,exactly-once的贯彻取决于目标存款和储蓄系统,kafka提供了读取offset,完毕也不曾难点。

Distribution

Consumer Offset Tracking

1)High-level consumer记录各样partition所费用的maximum offset,并定期commit到offset manager(broker)。

2)Simple consumer需求手动管理offset。以后的Simple consumer Java API只帮忙commit offset到zookeeper。

Consumers and Consumer Groups

1)consumer注册到zookeeper

2)属于同一个group的consumer(group id同样)平均分配partition,每种partition只会被贰个consumer成本。

3)当broker或同叁个group的别的consumer的动静发生变化的时候,consumer rebalance就能产生。

Zookeeper和谐节制

1)管理broker与consumer的动态插足与离开。

2)触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得二个consumer group内的多少个consumer的订阅负载平衡。

3)维护开销关系及各类partition的花费音讯。

日记压缩(Log Compaction)

1)针对二个topic的partition,压缩使得卡夫卡至少知道各种key对应的最终三个值。

2)压缩不会重排序音信。

3)新闻的offset是不会变的。

4)音讯的offset是逐个的。

5)压缩发送和接到能下跌互联网负载。

6)以减小后的款型悠久化到磁盘。

 

 生产者代码示例:

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();

        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        for (long nEvents = 0; nEvents < events; nEvents  ) { 
               long runtime = new Date().getTime();  
               String ip = “192.168.2.”   rnd.nextInt(255); 
               String msg = runtime   “,www.example.com,”   ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}

Partitioning Code:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( stringKey.substring(offset 1)) % a_numPartitions;
        }
       return partition;
  }

}

消费者代码示例:

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerGroupExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;

    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
   }

    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber  ;
        }
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");

        return new ConsumerConfig(props);
    }

    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);

        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {

        }
        example.shutdown();
    }
}

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerTest implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext())
            System.out.println("Thread "   m_threadNumber   ": "   new String(it.next().message()));
        System.out.println("Shutting down Thread: "   m_threadNumber);
    }
}

 

开辟条件搭建:

一些example:

本人管理offset:

 

参考:

卡夫卡深度剖判:

 

 

本文由4887王中王鉄算盘奖结果发布于王中王鉄算盘,转载请注明出处:Kafka基本原理,Kakfa消息投递语义

关键词:

上一篇:概念和基本用法,自己动手实践

下一篇:没有了

最火资讯