×

apache kafka age

apache kafka(大数据Kafka中常用Message Queue有哪些区别呢)

admin admin 发表于2023-08-28 08:12:25 浏览39 评论0

抢沙发发表评论

本文目录

大数据Kafka中常用Message Queue有哪些区别呢

常用Message Queue对比3.1 RabbitMQRabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。3.2 RedisRedis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。3.3 ZeroMQZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZeroMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。3.4 ActiveMQActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。3.5 Kafka/JafkaKafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。上图中一个topic配置了3个partition。Partition1有两个offset:0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id恰好相同。如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。

什么时候该用 RabbitMQ 什么时候该用 Apache Kafka

Kafka和RabbitMq一样是通用意图消息代理,他们都是以分布式部署为目的。但是他们对消息语义模型的定义的假设是非常不同的。我对"AMQP 更成熟"这个论点是持怀疑态度的。让我们用事实说话来看看用什么解决方案来解决你的问题。a) 以下场景你比较适合使用Kafka。你有大量的事件(10万以上/秒)、你需要以分区的,顺序的,至少传递成功一次到混杂了在线和打包消费的消费者、你希望能重读消息、你能接受目前是有限的节点级别高可用或则说你并不介意通过论坛/IRC工具得到还在幼儿阶段的软件的支持。b) 以下场景你比较适合使用RabbitMQ。你有较少的事件(2万以上/秒)并且需要通过复杂的路由逻辑去找到消费者、你希望消息传递是可靠的、你并不关心消息传递的顺序、你需要现在就支持集群-节点级别的高可用或则说你需要7*24小时的付费支持(当然也可以通过论坛/IRC工具)。

apache kafka源码怎么编译

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.(Kafka是一个分布式的、可分区的(partitioned)、基于备份的(replicated)和commit-log存储的服务.。它提供了类似于messaging system的特性,但是在设计实现上完全不同)。kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性:  (1)、通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。  (2)、高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。  (3)、支持通过kafka服务器和消费机集群来分区消息。  (4)、支持Hadoop并行数据加载。一、用Kafka里面自带的脚本进行编译  下载好了Kafka源码,里面自带了一个gradlew的脚本,我们可以利用这个编译Kafka源码:1 # wget 二、利用sbt进行编译  我们同样可以用sbt来编译Kafka,步骤如下:01 # git clone https://git-wip-us.apache.org/repos/asf/kafka.git 02 # cd kafka 03 # git checkout -b 0.8 remotes/origin/0.8 04 # ./sbt update 05 org.eclipse.jdt#core;3.1.1!core.jar (2243ms) 06 downloading http://repo1.maven.org/maven2/ant/ant/1.6.5/ant-1.6.5.jar ... 07 ant#ant;1.6.5!ant.jar (1150ms) 08 Done updating. 09 Resolving org.apache.hadoop#hadoop-core;0.20.2 ... 10 Done updating. 11 Resolving com.yammer.metrics#metrics-annotation;2.2.0 ... 12 Done updating. 13 Resolving com.yammer.metrics#metrics-annotation;2.2.0 ... 14 Done updating. 15 Total time: 168 s, completed Jun 18, 2014 6:51:38 PM 16 17 # ./sbt package 18 Set current project to Kafka (in build file:/export1/spark/kafka/) 19 Getting Scala 2.8.0 ... 20 :: retrieving :: org.scala-sbt#boot-scala 21 confs: 22 3 artifacts copied, 0 already retrieved (14544kB/27ms) 23 Total time: 1 s, completed Jun 18, 2014 6:52:37 PM 对于Kafka 0.8及以上版本还需要运行以下的命令:01 # ./sbt assembly-package-dependency 02 Loading project definition from /export1/spark/kafka/project 03 Multiple resolvers having different access mechanism configured with 04 same name ’sbt-plugin-releases’. To avoid conflict, Remove duplicate project 05 resolvers (`resolvers`) or rename publishing resolver (`publishTo`). 06 Set current project to Kafka (in build file:/export1/spark/kafka/) 07 Credentials file /home/wyp/.m2/.credentials does not exist 08 Including slf4j-api-1.7.2.jar 09 Including metrics-annotation-2.2.0.jar 10 Including scala-compiler.jar 11 Including scala-library.jar 12 Including slf4j-simple-1.6.4.jar 13 Including metrics-core-2.2.0.jar 14 Including snappy-java-1.0.4.1.jar 15 Including zookeeper-3.3.4.jar 16 Including log4j-1.2.15.jar 17 Including zkclient-0.3.jar 18 Including jopt-simple-3.2.jar 19 Merging ’META-INF/NOTICE’ with strategy ’rename’ 20 Merging ’org/xerial/snappy/native/README’ with strategy ’rename’ 21 Merging ’META-INF/maven/org.xerial.snappy/snappy-java/LICENSE’ 22 with strategy ’rename’ 23 Merging ’LICENSE.txt’ with strategy ’rename’ 24 Merging ’META-INF/LICENSE’ with strategy ’rename’ 25 Merging ’META-INF/MANIFEST.MF’ with strategy ’discard’ 26 Strategy ’discard’ was applied to a file 27 Strategy ’rename’ was applied to 5 files 28 Total time: 3 s, completed Jun 18, 2014 6:53:41 PM 当然,我们也可以在sbt里面指定scala的版本:01 《!-- 02 User: 过往记忆 03 Date: 14-6-18 04 Time: 20:20 05 bolg: http://www.iteblog.com 06 本文地址:http://www.iteblog.com/archives/1044 07 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 08 过往记忆博客微信公共帐号:iteblog_hadoop 09 --》 10 sbt "++2.10.3 update" 11 sbt "++2.10.3 package" 12 sbt "++2.10.3 assembly-package-dependency"

大数据时代下Apache Kafka是数据库吗

首先明确说明它不是数据库,它没有schema,也没有表,更没有索引。它仅仅是生产消息流、消费消息流而已。从这个角度来说Kafka的确不像数据库,至少不像我们熟知的关系型数据库。那么到底什么是数据库呢?或者说什么特性使得一个系统可以被称为数据库?经典的教科书是这么说的:数据库是提供 ACID 特性的,即atomicity、consistency、isolation和durability。好了,现在问题演变成了Apache Kafka支持ACID吗?如果它支持,Kafka又是怎么支持的呢?要回答这些问题,我们依次讨论下ACID。1、持久性(durability)我们先从最容易的持久性开始说起,因为持久性最容易理解。在80年代持久性指的是把数据写入到磁带中,这是一种很古老的存储设备,现在应该已经绝迹了。目前实现持久性更常见的做法是将数据写入到物理磁盘上,而这也只能实现单机的持久性。当演进到分布式系统时代后,持久性指的是将数据通过备份机制拷贝到多台机器的磁盘上。很多数据库厂商都有自己的分布式系统解决方案,如GreenPlum和Oracle RAC。它们都提供了这种多机备份的持久性。和它们类似,Apache Kafka天然也是支持这种持久性的,它提供的副本机制在实现原理上几乎和数据库厂商的方案是一样的。2、原子性(atomicity)数据库中的原子性和多线程领域内的原子性不是一回事。我们知道在Java中有AtomicInteger这样的类能够提供线程安全的整数操作服务,这里的atomicity关心的是在多个线程并发的情况下如何保证正确性的问题。而在数据库领域,原子性关心的是如何应对错误或异常情况,特别是对于事务的处理。如果服务发生故障,之前提交的事务要保证已经持久化,而当前运行的事务要终止(abort),它执行的所有操作都要回滚,最终的状态就好像该事务从未运行过那样。举个实际的例子,第三个方法是采用基于日志结构的消息队列来实现,比如使用Kafka来做,如下图所示:在这个架构中app仅仅是向Kafka写入消息,而下面的数据库、cache和index作为独立的consumer消费这个日志——Kafka分区的顺序性保证了app端更新操作的顺序性。如果某个consumer消费速度慢于其他consumer也没关系,毕竟消息依然在Kafka中保存着。总而言之,有了Kafka所有的异质系统都能以相同的顺序应用app端的更新操作,从而实现了数据的最终一致性。这种方法有个专属的名字,叫capture data change,也称CDC。3、隔离性(isolation)在传统的关系型数据库中最强的隔离级别通常是指serializability,国内一般翻译成可串行化或串行化。表达的思想就是连接数据库的每个客户端在执行各自的事务时数据库会给它们一个假象:仿佛每个客户端的事务都顺序执行的,即执行完一个事务之后再开始执行下一个事务。其实数据库端同时会处理多个事务,但serializability保证了它们就像单独执行一样。举个例子,在一个论坛系统中,每个新用户都需要注册一个唯一的用户名。一个简单的app实现逻辑大概是这样的:4、一致性(consistency)最后说说一致性。按照Kelppmann大神的原话,这是一个很奇怪的属性:在所有ACID特性中,其他三项特性的确属于数据库层面需要实现或保证的,但只有一致性是由用户来保证的。严格来说,它不属于数据库的特性,而应该属于使用数据库的一种方式。坦率说第一次听到这句话时我本人还是有点震惊的,因为从没有往这个方面考虑过,但仔细想想还真是这么回事。比如刚才的注册用户名的例子中我们要求每个用户名是唯一的。这种一致性约束是由我们用户做出的,而不是数据库本身。数据库本身并不关心或并不知道用户名是否应该是唯一的。针对Kafka而言,这种一致性又意味着什么呢?Kelppmann没有具体展开,但我个人认为他应该指的是linearizability、消息顺序之间的一致性以及分布式事务。幸运的是,Kafka的备份机制实现了linearizability和total order broadcast,而且在Kafka 0.11开始也支持分布式事务了。

kafka是干嘛的

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

主要特性

Kafka是一种高吞吐量 的分布式发布订阅消息系统,有如下特性:

通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。

支持通过Kafka服务器和消费机集群来分区消息。

支持Hadoop并行数据加载。 

Kafka通过官网发布了最新版本3.0.0。

以上内容来自 百度百科-kafka

apache kafka有哪几个部分组成

Kafka是一个分布式发布-订阅publish-subscribe消息传递系统,设计目标是快速、可伸缩和耐用冗余。

它是一个在一个较高的抽象水平上描述的但是又非常简单的系统,虽然当你更深地挖掘时会令人难以置信的技术细节。卡夫卡文档出色的解释了系统中许多设计和实现的微妙之处,所以我们不会在这里试图解释他们了。

像许多发布-订阅消息传递系统,卡夫卡能保存源消息的数据。生产者将输入写入到主题topic,而消费者则从主题topic中读取写入数据。因为卡夫卡是一个分布式系统,所以topic主题会实现跨多个节点分区和复制。

消息是简单byte数组,开发者能够使用它们存在对象的任何格式,如String或JSON和Avro,每个消息能够有一个Key,因此,生产者保证拥有相同key的消息到达同一个分区,而对主题topic的消费,可以使用多个消费者来配置消费组,每个消费组中消费者能够从他订阅的那个主题Topic所在分区子集中读取消息,这样每个消息发送给消费组中一个消费者,使用相同key的所有消息能够到达相同的消费者。

Kafka的特点是它将每个主题topic分区都看成一个日志log(一个有顺序的消息集合),一个分区中的每个消息被分配一个唯一的偏移量offset. Kafka并不试图跟踪哪个消息被哪个消费者读取,而是只是保留未被读取的消息。 Kafka保留所有消息的时间,而消费者负责跟踪它们在每个日志(日志是一个消息序列集合代表一个topic分区)中的位置, 因此Kafka能够支持大量的消费者,使用最小的代价保留大量的数据。

Kafka如何工作?

假设我们正在开发一个大型多人在线游戏。在这些游戏中,玩家在一个虚拟的世界中互相合作和竞争。通常玩家互相发生贸易,比如交换物品和金钱,所以游戏开发者重要的是要确保球员不会欺骗,在下面两种情况下的交易将标记为特殊:如果贸易数量明显大于正常的球员;如果IP玩家登录是不同于过去的20场比赛使用的IP。除了对实时交易进行标记以外,我们也想加载数据到Apache Hadoop,我们的数据,科学家们可以用它来训练和测试新算法。

基于游戏服务器内存数据缓存进行实时事件标记是最好的,能够让我们达到迅速决定,特别是对那些最活跃玩家。如果我们分区游戏服务器,我们的系统有多个游戏服务器和数据集,,包括过去登录的20个玩家和近20在内存的交易,。

我们的游戏服务器必须执行两个截然不同的角色:第一个是接受和执行用户操作,第二是实时处理贸易信息并标记可疑事件。为了有效执行第二个角色功能,每个用户整个贸易的历史事件都驻留在一个单独的服务器内存中。因为接收用户操作(第一个角色功能)的服务器可能没有他的贸易历史,这意味着我们必须通过服务器之间的消息实现第二个角色功能。为了让两个角色功能保持松散耦合,我们使用卡夫卡在服务器之间传递消息,您将看到如下:

卡夫卡有几个特性:

可伸缩性、数据分区,低延迟,并且能够处理大量不同的消费者。我们为登录和交易配置了一个Topic主题。我们把它们配置成一个topic主题的原因是:只有我们获得已经登录信息(我们可以确保玩家从他平时IP登录)后,才能确保他的交易是有效的。卡夫卡可以在在一个主题topic中维护这个前后顺序,而不是在两个topic之间。

当用户登录后进行交易,接受服务器立即发送事件到Kafka. 消息是将user id作为key, 事件作为值. 这能确保同一个用户的所有交易和登录发送到Kafka同样的分区. 每个事件处理服务器都运行一个Kafka消费者, 其每个消费者都被配置为同样组的一部分,这样,每个服务器从少量Kafka分区读取消息,所有关于某个特定用户的数据都能发往相同事件处理服务器,当事件处理服务器从Kafka读取一个用户交易时,将这个事件加入到用户事件历史缓存本地内存缓存,这样就无需额外网络磁盘开销直接标记那些可以事件。

重要的是我们为每个事件处理服务器创建一个分区,或者每个事件处理服务器上每核对应一个多线程应用,(记住 Kafka大部分是用少于 10,000 个分区实现所有主题topic,这样我们不能为每个用户创建一个分区,因为用户数是不断增加的。)

这好像是一种迂回方式处理事件:从游戏服务器发送事件到Kafka, 另外一台服务器读取这个事件再处理它,这种设计解耦了两种角色功能,允许我们为每个角色功能安排其需要的容量与能力。.另外,这样做不会增加处理时间,因为Kafka是设计为高吞吐量和低延迟的,即使只有一个小的三台节点服务器的集群环境,也能每秒处理接近一百万个事件,平均延迟只有3ms.

当服务器标识出一个事件作为可疑,它会发送这个标记了的事件到新的Kafka topic,比如其主题名称为Alerts,这时报警服务器或仪表监控服务器会接受到这个事件,同时另外一个单独过程会从Alerts主题中读取这个事件,将它写入到Hadoop进行更进一步分析。

因为Kafka并不跟踪确认每个消费者的消息,它就能用很少的性能影响处理成千上万的消费者。Kafka甚至可以处理批量消费者:每一个小时处理过程唤醒激活处理一个队列中所有消息,根本就不会影响系统的吞吐量或延迟。

如何在Windows上脱离Cygwin运行Apache Kafka

1)列出当前所有的消息主题(topic):kafka-topics.bat–zookeeperlocalhost:2181–list2)查看某个消息主题的状态信息:kafka-topics.bat–zookeeperlocalhost:2181–describe–topic《topicname》3)从头读取某个topic的所有消息:kafka-console-consumer.bat–zookeeperlocalhost:2181–from-beginning–topictest4)删除某个消息主题:kafka-run-class.batkafka.admin.TopicCommand–zookeeperlocalhost:2181–delete–topic《topicname》