CDH5-flume+kafka

最近简单学习了如何在CDH5上搭建flumekafka,在这篇文章里记录下。

添加服务

我目前在使用的CDH版本是5.3.9,flume 直接可以在CDH上添加服务,非常简单,具体可见官网的文档。而kafka 则相对比较麻烦点,需要下载kafka的服务描述jar包

# 在cloudera-scm-server上执行
mkdir -pv /opt/cloudera/csd  
wget -c http://archive.cloudera.com/csds/kafka/KAFKA-1.2.0.jar -O /opt/cloudera/csd/KAFKA-1.2.0.jar  
chown -R cloudera-scm:cloudera-scm /opt/cloudera/csd  

需要重启Cloudera Management Service,可以在CDH页面上执行。之后就可以在percel页面中看到kafka了。接着就是分配、激活kafka和添加服务了。具体可以看下这篇文章

我的这个测试集群一共有四台机器,其中cdh1.mycluster.comnamenodezookeeper也只安装在了namenode上。cdh[2-4].mycluster.comdatanode

配置flume

在页面上修改flume配置文件,如下:

tier1.sources=src_http_41800  
tier1.channels=ch_kafka_hive_table  
tier1.sinks=sink_hdfs_hive_table  
tier1.sources.src_http_41800.type=http  
tier1.sources.src_http_41800.port=41800  
tier1.sources.src_http_41800.channels=ch_kafka_hive_table  
tier1.sources.src_http_41800.handler=org.apache.flume.source.http.JSONHandler  
tier1.channels.ch_kafka_hive_table.type=org.apache.flume.channel.kafka.KafkaChannel  
tier1.channels.ch_kafka_hive_table.capacity=10000  
tier1.channels.ch_kafka_hive_table.transactionCapacity=1000  
tier1.channels.ch_kafka_hive_table.brokerList=cdh2.mycluster.com:9092,cdh3.mycluster.com:9092,cdh4.mycluster.com:9092  
tier1.channels.ch_kafka_hive_table.topic=flume_hive_table  
tier1.channels.ch_kafka_hive_table.zookeeperConnect=cdh1.mycluster.com:2181  
tier1.channels.ch_kafka_hive_table.kafka.producer.type=async  
tier1.channels.ch_kafka_hive_table.kafka.acks=all  
tier1.channels.ch_kafka_hive_table.kafka.compression.type=snappy  
tier1.channels.ch_kafka_hive_table.kafka.batch.size=16384  
tier1.channels.ch_kafka_hive_table.kafka.linger.ms=100  
tier1.channels.ch_kafka_hive_table.kafka.max.request.size=1048576  
tier1.sinks.sink_hdfs_hive_table.type=hdfs  
tier1.sinks.sink_hdfs_hive_table.channel=ch_kafka_hive_table  
tier1.sinks.sink_hdfs_hive_table.hdfs.path=hdfs://cdh1.mycluster.com/user/hive/warehouse/%{database}.db/%{table}/%{partition}  
tier1.sinks.sink_hdfs_hive_table.hdfs.writeFormat=Text  
tier1.sinks.sink_hdfs_hive_table.hdfs.fileType=DataStream  
tier1.sinks.sink_hdfs_hive_table.hdfs.inUsePrefix=.  
tier1.sinks.sink_hdfs_hive_table.hdfs.rollInterval=3600  
tier1.sinks.sink_hdfs_hive_table.hdfs.rollSize=0  
tier1.sinks.sink_hdfs_hive_table.hdfs.rollCount=0  
tier1.sinks.sink_hdfs_hive_table.hdfs.batchSize=1000  
tier1.sinks.sink_hdfs_hive_table.hdfs.txnEventMax=1000  
tier1.sinks.sink_hdfs_hive_table.hdfs.callTimeout=60000  
tier1.sinks.sink_hdfs_hive_table.hdfs.appendTimeout=60000  

重启flume后,可以发现flume起了个41800端口。通过POST提交数据到这个端口:

curl -X POST -H "Content-Type: application/json; charset=UTF-8" -d '[{"headers":{"database":"db_test","table":"t_log_test","partition":"dt=2016-08-09"},{"headers":{"database":"db_test","table":"t_log_test2","partition":"dt=2016-08-09"},"body":"4\t5\t6"}]' http://cdh2.mycluster.com:41800/  

如果返回200则说明提交成功了。这时看下hadoop

# sudo -u hdfs hadoop fs -ls /user/hive/warehouse/db_test.db                          
Found 3 items  
drwxrwxrwt   - flume hive          0 2016-09-11 20:35 /user/hive/warehouse/db_test.db/t_log_test  
drwxrwxrwt   - flume hive          0 2016-09-11 20:35 /user/hive/warehouse/db_test.db/t_log_test2  
# 数据先是存在tmp文件,1小时候才变成正式的FlumeData文件
# sudo -u hdfs hadoop fs -ls /user/hive/warehouse/db_test.db/t_log_test/dt=2016-08-08/ 
Found 1 items  
-rw-r--r--   2 flume hive         36 2016-09-11 21:35 /user/hive/warehouse/db_test.db/t_log_test/dt=2016-08-08/FlumeData.1473597323541
# sudo -u hdfs hadoop fs -cat /user/hive/warehouse/db_test.db/t_log_test/dt=2016-08-08/FlumeData.1473597323541
1       2       3