当前位置: 首页 > news >正文

邯郸网站建设的企业外贸网站建设视频

邯郸网站建设的企业,外贸网站建设视频,wordpress同步twitter,腾讯云装wordpress大纲 表值函数完整代码 在《0基础学习PyFlink——用户自定义函数之UDF》中#xff0c;我们讲解了UDF。本节我们将讲解表值函数——UDTF 表值函数 我们对比下UDF和UDTF def udf(f: Union[Callable, ScalarFunction, Type] None,input_types: Union[List[DataType], DataTy… 大纲 表值函数完整代码 在《0基础学习PyFlink——用户自定义函数之UDF》中我们讲解了UDF。本节我们将讲解表值函数——UDTF 表值函数 我们对比下UDF和UDTF def udf(f: Union[Callable, ScalarFunction, Type] None,input_types: Union[List[DataType], DataType, str, List[str]] None,result_type: Union[DataType, str] None,deterministic: bool None, name: str None, func_type: str general,udf_type: str None) - Union[UserDefinedScalarFunctionWrapper, Callable]:def udtf(f: Union[Callable, TableFunction, Type] None,input_types: Union[List[DataType], DataType, str, List[str]] None,result_types: Union[List[DataType], DataType, str, List[str]] None,deterministic: bool None,name: str None) - Union[UserDefinedTableFunctionWrapper, Callable]:可以发现 UDF比UDTF多了func_type和udf_type参数UDTF的返回类型比UDF的丰富多了两个List类型List[DataType]和List[str] 特别是最后一点可以认为是UDF和UDTF在应用上的主要区别。 换种更容易理解的说法是UDTF可以返回任意数量的行作为输出而不是像UDF那样返回单个值行。 举一个例子 word_count_data [A, B, C, a, C] 我们希望统计上面这些字符的个数以及小写后字符的个数。这样A的个数是1a的个数是2因为a算一个A小写后又算一个。C的个数是2g的个数是2。 这就要求统计算法在遇到大写字母时需要统计大小写两种字母而遇到小写字母时只需要统计小写字母。 udtf(result_types[DataTypes.STRING()], input_typesrow_type_tab_source)def rowFunc(row):if row[0].isupper():yield row[0]yield row[0].lower()else:yield row[0]yield关键字返回的是generator生成器。Table API对rowFunc的调用最终会生成[“A”,“a”,“B”,“b”,“C”,“c”,“a”,“C”,“c”]。 和调用UDF不同的是需要使用flat_map来调用UDTF。flat即为“打平”可以生动的理解为将多维降为一维。 tab_transtab_source.flat_map(rowFunc)tab_trans.execute().print()-------------------------------- | f0 | -------------------------------- | A | | a | | B | | b | | C | | c | | a | | C | | c | -------------------------------- 9 rows in set由于我们没有指定经过处理的值所属的字段名称于是会使用默认的f0作为字段名。我们可以使用alias来给它别名下。 tab_trans_aliastab_trans.alias(trans_word)tab_trans_alias.execute().print()-------------------------------- | trans_word | -------------------------------- | A | | a | | B | | b | | C | | c | | a | | C | | c | -------------------------------- 9 rows in set最后我们就可以用这个新的表做字数统计计算 tab_trans_alias.group_by(col(trans_word)) \.select(col(trans_word), lit(1).count) \.execute_insert(WordsCountTableSink) \.wait()I[A, 1] I[a, 2] I[B, 1] I[b, 1] I[C, 2] I[c, 2]完整代码 from pyflink.common import Configuration from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema) from pyflink.table.types import DataTypes from pyflink.table.table_descriptor import TableDescriptor from pyflink.table.expressions import lit, col from pyflink.common import Row from pyflink.table.udf import udf,udtf,udaf,udtaf import pandas as pd from pyflink.table.udf import UserDefinedFunctionword_count_data [A, B, C, a, C] def word_count():config Configuration()# write all the data to one fileconfig.set_string(parallelism.default, 1)env_settings EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env TableEnvironment.create(env_settings)row_type_tab_source DataTypes.ROW([DataTypes.FIELD(word, DataTypes.STRING())])tab_source t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)# define the sink schemasink_schema Schema.new_builder() \.column(word, DataTypes.STRING().not_null()) \.column(count, DataTypes.BIGINT()) \.primary_key(word) \.build()# Create a sink descriptorsink_descriptor TableDescriptor.for_connector(print)\.schema(sink_schema) \.build()t_env.create_temporary_table(WordsCountTableSink, sink_descriptor)udtf(result_types[DataTypes.STRING()], input_typesrow_type_tab_source)def rowFunc(row):if row[0].isupper():yield row[0]yield row[0].lower()else:yield row[0]tab_transtab_source.flat_map(rowFunc)tab_trans.execute().print()tab_trans_aliastab_trans.alias(trans_word)tab_trans_alias.execute().print()tab_trans_alias.group_by(col(trans_word)) \.select(col(trans_word), lit(1).count) \.execute_insert(WordsCountTableSink) \.wait()if __name__ __main__:word_count()
http://www.ho-use.cn/article/10813892.html

相关文章:

  • 在门户网站建设上的讲话临平做网站
  • 大连个人网站建设wordpress文章导入公众号
  • 天津网站建设基本流程加强网站信息内容建设管理
  • 优秀的网页设计网站app大全视频app大全
  • 怎么制作网站设计wordpress 文章新窗口
  • 什么是企业营销型网站做防护信息的网站
  • jquery在网站开发实例运用建网站网站
  • 深圳市宝安区建设局网站搜索引擎优化是什么工作
  • 怎样把建好的网站上传到互联网营销型网站的付费推广渠道
  • 建设银行的官方网站电话小公司管理软件
  • 空气源热泵热水器网站建设网站建设多少钱一年
  • 陕西建设网网站集群专业网站搭建运营
  • 美食网站建设的时间进度表南京做网站公司地点
  • 郑州网站优化工资网站建设易网宣
  • 交流做病理切片的网站哪个网站可以接任务做兼职
  • 如何设计网站首页导航专业行业网站开发报价
  • 制作网站能挣钱jsp网站 值班
  • 怎么做优惠网站建筑模拟3正版下载
  • 城乡建设厅建筑特种作业证书查询seo外链专员工作要求
  • 湛江宇锋网站建设linux网站如何做ip解析
  • 湖南建设银行官网网站首页vue开发视频网站
  • 公司起名吉祥字大全seo怎么优化方案
  • 做网站需要加班吗300平方别墅装修大约多少钱
  • 怎么关闭网站安全检测wordpress外贸网站模板
  • 网站建设的投资预算怎么写广告制作开票大类是什么
  • 法治网站的建设整改措施wordpress 医疗
  • 学院网站建设北滘做网站
  • 邯郸wap网站建设费用网页设计的发展
  • 做任务打字赚钱的网站wordpress 分类目录 404
  • php网站开发套模板步骤个人公众号开发php