当前位置: 首页 > news >正文

网站服务器基本要素百度企业推广怎么收费

网站服务器基本要素,百度企业推广怎么收费,装修平台网络推广公司,wordpress网站建小程序前言 今天开始 DWS 层的搭建#xff0c;不知不觉又是周一#xff0c;都忘了昨天是周末#xff0c;近两年对我来说#xff0c;周六日晚上八九点能打一小会篮球就算一周的休息了。不得不说自己真的是天生打工体质#xff0c;每天不管多累#xff0c;晚上十二点睡#xff0…前言 今天开始 DWS 层的搭建不知不觉又是周一都忘了昨天是周末近两年对我来说周六日晚上八九点能打一小会篮球就算一周的休息了。不得不说自己真的是天生打工体质每天不管多累晚上十二点睡第二天六点多七点准时自然醒依然精神焕发中午都不带困的那既然老天给我这个特质让我像牛一样可以不知疲倦的工作那我也希望是让我在热爱的领域发光发热那既然这样总得先让我找到个满意的工作吧哈哈哈 ... 1、DWS 层搭建 设计要点 DWS层的设计参考指标体系需求驱动前面的 DIM 和 DWD 的设计都是参考建模理论是业务驱动DWS层表名的命名规范为dws_数据域_统计粒度_业务过程_统计周期window 离线数仓中的 DWS 层的统计周期我们当时做的是 1/7/30 那实时数仓的统计周期当然不能这么大离线数仓中每一天就相当于一个窗口而在实时数仓当中窗口都是秒级别的我们这里开窗的大小选择 10 s因为我们的可视化平台只能 10s 刷新一次开得太小没有意义生产环境中可以更小比如 1s 甚至可以不开窗。开窗还是不开窗是性能和时效性的取舍 1.1、流量域来源关键词粒度页面浏览各窗口汇总表 主要任务 从 Kafka 页面浏览明细dwd_traffic_page_log主题读取数据过滤搜索行为使用自定义 UDTF一进多出函数对搜索内容分词。统计各窗口各关键词出现频次写入 ClickHouse。 1.1.1、思路分析 在 DWD 层我们对日志根据日志类型进行了分流写入到了 5 个不同的主题当中现在我们需要统计搜索内容中的关键词所以需要消费页面浏览日志使用分词器将搜索内容分为多个关键词划分窗口词频统计后存储进 clickhouse 思考既然用到分词为啥不直接用 ES 存呢 答确实是要分词但是我们这里是要做词频统计ES 是对关键词做索引相当于用 key关键词去获得 value文档而我们这里是要对 key 进行统计所以不合适 1.1.2、代码实现 1IK 分词器工具类 public class KeywordUtil {public static ListString analyze(String text){// 创建集合用于存放切分或的数据ListString keywordList new ArrayList();// 封装待分词内容StringReader reader new StringReader(text);// 创建 IK 分词器(ik_smart 智能分词,ik_max_word: 尽可能分最多的词)IKSegmenter ikSegmenter new IKSegmenter(reader,true);try {// 取出切分好的词Lexeme lexeme null;while((lexeme ikSegmenter.next())!null){String keyword lexeme.getLexemeText();keywordList.add(keyword);}} catch (IOException e) {e.printStackTrace();}return keywordList;}public static void main(String[] args) {ListString list analyze(Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待);System.out.println(list);} } 2自定义 UDTF FunctionHint(output DataTypeHint(ROWword STRING)) public class IkUDTF extends TableFunctionRow {public void eval(String str){for (String word : KeywordUtil.analyze(str)) {collect(Row.of(word));}} } 3 消费页面浏览日志主题 我们相当于一个消费者去消费页面浏览主题那么就需要先创建该表也就需要先确定我们要的字段。在事件时间语义下使用窗口函数的时候我们需要指定事件时间的字段 前面我们为了 join lookup 表的时候那样要想 join lookup 表必须要有一个处理时间字段 只不过我们现在需要指定一个事件时间我们同样可以通过 DDL 中来指定 对于这里的关键词需求而言我们不需要保留 common 字段所以建表如下  ​ // TODO 3. 消费 Kafka dwd_traffic_page_log 主题String groupId dws_traffic_source_keyword_page_view_window;tableEnv.executeSql(CREATE TABLE dwd_traffic_page_log page mapstring,string, ts bigint , time_ltz AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000)), WATERMARK FOR time_ltz AS time_ltz - INTERVAL 2 SECOND MyKafkaUtil.getKafkaDDL(dwd_traffic_page_log, groupId));​ 这里我们指定了 time_ltz 为事件时间字段以及乱序延迟时间最大为 2s这里为什么不直接使用 ts 字段呢这是因为 json 默认把数值类型都当做 bigint 来处理而 Flink SQL 中表的事件时间必须为 timestamp 类型所以我们需要进行转换 注意建表语句中尽量加 AS 尤其字段涉及函数 4过滤出搜索数据 // TODO 4. 过滤出搜索数据 Table searchLog tableEnv.sqlQuery(SELECT page[item] item, time_ltz FROM dwd_traffic_page_log WHERE page[last_page_id search] AND page[item_type] keyword AND page[item] is not null ); tableEnv.createTemporaryView(search_log_table,searchLog); 5注册 udtf 函数并进行分词 // TODO 5. 注册 udtf 分词 tableEnv.createTemporaryFunction(ik, IkUDTF.class); Table splitTable tableEnv.sqlQuery(SELECT word, time_ltz FROM search_log_table LATERAL TABLE(ik(item)) ); tableEnv.createTemporaryView(split_table,splitTable); 6分组、开窗、聚合 之前离线数仓写过窗口函数但是都是没有边界的窗口。这里我们学习一下 Flink 中的三种窗口怎么用 Flink SQL 去写 上面三种窗口分别对应滚动滑动和会话下面是使用案例 现在我们需要考虑将来写入到 ck 时ck 应该采用什么引擎 选择 SummingMergeTree 优点自动预聚合存储的内容少了查询效率高缺点只能做求和指标比如峰值指标就做不了。再有假如数据消费后挂了Flink 读取后数据写入到 ck 了但是这时候挂了Flink 恢复后会重新消费ck 就会重复处理。如果是别的引擎还好因为数据不是聚合的状态而是一条一条存储的我们可以对数据根据 uuid 进行区分是否已经处理过选择ReplacingMergeTree 它有去重的功能但是是在任务挂掉的时候我们才用得到保证一致性可以做更多的指标缺点就是会存储更多的数据 那么我们当然选择 ReplacingMergeTree 现在我们需要考虑去重字段在 ck 中去重字段比主键都重要 去重order by 字段 根据 窗口时间起始终止关键词 进行去重这里会添加一个 source 字段区分日志的来源比如 search、cart、order 窗口的起始和终止时间同样有特定的函数来获取 最终我们的代码 // TODO 6. 分组、开窗、聚合Table resultTable tableEnv.sqlQuery(SELECT date_format(tumble_start(time_ltz,interval 10 second),yyyy-MM-dd HH:mm:ss) stt, date_format(tumble_end(time_ltz,interval 10 second),yyyy-MM-dd HH:mm:ss) edt, search source, word keyword, count(*) keyword_count, unix_timestamp() ts FROM split_table GROUP BY word,tumble(time_ltz,interval 10 second)); 7创建 ck 表格 create table if not exists dws_traffic_source_keyword_page_view_window (stt DateTime,edt DateTime,source String,keyword String,keyword_count UInt64,ts UInt64 ) engine ReplacingMergeTree(ts)partition by toYYYYMMDD(stt)order by (stt, edt, source, keyword); 8ck 工具类 上面第 6 步之后我们得到了开窗聚合后的一个结果要写入 ck 我们需要先将动态表转为流 // TODO 7. 将动态表转换为流DataStreamKeywordBean dataStream tableEnv.toAppendStream(resultTable, KeywordBean.class);接着我们需要通过 JdbcSink 写出到 ck 集群中因为之后每个聚合结果都是存在 DWS 层的所以都会用到该 JdbcSink所以我们统一封装成一个工具类 public class ClickHouseUtil {// 泛型方法需要再返回值类型前面放一个泛型public static T SinkFunctionT getSinkFunction(String sql) {return JdbcSink.sink(sql,new JdbcStatementBuilderT() {SneakyThrowsOverridepublic void accept(PreparedStatement preparedStatement, T t) throws SQLException {// 利用反射获得 t 对象的属性Class? tClz t.getClass();int index 1;for (Field field : tClz.getDeclaredFields()) {field.setAccessible(true); // 防止访问呢 private 属性失败// 尝试获得字段上的注解TransientSink transientSink field.getAnnotation(TransientSink.class);if (transientSink ! null){continue;}// 获得字段值Object value field.get(t);// 给占位符赋值preparedStatement.setObject(index,value);}}}, new JdbcExecutionOptions.Builder().withBatchSize(5).withBatchIntervalMs(1000L).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(GmallConfig.CLICKHOUSE_DRIVER).withUrl(GmallConfig.CLICKHOUSE_URL).build());} }在代码中我们有一个获取注解的操作是为了防止 JavaBean 中的字段可能是辅助字段在 ck 表中并没有能对应上的所以我们通过注解来甄别 Retention(RetentionPolicy.RUNTIME) // 生效时机: 运行时 Target(ElementType.FIELD) // 该注解的作用域: 属性上 public interface TransientSink {} 补全主程序  // TODO 8. 写入 clickhouse// 插入字段顺序尽量和ck库的表保持一致dataStream.addSink(ClickHouseUtil.getSinkFunction(insert into dws_traffic_source_keyword_page_view_window values(?,?,?,?,?,?)));// TODO 9. 启动任务env.execute(DwsTrafficSourceKeywordPageViewWindow); 注意因为我们是通过反射获取 Bean 对象字段来向 ck 表插入数据的所以一定要保证 Bean 对象的顺序要和 ck 表对应上 1.2、流量域版本-渠道-地区-访客类别粒度页面浏览各窗口汇总表 上面 DWS 的第一个需求我们是用 Flink SQL 来实现的从这个需求开始我们将使用 DataStream API 来实现 1.2.1、需求分析 维度有 4 个版本渠道地区和访客类别度量值有 5 个会话数、页面浏览数、浏览总时长、独立访客数、跳出会话数等 关于独立访客数和跳出会话数我们之前在 DWD 层已经实现并分别写入到了 dwd_traffic_unique_visitor_detail状态编程保存 lastVisitDate 实现 和 dwd_traffic_user_jump_detailFlink CEP 实现 主题了所以这里只需要分析前 2 个度量值怎么计算 会话数 我们的数据中没有 session_id但是要求也很简单last_page_id 为 null 即代表一个新会话的开始页面浏览数PV 页面浏览记录中每一行数据就是一个浏览记录count(1) 浏览总时长 还是从页面浏览记录中获取浏览时间during_time 思考这三个度量值都可以从 dwd_traffic_page_log 中一次计算出来但是怎么和另外两个来自不同主题的度量值聚合呢 答使用 join根据 dws 表的粒度进行 join但是在 SQL 中使用 join 的话也许好一点可是我们使用 API 就比较复杂所以其实我们还可以使用另一种方式实现——使用 union 分组聚合也可以实现 那么最终写入到 ck 中的字段其实一共有 12 个4个维度字段 5 个度量值 ts 窗口起始、终止时间字段 1.2.2、代码实现 1建表语句 create table if not exists dws_traffic_vc_ch_ar_is_new_page_view_window (stt DateTime,edt DateTime,vc String,ch String,ar String,is_new String,uv_ct UInt64,sv_ct UInt64,pv_ct UInt64,dur_sum UInt64,uj_ct UInt64,ts UInt64 ) engine ReplacingMergeTree(ts)partition by toYYYYMMDD(stt)order by (stt, edt, vc, ch, ar, is_new); 2创建 ck 表对应的 Bean import lombok.AllArgsConstructor; import lombok.Data;Data AllArgsConstructor public class TrafficPageViewBean {// 窗口起始时间String stt;// 窗口结束时间String edt;// app 版本号String vc;// 渠道String ch;// 地区String ar;// 新老访客状态标记String isNew ;// 独立访客数Long uvCt;// 会话数Long svCt;// 页面浏览数Long pvCt;// 累计访问时长Long durSum;// 跳出会话数Long ujCt;// 时间戳Long ts; } 3读取三个主题的数据 // TODO 3. 读取三个主题的数据String uvTopic dwd_traffic_unique_visitor_detail;String ujdTopic dwd_traffic_user_jump_detail;String topic dwd_traffic_page_log;String groupId dws_traffic_channel_page_view_window;DataStreamSourceString uvDS env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(uvTopic, groupId));DataStreamSourceString ujdDS env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(ujdTopic, groupId));DataStreamSourceString pageDS env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));4统一数据格式 将 3 个主题的数据进行格式统一方便后面 union 聚合 // TODO 4. 统一数据格式SingleOutputStreamOperatorTrafficPageViewBean trafficWithUvDS uvDS.map(line - {JSONObject jsonObject JSONObject.parseObject(line);JSONObject common jsonObject.getJSONObject(common);return new TrafficPageViewBean(, ,common.getString(vc),common.getString(ch),common.getString(ar),common.getString(is_new),1L, 0L, 0L, 0L, 0L,common.getLong(ts));});SingleOutputStreamOperatorTrafficPageViewBean trafficWithUJ ujdDS.map(line - {JSONObject jsonObject JSONObject.parseObject(line);JSONObject common jsonObject.getJSONObject(common);return new TrafficPageViewBean(, ,common.getString(vc),common.getString(ch),common.getString(ar),common.getString(is_new),0L, 0L, 0L, 0L, 1L,common.getLong(ts));});SingleOutputStreamOperatorTrafficPageViewBean trafficWithSvPvDurSumDS pageDS.map(line - {JSONObject jsonObject JSONObject.parseObject(line);JSONObject common jsonObject.getJSONObject(common);JSONObject page jsonObject.getJSONObject(page);return new TrafficPageViewBean(, ,common.getString(vc),common.getString(ch),common.getString(ar),common.getString(is_new),0L,page.getString(last_page_id) null ? 1L : 0L,1L,page.getLong(during_time),0L,common.getLong(ts));}); 注意 trafficWithUJ 这条流本就存在延时所以很可能下面在 union 的时候窗口都关闭了它还没来所以我们只能给水位线的最大乱序等待时间 判定为用户跳出的最大时间也就是超时时间 5三流 union 对三条流进行 union 然后提取出事件时间生成水位线之后就需要开窗聚合了而开窗聚合我们一般都会指定 keyby 再开窗全窗口几乎不用而 keyby 的字段我们选择 4 个维度可以用 String  拼接也可以用一个四元组 Tuple4 窗口分类 OpWindowwindowAll()KeyedWindowwindow() 时间滚动、滑动、会话计数滚动、滑动 // TODO 5. 三条流进行 unionDataStreamTrafficPageViewBean unionDS trafficWithUvDS.union(trafficWithUJDS, trafficWithSvPvDurSumDS);// TODO 6. 提取事件时间(去 ts 字段生成水位线)SingleOutputStreamOperatorTrafficPageViewBean trafficPageViewWithWaterMarkDS unionDS.assignTimestampsAndWatermarks(WatermarkStrategy.TrafficPageViewBeanforBoundedOutOfOrderness(Duration.ofSeconds(14)).withTimestampAssigner(new SerializableTimestampAssignerTrafficPageViewBean() {Overridepublic long extractTimestamp(TrafficPageViewBean element, long recordTimestamp) {return element.getTs();}}));// TODO 7. 分组开窗聚合(按照维度做keyby)WindowedStreamTrafficPageViewBean, Tuple4String, String, String, String, TimeWindow windowedStream trafficPageViewWithWaterMarkDS.keyBy(new KeySelectorTrafficPageViewBean, Tuple4String, String, String, String() {Overridepublic Tuple4String, String, String, String getKey(TrafficPageViewBean value) throws Exception {return Tuple4.of(value.getAr(),value.getCh(),value.getIsNew(),value.getVc());}}).window(TumblingEventTimeWindows.of(Time.seconds(10))); 注意这里在设置水位线延迟时间时我们设置为 14因为需求中包含用户跳出会话数而跳出这个需求本就存在延迟我们在 DWD 层设置了两种判断跳出策略前提是按照 mid 分区1. last_page_id null 下一条数据的 last_page_id 也为 null  2. last_page_id null 超时时间达到 10s 视作跳出 6 聚合 回顾一下窗口聚合函数 增量聚合函数来一条计算一条效率高存储数据量小全量聚合函数可以求平均值和百分比可以获取窗口信息 与增量聚合函数不同全窗口函数需要先收集窗口中的数据并在内部缓存起来等到窗口要输出结果的时候再取出数据进行计算。很明显这就是典型的批处理思路了——先攒数据等一批都到齐了再正式启动处理流程。         但是把计算放到窗口关闭才去计算无疑是低效的毕竟如果数据量比较大的时候这种方式肯定没有增量聚合函数计算的快。那为什么还要使用这种方式呢这是因为有些场景下我们要做的计算必须基于全部的数据才有效比如求平均值这时做增量聚合就没什么意义了 那么现在我们应该对 keyby 后的数据流进行聚合把相同 key 的度量值进行累加那么我们应该选用哪种聚合函数呢 选用增量聚合函数其实可以实现度量值的累加但是由于我们的 ck 表中还有两个窗口字段需要补充窗口起始和终止时间所以我们需要获取窗口信息那这就只能使用全量聚合函数了毕竟全量窗口函数才能获得窗口信息但是全窗口函数的计算往往是放到最后才执行的这就很难受那能不能结合二者的优点呢 其实是可以的我们在之前学习Flink 窗口的时候是讲过的 增量聚合函数处理计算会更高效。举一个最简单的例子对一组数据求和。大量的数据连续不断到来全窗口函数只是把它们收集缓存起来并没有处理到了窗口要关闭、输出结果的时候再遍历所有数据依次叠加得到最终结果。而如果我们采用增量聚合的方式那么只需要保存一个当前和的状态每个数据到来时就会做一次加法更新状态到了要输出结果的时候只要将当前状态直接拿出来就可以了。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中自然就会比全窗口聚合更加高效、输出更加实时。         而全窗口函数的优势在于提供了更多的信息可以认为是更加“通用”的窗口操作。它只负责收集数据、提供上下文相关信息把所有的原材料都准备好至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活功能更加强大。 所以在实际应用中我们往往希望兼具这两者的优点把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。 SingleOutputStreamOperatorTrafficPageViewBean resultDS windowedStream.reduce(new ReduceFunctionTrafficPageViewBean() {Overridepublic TrafficPageViewBean reduce(TrafficPageViewBean value1, TrafficPageViewBean value2) throws Exception {value1.setSvCt(value1.getSvCt() value2.getSvCt());value1.setUvCt(value1.getUvCt() value2.getUvCt());value1.setUvCt(value1.getUjCt() value2.getUjCt());value1.setPvCt(value1.getPvCt() value2.getPvCt());value1.setDurSum(value1.getDurSum() value2.getDurSum());return value1;}}, new WindowFunctionTrafficPageViewBean, TrafficPageViewBean, Tuple4String, String, String, String, TimeWindow() {Overridepublic void apply(Tuple4String, String, String, String stringStringStringStringTuple4, TimeWindow window, IterableTrafficPageViewBean input, CollectorTrafficPageViewBean out) throws Exception {// 获取数据TrafficPageViewBean next input.iterator().next();// 补充信息next.setStt(DateFormatUtil.toYmdHms(window.getStart()));next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));// 修改 tsnext.setTs(System.currentTimeMillis());// 输出数据out.collect(next);}}); 这样我们既高效地完成了窗口聚合增量聚合也拿到了窗口信息全量聚合获得起止时间  7写出到 clickhouse // TODO 8. 写入 clickhouseresultDS.addSink(ClickHouseUtil.getSinkFunction(insert into dws_traffic_channel_page_view_window values(?,?,?,?,?,?,?,?,?,?,?,?)));// TODO 9. 启动任务env.execute(DwsTrafficVcChArIsNewPageViewWindow); 总结 至此流量域两张汇总表创建完毕关于流量域就剩一张表明天完成先去吃饭
http://www.ho-use.cn/article/10811951.html

相关文章:

  • 汕头网站开发武山县建设局网站
  • wordpress建两个网站响应式布局网站模板
  • 给网站做游戏视频怎么赚钱兼职招聘信息最新招聘
  • 给别人做设计的网站wordpress参数
  • 域名注册1元怎么建设seo自己网站
  • 中国建设行业峰会网站新赣州房产网
  • 山东中恒建设集团网站做电影网站都需要什么手续
  • 编辑网站设计培训班学费一般多少
  • 网站建设行业的趋势郑州网站权重
  • 广东网站备案电话号码哪里可以学做网站
  • 用wordpress建一个网站怎么出售友情链接
  • 片网站无法显示app开发的基本步骤
  • 湖南城乡建设厅网站如何本地搭建自己的网站
  • 网站开发技术文档格式网站不备案怎么回事
  • 互联网网站运营推广网页特效代码
  • 商业网站策划书范文营销策划案ppt优秀案例
  • 广东一站式网站建设推荐企业展厅效果图大全
  • 自助建站系统凡科小程序开发教程资料
  • 有源码就可以自己做H5网站吗专做投放广告网站
  • 江门恒阳网站建设新手怎么学网络运营
  • 无锡网络建站建站手机网站
  • 网站报错500不关闭网站 备案
  • 在上海做兼职在哪个网站旅游网站开题报告
  • 中国人做的比较好的shopify网站横向拖动的网站
  • 网站咋建立苏州信息造价网
  • 写入网站文件北京定制网络营销收费
  • 巩义企业网站建设怎么下载1688上的视频
  • 西安网站建设q.479185700強游戏平台十大排名
  • 网站用什么东西做网站开发企业排名
  • 江苏省住房城乡建设厅网站网上推广服务