问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

flinksql自定义topN函数的代码

发布网友 发布时间:2022-04-22 14:18

我来回答

2个回答

懂视网 时间:2022-05-03 04:23

Michael,3000
Andy,4500
Justin,3500
Betral,4000

一、定义自定义无类型聚合函数

        想要自定义无类型聚合函数,那必须得继承org.spark.sql.expressions.UserDefinedAggregateFunction,然后重写父类得抽象变量和成员方法。

package com.cjs
 
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
 
object UDFMyAverage extends UserDefinedAggregateFunction{
 //定义输入参数的数据类型
 override def inputSchema: StructType = StructType(StructField("inputColumn", LongType)::Nil)
 //定义缓冲器的数据结构类型,缓冲器用于计算,这里定义了两个数据变量:sum和count
 override def bufferSchema: StructType = StructType(StructField("sum",LongType)::StructField("count",LongType)::Nil)
 
 //聚合函数返回的数据类型
 override def dataType: DataType = DoubleType
 
 override def deterministic: Boolean = true
 //初始化缓冲器
 override def initialize(buffer: MutableAggregationBuffer): Unit = {
 //buffer本质上也是一个Row对象,所以也可以使用下标的方式获取它的元素
 buffer(0) = 0L //这里第一个元素是上面定义的sum
 buffer(1) = 0L //这里第二个元素是上面定义的sount
 }
 
 //update方法用于将输入数据跟缓冲器数据进行计算,这里是一个累加的作用
 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
 buffer(0) = buffer.getLong(0) + input.getLong(0)
 buffer(1) = buffer.getLong(1) + 1
 }
 
 //buffer1是主缓冲器,储存的是目前各个节点的部分计算结果;buffer2是分布式中执行任务的各个节点的“主”缓冲器;
 // merge方法作用是将各个节点的计算结果做一个聚合,其实可以理解为分布式的update的方法,buffer2相当于input:Row
 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
 buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
 buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
 }
 
 //计算最终结果
 override def evaluate(buffer: Row): Any = {
 buffer.getLong(0).toDouble/buffer.getLong(1)
 }
}

二、使用自定义无类型聚合函数

package com.cjs
 
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
 
object TestMyAverage {
 def main(args: Array[String]): Unit = {
 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
 
 val conf = new SparkConf()
  .set("spark.some.config.option","some-value")
  .set("spark.sql.warehouse.dir","file:///e:/tmp/spark-warehouse")
 
 val ss = SparkSession
  .builder()
  .config(conf)
  .appName("test-myAverage")
  .master("local[2]")
  .getOrCreate()
 
 import ss.implicits._
 val sc = ss.sparkContext
 
 val schemaString = "name,salary"
 val fileds = schemaString.split(",").map(filedName => StructField(filedName,StringType, nullable = true))
 val schemaStruct = StructType(fileds)
 
 val path = "E:\IntelliJ Idea\sparkSql_practice\src\main\scala\com\cjs\employee.txt"
 val empRDD = sc.textFile(path).map(_.split(",")).map(row=>Row(row(0),row(1)))
 
 val empDF = ss.createDataFrame(empRDD,schemaStruct)
 empDF.createOrReplaceTempView("emp")
// ss.sql("select name, salary from emp limit 5").show()
 //想要在spark sql里使用无类型自定义聚合函数,那么就要先注册给自定义函数
 ss.udf.register("myAverage",UDFMyAverage)
 
// empDF.show()
 ss.sql("select myAverage(salary) as average_salary from emp").show()
 }
 
}

输出结果:

技术图片

SparkSQL自定义无类型聚合函数

标签:div   master   rac   file   object   size   一个   rri   gty   

热心网友 时间:2022-05-03 01:31

摘要当前 Flink 有如下几种函数:标量函数 将标量值转换成一个新标量值;表值函数 将标量值转换成新的行数据;聚合函数 将多行数据里的标量值转换成一个新标量值;表值聚合函数 将多行数据里的标量值转换成新的行数据;异步表值函数 是异步查询外部数据系统的特殊函数。Flink SQL的自定义函数用于 SQL 查询前要先经过注册;而在用于 Table API 时,函数可以先注册后调用,也可以 内联 后直接使用。咨询记录 · 回答于2021-10-25flinksql自定义topN函数的代码当前 Flink 有如下几种函数:标量函数 将标量值转换成一个新标量值;表值函数 将标量值转换成新的行数据;聚合函数 将多行数据里的标量值转换成一个新标量值;表值聚合函数 将多行数据里的标量值转换成新的行数据;异步表值函数 是异步查询外部数据系统的特殊函数。Flink SQL的自定义函数用于 SQL 查询前要先经过注册;而在用于 Table API 时,函数可以先注册后调用,也可以 内联 后直接使用。二、标量函数(UDF)自定义标量函数可以把 0 到多个标量值映射成 1 个标量值,数据类型里列出的任何数据类型都可作为求值方法的参数和返回值类型。想要实现自定义标量函数,你需要扩展 org.apache.flink.table.functions 里面的 ScalarFunction 并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法必须是 public 的,而且名字必须是 eval。代码如下(示例):import org.apache.flink.table.annotation.InputGroupimport org.apache.flink.table.api._import org.apache.flink.table.functions.ScalarFunction class HashFunction extends ScalarFunction { // 接受任意类型输入,返回 INT 型输出 def eval(@DataTypeHint(inputGroup = InputGroup.ANY) o: AnyRef): Int { return o.hashCode(); }} val env = TableEnvironment.create(...) // 在 Table API 里不经注册直接“内联”调用函数env.from("MyTable").select(call(classOf[HashFunction], $"myField")) // 注册函数env.createTemporarySystemFunction("HashFunction", classOf[HashFunction]) // 在 Table API 里调用注册好的函数env.from("MyTable").select(call("HashFunction", $"myField")) // 在 SQL 里调用注册好的函数env.sqlQuery("SELECT HashFunction(myFi我想要用java编写的flinksql自定义topN的代码!谢谢!!还在吗?TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜。流式的TopN可以使业务方在内存中按照某个统计指标(如出现次数)计算排名并快速出发出更新后的排行榜。我们以统计词频为例展示一下如何快速开发一个计算TopN的flink程序。flink支持各种各样的流数据接口作为数据的数据源,本次demo我们采用内置的socketTextStream作为数据数据源。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作为时间语义DataStream text = env.socketTextStream(hostName, port); //监听指定socket端口作为输入与离线wordcount类似,程序首先需要把输入的整句文字按照分隔符split成一个一个单词,然后按照单词为key实现累加DataStream<Tuple2> ds = text .flatMap(new LineSplitter()); //将输入语句split成一个一个单词并初始化count值为1的Tuple2类型private static final class LineSplitter implements FlatMapFunction<String, Tuple2> { @Override public void flatMap(String value, Collector<Tuple2> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2(token, 1)); } } } }DataStream<Tuple2> wcount = ds .keyBy(0) //按照Tuple2的第一个元素为key,也就是单词 .window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(20))) //key之后的元素进入一个总时间长度为600s,每20s向后滑动一次的滑动窗口 全局TopN数据流经过前面的处理后会每20s计算一次各个单词的count值并发送到下游窗口 DataStream<Tuple2> ret = wcount .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20))) //所有key元素进入一个20s长的窗口(选20秒是因为上游窗口每20s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化) .process(new TopNAllFunction(5));//计算该窗口TopNwindowAll是一个全局并发为1的特殊操作,也就是所有元素都会进入到一个窗口内进行计算。 // TODO Auto-generated met 解决思路就是使用嵌套 TopN,或者说两层 TopN。在原先的 TopN 前面,再加一层 TopN,用于分散热点。例如可以先加一层分组 TopN,第一层会计算出每一组的 TopN,而后在第二层中进行合并汇总,得到最终的全网TopN。第二层虽然仍是单点,但是大量的计算量由第一层分担了,而第一层是可以水平扩展的。代码没有发全叭,您在继续发,我参考一下亲,十分抱歉,刚刚又去搜索了一下资料,这已经是全部代码,并没有漏发,十分抱歉,资料只显示了这么多,未能完全帮助到您,深感抱歉,祝您生活愉快PriorityQueue queue;public TopNHotItems(int i) { this.topSize = i; this.queue= new PriorityQueue>(topSize);}@Overridepublic void processElement(ObjectEntity objectEntity, Context context, Collector collector) throws Exception { if (queue.size() < topSize) { queue.add(objectEntity); } else { if (queue.peek().getSessions()<objectEntity.getSessions()) { queue.poll(); queue.add(objectEntity); } }}核心代码是这个,您再帮我看一下稍等一下我这就重新给您发一遍,看下有没有遗漏与离线wordcount类似,程序首先需要把输入的整句文字按照分隔符split成一个一个单词,然后按照单词为key实现累加 } .sum(1);// 将相同的key的元素第二个count值相加windowAll是一个全局并发为1的特殊操作,也就是所有元素都会进入到一个窗口内进行计算。 new Comparator() { .process(new TopNFunction(5)) //分组TopN统计 ProcessWindowFunction<Tuple2, Tuple2, String, TimeWindow> { new Comparator() {上面的代码实现了按照首字母分组,取每组元素count最高的TopN方法。解决思路就是使用嵌套 TopN,或者说两层 TopN。在原先的 TopN 前面,再加一层 TopN,用于分散热点。例如可以先加一层分组 TopN,第一层会计算出每一组的 TopN,而后在第二层中进行合并汇总,得到最终的全网TopN。第二层虽然仍是单点,但是大量的计算量由第一层分担了,而第一层是可以水平扩展的。亲,这是找到topN代码资料,希望对您有所帮助,祝您生活愉快
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
ef英语哪个好 EF英孚英语培训怎么样? 英孚英语好不好 EF英孚教育到底好不好 大佬们,麦芒7和荣耀10那个值得入手?2500以下的机子还有啥好推荐的么... 介绍几款2500元以前的手机 像素一定要高 其他的不做要求 近期想入手一部安卓手机,价格2200到2500左右…买HTC desire Z还是 三星... 笔记本忘记开机密码怎么办急死了 笔记本电脑屏幕开机锁忘记密码 怎么办?急死了 华硕笔记本电脑开机密码忘记了怎样找回?系统是Windows 7旗舰版... SQL 不能执行 php防止sql注入示例分析和几种常见攻击正则 怎样用HQL语句查询出一个排行榜某个用户排行第几?并且查出上5条记录和下5条记录? as3.0怎么连接mysql数据库 Composer 安装东西遇到github需要token怎么办 PHP如何判断用户是否登陆,防止同账号多登陆, Old Tom the killer whale,的翻译 求各位大神帮忙翻译下面的sql语句呗?相当感谢了! 如何在DB2中执行存储过程 404 Not Found Base64编码的WAV从JSON对象提取问题,怎么解决 如何更好的使用Oracle全文索引 php 怎么读取mysql一条数据并输出某一字段 2019年江苏二级建造师考后审核 二级造价师各科目分值是多少 南的读音 二造考试什么时间出成绩? 2019年哪些省份明确不举行二级造价工程师考试? 在江苏省考的二建、二级造价师可以在安徽省使用吗? 报二建毕业证书找不到了怎么办 unexpected token: org.hibernate.hql.ast.QuerySyntaxException: unexpected token错误 sql中 in的用法 unexpected token 错误 sql order by 和rownum 查询问题 jsp 与SQL数据库查询问题 怎么和陌生客户打招呼? 怎么才可以跟陌生客户有话题聊? 接触陌生客人,如何谈话? 第一次见客户怎么沟通技巧 与客户沟通如何找话题? 与陌生人沟通的技巧?如何找共同话题? 如何与陌生的客户很好的聊天,找到合适的话题 面对陌生人,想要拉进关系,如何寻找话题?用这三个方法帮你搞定 怎样和顾客找话题 微信怎么看银行卡余额 第一次跟陌生客户打电话合作,该怎么说? 如何与陌生人谈话的时候找话题 怎样和陌生客户打电话? 汽车的商业险包括什么