笔记
【Twitter Storm系列】flume-ng+Kafka+Storm+HDFS 实时系统搭建
252637867 22 2014-03-12 10:57

一直以来都想接触Storm实时计算这块的东西,最近在群里看到上海一哥们罗宝写的Flume+Kafka+Storm的实时日志流系统的搭建文档,自己也跟着整了一遍,之前罗宝的文章中有一些要注意点没提到的,以后一些写错的点,在这边我会做修正;内容应该说绝大部分引用罗宝的文章的,这里要谢谢罗宝兄弟,还有写这篇文章@晨色星空J2EE也给了我很大帮助,这里也谢谢@晨色星空J2EE

之前在弄这个的时候,跟群里的一些人讨论过,有的人说,直接用storm不就可以做实时处理了,用不着那么麻烦;其实不然,做软件开发的都知道模块化思想,这样设计的原因有两方面:

一方面是可以模块化,功能划分更加清晰,从“数据采集--数据接入--流失计算--数据输出/存储”

1.数据采集

负责从各节点上实时采集数据,选用clouderaflume来实现

2.数据接入

由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,选用apachekafka

3.流式计算

对采集到的数据进行实时分析,选用apachestorm

4.数据输出

对分析后的结果持久化,暂定用mysql

另一方面是模块化之后,加入当Storm挂掉了之后,数据采集和数据接入还是继续在跑着,数据不会丢失,storm起来之后可以继续进行流式计算;


那么接下来我们来看下整体的架构图


详细介绍各个组件及安装配置:

操作系统:ubuntu

Flume

FlumeCloudera提供的一个分布式、可靠、和高可用的海量日志采集、聚合和传输的日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

下图为flume典型的体系结构:

Flume数据源以及输出方式:

Flume提供了从console(控制台)RPC(Thrift-RPC)text(文件)tail(UNIX tail)syslog(syslog日志系统,支持TCPUDP2种模式)exec(命令执行)等数据源上收集数据的能力,在我们的系统中目前使用exec方式进行日志采集。

Flume的数据接受方,可以是console(控制台)text(文件)dfs(HDFS文件)RPC(Thrift-RPC)syslogTCP(TCP syslog日志系统)等。在我们系统中由kafka来接收。

Flume下载及文档:

http://flume.apache.org/

Flume安装:

  1. $tar zxvf apache-flume-1.4.0-bin.tar.gz/usr/local  

Flume启动命令:

  1. $bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console  


Kafka

kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:

  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
  • 支持通过kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

kafka分布式订阅架构如下图:--取自Kafka官网

罗宝兄弟文章上的架构图是这样的

其实两者没有太大区别,官网的架构图只是把Kafka简洁的表示成一个Kafka Cluster,而罗宝兄弟的架构图就相对详细一些;

Kafka版本:0.8.0

Kafka下载及文档:http://kafka.apache.org/

Kafka安装:

  1. > tar xzf kafka-<VERSION>.tgz  
  2. > cd kafka-<VERSION>  
  3. > ./sbt update  
  4. > ./sbt package  
  5. > ./sbt assembly-package-dependency  

启动及测试命令:

1 start server

  1. > bin/zookeeper-server-start.shconfig/zookeeper.properties  
  2. > bin/kafka-server-start.shconfig/server.properties  
这里是官网上的教程,kafka本身有内置zookeeper,但是我自己在实际部署中是使用单独的zookeeper集群,所以第一行命令我就没执行,这里只是些出来给大家看下。

配置独立的zookeeper集群需要配置server.properties文件,讲zookeeper.connect修改为独立集群的IP和端口

  1. zookeeper.connect=nutch1:2181  

2Create a topic

  1. > bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test  
  2. > bin/kafka-list-topic.sh --zookeeperlocalhost:2181  

3Send some messages

  1. > bin/kafka-console-producer.sh--broker-list localhost:9092 --topic test  

4Start a consumer

  1. > bin/kafka-console-consumer.sh--zookeeper localhost:2181 --topic test --from-beginning  

kafka-console-producer.shkafka-console-cousumer.sh只是系统提供的命令行工具。这里启动是为了测试是否能正常生产消费;验证流程正确性

在实际开发中还是要自行开发自己的生产者与消费者;

kafka的安装也可以参考我之前写的文章:http://blog.csdn.net/weijonathan/article/details/18075967

Storm

Twitter将Storm正式开源了,这是一个分布式的、容错的实时计算系统,它被托管在GitHub上,遵循 Eclipse Public License 1.0。Storm是由BackType开发的实时处理系统,BackType现在已在Twitter麾下。GitHub上的最新版本是Storm 0.5.2,基本是用Clojure写的。


Storm的主要特点如下:

  1. 简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。
  2. 可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
  3. 容错性。Storm会管理工作进程和节点的故障。
  4. 水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
  5. 可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
  6. 快速。系统的设计保证了消息能得到快速的处理,使用ØMQ作为其底层消息队列。(0.9.0.1版本支持ØMQ和netty两种模式)
  7. 本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。
由于篇幅问题,具体的安装步骤可以参考我之前写的文章:http://blog.csdn.net/weijonathan/article/details/17762477

接下来重头戏开始拉!那就是框架之间的整合啦

flumekafka整合

1.下载flume-kafka-plus:https://github.com/beyondj2ee/flumeng-kafka-plugin

2.提取插件中的flume-conf.properties文件

修改该文件:#source section

producer.sources.s.type = exec
producer.sources.s.command = tail -f -n+1 /mnt/hgfs/vmshare/test.log
producer.sources.s.channels = c

修改所有topic的值改为test

将改后的配置文件放进flume/conf目录下

在该项目中提取以下jar包放入环境中flumelib下:


完成上面的步骤之后,我们来测试下flume+kafka这个流程有没有走通;

我们先启动flume,然后再启动kafka,启动步骤按之前的步骤执行;接下来我们使用kafka的kafka-console-consumer.sh脚本查看是否有flume有没有往Kafka传输数据;


以上这个是我的test.log文件通过flume抓取传到kafka的数据;说明我们的flume和kafka流程走通了;

大家还记得刚开始我们的流程图么,其中有一步是通过flume到kafka,还有一步是到hdfs的;而我们这边还没有提到如何存入kafka且同时存如hdfs;

flume是支持数据同步复制,同步复制流程图如下,取自于flume官网,官网用户指南地址:http://flume.apache.org/FlumeUserGuide.html


怎么设置同步复制呢,看下面的配置:

  1. #2个channel和2个sink的配置文件  这里我们可以设置两个sink,一个是kafka的,一个是hdfs的;  
  2. a1.sources = r1  
  3. a1.sinks = k1 k2  
  4. a1.channels = c1 c2  
具体配置大伙根据自己的需求去设置,这里就不具体举例了

kafka和storm的整合

1.下载kafka-storm0.8插件:https://github.com/wurstmeister/storm-kafka-0.8-plus

2.使用maven package进行编译,得到storm-kafka-0.8-plus-0.3.0-SNAPSHOT.jar包  --有转载的童鞋注意下,这里的包名之前写错了,现在改正确了!不好意思!

3.将该jar包及kafka_2.9.2-0.8.0-beta1.jar、metrics-core-2.2.0.jar、scala-library-2.9.2.jar (这三个jar包在kafka项目中能找到)

备注:如果开发的项目需要其他jar,记得也要放进stormLib中比如用到了mysql就要添加mysql-connector-java-5.1.22-bin.jarstormlib

那么接下来我们把storm也重启下;

完成以上步骤之后,我们还有一件事情要做,就是使用kafka-storm0.8插件,写一个自己的Storm程序;

这里我给大伙附上一个我弄的storm程序,百度网盘分享地址:http://pan.baidu.com/s/1bnEdgh5;

先稍微看下程序的创建Topology代码


数据操作主要在WordCounter类中,这里只是使用简单JDBC进行插入处理


这里只需要输入一个参数作为Topology名称就可以了!我们这里使用本地模式,所以不输入参数,直接看流程是否走通;

  1. storm-0.9.0.1/bin/storm jar storm-start-demo-0.0.1-SNAPSHOT.jar com.storm.topology.MyTopology  

先看下日志,这里打印出来了往数据库里面插入数据了


然后我们查看下数据库;插入成功了!


到这里我们的整个整合就完成了!

但是这里还有一个问题,不知道大伙有没有发现。这个也是@晨色星空J2EE跟我说的,其实我也应该想到的;

由于我们使用storm进行分布式流式计算,那么分布式最需要注意的是数据一致性以及避免脏数据的产生;所以我提供的测试项目只能用于测试,正式开发不能这样处理;

@晨色星空J2EE给的建议是建立一个zookeeper的分布式全局锁,保证数据一致性,避免脏数据录入!

zookeeper客户端框架大伙可以使用Netflix Curator来完成,由于这块我还没去看,所以只能写到这里了!

这里在一次谢谢罗宝和@晨色星空!


转载的话请注明来源地址:http://blog.csdn.net/weijonathan/article/details/18301321 和 http://www.51studyit.com/html/notes/20140312/14.html

作者

Jonathan.Wei

已学习课程数:7

已发表笔记数:3

Ta的笔记
01    【Apache Flume系列】Flume-ng failover 以及Load balance测试及注意事项

好久没写博客了。最近在研究storm、flume和kafka。今天给大伙写下我测试flume failover以及load balance的场景以及一些结论;测试环境包含5个配置文件,也就是5个agent。一个主的配置文件,也就是我们配置failover以及load balance关系的配置文件(flume-sink.properties),这个文件在下面的场景会变动,所以这里就不列举出来了,会在具体的场景中写明;其他4个配置文件类似:#Name the compents on this agent a

02    【Twitter Storm系列】Storm环境配置及吞吐量测试调优--个人理解

1、硬件配置信息6台服务器,2个CPU,96G,6核,24线程 2、集群信息Storm集群:1个nimbus,6个supervisornimbus:192.168.7.127supervisor:192.168.7.128192.168.7.129192.168.7.130192.168.7.131192.168.7.132192.168.7.133 Zookeeper集群:3个节点192.168.7.127:2181, 192.168.7.128:2181, 19

03    【Twitter Storm系列】flume-ng+Kafka+Storm+HDFS 实时系统搭建

一直以来都想接触Storm实时计算这块的东西,最近在群里看到上海一哥们罗宝写的Flume+Kafka+Storm的实时日志流系统的搭建文档,自己也跟着整了一遍,之前罗宝的文章中有一些要注意点没提到的,以后一些写错的点,在这边我会做修正;内容应该说绝大部分引用罗宝的文章的,这里要谢谢罗宝兄弟,还有写这篇文章@晨色星空J2EE也给了我很大帮助,这里也谢谢@晨色星空J2EE之前在弄这个的时候,跟群里的一些人讨论过,有的人说,直接用storm不就可以做实时处理了,用不着那么麻烦;其实不然,做软件开发的都知道模块化

相关笔记
01    • Storm优势

• Storm优势  1. 简单的编程模型。类似于MapReduce降低了并行批处 理复杂性,Storm降低了进行实时处理的复杂性。  2. 服务化,一个服务框架,支持热部署,即时上线或下线App.  3. 可以使用各种编程语言。你可以在Storm之上使用各种 编程语言。默认支持Clojure、Java、Ruby和Python。要 增加对其他语言的支持,只需实现一个简单的Storm通信 协议即可。  4. 容错性。Storm会管理工作进程和节点的故障。  5. 水平扩展。计算是在多个线程、进程和

最新笔记
01    Mysql DBA

基本知识1.mysql的编译安装2.mysql 第3方存储引擎安装配置方法3.mysql主流存储引擎(MyISAM/innodb/MEMORY)的特点4.字符串编码知识5.MySQL用户账户管理6.数据备份/数据入导出7.mysql 支持的基本数据类型8.库/表/字段/索引 的创建/修改/删除9.基本sql语法:select/insert/update/delete,掌握最基本的语法即可,什么inner join,left join的了解就行mysql的应用场景大多都是高并发访问/业务逻辑简单,join/

02    mysql 备份

备份:mysqldump -uroot-p'root' jpstudy > /opt/mysql_jpstudy_bak.sql恢复:mysql -uroot-p'root' jpstudy < /opt/mysql_jpstudy_bak.sql参数说明:1、备份sql文件名以 -B ,即:mysqldump -uroot-p'root' -B jpstudy >/opt/mysql_jpstudy_bak_B.sql表示:备份的sql语句中有创建数据库和字符集的语句2、参数--com

03    mysql 索引失效

1.全值匹配2.最佳左前缀法则:如果索引了多列,要遵守最佳左前缀法则。指的是查询从索引的最左前列开始并且不跳过索引中的列。3.不在索引列上做任何操作(计算、函数、类型转换),会导致索引失效而转向全表扫描4.存储引擎不能使用索引中范围条件右边的列5.尽量使用覆盖索引(只访问索引的查询(索引列和查询列一致)),减少select*6.mysql在使用不等于(!=或者<>)的时候无法使用索引会导致全表扫描7.is null ,is not null也无法使用索引8.like以通配符开头(‘%abc’)

04    tomcat安全问题4

7 脚本权限回收去除其他用户对bin目录下可执行权限,防止其他用户起停tomcat# chmod -R 744bin/*8 访问日志格式规范开启Referer和User-Agetn是为了一旦出现安全问题能够更好的根据日志进行排查       <Hostname="23.83.xx.xx" appBase="webapps"     

05    tomcat安全问题2

3 禁用管理端对于tomcat的web管理端属于高危安全隐患,一旦被攻破,黑客通过上传web shell方式取得服务器的控制权,那是非常可怕的。我们需要删除tomcat安装目录下conf/tomcat- user.xml或者删除webapps下默认的目录和文件。 # mv webapps/*/tmp 4 降权启动tomcattomcat 启动用户权限必须为非root,避免一旦tomcat服务被入侵,获取root权限,普通用户只能使用大于1024端口,如果要想使用80端

06    tomcat安全问题

1、telnet管理端口保护使用telnet连接进来可以输入SHUTDOWN可以直接关闭tomcat,极不安全,必须关闭。可以修改默认的管理端口8005改为其他端口,修改SHUTDOWN指令为其他字符串。# viconf/server.xml <Server port="8365" shutdown="IN0IT">2 AJP连接端口保护Tomcat 服务器通过Connector连接器组件与客户程序建立连接,Connector组件负责接收客户的

07    发个测试

发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试 发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试发个测试 发个测试发个测试发个测

08    Elasticsearch 优化之路

1、index 创建的时候一定要计算好shard,因为主分片一经确认是不能修改的,每一个分片上面独立运行着一个lucene程序;因此设置主分片的时候尽量考虑未来发展需求,如果当前有1G数据,使用默认分片5个,每一个主分片数据相当于200M数据(hash(ID)%max_shards进行数据分片存储的),但是随着时间推移如果3个月后数据变成100G了,但是主分片还是5个,每一个上面就是20G的数据,会大大降低处理性能;(2.3.1版本增加了对index的动态迁移能力,也许可以快速的处理这一问题)。2、字段的

09    SQL-触发器

1.触发器的定义触发器是一种特殊的存储过程 在表或视图上执行insert、update、delete操作自动被调用的存储过程 用途: 1.检测数据的有效性(check) 2.*记录操作的日志 3.拦截数据 rollback 4.*统计某表中的数据 分类: 1.after —— 在数据操作完成之后触发 2.inserted of —— 在数据操作完成之前触发  after可以创建在普通表上和视图上  inserted of可以创建在表 触发器的创建 表示:在指定表上执行upd

10    Java面试题—2016最新Java面试考题知识详解(1)

    动力节点Java培训  下面给出的Java开发中ClassLoader中的描述,哪些描述是正确的(C) AClassLoader没有层次关系 B所有类中的ClassLoader都是AppClassLoader C通过classforname(StringclassName)能够动态加载一个类 D不同的ClassLoader加载同一个Class文件,所得的类是相同的   拓展知识ClassLoader知识

热门笔记
01    Storm单机+zookeeper集群安装

Storm单机+zookeeper集群安装 1、安装zookeeper集群 2、准备机器 10.10.3.44 flumemaster1 zk 10.10.3.129 flumemaster2 zk 10.10.3.132 flumecollector1 zk 10.10.3.115 flumeNg1 storm 3、配置hosts文件(4台服务器上面都需要配置) vi /etc/hosts

02    sqoop安装使用手册

sqoop使用 需求:将mysql中的表b05_age的数据导入hive中 1、安装 yum install sqoop(sqoop必须安装在有hive client的服务器上面,如果没有执行yum install hive) 复制mysql的驱动jar到/usr/lib/sqoop/lib下面 2、异常处理 正确命令:(将关系型数据的表结构复制到hive中) sudo -u hive sqoop create-hive-table --connect jdbc:my

03    CDH hadoop集群安装-1

准备机器: 192.168.1.241 192.168.1.242 192.168.1.243 1、查看ip地址是否为静态ip,如果不是进行配置 vim /etc/sysconfig/network-scripts/ifcfg-eth0 DEVICE=eth0 TYPE=Ethernet ONBOOT=yes NM_CONTROLLED=yes BOOTPROTO=none IPADDR=192.168.1.241 NE

04    Oracle SQL使用心得

1. 我用的Oracle 客户端最好的工具是PL/SQL Developer, 当然,如果用免费的Toad也不错,感觉现在用Toad的人还是挺多的。 2. Oracle SQL如果想提高速度有几个方式 1)创建索引,尽量建立唯一索引 2)当要创建的索引列的值取值比较小,建议创建Bitmap的索引而不是默认的Btree的。(比如性别,学历等) 3)在where条件后尽量采用数字类型的字段,比varchar的速度快 4)尽量不用用IN,Not In,union这样的条件查

05    JAVA网站静态化方法

1. 通过freemarker静态化 2. 通过jsp filter静态化 主要思路:请求servlet->判断静态文件是否存在并且静态文件创建时间是否在阀值之内-->如果不是,则访问数据库生成静态文件->否则直接跳转静态文件 然后通过urlReWrite直接将访问servlet的请求改为html,完成seo 最后通过SQUID缓存前台数据 一、从数据库中取相应数据并替换掉模板中的对应标签,下面是一个简单的示例

06    使用JAVA 6 构建自己的HTTP服务器

import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.util.List; import java.util.Map; import c

07    linux下优化tomcat服务器性能

服务器优化说明 WEB服务器优化 1、 更换tomcat5为tomcat6 版本copy测试服务版本即可 2、 加大tomcat内存 修改bin下的catalina.sh文件,增加青绿色部分 JAVA_OPTS='-Xms768m -Xmx1648m -XX:MaxPermSize=512m' 3、 加大tomcat连接数 修改conf下的server.xml文件,修改青绿色部分参数值 maxThrea

08    Oracle 如何查询锁表的对象

select s.username, decode(l.type,'tm','table lock','tx','row lock',null) lock_level, o.owner, o.object_name, o.object_type, s.sid, s.serial#, s.terminal, s.machine, s.program, s.osuser from v$session s,v$lock l,dba_objects o where l.sid = s.s

09    【Twitter Storm系列】flume-ng+Kafka+Storm+HDFS 实时系统搭建

一直以来都想接触Storm实时计算这块的东西,最近在群里看到上海一哥们罗宝写的Flume+Kafka+Storm的实时日志流系统的搭建文档,自己也跟着整了一遍,之前罗宝的文章中有一些要注意点没提到的,以后一些写错的点,在这边我会做修正;内容应该说绝大部分引用罗宝的文章的,这里要谢谢罗宝兄弟,还有写这篇文章@晨色星空J2EE也给了我很大帮助,这里也谢谢@晨色星空J2EE之前在弄这个的时候,跟群里的一些人讨论过,有的人说,直接用storm不就可以做实时处理了,用不着那么麻烦;其实不然,做软件开发的都知道模块化

10    实时流处理框架——Storm(介绍篇)

1. Storm介绍2. Storm环境配置3. Storm程序流程4. Storm总结及问题1. Storm介绍 1.1 实时流计算背景 随着互联网的更进一步发展,信息浏览、搜索、关系交互传递型,以及电子商务、互联网旅游生活产品等将生活中的流通环节在线化。对于实时性的要求进一步提升,而信息的交互和沟通正在从点对点往信息链甚至信息网的方向发展,这样必然带来数据在各个维度的交叉关联,数据爆炸已不可避免。因此流式处理和NoSQL产品应运而生,分别解决实时框架和数据大 规模存储计算的问题。 流式处理可

友情链接