文章目录
一 标签管理1 标签列表2 添加标签(1)添加一级标签(2)添加二级标签(3)添加三级标签(4)添加四级标签(5)数据库标签字段说明(6)数据库中存储 3 标签任务(1)填写字段说明(2)填写示例(3)数据库表结构task_infofile_infotask_tag_rule 二 统计型和规则型标签的SQL处理0 概述1 总体处理流程2 搭建工程(1)新建父工程(2)pom.xml(3)增加通用模块(4)增加task-sql模块一 标签管理
标签管理模块,是用户画像是开发的起点,所有的用户画像都起始于对标签的定义和规则的指定。
1 标签列表
根据之前对标签的规划,我们要在这里把标签创建为四个级别,其中一二级为标签的类目,第三级为主要的标签,第四级是具体的标签数值。
2 添加标签
(1)添加一级标签
在页面的左上角点击【添加一级标签】
共有四个一级标签
人口属性 标签编码:TG_BASE标签名称:人口属性标签类型:类目 用户行为 标签编码:TG_BEHAVIOR标签名称:用户行为标签类型:类目 用户偏好 标签编码:TG_FAVOR标签名称:用户偏好标签类型:类目 营销特征 标签编码:TG_BUSI标签名称:营销特征标签类型:类目
最终结果
(2)添加二级标签
在一级标签【人口属性】下点击【添加子标签】 – 自然属性
上级标签:自然属性上级标签编码:TG_BASE标签编码:TG_BASE_PERSONA标签名称:自然属性标签类型:类目
在一级标签【人口属性】下点击【添加子标签】 – 社会属性
上级标签:人口属性上级标签编码:TG_BASE标签编码:TG_BASE_SOCAL标签名称:社会属性标签类型:类目
(3)添加三级标签
【人口属性】的【自然属性】下点击【添加子标签】
上级标签:自然属性上级标签编码:TG_BASE_PERSONA标签编码:TG_BASE_PERSONA_GENDER标签名称:性别标签类型:统计型(客观事实为统计型,人为统计为规则型)标签值类型:文本
(4)添加四级标签
在三级标签【性别】下点击【添加子标签】
上级标签:性别上级标签编码:TG_BASE_PERSONA_GENDER标签编码:TG_BASE_PERSONA_GENDER_001(女为002,未知为003)标签名称:男标签类型:统计型标签值类型:文本
结果展示:
(5)数据库标签字段说明
(6)数据库中存储
在数据库tag_info表中可以查看到刚刚添加的数据
3 标签任务
标签任务要求定义在三级标签中,因为一个三级标签下的四级标签其实是由这个三级标签计算出来的不同结果,如三级标签性别下的男、女、未知,所以任务逻辑要定义在三级标签上。
点击【性别】后面的【添加任务】。
(1)填写字段说明
(2)填写示例
添加内容:
任务状态:
启动:此任务到了晚上某一个时间点,进行计算,得出结果停用:不会进行计算
任务名称:有默认值,可以修改
执行方式:
SQL:大多数统计型和规则性的数据用SQLJAR:多数挖掘型使用JAR,部分规则型可能也会用JAR,如一些规则型的一部分数据不来自于数仓,而是来自于MySQL、Redis、或是其他容器中,或者是sql十分复杂,使得sql的表意已经不足够了
任务执行时点:要求任务执行方式不能早于数仓更新时间
任务sql,`select id as uid,if(gender is null, ‘U’,gender) as query_value from dim_user_info where dt=‘9999-99-99’
起query_value别名原因:不希望对每一个任务都执行不同的sql,希望不管查询什么,只要查询出的结构相同,都使用同一个sql结构处理null值:if(gender is null, ‘U’,gender)
标签规则匹配:将查询出的结果值和四级标签进行映射
F – 男M – 女U – 未知
任务级别:
在很长的处理流程中(抽取–计算–重组–导出),优先级别数字越小,越优先进行处理在有依赖关系的标签中,被依赖的标签应该优先计算
任务参数:遵循spark-submit XXX.jar提交任务的参数,不添加conf是yarn的参数,加conf的是spark参数
--driver-memory=1G --num-executors=3 --executor-memory=2G --executor-cores=2--conf spark.default.parallelism=12
到此为止,一个标签的简单定义完成了。
结果展示:
也可以在数据库中查找到对应的数据
(3)数据库表结构
task_info
其中task_type有两种
TAG:计算标签A、B、CFLOW:流程任务,不针对某一个标签进行计算,如将整个表从hive导入到clickhouse,是整体性的任务
main class为jar包的类
file_id为文件编码
file_info
对应file_info表中id,file_name为jar包名称,file_path为jar包存放位置
task_tag_rule
存放对应规则计算出的结果,如男,女,未知
task_info – file_info – task_tag_rule 对应关系为 1:1:n
二 统计型和规则型标签的SQL处理
0 概述
接下来的任务:利用在网页端填写的规则计算出想要得到的标签结果。
已有数据,存放在MySQL中:1 整数 2 浮点 3 文本 4 日期
tag_info:标签定义(id,标签编码,标签名称,标签等级,上级标签id,标签类型【99类目,1统计型,2规则性,3挖掘型】,标签值类型【1 整数 2 浮点 3 文本 4 日期】,标签对应的计算任务id,标签说明,更新时间,创建时间)
id tag_code tag_name tag_level parent_tag_id tag_type tag_value_type tag_task_id tag_comment update_time create_time
task_info:任务定义(ID,任务名称,任务状态【0未启用,1启用】,任务说明,任务运行时间,任务类型【TAG计算标签,FLOW流程任务,不是针对某一个标签进行计算,如将整个表从hive移动到clickhouse】,执行类型【SQL,JAR】,主类【执行类型为JAR会有主类】,文件ID【对应file_info表中的ID,对应JAR包的名字,位置,文件系统,文件状态,创建时间】(file_info表字段:id file_name file_ex_name file_path file_system file_status create_time ),任务参数,SQL语句,任务执行级别,创建时间)
id task_name task_status task_comment task_time task_type exec_type main_class file_id task_args task_sql task_exec_level create_time
上表称为主表,此外还需要一个子表,存放四级标签的值,一个主表对应多个子表,子表名称为task_tag_rule
task_tag_rule:SQL的查询值与四级标签的映射(ID,标签ID【对应哪个标签,对应tag_info】,任务ID【对应哪个任务,对应task_info】,查询值、子标签ID【对应tag_info中的ID】)
id tag_id task_id query_value sub_tag_id
tag_info、task_info、task_tag_rule对应关系为1:1:n
以上三者完成了对标签任务的定义,最终想要得到标签的计算结果(UID,TAG,TAG_VALUE),存放在hive中。
还需要数据,数据在数据仓库中(ads,dwt,dws,dwd),存放在hive中。
两种数据需要通过Spark程序结合起来,结合的目的是:读取标签的定义和规则,读取数仓中的数据,计算出标签,其中spark读取MySQL中数据的核心是JDBC。
现在假如需要读取很多标签,如年龄、性别、金额等,它们之间的不同在于sql,但结构是相同的,可以将sql中的不同定义为变量,然后Spark通过相同的结构去取不同的变量,可以用一个程序计算出不同的标签,即希望Spark程序是一个通用程序,适用于所有的SQL型任务。
1 总体处理流程
目前虽然填写了一个标签任务,但是并不是光靠一个SQL就能生成标签的,还要结合任务定义中的其他规则来计算标签数据。
那么 “定义+规则+SQL=标签” 这个步骤需要一个spark程序Jar包来完成,并且存储在HDFS。
系统在每日的凌晨0点或者手动会把对应的标签任务生成为待执行的任务进程。
达到任务进程的要求时(1任务执行时点、2任务执行层级),会推送JSON格式的作业提交信息传给远端的任务提交器。
任务提交器会根据JSON组合生成spark-submit及相关参数,提交YARN。
YARN会根据提交时的JAR包路径生成Spark作业。
Spark作业执行阶段会读取MySQL库中标签定义、规则和SQL,去查询Hive中的数据通过计算生成标签最后写入画像库的某张标签表。
在Spark作业提交后,远程提交器会一直监控该作业的运行状态并通过回调,传递给用户画像管理平台,用于监控。
总体处理流程如下图:
一些说明:
上传程序jar包:对应于概述中的Spark程序,将程序打成jar包。任务进程:此时的任务状态为TODO,想要变为START需要考虑两个条件。 是否到达运行任务时点优先级 远程任务提交器:程序需要运行在Yarn和Spark上,命令为spark-submit,想要提交任务前提就是需要在有Spark和Yarn的环境上运行,但是用户画像管理平台可能是单独部署的,在不同的服务器上,这时可以将远程任务提交提部署在有yarn和spark的环境上,实现任务的运行。远程任务提交器会将任务翻译成spark-submit。画像库:一般一个标签一个表。
当前任务分析:完成Spark作业执行的JAR包,提交到管理平台,上传到HDFS存储起来。任务满足调度条件时,读取标签定义及规则,组合成SQL,在数仓中执行,使程序能自动找到需要的jar包,计算,产生结果并写入画像标签库。
2 搭建工程
(1)新建父工程
在idea中新建项目,父工程如下图
工程架构
父工程中不需要源码,所以可以将src文件夹delete掉
(2)pom.xml
<properties><spark.version>3.0.0</spark.version><scala.version>2.12.11</scala.version></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version><!-- provided如果存在,那么运行时该Jar包不存在,也不会打包到最终的发布版本中,只是编译器有效 --><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.lz4</groupId><artifactId>lz4-java</artifactId><version>1.7.1</version></dependency><dependency><groupId>org.anarres.lzo</groupId><artifactId>lzo-hadoop</artifactId><version>1.0.5</version><exclusions><exclusion><groupId>org.apache.hadoop</groupId><artifactId>hadoop-core</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.10.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.10.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.68</version></dependency><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.1.55</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.18</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId><version>${spark.version}</version><scope>provided</scope></dependency></dependencies></dependencyManagement>
子工程继承父工程,所以父工程下的所有dependency,子工程都会都会引用,而dependencyManagement,子工程不会引用。可以理解为dependencyManagement只声明,不引用,dependency既声明又引用。
mllib机器学习包比较特殊,其余子工程不会使用。而且如果有两个子工程都使用机器学习包,使用包的版本不一样,但在父工程中声明了,子工程中不用声明,统一使用父工程中的版本号,dependencyManagement的目的就是统一所使用jar包的版本号。
(3)增加通用模块
在这个工程下未来要实现多个task子模块,这些task子模块需要一些通用的工具类和实体类,点击new – module – maven,创建通用模块。
(4)增加task-sql模块
创建子模块:task-sql,步骤同上
如果代码只用task-sql使用,可以不用提交,否则需要提交到common中
增加scala framework
在src – main目录下创建目录scala,指定为源码目录,创建com.hzy.useprofile.TaskSQLApp
总体目录结构如下图:
继承关系体现在task-sql下的poml文件中,如下
<parent><artifactId>user-profile-task1009</artifactId><groupId>com.hzy.userprofile</groupId><version>1.0-SNAPSHOT</version></parent>
task-common和task-sql的依赖关系体现在:在task-sql下的poml.xml文件中添加父工程依赖
<dependencies><dependency><groupId>com.hzy.userprofile</groupId><artifactId>task-common</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies>
聚合关系体现在父工程poml文件中,父工程执行一个声明周期,modules下的所有模块都会执行
<modules><module>task-common</module><module>task-sql</module></modules>