javaee论坛

普通会员

225648

帖子

344

回复

358

积分

楼主
发表于 2019-11-03 06:34:19 | 查看: 371 | 回复: 1

FlinkTable可以很好的将Stream数据直接写入到文件系统。示例如下:

代码示例一publicclassSqlSinkFileSystemStream{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironmenttableEnv=StreamTableEnvironment.create(env);Schemaschema=newSchema().field("userId",Types.STRING).field("name",Types.STRING).field("age",Types.STRING).field("sex",Types.STRING).field("createTime",Types.BIG_DEC).field("updateTime",Types.BIG_DEC);TableSchematableSchema=newTableSchema.Builder().field("userId",Types.STRING).field("name",Types.STRING).field("age",Types.STRING).field("sex",Types.STRING).field("createTime",Types.BIG_DEC).field("updateTime",Types.BIG_DEC).build();Propertiesp=newProperties();p.setProperty("bootstrap.servers","localhost:9092");p.setProperty("group.id","test");Kafkakafka=newKafka().properties(p).topic("user").version("0.10");tableEnv.connect(kafka).withSchema(schema).withFormat(newJson().deriveSchema()).inAppendMode().registerTableSource("Users");Tabletable=tableEnv.sqlQuery("select*fromUsers");//输出到本地tableEnv.toAppendStream(table,TypeInformation.of(Row.class)).print("row:");FileSystemfileSystem=newFileSystem().path("data/user.csv");tableEnv.connect(fileSystem).withSchema(schema)//使用newCsv()不是很好用,schema的参数处理不好.withFormat(newOldCsv().schema(tableSchema).fieldDelimiter(",")).inAppendMode().registerTableSink("Users2");//插入到fsQueryConfigconf=newStreamQueryConfig();tableEnv.insertInto(table,"Users2",conf);env.execute("SqlSinkFileSystemStream");}}示例代码二

Flink自己实现了CsvTableSink类,可以直接使用,代码如下:

publicclassSqlSinkCsvFileStream{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironmenttableEnv=StreamTableEnvironment.create(env);Schemaschema=newSchema().field("userId",Types.STRING).field("name",Types.STRING).field("age",Types.STRING).field("sex",Types.STRING).field("createTime",Types.BIG_DEC).field("updateTime",Types.BIG_DEC);tableEnv.connect(newKafka().version("0.10").topic("user").property("bootstrap.servers","localhost:9092")).withSchema(schema).withFormat(newJson().deriveSchema()).inAppendMode().registerTableSource("Users");Tabletable=tableEnv.sqlQuery("selectuserId,name,age,sex,createTimefromUsers");tableEnv.toAppendStream(table,TypeInformation.of(Row.class)).print();CsvTableSinksink=newCsvTableSink("data/users.csv",",",1,FileSystem.WriteMode.NO_OVERWRITE);tableEnv.registerTableSink("Result",newString[]{"userId","name","age","sex","createTime"},newTypeInformation[]{Types.STRING,Types.STRING,Types.STRING,Types.STRING,Types.BIG_DEC},sink);tableEnv.insertInto(table,"Result",newStreamQueryConfig());env.execute("SqlSinkCsvFileStream");}}

普通会员

0

帖子

310

回复

320

积分
沙发
发表于 2023-10-11 16:46:33

楼主节操掉了,还不快捡起来

您需要登录后才可以回帖 登录 | 立即注册

触屏版| 电脑版

技术支持 历史网 V2.0 © 2016-2017