发布网友 发布时间:2022-04-22 14:18
共2个回答
懂视网 时间:2022-05-03 04:23
Michael,3000 Andy,4500 Justin,3500 Betral,4000
一、定义自定义无类型聚合函数
想要自定义无类型聚合函数,那必须得继承org.spark.sql.expressions.UserDefinedAggregateFunction,然后重写父类得抽象变量和成员方法。
package com.cjs import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ object UDFMyAverage extends UserDefinedAggregateFunction{ //定义输入参数的数据类型 override def inputSchema: StructType = StructType(StructField("inputColumn", LongType)::Nil) //定义缓冲器的数据结构类型,缓冲器用于计算,这里定义了两个数据变量:sum和count override def bufferSchema: StructType = StructType(StructField("sum",LongType)::StructField("count",LongType)::Nil) //聚合函数返回的数据类型 override def dataType: DataType = DoubleType override def deterministic: Boolean = true //初始化缓冲器 override def initialize(buffer: MutableAggregationBuffer): Unit = { //buffer本质上也是一个Row对象,所以也可以使用下标的方式获取它的元素 buffer(0) = 0L //这里第一个元素是上面定义的sum buffer(1) = 0L //这里第二个元素是上面定义的sount } //update方法用于将输入数据跟缓冲器数据进行计算,这里是一个累加的作用 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } //buffer1是主缓冲器,储存的是目前各个节点的部分计算结果;buffer2是分布式中执行任务的各个节点的“主”缓冲器; // merge方法作用是将各个节点的计算结果做一个聚合,其实可以理解为分布式的update的方法,buffer2相当于input:Row override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } //计算最终结果 override def evaluate(buffer: Row): Any = { buffer.getLong(0).toDouble/buffer.getLong(1) } }
二、使用自定义无类型聚合函数
package com.cjs import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{StringType, StructField, StructType} object TestMyAverage { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) val conf = new SparkConf() .set("spark.some.config.option","some-value") .set("spark.sql.warehouse.dir","file:///e:/tmp/spark-warehouse") val ss = SparkSession .builder() .config(conf) .appName("test-myAverage") .master("local[2]") .getOrCreate() import ss.implicits._ val sc = ss.sparkContext val schemaString = "name,salary" val fileds = schemaString.split(",").map(filedName => StructField(filedName,StringType, nullable = true)) val schemaStruct = StructType(fileds) val path = "E:\IntelliJ Idea\sparkSql_practice\src\main\scala\com\cjs\employee.txt" val empRDD = sc.textFile(path).map(_.split(",")).map(row=>Row(row(0),row(1))) val empDF = ss.createDataFrame(empRDD,schemaStruct) empDF.createOrReplaceTempView("emp") // ss.sql("select name, salary from emp limit 5").show() //想要在spark sql里使用无类型自定义聚合函数,那么就要先注册给自定义函数 ss.udf.register("myAverage",UDFMyAverage) // empDF.show() ss.sql("select myAverage(salary) as average_salary from emp").show() } }
输出结果:
SparkSQL自定义无类型聚合函数
标签:div master rac file object size 一个 rri gty
热心网友 时间:2022-05-03 01:31
摘要当前 Flink 有如下几种函数:标量函数 将标量值转换成一个新标量值;表值函数 将标量值转换成新的行数据;聚合函数 将多行数据里的标量值转换成一个新标量值;表值聚合函数 将多行数据里的标量值转换成新的行数据;异步表值函数 是异步查询外部数据系统的特殊函数。Flink SQL的自定义函数用于 SQL 查询前要先经过注册;而在用于 Table API 时,函数可以先注册后调用,也可以 内联 后直接使用。咨询记录 · 回答于2021-10-25flinksql自定义topN函数的代码当前 Flink 有如下几种函数:标量函数 将标量值转换成一个新标量值;表值函数 将标量值转换成新的行数据;聚合函数 将多行数据里的标量值转换成一个新标量值;表值聚合函数 将多行数据里的标量值转换成新的行数据;异步表值函数 是异步查询外部数据系统的特殊函数。Flink SQL的自定义函数用于 SQL 查询前要先经过注册;而在用于 Table API 时,函数可以先注册后调用,也可以 内联 后直接使用。二、标量函数(UDF)自定义标量函数可以把 0 到多个标量值映射成 1 个标量值,数据类型里列出的任何数据类型都可作为求值方法的参数和返回值类型。想要实现自定义标量函数,你需要扩展 org.apache.flink.table.functions 里面的 ScalarFunction 并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法必须是 public 的,而且名字必须是 eval。代码如下(示例):import org.apache.flink.table.annotation.InputGroupimport org.apache.flink.table.api._import org.apache.flink.table.functions.ScalarFunction class HashFunction extends ScalarFunction { // 接受任意类型输入,返回 INT 型输出 def eval(@DataTypeHint(inputGroup = InputGroup.ANY) o: AnyRef): Int { return o.hashCode(); }} val env = TableEnvironment.create(...) // 在 Table API 里不经注册直接“内联”调用函数env.from("MyTable").select(call(classOf[HashFunction], $"myField")) // 注册函数env.createTemporarySystemFunction("HashFunction", classOf[HashFunction]) // 在 Table API 里调用注册好的函数env.from("MyTable").select(call("HashFunction", $"myField")) // 在 SQL 里调用注册好的函数env.sqlQuery("SELECT HashFunction(myFi我想要用java编写的flinksql自定义topN的代码!谢谢!!还在吗?TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜。流式的TopN可以使业务方在内存中按照某个统计指标(如出现次数)计算排名并快速出发出更新后的排行榜。我们以统计词频为例展示一下如何快速开发一个计算TopN的flink程序。flink支持各种各样的流数据接口作为数据的数据源,本次demo我们采用内置的socketTextStream作为数据数据源。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作为时间语义DataStream text = env.socketTextStream(hostName, port); //监听指定socket端口作为输入与离线wordcount类似,程序首先需要把输入的整句文字按照分隔符split成一个一个单词,然后按照单词为key实现累加DataStream<Tuple2> ds = text .flatMap(new LineSplitter()); //将输入语句split成一个一个单词并初始化count值为1的Tuple2类型private static final class LineSplitter implements FlatMapFunction<String, Tuple2> { @Override public void flatMap(String value, Collector<Tuple2> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2(token, 1)); } } } }DataStream<Tuple2> wcount = ds .keyBy(0) //按照Tuple2的第一个元素为key,也就是单词 .window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(20))) //key之后的元素进入一个总时间长度为600s,每20s向后滑动一次的滑动窗口 全局TopN数据流经过前面的处理后会每20s计算一次各个单词的count值并发送到下游窗口 DataStream<Tuple2> ret = wcount .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20))) //所有key元素进入一个20s长的窗口(选20秒是因为上游窗口每20s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化) .process(new TopNAllFunction(5));//计算该窗口TopNwindowAll是一个全局并发为1的特殊操作,也就是所有元素都会进入到一个窗口内进行计算。 // TODO Auto-generated met 解决思路就是使用嵌套 TopN,或者说两层 TopN。在原先的 TopN 前面,再加一层 TopN,用于分散热点。例如可以先加一层分组 TopN,第一层会计算出每一组的 TopN,而后在第二层中进行合并汇总,得到最终的全网TopN。第二层虽然仍是单点,但是大量的计算量由第一层分担了,而第一层是可以水平扩展的。代码没有发全叭,您在继续发,我参考一下亲,十分抱歉,刚刚又去搜索了一下资料,这已经是全部代码,并没有漏发,十分抱歉,资料只显示了这么多,未能完全帮助到您,深感抱歉,祝您生活愉快PriorityQueue queue;public TopNHotItems(int i) { this.topSize = i; this.queue= new PriorityQueue>(topSize);}@Overridepublic void processElement(ObjectEntity objectEntity, Context context, Collector collector) throws Exception { if (queue.size() < topSize) { queue.add(objectEntity); } else { if (queue.peek().getSessions()<objectEntity.getSessions()) { queue.poll(); queue.add(objectEntity); } }}核心代码是这个,您再帮我看一下稍等一下我这就重新给您发一遍,看下有没有遗漏与离线wordcount类似,程序首先需要把输入的整句文字按照分隔符split成一个一个单词,然后按照单词为key实现累加 } .sum(1);// 将相同的key的元素第二个count值相加windowAll是一个全局并发为1的特殊操作,也就是所有元素都会进入到一个窗口内进行计算。 new Comparator() { .process(new TopNFunction(5)) //分组TopN统计 ProcessWindowFunction<Tuple2, Tuple2, String, TimeWindow> { new Comparator() {上面的代码实现了按照首字母分组,取每组元素count最高的TopN方法。解决思路就是使用嵌套 TopN,或者说两层 TopN。在原先的 TopN 前面,再加一层 TopN,用于分散热点。例如可以先加一层分组 TopN,第一层会计算出每一组的 TopN,而后在第二层中进行合并汇总,得到最终的全网TopN。第二层虽然仍是单点,但是大量的计算量由第一层分担了,而第一层是可以水平扩展的。亲,这是找到topN代码资料,希望对您有所帮助,祝您生活愉快