×

apache kafka实战

apache kafka实战(kafka怎么发布订阅 怎么在java中实现)

admin admin 发表于2023-11-18 12:57:30 浏览41 评论0

抢沙发发表评论

本文目录

kafka怎么发布订阅 怎么在java中实现

这是我们项目中用到的代码public class ProducerService {  private static Logger log = Logger.getLogger(ProducerService.class);  private static  Producer《String,String》 producer = null;  private static String serviceIp = PropertiesUtils.getValue(“/epoo.properties“,“bootstrap.servers“);  private static String serviceName = PropertiesUtils.getValue(“/epoo.properties“,“name“);       public boolean initProducer(){        Properties props = new Properties();               //dataPlace.getIp()        props.put(“bootstrap.servers“, serviceIp);        props.put(“key.serializer“, “org.apache.kafka.common.serialization.StringSerializer“);        props.put(“value.serializer“, “org.apache.kafka.common.serialization.StringSerializer“);        props.put(“acks“, “-1“);        producer = new KafkaProducer(props);           try{            List《PartitionInfo》 list = producer.partitionsFor(serviceName);                    }catch(Exception e){            JOptionPane.showMessageDialog(null, e.getMessage(), “错误“, JOptionPane.YES_OPTION);            log.error(e.getMessage());            return false;        }        return true;    }        public  void sendData(String mess){        if(producer == null){            initProducer();        }                producer.send(new ProducerRecord《String,String》(serviceName,mess),new Callback() {            @Override            public void onCompletion(RecordMetadata rm, Exception e) {                if(e != null){                    e.printStackTrace();                    log.error(e.getMessage());                }                System.out.println(“发送到服务器的Offset: “ + rm.offset() + “-----Topic:“ + rm.topic() + “-----partition:“ + rm.partition());            }        });    }        public  void close(){        if(producer != null){            producer.close();        }    }}

kafka apache 使用在什么场合

1、Messaging 对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的“事务性““消息传输担保(消息确认机制)““消息分组“等企业级特性;kafka只能使用作为“常规“的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等) 2、Websit activity tracking kafka可以作为“网站活性跟踪“的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等 3、Log Aggregation kafka的特性决定它非常适合作为“日志收集中心“;application可以将操作日志“批量““异步“的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统.

kafka利用log4j输出日志到哪里

引言前段时间写的《Spring+Log4j+ActiveMQ实现远程记录日志——实战+分析》得到了许多同学的认可,在认可的同时,也有同学提出可以使用Kafka来集中管理日志,于是今天就来学习一下。特别说明,由于网络上关于Kafka+Log4j的完整例子并不多,我也是一边学习一边使用,因此如果有解释得不好或者错误的地方,欢迎批评指正,如果你有好的想法,也欢迎留言探讨。第一部分 搭建Kafka环境安装Kafka下载:主要配置以下几项,内容就不说了,注释里都很详细:然后启动Kafka Server:?1 bin/kafka-server-start.sh config/server.properties创建Topic附上出处链接:http://my.oschina.net/itblog/blog/540918

大数据时代下Apache Kafka是数据库吗

首先明确说明它不是数据库,它没有schema,也没有表,更没有索引。它仅仅是生产消息流、消费消息流而已。从这个角度来说Kafka的确不像数据库,至少不像我们熟知的关系型数据库。那么到底什么是数据库呢?或者说什么特性使得一个系统可以被称为数据库?经典的教科书是这么说的:数据库是提供 ACID 特性的,即atomicity、consistency、isolation和durability。好了,现在问题演变成了Apache Kafka支持ACID吗?如果它支持,Kafka又是怎么支持的呢?要回答这些问题,我们依次讨论下ACID。1、持久性(durability)我们先从最容易的持久性开始说起,因为持久性最容易理解。在80年代持久性指的是把数据写入到磁带中,这是一种很古老的存储设备,现在应该已经绝迹了。目前实现持久性更常见的做法是将数据写入到物理磁盘上,而这也只能实现单机的持久性。当演进到分布式系统时代后,持久性指的是将数据通过备份机制拷贝到多台机器的磁盘上。很多数据库厂商都有自己的分布式系统解决方案,如GreenPlum和Oracle RAC。它们都提供了这种多机备份的持久性。和它们类似,Apache Kafka天然也是支持这种持久性的,它提供的副本机制在实现原理上几乎和数据库厂商的方案是一样的。2、原子性(atomicity)数据库中的原子性和多线程领域内的原子性不是一回事。我们知道在Java中有AtomicInteger这样的类能够提供线程安全的整数操作服务,这里的atomicity关心的是在多个线程并发的情况下如何保证正确性的问题。而在数据库领域,原子性关心的是如何应对错误或异常情况,特别是对于事务的处理。如果服务发生故障,之前提交的事务要保证已经持久化,而当前运行的事务要终止(abort),它执行的所有操作都要回滚,最终的状态就好像该事务从未运行过那样。举个实际的例子,第三个方法是采用基于日志结构的消息队列来实现,比如使用Kafka来做,如下图所示:在这个架构中app仅仅是向Kafka写入消息,而下面的数据库、cache和index作为独立的consumer消费这个日志——Kafka分区的顺序性保证了app端更新操作的顺序性。如果某个consumer消费速度慢于其他consumer也没关系,毕竟消息依然在Kafka中保存着。总而言之,有了Kafka所有的异质系统都能以相同的顺序应用app端的更新操作,从而实现了数据的最终一致性。这种方法有个专属的名字,叫capture data change,也称CDC。3、隔离性(isolation)在传统的关系型数据库中最强的隔离级别通常是指serializability,国内一般翻译成可串行化或串行化。表达的思想就是连接数据库的每个客户端在执行各自的事务时数据库会给它们一个假象:仿佛每个客户端的事务都顺序执行的,即执行完一个事务之后再开始执行下一个事务。其实数据库端同时会处理多个事务,但serializability保证了它们就像单独执行一样。举个例子,在一个论坛系统中,每个新用户都需要注册一个唯一的用户名。一个简单的app实现逻辑大概是这样的:4、一致性(consistency)最后说说一致性。按照Kelppmann大神的原话,这是一个很奇怪的属性:在所有ACID特性中,其他三项特性的确属于数据库层面需要实现或保证的,但只有一致性是由用户来保证的。严格来说,它不属于数据库的特性,而应该属于使用数据库的一种方式。坦率说第一次听到这句话时我本人还是有点震惊的,因为从没有往这个方面考虑过,但仔细想想还真是这么回事。比如刚才的注册用户名的例子中我们要求每个用户名是唯一的。这种一致性约束是由我们用户做出的,而不是数据库本身。数据库本身并不关心或并不知道用户名是否应该是唯一的。针对Kafka而言,这种一致性又意味着什么呢?Kelppmann没有具体展开,但我个人认为他应该指的是linearizability、消息顺序之间的一致性以及分布式事务。幸运的是,Kafka的备份机制实现了linearizability和total order broadcast,而且在Kafka 0.11开始也支持分布式事务了。