javaee论坛

普通会员

225648

帖子

335

回复

349

积分

楼主
发表于 2019-11-03 06:34:24 | 查看: 360 | 回复: 0

因业务要求,我们需要从Kafka中读取数据,变换后最终Sink到业务的消息队列中,为保证数据的可靠性,我们同时对Sink的结果数据,进行保存。最终选择将流数据Sink到HDFS上,在Flink中,同时也提供了HDFSConnector。下面就介绍如何将流式数据写入HDFS,同时将数据load到Hive表中。

一、pom.xml文件配置<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-filesystem_2.11</artifactId><version>1.8.0</version></dependency>二、FlinkDataStream代码

代码是将最后的结果数据,拼接成CSV格式,最后写入HDFS中。下面的逻辑在真实地业务中删除许多。仅保留有用对大家的代码。

publicclassRMQAndBucketFileConnectSink{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Propertiesp=newProperties();p.setProperty("bootstrap.servers","localhost:9092");SingleOutputStreamOperator<String>ds=env.addSource(newFlinkKafkaConsumer010<String>("user",newSimpleStringSchema(),p)).map(newMapFunction<String,User>(){@OverridepublicUsermap(Stringvalue)throwsException{returnnewGson().fromJson(value,User.class);}}).assignTimestampsAndWatermarks(newAscendingTimestampExtractor<User>(){@OverridepubliclongextractAscendingTimestamp(Userelement){returnelement.createTime;}}).map(newMapFunction<User,String>(){@OverridepublicStringmap(Uservalue)throwsException{returnvalue.userId+","+value.name+","+value.age+","+value.sex+","+value.createTime+","+value.updateTime;}});//写入RabbitMQRMQConnectionConfigrmqConnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setVirtualHost("/").setPort(5672).setUserName("admin").setPassword("admin").build();//写入RabbitMQ,如果队列是持久化的,需要重写RMQSink的setupQueue方法RMQSink<String>rmqSink=newRMQSink<>(rmqConnectionConfig,"queue_name",newSimpleStringSchema());ds.addSink(rmqSink);//写入HDFSBucketingSink<String>bucketingSink=newBucketingSink<>("/apps/hive/warehouse/users");//设置以yyyyMMdd的格式进行切分目录,类似hive的日期分区bucketingSink.setBucketer(newDateTimeBucketer<>("yyyyMMdd",ZoneId.of("Asia/Shanghai")));//设置文件块大小128M,超过128M会关闭当前文件,开启下一个文件bucketingSink.setBatchSize(1024*1024*128L);//设置一小时翻滚一次bucketingSink.setBatchRolloverInterval(60*60*1000L);//设置等待写入的文件前缀,默认是_bucketingSink.setPendingPrefix("");//设置等待写入的文件后缀,默认是.pendingbucketingSink.setPendingSuffix("");//设置正在处理的文件前缀,默认为_bucketingSink.setInProgressPrefix(".");ds.addSink(bucketingSink);env.execute("RMQAndBucketFileConnectSink");}}

写入的HDFS文件目录如下:

/apps/hive/warehouse/users/20190708/apps/hive/warehouse/users/20190709/apps/hive/warehouse/users/20190710...三、Hive表的创建以及导入

创建hive表

createexternaltabledefault.users(`userId`string,`name`string,`age`int,`sex`int,`ctime`string,`utime`string,)partitionedby(dtstring)ROWFORMATDELIMITEDFIELDSTERMINATEDBY',';

创建定时任务,每天凌晨导入HDFS文件到Hive,导入Hive脚本。

load_hive.sh如下:

#!/usr/bin/envbashd=`date-d"-1day"+%Y%m%d`#每天HDFS的数据导入hive分区中/usr/hdp/2.6.3.0-235/hive/bin/hive-e"altertabledefault.usersaddpartition(dt='${d}')location'/apps/hive/warehouse/users/${d}'"

使用crontab每天凌晨调度就行。


上一篇:数学建模三剑客MSN 下一篇:线程八锁
您需要登录后才可以回帖 登录 | 立即注册

触屏版| 电脑版

技术支持 历史网 V2.0 © 2016-2017