javaee论坛

普通会员

225648

帖子

344

回复

358

积分

楼主
发表于 2019-11-03 06:34:21 | 查看: 469 | 回复: 0

Flink提供了TableApi,用来统一批流入口,使用FlinkTableApi,直接处理流,会特别的简洁,易用。下面来看下使用FlinkTableApi实现TumbleWindow(翻滚窗口)、SlideWindow(滑动窗口)、SessionWindow(会话)。

一、基于ProcessingTime的Window实现示例代码publicclassSqlTumbleWindowProcessTimeStream{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//ProcessingTimeenv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);StreamTableEnvironmenttableEnv=StreamTableEnvironment.create(env);Propertiesp=newProperties();p.setProperty("bootstrap.servers","localhost:9092");p.setProperty("group.id","test");Kafkakafka=newKafka().properties(p).topic("user").version("0.10");Schemaschema=newSchema().field("userId",Types.STRING).field("name",Types.STRING).field("age",Types.STRING).field("sex",Types.STRING).field("createTime",Types.BIG_DEC).field("updateTime",Types.BIG_DEC)//设置ProcessingTime.field("procTime",Types.SQL_TIMESTAMP).proctime();tableEnv.connect(kafka).withSchema(schema).withFormat(newJson().deriveSchema()).inAppendMode().registerTableSource("Users");Tabletable=tableEnv.sqlQuery("select*fromUsers");tableEnv.toAppendStream(table,TypeInformation.of(Row.class)).print("row:");//tumblewindowTabletable1=tableEnv.sqlQuery("SELECTuserId,TUMBLE_START(procTime,INTERVAL'1'MINUTE)ASrStart,COUNT(1)AScountNumfromUsersGROUPBYTUMBLE(procTime,INTERVAL'1'MINUTE),userId");tableEnv.toAppendStream(table1,TypeInformation.of(Row.class)).print("tumble:");//tumblewindow,这里试下TUMBLE_END、TUMBLE_PROCTIMEapiTabletable2=tableEnv.sqlQuery("SELECTuserId,TUMBLE_START(procTime,INTERVAL'1'MINUTE)ASrStart,TUMBLE_END(procTime,INTERVAL'1'MINUTE)ASrEnd,TUMBLE_PROCTIME(procTime,INTERVAL'1'MINUTE)ASpTime,COUNT(1)AScountNumfromUsersGROUPBYTUMBLE(procTime,INTERVAL'1'MINUTE),userId");tableEnv.toAppendStream(table2,TypeInformation.of(Row.class)).print("tumble2:");//HOP()表示slidewindowTabletable3=tableEnv.sqlQuery("SELECTuserId,HOP_START(procTime,INTERVAL'1'MINUTE,INTERVAL'5'MINUTE)ashStart,COUNT(1)AScountNumfromUsersGROUPBYHOP(procTime,INTERVAL'1'MINUTE,INTERVAL'5'MINUTE),userId");tableEnv.toAppendStream(table3,TypeInformation.of(Row.class)).print("hop:");//sessionwindowTabletable4=tableEnv.sqlQuery("SELECTuserId,SESSION_START(procTime,INTERVAL'1'MINUTE)ASsStart,COUNT(1)AScountNumfromUsersGROUPBYSESSION(procTime,INTERVAL'1'MINUTE),userId");tableEnv.toAppendStream(table4,TypeInformation.of(Row.class)).print("session:");env.execute("SqlTumbleWindowProcessTimeStream");}}二、基于EventTime的Window实现示例代码publicclassSqlTumbleWindowEventTimeStream{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);StreamTableEnvironmenttableEnv=StreamTableEnvironment.create(env);Propertiesp=newProperties();p.setProperty("bootstrap.servers","localhost:9092");p.setProperty("group.id","test");Kafkakafka=newKafka().properties(p).topic("user").version("0.10");Schemaschema=newSchema().field("userId",Types.STRING).field("name",Types.STRING).field("age",Types.STRING).field("sex",Types.STRING)//.field("createTime",Types.BIG_DEC).field("updateTime",Types.BIG_DEC)//设置rowtime,把字段timestamp设置为EventTime的水印时间戳.field("rowTime",Types.SQL_TIMESTAMP).rowtime(newRowtime().timestampsFromField("createTime").watermarksPeriodicBounded(60000));tableEnv.connect(kafka).withSchema(schema).withFormat(newJson().deriveSchema()).inAppendMode().registerTableSource("Users");Tabletable=tableEnv.sqlQuery("select*fromUsers");tableEnv.toAppendStream(table,TypeInformation.of(Row.class)).print("row:");Tabletable1=tableEnv.sqlQuery("SELECTuserId,TUMBLE_START(rowTime,INTERVAL'1'MINUTE)ASrStart,COUNT(1)AScountNumfromUsersGROUPBYTUMBLE(rowTime,INTERVAL'1'MINUTE),userId");tableEnv.toAppendStream(table1,TypeInformation.of(Row.class)).print("tumble:");Tabletable2=tableEnv.sqlQuery("SELECTuserId,TUMBLE_START(rowTime,INTERVAL'1'MINUTE)ASrStart,TUMBLE_END(rowTime,INTERVAL'1'MINUTE)ASrEnd,TUMBLE_ROWTIME(rowTime,INTERVAL'1'MINUTE)ASpTime,COUNT(1)AScountNumfromUsersGROUPBYTUMBLE(rowTime,INTERVAL'1'MINUTE),userId");tableEnv.toAppendStream(table2,TypeInformation.of(Row.class)).print("tumble2:");Tabletable3=tableEnv.sqlQuery("SELECTuserId,HOP_START(rowTime,INTERVAL'1'MINUTE,INTERVAL'5'MINUTE)ashStart,COUNT(1)AScountNumfromUsersGROUPBYHOP(rowTime,INTERVAL'1'MINUTE,INTERVAL'5'MINUTE),userId");tableEnv.toAppendStream(table3,TypeInformation.of(Row.class)).print("hop:");Tabletable4=tableEnv.sqlQuery("SELECTuserId,SESSION_START(rowTime,INTERVAL'1'MINUTE)ASsStart,COUNT(1)AScountNumfromUsersGROUPBYSESSION(rowTime,INTERVAL'1'MINUTE),userId");tableEnv.toAppendStream(table4,TypeInformation.of(Row.class)).print("session:");env.execute("SqlTumbleWindowProcessTimeStream");}}

您需要登录后才可以回帖 登录 | 立即注册

触屏版| 电脑版

技术支持 历史网 V2.0 © 2016-2017