Flume_飛翔的大雁的博客-CSDN博客_flume 飞翔的大雁


本站和网页 https://blog.csdn.net/qq_45710900/article/details/102294408 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

Flume_飛翔的大雁的博客-CSDN博客_flume 飞翔的大雁
Flume
飛翔的大雁
于 2019-10-07 12:26:50 发布
7632
收藏
60
分类专栏:
BigData
# flume
文章标签:
flume
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_45710900/article/details/102294408
版权
BigData
同时被 2 个专栏收录
13 篇文章
5 订阅
订阅专栏
flume
1 篇文章
0 订阅
订阅专栏
flume 总结
参数配置详解概述Flume数据的传输过程
SourcesNetCat SourceAvro SourceExec SourceSpooling Directory SourceTaildir Source
ChannelsFile Channel
SinksLogger SinkHDFS SinkAvro SinkKafka Sink
案例汇聚者 (collect.conf)收集者(product1.conf)
spark 与 flume 整合pushpull
kafka与 flume 整合
参数配置详解
概述
本篇文章是根据Flume官网对Flume组件(Source,Channel,Sink)的常用配置参数做一个主要介绍,如有表达意思错误希望不吝指出。
source 采集数据 采集日志数据,将采集到的日志数据传输给channel channel 一个队列,存储source传递过来的数据 sink 从channel中获取数据,将数据输出到目标位置(HDFS、HBase、Source) Event 传输数据的单元,Flume中采集数据并传输的最小单位
Flume数据的传输过程
Sources
Flume中常用的Source有NetCat,Avro,Exec,Spooling Directory,Taildir,也可以根据业务场景的需要自定义Source,具体介绍如下。
NetCat Source
NetCat Source可以使用TCP和UDP两种协议方式,使用方法基本相同,通过监听指定的IP和端口来传输数据,它会将监听到的每一行数据转化成一个Event写入到Channel中。(必须参数以@标示,下类同) channels@ – type@ – 类型指定为:netcat bind@ – 绑定机器名或IP地址 port@ – 端口号 max-line-length
Property NameDefaultDescriptionchannels@–type@–类型指定为:netcatbind@–绑定机器名或IP地址port@–端口号max-line-length512一行的最大字节数ack-every-eventtrue对成功接受的Event返回OKselector.typereplicating选择器类型replicating or multiplexingselector.*选择器相关参数interceptors–拦截器列表,多个以空格分隔interceptors.*拦截器相关参数
Avro Source
不同主机上的Agent通过网络传输数据可使用的Source,一般是接受Avro client的数据,或和是上一级Agent的Avro Sink成对存在。
Property NameDefaultDescriptionchannels@–type@–类型指定为:avrobind@–监听的主机名或IP地址port@–端口号threads–传输可使用的最大线程数selector.typeselector.*interceptors–拦截器列表interceptors.*compression-typenone可设置为“none” 或 “deflate”. 压缩类型需要和AvroSource匹配
Exec Source
Exec source通过执行给定的Unix命令的传输结果数据,如cat,tail -F等,实时性比较高,但是一旦Agent进程出现问题,可能会导致数据的丢失。
Property NameDefaultDescriptionchannels@–type@–类型指定为:execcommand@–需要去执行的命令shell–运行命令的shell脚本文件restartThrottle10000尝试重启的超时时间restartfalse如果命令执行失败,是否重启logStdErrfalse是否记录错误日志batchSize20批次写入channel的最大日志数量batchTimeout3000批次写入数据的最大等待时间(毫秒)selector.typereplicating选择器类型replicating or multiplexingselector.*选择器其他参数interceptors–拦截器列表,多个空格分隔interceptors.*
Spooling Directory Source
通过监控一个文件夹将新增文件内容转换成Event传输数据,特点是不会丢失数据,使用Spooling Directory Source需要注意的两点是,1)不能对被监控的文件夹下的新增的文件做出任何更改,2)新增到监控文件夹的文件名称必须是唯一的。由于是对整个新增文件的监控,Spooling Directory Source的实时性相对较低,不过可以采用对文件高粒度分割达到近似实时。
Property NameDefaultDescriptionchannels@–type@–类型指定:spooldir.spoolDir@–被监控的文件夹目录fileSuffix.COMPLETED完成数据传输的文件后缀标志deletePolicynever删除已经完成数据传输的文件时间:never or immediatefileHeaderfalse是否在header中添加文件的完整路径信息fileHeaderKeyfile如果header中添加文件的完整路径信息时key的名称basenameHeaderfalse是否在header中添加文件的基本名称信息basenameHeaderKeybasename如果header中添加文件的基本名称信息时key的名称includePattern^.*$使用正则来匹配新增文件需要被传输数据的文件ignorePattern^$使用正则来忽略新增的文件trackerDir.flumespool存储元数据信息目录consumeOrderoldest文件消费顺序:oldest, youngest and random.maxBackoff4000如果channel容量不足,尝试写入的超时时间,如果仍然不能写入,则会抛出ChannelExceptionbatchSize100批次处理粒度inputCharsetUTF-8输入码表格式decodeErrorPolicyFAIL遇到不可解码字符后的处理方式:FAIL,REPLACE,IGNOREselector.typereplicating选择器类型:replicating or multiplexingselector.*选择器其他参数interceptors–拦截器列表,空格分隔interceptors.*
Taildir Source
可以实时的监控指定一个或多个文件中的新增内容,由于该方式将数据的偏移量保存在一个指定的json文件中,即使在Agent挂掉或被kill也不会有数据的丢失,需要注意的是,该Source不能在Windows上使用。
Property NameDefaultDescriptionchannels@–type@–指定类型:TAILDIR.filegroups@–文件组的名称,多个空格分隔filegroups.@–被监控文件的绝对路径positionFile~/.flume/taildir_position.json存储数据偏移量路径headers..–Header key的名称byteOffsetHeaderfalse是否添加字节偏移量到key为‘byteoffset’值中skipToEndfalse当偏移量不能写入到文件时是否跳到文件结尾idleTimeout120000关闭没有新增内容的文件超时时间(毫秒)writePosInterval3000在positionfile 写入每一个文件lastposition的时间间隔batchSize100批次处理行数fileHeaderfalse是否添加header存储文件绝对路径fileHeaderKeyfilefileHeader启用时,使用的key
Channels
官网提供的Channel有多种类型可供选择,这里介绍Memory Channel和File Channel。 Memory Channel Memory Channel是使用内存来存储Event,使用内存的意味着数据传输速率会很快,但是当Agent挂掉后,存储在Channel中的数据将会丢失。
Property NameDefaultDescriptiontype@–类型指定为:memorycapacity100存储在channel中的最大容量transactionCapacity100从一个source中去或者给一个sink,每个事务中最大的事件数keep-alive3对于添加或者删除一个事件的超时的秒钟byteCapacityBufferPercentage20定义缓存百分比byteCapacitysee descriptionChannel中允许存储的最大字节总数
File Channel
File Channel使用磁盘来存储Event,速率相对于Memory Channel较慢,但数据不会丢失。
Property NameDefaultDescriptiontype@–类型指定:file.checkpointDir~/.flume/file-channel/checkpointcheckpoint目录useDualCheckpointsfalse备份checkpoint,为True,backupCheckpointDir必须设置backupCheckpointDir–备份checkpoint目录dataDirs~/.flume/file-channel/data数据存储所在的目录设置transactionCapacity10000Event存储最大值checkpointInterval30000checkpoint间隔时间maxFileSize2146435071单一日志最大设置字节数minimumRequiredSpace524288000最小的请求闲置空间(以字节为单位)capacity1000000Channel最大容量keep-alive3一个存放操作的等待时间值(秒)use-log-replay-v1falseExpert: 使用老的回复逻辑use-fast-replayfalseExpert: 回复不需要队列checkpointOnClosetrue
Sinks
Flume常用Sinks有Log Sink,HDFS Sink,Avro Sink,Kafka Sink,当然也可以自定义Sink。
Logger Sink
Logger Sink以INFO 级别的日志记录到log日志中,这种方式通常用于测试。
Property NameDefaultDescriptionchannel@–type@–类型指定:loggermaxBytesToLog16能够记录的最大Event Body字节数
HDFS Sink
Sink数据到HDFS,目前支持text 和 sequence files两种文件格式,支持压缩,并可以对数据进行分区,分桶存储。
NameDefaultDescriptionchannel@–type@–指定类型:hdfshdfs.path@–HDFS的路径 hdfs://namenode/flume/webdata/hdfs.filePrefixFlumeData保存数据文件的前缀名hdfs.fileSuffix–保存数据文件的后缀名hdfs.inUsePrefix–临时写入的文件前缀名hdfs.inUseSuffix.tmp临时写入的文件后缀名hdfs.rollInterval30间隔多长将临时文件滚动成最终目标文件,单位:秒, 如果设置成0,则表示不根据时间来滚动文件hdfs.rollSize1024当临时文件达到多少(单位:bytes)时,滚动成目标文件, 如果设置成0,则表示不根据临时文件大小来滚动文件hdfs.rollCount10当 events 数据达到该数量时候,将临时文件滚动成目标文件,如果设置成0,则表示不根据events数据来滚动文件hdfs.idleTimeout0当目前被打开的临时文件在该参数指定的时间(秒)内, 没有任何数据写入,则将该临时文件关闭并重命名成目标文件hdfs.batchSize100每个批次刷新到 HDFS 上的 events 数量hdfs.codeC–文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappyhdfs.fileTypeSequenceFile文件格式,包括:SequenceFile, DataStream,CompressedStre, 当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC; 当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;hdfs.maxOpenFiles5000最大允许打开的HDFS文件数,当打开的文件数达到该值,最早打开的文件将会被关闭hdfs.minBlockReplicas–HDFS副本数,写入 HDFS 文件块的最小副本数。 该参数会影响文件的滚动配置,一般将该参数配置成1,才可以按照配置正确滚动文件hdfs.writeFormatWritable写 sequence 文件的格式。包含:Text, Writable(默认)hdfs.callTimeout10000执行HDFS操作的超时时间(单位:毫秒)hdfs.threadsPoolSize10hdfs sink 启动的操作HDFS的线程数hdfs.rollTimerPoolSize1hdfs sink 启动的根据时间滚动文件的线程数hdfs.kerberosPrincipal–HDFS安全认证kerberos配置hdfs.kerberosKeytab–HDFS安全认证kerberos配置hdfs.proxyUser代理用户hdfs.roundfalse是否启用时间上的”舍弃”hdfs.roundValue1时间上进行“舍弃”的值hdfs.roundUnitsecond时间上进行”舍弃”的单位,包含:second,minute,hourhdfs.timeZoneLocal Time时区。hdfs.useLocalTimeStampfalse是否使用当地时间hdfs.closeTries 0Numberhdfs sink 关闭文件的尝试次数;如果设置为1,当一次关闭文件失败后,hdfs sink将不会再次尝试关闭文件, 这个未关闭的文件将会一直留在那,并且是打开状态; 设置为0,当一次关闭失败后,hdfs sink会继续尝试下一次关闭,直到成功hdfs.retryInterval180hdfs sink 尝试关闭文件的时间间隔, 如果设置为0,表示不尝试,相当于于将hdfs.closeTries设置成1serializerTEXT序列化类型serializer.*
Avro Sink
Property NameDefaultDescriptionchannel@–type@–指定类型:avro.hostname@–主机名或IPport@–端口号batch-size100批次处理Event数connect-timeout 20000连接超时时间request-timeout20000请求超时时间compression-typenone压缩类型,“none” or “deflate”.compression-level6压缩级别,0表示不压缩,1-9数字越大,压缩比越高sslfalse使用ssl加密
Kafka Sink
传输数据到Kafka中,需要注意的是Flume版本和Kafka版本的兼容性
Property NameDefaultDescriptiontype–指定类型:org.apache.flume.sink.kafka.KafkaSinkkafka.bootstrap.servers–kafka服务地址kafka.topicdefault-flume-topickafka TopicflumeBatchSize100批次写入kafka Event数
案例
汇聚者 (collect.conf)
collect.sources=cs1 //来源 collect.channels=cc1 //管道 collect.sinks=csi //传输
#sources
collect.sources.cs1.type=avro //类型 collect.sources.cs1.port=5140 //端口号 collect.sources.cs1.bind=master //节点名称 collect.sources.cs1.thread=15 //线程 collect.sources.cs1.channels=cc1
#channels collect.channels.cc1.type=memory //存储地方 collect.channels.cc1.capacity=100000 //容量 collect.channels.cc1.transactionCapacity=200 // Event存储最大值 collect.channels.cc1.keep-alive = 30 //存放等待时间
#sinks collect.sinks.csi.channel=cc1 collect.sinks.csi.type=hdfs collect.sinks.csi.hdfs.path=hdfs://master:9000/flume/school/getData/ collect.sinks.csi.hdfs.filePrefix=infomation collect.sinks.csi.hdfs.minBlockReplicas=1 collect.sinks.csi.hdfs.fileSuffix=.text collect.sinks.csi.hdfs.writeFormat=Text collect.sinks.csi.hdfs.rollInterval=300 collect.sinks.csi.hdfs.fileType=DataStream collect.sinks.csi.hdfs.rollSize = 0 collect.sinks.csi.hdfs.rollCount = 0 collect.sinks.csi.hdfs.batchSize = 10 collect.sinks.csi.txnEventMax = 1000 collect.sinks.csi.hdfs.callTimeout = 60000 collect.sinks.csi.hdfs.appendTimeout=60000
执行 vim collect.sh flume-ng agent -n collect (agent名称) -c conf -f 路径/collect.conf (执行文件路径) -Dflume.root.logger=DEBUG,console > /flume/logs/school_collect.log 2>&1 & (当有错误时日志存放路径 最后一个& :后台运行)
flume-ng agent -n collect -c conf -f /opt/programfile/flume/conf/conf/collect.conf -Dflume.root.logger=DEBUG,console > /flume/logs/school_collect.log 2>&1 &
bash collect.sh
收集者(product1.conf)
product1.sources=ps1 product1.channels=pc1 product1.sinks=pk1
#sources
product1.sources.ps1.type=syslogtcp //类型 product1.sources.ps1.port=5140 //绑定端口号 product1.sources.ps1.bind=slave1 //绑定节点名称 product1.sources.ps1.channels=pc1
#channels product1.channels.pc1.type=memory product1.channels.pc1.capacity=100000 product1.channels.pc1.transactionCapacity=100 product1.channels.pc1.keep-alive = 30
#sinks product1.sinks.pk1.channel=pc1 product1.sinks.pk1.type=avro product1.sinks.pk1.hostname=master product1.sinks.pk1.port=5140
执行 vim product1.sh
flume-ng agent -n product1 -c conf -f /opt/programfile/flume/conf/conf/product1.conf -Dflume.root.logger=DEBUG,console > /flume/logs/school_slave1.log 2>&1 &
bash product1.sh
注意: 启动节点的顺序:先启动数据的汇聚节点,在启动数据的采集节点
spark 与 flume 整合
push
#flume-to-spark-push.conf: A single-node Flume configuration
#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Describe/configure the source
#把Flume Source类别设置为netcat,绑定到node3的33333端口
#可以通过“telnet node3 33333”命令向Flume Source发送消息
a1.sources.r1.type = netcat
a1.sources.r1.bind = node3
a1.sources.r1.port = 33333
#Describe the sink
#Flume Sink类别设置为avro,绑定到node2的44444端口
#Flume Source把采集到的消息汇集到Flume Sink以后,Sink会把消息推送给node2的44444端口
#Spark Streaming程序一直在监听node2的44444端口,一旦有消息到达,就会被Spark Streaming应用程序取走进行处理
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = slave
a1.sinks.k1.port = 44444
#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 1000000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
scala代码
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
object FlumeEventCount {
def main(args: Array[String]) {
val host = “slave"
val port = 44444
// Create the context and set the batch size
val conf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
// 减少终端的输出信息。设置为ERROR时,由于flume没有启动,仍有大量的输出信息
ssc.sparkContext.setLogLevel("ERROR")
// Create a flume stream
val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
ssc.start()
ssc.awaitTermination()
}// 备注:host (node1),必须是Spark集群中的一台节点,Spark会在这台机器上启动NettyServer
pull
注意 将spark-streaming-flume-sink_2.11-2.3.0.jar、scala-library-2.11.8.jar拷贝到$FLUME_HOME/lib中 备注 scala-library-2.10.5.jar 删除 启动flume: flume-ng agent --conf-file $FLUME_HOME/conf/flume-to-spark-pull.conf --name a1 -Dflume.root.logger=INFO,console
定义配置文件 flume-to-spark-pull.conf
# agent名称,source、channel、sink的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 定义具体的source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node3
a1.sources.r1.port = 22222
a1.sources.r1.channels = c1
# 定义具体的channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 定义具体的sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = node3
a1.sinks.k1.port = 11111
a1.sinks.k1.channel = c1
# 备注:node3是安装了flume的节点
scala import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._
object FlumePullingEventCount {
def main(args: Array[String]) {
val host = "node3"
val port = 11111
val conf = new SparkConf().setAppName("FlumePullingEventCount").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")
val stream = FlumeUtils.createPollingStream(ssc, host, port)
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
ssc.start()
ssc.awaitTermination()
kafka与 flume 整合
将kafkaflume-plugin.jar导入到flume/lib下 编写配置文件conf vi /opt/programfile/flume/conf/kflume.conf # config component
producer.sources=s
producer.channels=c
producer.sinks=r
# config sources
producer.sources.s.type=exec
producer.sources.s.command=tail -F /home/zhangsan/userEventlogs.log
producer.sources.s.channels=c
# config channel
producer.channels.c.type=memory
producer.channels.c.capacity=15000
producer.channels.c.transactionCapacity=150
# config sink4 (kafka中的producer : broker-list,topic)
producer.sinks.r.type=org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=master:9092,slave1:9092,slave2:9092
//9092是broker server的服务器端口,允许只写一个主机地址
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks= -1
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=flumetokafka2
producer.sinks.r.channel = c
编写执行脚本 vi /opt/programfile/flume/kflume.bash #!/bin/bash bin/flume-ng agent -n producer -c conf -f conf/kflume.conf -Dflume.root.logger=DEBUG,console>./kflume.log 2>&1 & 启动脚本 bash kflume.sh 向/home/zhangsan/userEventlogs.log 追加数据 echo “hello ,flume to kafka” >> userEventlogs.log 启动kafka 创建主题:flumetokafka 启动kafka消费者: bin/kafka-console-consumer.sh --zookeeper master:2181 --topic flumetokafka --from-beginning 实时的测试 不断向userEventlogs.log中写入新的信息
飛翔的大雁
关注
关注
49
点赞
60
收藏
打赏
评论
Flume
Flume 总结参数配置详解概述Flume数据的传输过程SourcesNetCat SourceAvro SourceExec SourceSpooling Directory SourceTaildir SourceChannelsFile ChannelSinksLogger SinkHDFS SinkAvro SinkKafka Sink案例汇聚者 (collect.conf)收集者(pro...
复制链接
扫一扫
专栏目录
flume包,用于数据的采集
01-12
flume的包。flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。Client:Client生产数据,运行在一个独立的线程。
 Event: 一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)
  Flow: Event从源点到达目的点的迁移的抽象。
  Agent: 一个独立的Flume进程,包含组件Source、 Channel、 Sink。(Agent使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。)
  Source: 数据收集组件。(source从Client收集数据,传递给Channel)
  Channel: 中转Event的一个临时存储,保存由Source组件传递过来的Event。(Channel连接 sources 和 sinks ,这个有点像一个队列。)
  Sink: 从Channel中读取并移除Event, 将Event传递到FlowPipeline中的下一个Agent(如果有的话)(Sink从Channel收集数据,运行在一个独立线程。)
Flume详解(一)
登峰造极胡子球手
06-29
9365
1、Flume 定义
Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传 输的系统。Flume 基于流式架构,灵活简单。
2、为什么选用flume
flume的主要功能:就是实时读取服务器本地磁盘的数据,将数据写入到HDFS上
3、Flume 组成架构
简易架构图
详细架构图
...
评论 2
您还未登录,请先
登录
后发表或查看评论
Flume的简单介绍
热门推荐
会飞的猪的 博客
07-08
1万+
要想使用Flume,就需要运行Flume代理。Flume代理是由持续运行的sorce(数据来源)、sink(数据目标)以及channei(用干连接sours。和sink)构成的Java进程。Flume的source产生事件,并将其传送给channel , channel存储这些事件直至转发给sink。可以把source-channel-sink的组合视为基本Flume构件  Collector的作...
Flume的简介、原理与安装
彷徨的博客
10-13
2289
1、前言
flume是由cloudera软件公司产出的可分布式日志收集系统,后与2009年被捐赠了apache软件基金会,为hadoop相关组件之一。尤其近几年随着flume的不断被完善以及升级版本的逐一推出,特别是flume-ng;同时flume内部的各种组件不断丰富,用户在开发的过程中使用的便利性得到很大的改善,现已成为apache top项目之一.
官网:http://flume.apa...
Flume介绍与原理
A210810的博客
07-29
3685
1:什么是Flume?
apache Flume 是一个可以收集列如日志,事件等数据资源,将这些数量极大的数据从各项数据资源中集中起来的存储工具,服务,或者数字集中机制,flume具有高可用分布式,配置工具,设计的原理也是基于将数据流,如日志数据从各种网站服务器上汇集起来存储到HDFS,Hbase等集中存储器中,结构如下图。
2:应用的场景
比如我们在做一个电子商务网站,我们想从消费用户中访问点特定的节点区域来分析消费者的行为或购买意图,这样我们就可以更加快速的将客户想要的推送到界面上,
实现这些我们需要
Flume - FileChannel源码详解
qianshanding0708的博客
11-21
6173
FileChannel在Flume是一个非常重要的Channel,FileChannel可以很好的保证数据的完整性和一致性,提供了类似mysql binlog的机制,保证机器down机,JVM异常退出时数据不丢失,在采集数据量很大的情况下,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
FileChannel的简易类结构:
Fi...
大数据开发笔记(六):Flume基础学习
专注大数据与人工智能技术分享,有事扫码加V或加q交流群704932595,欢迎互相学习!
02-04
3158
Flume基础
1、Flume是什么?
Flume是数据采集,日志收集的框架,通过分布式形式进行采集,(高可用分布式)
本质:可以高效从各个网站服务器中收集日志数据,并且存储到HDFS、hbase
2、Flume可以对接的数据源?
Console、RPC、Text、Tail、Syslog、Exec等
3、Flume接受的数据源输出目标?
磁盘,hdfs,hbase, 经过网络传输kafka
data->flume->kafka->spark stre...
flume详解
qq_45048256的博客
12-29
552
flume详解
Flume常见Source、Channel、Sink配置
Why Do You Run
03-12
790
整合一下Flume常见的Source、Channel、Sink的配置
--------------------------常见的Source的配置--------------------
Avro Source配置:
参数
默认值
描述
channels
与之相连的Channel,可以配置多个。用空格隔开。在单个代理流程中,是通过channel连接sources和sinks。一...
flume详解 从原理到应用
weixin_41929239的博客
11-17
628
第一章 flume概述
1.1 flume定义
flume是cloudera提供的一个高可用,高可靠,分布式的海量日志采集、聚合和传输的系统。flume基于流式架构,灵活简单。
flume最主要的作用:实时读取服务器本地磁盘的数据,将数据写入HDFS或kafka消息队列中。
1.2 flume基础架构
flume的组成架构如图所示:
1.2.1 agent
agent是一个jvm进程,他以事件的形式将数据从源头送至目的地。
agent主要有3个部分组成:source、channel、sink
1.2
(转载)Flume简单介绍
moose_killer的博客
03-16
680
Flume概述:
Apache Flume是一个分布式的、可靠的、可用的系统,用于有效地收集、
聚合和将大量日志数据从许多不同的源移动到一个集中的数据存储。
Apache Flume的使用不仅仅局限于日志数据聚合。由于数据源是可定制的,
Flume可以用于传输大量事件数据,包括但不限于网络流量数据、
社交媒体生成的数据、电子邮件消息和几乎所有可能的数据源。
=================================================================
Flume概述
weixin_44847293的博客
11-07
1235
typora-copy-images-to: flume_typora
数据采集工具 – Flume
Flume概述
1、概述(什么是、体系结构、拓扑结构、内部原理)
2、安装配置
3、应用(基础、高级)
无论数据来自什么企业,或是多大量级,通过部署Flume,可以确保数据都安全、及时地到达大数据平台,用户可以将精力集中在如何洞悉数据上。
Flume的定义
Flume由Cloudera公司开发,是一个分布式、高可靠、高可用的海量日志采集、聚合、传输的系统。
Flume支持在日志系统中定制各类数据发.
Flume详解
hello,Mr张
12-06
2867
1、Flume简介
(1)、flume提供分布式,可靠的,对海量的日志进行搞笑收集,聚集,移动的服务,flume只能在unix环境下运行;
(2)、flume基于流式架构,容错性强,也很灵活简单;
(3)、flume,kafka用来进行实时数据收集,spark、storm用来实时处理数据,impala用来实时查询;
二、Flume架构
2.1、source
用于采集收据,source是产生数据流...
Flume简介、特点、核心概念及安装
qq_30612351的博客
11-09
4863
一、什么是flume?
flume是一个可分布式日志收集系统,为hadoop相关组件之一。
Flume 是可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据源中集中起来存储的工具/服务。
Flume可以采集文件,socket数据包(网络端口)、文件夹、kafka、mysql数据库等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中。
二、Flume特性
Flume是一个分布式、可靠、和高可用的海量日志采.
最全Flume常用配置文件详情解析
weixin_45417821的博客
08-19
2945
Flume的常用配置项1、Source配置项(常见配置项)1.1 Avro Source1.2 NetCat Source1.3 Exec Source1.4 Spooling Directory Source1.5 Taildir Source1.6 Thrift Source1.7 Kafka Source1.8 Sequence Generator Source1.9 HTTP Source2、Channels 配置项 (常见配置项)2.1 Memory Channel2.2 JDBC Channel
Flume组成架构
weixin_38023225的博客
10-09
617
1 概述
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streamin...
flume
leezsj的博客
07-17
505
flume
大数据处理流程
1、数据采集
2、数据存储
3、数据清洗
4、数据分析
5、数据展示
在数据采集和搜索工具中flume框架占有一定的市场分量
flume
flume是一种分布式,可靠的高可用的服务,用于有效的收集,聚合和移动大量日志数据,它具有基于数据流的简单灵活的体系结构,它具有调整可靠性机制以及许多故障转移和恢复机制,具有强大的功能和容错能力,它使用一个简单的可扩展数据模型语序在线分析应用程序
官网http://flume.apache.org/
flum.
大数据技术——Flume简介&安装配置&使用案例
最新发布
weixin_44606952的博客
11-29
630
1. Flume 概述
1.1 Flume简介
1.2 Flume的特点
1.3 Flume的基础架构
2. Flume安装配置
2.1 下载地址
2.2 安装部署
3. Flume 使用案例
3.1 实时监控单个追加文件
3.2 实时监控目录下多个新文件
3.3 实时监控目录下的多个追加文件
Flume技术原理
曹世宏的博客
11-16
7007
Flume简介
Flume概述:
Flume是开源日志系统。是一个分布式、可靠性和高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据;同时,FLume提供对数据进行简单处理,并写到各种数据接收方(可定制)的能力。
Flume是什么?
Flume是流式日志采集工具,FLume提供对数据进行简单处理并且写到各种数据接收方(可定制)的能力,Flume提供从本地文件(s...
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:深蓝海洋
设计师:CSDN官方博客
返回首页
飛翔的大雁
CSDN认证博客专家
CSDN认证企业博客
码龄3年
暂无认证
40
原创
5万+
周排名
102万+
总排名
9万+
访问
等级
1356
积分
127
粉丝
347
获赞
38
评论
962
收藏
私信
关注
热门文章
hive
26251
spark Sql
19838
RDD
12074
Flume
7632
hive on spark
6608
分类专栏
大数据 运维
付费
2篇
kafka 运维
付费
1篇
hadoop 运维
付费
1篇
笔记
BigData
13篇
hadoop
9篇
hive
2篇
flume
1篇
kafka
1篇
HBase
1篇
spark
7篇
JavaSE
15篇
Linux
1篇
最新评论
RDD
sinat_38866075:
太牛了,大佬!
spark Sql
weixin_68351087:
楼主这些东西是从事大数据必备了解的吗
spark Sql
班般:
这哪里是文章 这不就是教科书吗
hadoop 常用命令
tHmouse:
为数不多的,能看明白的点个赞
RDD
小狐狸ya:
棒 收获大大的
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
大数据运维 - Linux 之 软中断(softirq )
kafka 删除topic 不成功
第八天 hadoop 数据的清洗、串行MR、压缩算法
2022年2篇
2020年24篇
2019年14篇
目录
目录
分类专栏
大数据 运维
付费
2篇
kafka 运维
付费
1篇
hadoop 运维
付费
1篇
笔记
BigData
13篇
hadoop
9篇
hive
2篇
flume
1篇
kafka
1篇
HBase
1篇
spark
7篇
JavaSE
15篇
Linux
1篇
目录
评论 2
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
打赏作者
飛翔的大雁
你的鼓励将是我创作的最大动力
¥2
¥4
¥6
¥10
¥20
输入1-500的整数
余额支付
(余额:-- )
扫码支付
扫码支付:¥2
获取中
扫码支付
您的余额不足,请更换扫码支付或充值
打赏作者
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值