Maxwell:发送MySQL binlog到Hadoop(kafka+flume)

发现了一个可以发送MySQL binlog到Hadoop的项目:maxwell

Maxwell安装和部署

二进制文件,参考官方文档QuickStart即可。

启动Maxwell

bin/maxwell --user='aaaa' --password='bbbb' --host='xxx.xxx.xxx.xxx' --producer=kafka --kafka.bootstrap.servers=192.168.11.211:9092,192.168.11.212:9092,192.168.11.62:9092  

其中:

  1. aaaabbbb是在QuickStart里面设置的MySQL用户和密码
  2. kafka.bootstrap.servers参数是所有的kafka实例

使用kafka终端命令检查

先使用kafka终端命令检查binlog是否已经发送到kafka topic

# /opt/cloudera/parcels/KAFKA-2.0.2-1.2.0.2.p0.5/lib/kafka/bin/kafka-topics.sh -zookeeper=127.0.0.1:2181 -describe -topic maxwell
Topic:maxwell    PartitionCount:1    ReplicationFactor:1 Configs:  
    Topic: maxwell  Partition: 0    Leader: 1002    Replicas: 1002  Isr: 1002

使用消费者脚本

# /opt/cloudera/parcels/KAFKA-2.0.2-1.2.0.2.p0.5/lib/kafka/bin/kafka-console-consumer.sh -zookeeper=127.0.0.1:2181 --from-beginning --topic maxwell
## 类似下面的输出
{"database":"test3","table":"person","type":"insert","ts":1482915438,"xid":2881,"commit":true,"data":{"number":13,"name":"test13","birthday":"1992-11-30"}}

可以说kafka这边已经准备好了。

flume的配置

参考cloudera的文档(我的环境是CDH-5.3.9,flume版本是1.5.0,新的版本配置与我本文贴出来的不同,请注意)

tier1.sources  = source1  
tier1.channels = channel1  
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource  
tier1.sources.source1.zookeeperConnect = zk01.example.com:2181  
tier1.sources.source1.topic = weblogs  
tier1.sources.source1.groupId = flume  
tier1.sources.source1.channels = channel1  
tier1.sources.source1.interceptors = i1  
tier1.sources.source1.interceptors.i1.type = timestamp  
tier1.sources.source1.kafka.consumer.timeout.ms = 100

tier1.channels.channel1.type = memory  
tier1.channels.channel1.capacity = 10000  
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = hdfs  
tier1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d  
tier1.sinks.sink1.hdfs.rollInterval = 5  
tier1.sinks.sink1.hdfs.rollSize = 0  
tier1.sinks.sink1.hdfs.rollCount = 0  
tier1.sinks.sink1.hdfs.fileType = DataStream  
tier1.sinks.sink1.channel = channel1  

需要修改的是sourceszookeeperConnecttopic,如果有需要,可以修改sinkshdfs.path等配置。

如何确认已写入hadoop

在MySQL插入一条数据,然后查看hadoop

> use test3
> insert into person values(14, "test14", "2016-12-29");
# sudo -u hdfs hadoop fs -ls /tmp/kafka/maxwell/16-12-28/
Found 2 items  
-rw-r--r--   2 flume supergroup        156 2016-12-28 16:57 /tmp/kafka/maxwell/16-12-28/FlumeData.1482915441917
-rw-r--r--   2 flume supergroup        156 2016-12-28 18:47 /tmp/kafka/maxwell/16-12-28/FlumeData.1482922069309

# sudo -u hdfs hadoop fs -cat /tmp/kafka/maxwell/16-12-28/FlumeData.1482922069309
{"database":"test3","table":"person","type":"insert","ts":1482922063,"xid":3121,"commit":true,"data":{"number":14,"name":"test14","birthday":"2016-12-29"}}

Have FUN

后记

遇到一个问题,在一台机器上重新配置了maxwell,发现maxwell发送到kafka总是TIMEOUT。为了排查,启动maxwell时加上了--log_level=debug,发现maxwell是用了域名去连接kafka,而这个域名只是内部域名,所以解析失败。/etc/hosts加上后问题解决。