FlinkTable可以很好的将Stream数据直接写入到文件系统。示例如下:
代码示例一publicclassSqlSinkFileSystemStream{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironmenttableEnv=StreamTableEnvironment.create(env);Schemaschema=newSchema().field("userId",Types.STRING).field("name",Types.STRING).field("age",Types.STRING).field("sex",Types.STRING).field("createTime",Types.BIG_DEC).field("updateTime",Types.BIG_DEC);TableSchematableSchema=newTableSchema.Builder().field("userId",Types.STRING).field("name",Types.STRING).field("age",Types.STRING).field("sex",Types.STRING).field("createTime",Types.BIG_DEC).field("updateTime",Types.BIG_DEC).build();Propertiesp=newProperties();p.setProperty("bootstrap.servers","localhost:9092");p.setProperty("group.id","test");Kafkakafka=newKafka().properties(p).topic("user").version("0.10");tableEnv.connect(kafka).withSchema(schema).withFormat(newJson().deriveSchema()).inAppendMode().registerTableSource("Users");Tabletable=tableEnv.sqlQuery("select*fromUsers");//输出到本地tableEnv.toAppendStream(table,TypeInformation.of(Row.class)).print("row:");FileSystemfileSystem=newFileSystem().path("data/user.csv");tableEnv.connect(fileSystem).withSchema(schema)//使用newCsv()不是很好用,schema的参数处理不好.withFormat(newOldCsv().schema(tableSchema).fieldDelimiter(",")).inAppendMode().registerTableSink("Users2");//插入到fsQueryConfigconf=newStreamQueryConfig();tableEnv.insertInto(table,"Users2",conf);env.execute("SqlSinkFileSystemStream");}}示例代码二
Flink自己实现了CsvTableSink类,可以直接使用,代码如下:
publicclassSqlSinkCsvFileStream{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironmenttableEnv=StreamTableEnvironment.create(env);Schemaschema=newSchema().field("userId",Types.STRING).field("name",Types.STRING).field("age",Types.STRING).field("sex",Types.STRING).field("createTime",Types.BIG_DEC).field("updateTime",Types.BIG_DEC);tableEnv.connect(newKafka().version("0.10").topic("user").property("bootstrap.servers","localhost:9092")).withSchema(schema).withFormat(newJson().deriveSchema()).inAppendMode().registerTableSource("Users");Tabletable=tableEnv.sqlQuery("selectuserId,name,age,sex,createTimefromUsers");tableEnv.toAppendStream(table,TypeInformation.of(Row.class)).print();CsvTableSinksink=newCsvTableSink("data/users.csv",",",1,FileSystem.WriteMode.NO_OVERWRITE);tableEnv.registerTableSink("Result",newString[]{"userId","name","age","sex","createTime"},newTypeInformation[]{Types.STRING,Types.STRING,Types.STRING,Types.STRING,Types.BIG_DEC},sink);tableEnv.insertInto(table,"Result",newStreamQueryConfig());env.execute("SqlSinkCsvFileStream");}}