min():获取的最小值,指定的field是最小,但不是最小的那条记录,后面的示例会清晰的显示。
minBy():获取的最小值,同时也是最小值的那条记录。
max()与maxBy()的区别也是一样。
那么下面来看示例:
publicclassMinOrMinByOperator{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentsEnv=StreamExecutionEnvironment.getExecutionEnvironment();sEnv.setParallelism(1);Propertiesp=newProperties();p.setProperty("bootstrap.servers","localhost:9092");DataStreamSource<String>source=sEnv.addSource(newFlinkKafkaConsumer010<String>("student",newSimpleStringSchema(),p));source.print();WindowedStream<Student,String,TimeWindow>windowedStream=source.map(newMapFunction<String,Student>(){@OverridepublicStudentmap(Stringvalue)throwsException{returnnewGson().fromJson(value,Student.class);}}).keyBy(newKeySelector<Student,String>(){@OverridepublicStringgetKey(Studentvalue)throwsException{returnvalue.sid;}}).timeWindow(Time.minutes(1));SingleOutputStreamOperator<Student>min=windowedStream.min("age");min.print("min:");SingleOutputStreamOperator<Student>minBy=windowedStream.minBy("age");minBy.print("minBy:");sEnv.execute("MinOrMinByOperator");}}
模拟几条数据,输出结果如下