100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > 基于Flink的个人装扮商城群体用户画像与数据实时统计系统(六)-需求集C实现

基于Flink的个人装扮商城群体用户画像与数据实时统计系统(六)-需求集C实现

时间:2019-12-19 13:16:33

相关推荐

基于Flink的个人装扮商城群体用户画像与数据实时统计系统(六)-需求集C实现

文章目录

一、需求集C有什么?二、模拟生成用户购买商品的信息三、需求集C实现

一、需求集C有什么?

所有需求link:基于Flink的个人装扮商城群体用户画像与数据实时统计系统(二)-项目介绍与需求介绍

需求集C是针对模拟生成的用户购买商品的信息提出的,包括:

群体用户画像之当日支付类型偏好各类产品每日销售额实时Top5群体用户画像之每月实时消费水平&用户每月实时消费标签

附:模拟生成的用户购买商品的信息字段

二、模拟生成用户购买商品的信息

用户购买商品的信息实体类编写:ConsumeInfo

package cn.edu.neu.bean;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;/*** @author 32098*/@NoArgsConstructor@AllArgsConstructor@Datapublic class ConsumeInfo {/*** 订单ID*/private String orderId;/*** 用户ID*/private String userId;/*** 商品ID*/private String productId;/*** 商品类型*/private String productType;/*** 商品数量*/private int count;/*** 商品单价*/private double unitPrice;/*** 支付类型*/private String payType;/*** 支付时间*/private long payTime;/*** 优惠券金额*/private double couponAmount;}

用户购买商品的信息模拟生成:ConsumeInfoSource

package cn.edu.neu.source;import cn.edu.neu.bean.ConsumeInfo;import jdk.nashorn.internal.ir.annotations.Ignore;import mons.lang.RandomStringUtils;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Arrays;import java.util.List;import java.util.Random;/*** @author 32098*/public class ConsumeInfoSource extends RichParallelSourceFunction<ConsumeInfo> {private boolean keepMockData;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);keepMockData = true;}@Overridepublic void run(SourceContext<ConsumeInfo> sourceContext) throws Exception {// 订单IDString orderId;// 用户IDString userId;// 商品IDString productId;// 商品类型List<String> productTypeList = Arrays.asList("休闲服类", "运动服类", "西服类", "运动鞋类", "休闲鞋类", "包类", "皮鞋类", "拖鞋类", "靴类");String productType;// 商品数量int count;// 商品单价double unitPrice;// 支付类型List<String> payTypeList = Arrays.asList("微信", "微信", "微信", "支付宝", "支付宝", "掌上银行");String payType;// 支付时间long payTime;// 优惠券金额double couponAmount;Random r = new Random();productType = productTypeList.get(r.nextInt(productTypeList.size()));payType = payTypeList.get(r.nextInt(payTypeList.size()));while (keepMockData) {for(int i=0; i<r.nextInt(20); i++){orderId = RandomStringUtils.randomAlphanumeric(12);userId = RandomStringUtils.randomNumeric(3);productId = RandomStringUtils.randomAlphabetic(8);if(r.nextDouble()>0.5){productType = productTypeList.get(r.nextInt(productTypeList.size()));}count = r.nextInt(10)+1;unitPrice = Math.abs(r.nextGaussian()*30+300);if(r.nextDouble()>0.5){payType = payTypeList.get(r.nextInt(payTypeList.size()));}payTime = System.currentTimeMillis() + 2000;couponAmount = r.nextInt(10);sourceContext.collect(new ConsumeInfo(orderId, userId, productId, productType, count, unitPrice, payType, payTime, couponAmount));}Thread.sleep(2000);}}@Overridepublic void cancel() {keepMockData = false;}}

三、需求集C实现

群体用户画像之当日支付类型偏好

package cn.edu.neu.task.windowTask;import cn.edu.neu.bean.ConsumeInfo;import cn.edu.neu.bean.Statics;import cn.edu.neu.sink.StaticsSink;import cn.edu.neu.source.ConsumeInfoSource;import org.apache.mon.RuntimeExecutionMode;import org.apache.mon.functions.MapFunction;import org.apache.mon.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;/*** @author 32098** 群体用户画像之支付类型偏好*/public class PayTypeTask {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);DataStreamSource<ConsumeInfo> consumeInfoDs = env.addSource(new ConsumeInfoSource());SingleOutputStreamOperator<Statics> resultDs = consumeInfoDs.map(new MapFunction<ConsumeInfo, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(ConsumeInfo consumeInfo) throws Exception {String payType = consumeInfo.getPayType();return Tuple2.of(payType, 1L);}}).keyBy(e -> e.f0).window(TumblingProcessingTimeWindows.of(Time.days(1))).trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> tA, Tuple2<String, Long> tB) throws Exception {return Tuple2.of(tA.f0, tA.f1+tB.f1);}}).map(new MapFunction<Tuple2<String, Long>, Statics>() {@Overridepublic Statics map(Tuple2<String, Long> tupleIn) throws Exception {return new Statics("payType", tupleIn.f0, tupleIn.f1);}});resultDs.addSink(new StaticsSink());try {env.execute("payType analysis");} catch (Exception e) {e.printStackTrace();}}}

各类产品当日实时销售额Top5(前台页面展示Top5)

package cn.edu.neu.bean;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;/*** @author 32098*/@NoArgsConstructor@AllArgsConstructor@Datapublic class ProductTypeSaleStatic {private String productType;private Double totalSale;}

package cn.edu.neu.task.windowTask;import cn.edu.neu.bean.ConsumeInfo;import cn.edu.neu.bean.ProductTypeSaleStatic;import cn.edu.neu.sink.ProductTypeSaleSink;import cn.edu.neu.source.ConsumeInfoSource;import org.apache.mon.RuntimeExecutionMode;import org.apache.mon.functions.MapFunction;import org.apache.mon.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;/*** @author 32098** 各类产品每天销售额实时统计:10触发一次计算*/public class ProductTypeSaleTask {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);DataStreamSource<ConsumeInfo> consumeInfoDs = env.addSource(new ConsumeInfoSource());SingleOutputStreamOperator<ProductTypeSaleStatic> productTypeTotalSaleDs = consumeInfoDs.map(new MapFunction<ConsumeInfo, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(ConsumeInfo consumeInfo) throws Exception {String productType = consumeInfo.getProductType();double totalAmount = consumeInfo.getUnitPrice() * consumeInfo.getCount() - consumeInfo.getCouponAmount();return Tuple2.of(productType, totalAmount);}}).keyBy(e -> e.f0).window(TumblingProcessingTimeWindows.of(Time.days(1))).trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))).reduce(new ReduceFunction<Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> reduce(Tuple2<String, Double> inA, Tuple2<String, Double> inB) throws Exception {return Tuple2.of(inA.f0, inA.f1 + inB.f1);}}).map(new MapFunction<Tuple2<String, Double>, ProductTypeSaleStatic>() {@Overridepublic ProductTypeSaleStatic map(Tuple2<String, Double> in) throws Exception {return new ProductTypeSaleStatic(in.f0, in.f1);}});productTypeTotalSaleDs.addSink(new ProductTypeSaleSink());env.execute();}}

群体用户画像之每月实时消费水平&用户每月实时消费标签

package cn.edu.neu.bean;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;/*** @author 32098*/@NoArgsConstructor@AllArgsConstructor@Datapublic class UserConsumptionLevelStatics {private String uid;private double avgAmount;private String level;}

package cn.edu.neu.task.windowTask;import cn.edu.neu.bean.ConsumeInfo;import cn.edu.neu.bean.Statics;import cn.edu.neu.bean.UserConsumptionLevelStatics;import cn.edu.neu.sink.ConsumptionLevelStaticsSink;import cn.edu.neu.sink.StaticsSink;import cn.edu.neu.source.ConsumeInfoSource;import org.apache.mon.RuntimeExecutionMode;import org.apache.mon.functions.MapFunction;import org.apache.mon.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;import scala.tools.nsc.Global;import java.math.BigDecimal;import java.text.NumberFormat;/**** @author 32098** 1.群体用户画像之每月实时消费水平* 2.用户每月实时消费标签*/public class ConsumptionLevelTask {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);DataStreamSource<ConsumeInfo> consumeInfoDs = env.addSource(new ConsumeInfoSource());SingleOutputStreamOperator<Tuple2<String, Double>> userAvgConsumeDs = consumeInfoDs.map(new MapFunction<ConsumeInfo, Tuple3<String, Double, Integer>>() {@Overridepublic Tuple3<String, Double, Integer> map(ConsumeInfo consumeInfo) throws Exception {String userId = consumeInfo.getUserId();double totalAmount = consumeInfo.getUnitPrice() * consumeInfo.getCount() - consumeInfo.getCouponAmount();return Tuple3.of(userId, totalAmount, 1);}}).keyBy(e -> e.f0).window(TumblingProcessingTimeWindows.of(Time.days(30))).trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))).reduce(new ReduceFunction<Tuple3<String, Double, Integer>>() {@Overridepublic Tuple3<String, Double, Integer> reduce(Tuple3<String, Double, Integer> inA, Tuple3<String, Double, Integer> inB) throws Exception {return Tuple3.of(inA.f0, inA.f1 + inB.f1, inA.f2 + inA.f2);}}).map(new MapFunction<Tuple3<String, Double, Integer>, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(Tuple3<String, Double, Integer> in) throws Exception {return Tuple2.of(in.f0, in.f1 / in.f2);}});NumberFormat nbf= NumberFormat.getInstance();SingleOutputStreamOperator<UserConsumptionLevelStatics> resultDs = userAvgConsumeDs.map(new MapFunction<Tuple2<String, Double>, UserConsumptionLevelStatics>() {@Overridepublic UserConsumptionLevelStatics map(Tuple2<String, Double> tupleIn) throws Exception {String uid = tupleIn.f0;double avgAmount = tupleIn.f1;String level = "低消费水平";if (avgAmount >= 800 && avgAmount < 1800) {level = "中消费水平";} else if (avgAmount >= 1800) {level = "高消费水平";}BigDecimal bigDecimal = new BigDecimal(avgAmount);return new UserConsumptionLevelStatics(uid, bigDecimal.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue(), level);}});resultDs.addSink(new ConsumptionLevelStaticsSink());try {env.execute();} catch (Exception e) {e.printStackTrace();}}}

附:

上述需求实现涉及的Sink类:ProductTypeSaleSink

package cn.edu.neu.sink;import cn.edu.neu.bean.ProductTypeSaleStatic;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;/*** @author 32098*/public class ProductTypeSaleSink extends RichSinkFunction<ProductTypeSaleStatic> {private Connection conn = null;private PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection("jdbc:mysql://master:3306/user_portrait", "root", "Hive@");String sql = "";sql = "insert into product_type_total_sale(product_type, total_sale) values (?,?) on duplicate key update total_sale=?";ps = conn.prepareStatement(sql);}@Overridepublic void invoke(ProductTypeSaleStatic value, Context context) throws Exception {ps.setString(1, value.getProductType());ps.setDouble(2, value.getTotalSale());ps.setDouble(3, value.getTotalSale());ps.executeUpdate();}@Overridepublic void close() throws Exception {if (conn != null) {conn.close();}if (ps != null) {ps.close();}}}

上述需求实现涉及的Sink类:ConsumptionLevelStaticsSink

package cn.edu.neu.sink;import cn.edu.neu.bean.Statics;import cn.edu.neu.bean.UserConsumptionLevelStatics;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;/*** @author 32098*/public class ConsumptionLevelStaticsSink extends RichSinkFunction<UserConsumptionLevelStatics> {private Connection conn = null;private PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection("jdbc:mysql://master:3306/user_portrait", "root", "Hive@");String sql = "";sql = "insert into user_consumption_level(userId, avg_amount, level) values (?,?,?) on duplicate key update avg_amount=?, level=?";ps = conn.prepareStatement(sql);}@Overridepublic void invoke(UserConsumptionLevelStatics statics, Context context) throws Exception {ps.setString(1, statics.getUid());ps.setDouble(2, statics.getAvgAmount());ps.setString(3, statics.getLevel());ps.setDouble(4, statics.getAvgAmount());ps.setString(5, statics.getLevel());ps.executeUpdate();}@Overridepublic void close() throws Exception {if (conn != null) {conn.close();}if (ps != null) {ps.close();}}}

Mysql数据库表:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。