javaee论坛

普通会员

225648

帖子

345

回复

359

积分

楼主
发表于 2019-11-03 09:03:43 | 查看: 429 | 回复: 0

StructuredStreaming是Spark2.4版本推出的新的实时流处理计算。相比SparkStreaming具有更低的延迟。

具体信息请看:StructuredStreaming官方介绍

示例一:wordscountScala代码packagecom.testimportorg.apache.log4j.{Level,Logger}importorg.apache.spark.sql.SparkSessionobjectWordCount{defmain(args:Array[String]):Unit={Logger.getLogger("org.apache.spark").setLevel(Level.WARN)valspark=SparkSession.builder().master("local").appName("WordCount").getOrCreate()//读取socket流数据,监听端口9998vallines=spark.readStream.format("socket").option("host","localhost").option("port","9998").load()//隐式转换importspark.implicits._//将一行数据进行空格分割后打平valwords=lines.as[String].flatMap(_.split(""))//根据value进行groupby,计算出countvalwordCounts=words.groupBy("value").count()//将流数据写入console控制台valquery=wordCounts.writeStream.format("console").outputMode("complete").start()//将进程阻塞query.awaitTermination()}}nc-lk9998进行传输数据

WordCount代码结果输出

这里解释下:outputMode

outputMode输出模式分为3种:append,complete,update。

append:只有流数据中的新行将写入sinkcomplete:每次有更新时,流数据中的所有行都将写入sinkupdate:每次有更新时,只有流数据中更新的行要写入sink。如果查询没有包含聚合,相当于“append”模式。示例二:WordCountAppend模式objectWordCountAppend{defmain(args:Array[String]):Unit={Logger.getLogger("org.apache.spark").setLevel(Level.WARN)valspark=SparkSession.builder().master("local").appName("WordCountAppend").getOrCreate()vallines=spark.readStream.format("socket").option("host","localhost").option("port","9998").load()valquery=lines.writeStream.format("console").outputMode("append").start()query.awaitTermination()}}

自己体验一把就行

示例三:读取csv/json文件packagecom.testimportorg.apache.log4j.{Level,Logger}importorg.apache.spark.sql.SparkSessionimportorg.apache.spark.sql.types.{StringType,StructField,StructType}objectCSVFormat{defmain(args:Array[String]):Unit={Logger.getLogger("org.apache.spark").setLevel(Level.WARN)valspark=SparkSession.builder().master("local").appName("CSVFormat").getOrCreate()//定义schemavalschema=StructType(List(StructField("name",StringType),StructField("age",StringType),StructField("sex",StringType)))vallines=spark.readStream.format("csv").schema(schema).load("/Users/zhangzhiqiang/Documents/test_project/comtest/data")valquery=lines.writeStream.format("console").outputMode("append").start()query.awaitTermination()}}

在/Users/zhangzhiqiang/Documents/test_project/comtest/data目录下存放一些csv文件,或者逐步放入csv文件,可以看到界面实时的输出csv文件的内容。

上图是,我将data目录下的文件复制了一份产生的。

示例四:kafka流数据读取packagecom.testimportorg.apache.log4j.{Level,Logger}importorg.apache.spark.sql.SparkSessionobjectKafkaFormat{defmain(args:Array[String]):Unit={Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.kafka").setLevel(Level.WARN)valspark=SparkSession.builder().master("local").appName("KafkaFormat").getOrCreate()//读取kafka的数据valdf=spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","test").load()//隐式转换importspark.implicits._//截取key,value2个字段vallines=df.selectExpr("CAST(keyASSTRING)","CAST(valueASSTRING)").as[(String,String)]valres=lines.map{line=>//解析value的值valcolumns=line._2.split(",")(columns(0),columns(1),columns(2))}.toDF()res.createOrReplaceTempView("tmp")valresult=spark.sql("select_1asname,_2asage,_3assexfromtmp")valquery=result.writeStream.format("console").outputMode("append").start()query.awaitTermination()}}

启动kafka生成器命令行,shkafka-console-producer.sh--broker-listlocalhost:9092--topictest,向命令行写入caocao,32,male。

可以看到console的信息,如下图:

示例五:解析kafka的json数据packagecom.testimportcom.google.gson.Gsonimportorg.apache.log4j.{Level,Logger}importorg.apache.spark.sql.SparkSessionobjectKafkaFormatJson{defmain(args:Array[String]):Unit={Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.kafka").setLevel(Level.WARN)valspark=SparkSession.builder().master("local").appName("KafkaFormatJson").getOrCreate()//读取kafka流数据vallines=spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","test").load()//隐式转换importspark.implicits._valvalues=lines.selectExpr("CAST(valueASSTRING)").as[String]valres=values.map{value=>//解析json逻辑valgson=newGsonvalpeople=gson.fromJson(value,classOf[People])(people.name,people.age,people.sex)}res.createOrReplaceTempView("tmp")//sparksqlvalresult=spark.sql("select_1asname,_2asage,_3assexfromtmp")//写入valquery=result.writeStream.format("console").outputMode("append").start()query.awaitTermination()}}

People类

packagecom.testcaseclassPeople(name:String,age:String,sex:String)extendsSerializable

启动程序,在kafka生成器命令行,输入{"name":"caocao","age":"32","sex":"male"}数据。可以看到console的信息,如下:

github代码:structuredstreamngdemo项目


您需要登录后才可以回帖 登录 | 立即注册

触屏版| 电脑版

技术支持 历史网 V2.0 © 2016-2017