文章目录
一、需求集C有什么?二、模拟生成用户购买商品的信息三、需求集C实现一、需求集C有什么?
所有需求link:基于Flink的个人装扮商城群体用户画像与数据实时统计系统(二)-项目介绍与需求介绍
需求集C是针对模拟生成的用户购买商品的信息提出的,包括:
群体用户画像之当日支付类型偏好各类产品每日销售额实时Top5群体用户画像之每月实时消费水平&用户每月实时消费标签
附:模拟生成的用户购买商品的信息字段
二、模拟生成用户购买商品的信息
用户购买商品的信息实体类编写:ConsumeInfopackage cn.edu.neu.bean;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;/*** @author 32098*/@NoArgsConstructor@AllArgsConstructor@Datapublic class ConsumeInfo {/*** 订单ID*/private String orderId;/*** 用户ID*/private String userId;/*** 商品ID*/private String productId;/*** 商品类型*/private String productType;/*** 商品数量*/private int count;/*** 商品单价*/private double unitPrice;/*** 支付类型*/private String payType;/*** 支付时间*/private long payTime;/*** 优惠券金额*/private double couponAmount;}
用户购买商品的信息模拟生成:ConsumeInfoSource
package cn.edu.neu.source;import cn.edu.neu.bean.ConsumeInfo;import jdk.nashorn.internal.ir.annotations.Ignore;import mons.lang.RandomStringUtils;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Arrays;import java.util.List;import java.util.Random;/*** @author 32098*/public class ConsumeInfoSource extends RichParallelSourceFunction<ConsumeInfo> {private boolean keepMockData;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);keepMockData = true;}@Overridepublic void run(SourceContext<ConsumeInfo> sourceContext) throws Exception {// 订单IDString orderId;// 用户IDString userId;// 商品IDString productId;// 商品类型List<String> productTypeList = Arrays.asList("休闲服类", "运动服类", "西服类", "运动鞋类", "休闲鞋类", "包类", "皮鞋类", "拖鞋类", "靴类");String productType;// 商品数量int count;// 商品单价double unitPrice;// 支付类型List<String> payTypeList = Arrays.asList("微信", "微信", "微信", "支付宝", "支付宝", "掌上银行");String payType;// 支付时间long payTime;// 优惠券金额double couponAmount;Random r = new Random();productType = productTypeList.get(r.nextInt(productTypeList.size()));payType = payTypeList.get(r.nextInt(payTypeList.size()));while (keepMockData) {for(int i=0; i<r.nextInt(20); i++){orderId = RandomStringUtils.randomAlphanumeric(12);userId = RandomStringUtils.randomNumeric(3);productId = RandomStringUtils.randomAlphabetic(8);if(r.nextDouble()>0.5){productType = productTypeList.get(r.nextInt(productTypeList.size()));}count = r.nextInt(10)+1;unitPrice = Math.abs(r.nextGaussian()*30+300);if(r.nextDouble()>0.5){payType = payTypeList.get(r.nextInt(payTypeList.size()));}payTime = System.currentTimeMillis() + 2000;couponAmount = r.nextInt(10);sourceContext.collect(new ConsumeInfo(orderId, userId, productId, productType, count, unitPrice, payType, payTime, couponAmount));}Thread.sleep(2000);}}@Overridepublic void cancel() {keepMockData = false;}}
三、需求集C实现
群体用户画像之当日支付类型偏好package cn.edu.neu.task.windowTask;import cn.edu.neu.bean.ConsumeInfo;import cn.edu.neu.bean.Statics;import cn.edu.neu.sink.StaticsSink;import cn.edu.neu.source.ConsumeInfoSource;import org.apache.mon.RuntimeExecutionMode;import org.apache.mon.functions.MapFunction;import org.apache.mon.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;/*** @author 32098** 群体用户画像之支付类型偏好*/public class PayTypeTask {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);DataStreamSource<ConsumeInfo> consumeInfoDs = env.addSource(new ConsumeInfoSource());SingleOutputStreamOperator<Statics> resultDs = consumeInfoDs.map(new MapFunction<ConsumeInfo, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(ConsumeInfo consumeInfo) throws Exception {String payType = consumeInfo.getPayType();return Tuple2.of(payType, 1L);}}).keyBy(e -> e.f0).window(TumblingProcessingTimeWindows.of(Time.days(1))).trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> tA, Tuple2<String, Long> tB) throws Exception {return Tuple2.of(tA.f0, tA.f1+tB.f1);}}).map(new MapFunction<Tuple2<String, Long>, Statics>() {@Overridepublic Statics map(Tuple2<String, Long> tupleIn) throws Exception {return new Statics("payType", tupleIn.f0, tupleIn.f1);}});resultDs.addSink(new StaticsSink());try {env.execute("payType analysis");} catch (Exception e) {e.printStackTrace();}}}
各类产品当日实时销售额Top5(前台页面展示Top5)
package cn.edu.neu.bean;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;/*** @author 32098*/@NoArgsConstructor@AllArgsConstructor@Datapublic class ProductTypeSaleStatic {private String productType;private Double totalSale;}
package cn.edu.neu.task.windowTask;import cn.edu.neu.bean.ConsumeInfo;import cn.edu.neu.bean.ProductTypeSaleStatic;import cn.edu.neu.sink.ProductTypeSaleSink;import cn.edu.neu.source.ConsumeInfoSource;import org.apache.mon.RuntimeExecutionMode;import org.apache.mon.functions.MapFunction;import org.apache.mon.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;/*** @author 32098** 各类产品每天销售额实时统计:10触发一次计算*/public class ProductTypeSaleTask {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);DataStreamSource<ConsumeInfo> consumeInfoDs = env.addSource(new ConsumeInfoSource());SingleOutputStreamOperator<ProductTypeSaleStatic> productTypeTotalSaleDs = consumeInfoDs.map(new MapFunction<ConsumeInfo, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(ConsumeInfo consumeInfo) throws Exception {String productType = consumeInfo.getProductType();double totalAmount = consumeInfo.getUnitPrice() * consumeInfo.getCount() - consumeInfo.getCouponAmount();return Tuple2.of(productType, totalAmount);}}).keyBy(e -> e.f0).window(TumblingProcessingTimeWindows.of(Time.days(1))).trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))).reduce(new ReduceFunction<Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> reduce(Tuple2<String, Double> inA, Tuple2<String, Double> inB) throws Exception {return Tuple2.of(inA.f0, inA.f1 + inB.f1);}}).map(new MapFunction<Tuple2<String, Double>, ProductTypeSaleStatic>() {@Overridepublic ProductTypeSaleStatic map(Tuple2<String, Double> in) throws Exception {return new ProductTypeSaleStatic(in.f0, in.f1);}});productTypeTotalSaleDs.addSink(new ProductTypeSaleSink());env.execute();}}
群体用户画像之每月实时消费水平&用户每月实时消费标签
package cn.edu.neu.bean;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;/*** @author 32098*/@NoArgsConstructor@AllArgsConstructor@Datapublic class UserConsumptionLevelStatics {private String uid;private double avgAmount;private String level;}
package cn.edu.neu.task.windowTask;import cn.edu.neu.bean.ConsumeInfo;import cn.edu.neu.bean.Statics;import cn.edu.neu.bean.UserConsumptionLevelStatics;import cn.edu.neu.sink.ConsumptionLevelStaticsSink;import cn.edu.neu.sink.StaticsSink;import cn.edu.neu.source.ConsumeInfoSource;import org.apache.mon.RuntimeExecutionMode;import org.apache.mon.functions.MapFunction;import org.apache.mon.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;import scala.tools.nsc.Global;import java.math.BigDecimal;import java.text.NumberFormat;/**** @author 32098** 1.群体用户画像之每月实时消费水平* 2.用户每月实时消费标签*/public class ConsumptionLevelTask {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);DataStreamSource<ConsumeInfo> consumeInfoDs = env.addSource(new ConsumeInfoSource());SingleOutputStreamOperator<Tuple2<String, Double>> userAvgConsumeDs = consumeInfoDs.map(new MapFunction<ConsumeInfo, Tuple3<String, Double, Integer>>() {@Overridepublic Tuple3<String, Double, Integer> map(ConsumeInfo consumeInfo) throws Exception {String userId = consumeInfo.getUserId();double totalAmount = consumeInfo.getUnitPrice() * consumeInfo.getCount() - consumeInfo.getCouponAmount();return Tuple3.of(userId, totalAmount, 1);}}).keyBy(e -> e.f0).window(TumblingProcessingTimeWindows.of(Time.days(30))).trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))).reduce(new ReduceFunction<Tuple3<String, Double, Integer>>() {@Overridepublic Tuple3<String, Double, Integer> reduce(Tuple3<String, Double, Integer> inA, Tuple3<String, Double, Integer> inB) throws Exception {return Tuple3.of(inA.f0, inA.f1 + inB.f1, inA.f2 + inA.f2);}}).map(new MapFunction<Tuple3<String, Double, Integer>, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(Tuple3<String, Double, Integer> in) throws Exception {return Tuple2.of(in.f0, in.f1 / in.f2);}});NumberFormat nbf= NumberFormat.getInstance();SingleOutputStreamOperator<UserConsumptionLevelStatics> resultDs = userAvgConsumeDs.map(new MapFunction<Tuple2<String, Double>, UserConsumptionLevelStatics>() {@Overridepublic UserConsumptionLevelStatics map(Tuple2<String, Double> tupleIn) throws Exception {String uid = tupleIn.f0;double avgAmount = tupleIn.f1;String level = "低消费水平";if (avgAmount >= 800 && avgAmount < 1800) {level = "中消费水平";} else if (avgAmount >= 1800) {level = "高消费水平";}BigDecimal bigDecimal = new BigDecimal(avgAmount);return new UserConsumptionLevelStatics(uid, bigDecimal.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue(), level);}});resultDs.addSink(new ConsumptionLevelStaticsSink());try {env.execute();} catch (Exception e) {e.printStackTrace();}}}
附:
上述需求实现涉及的Sink类:ProductTypeSaleSink
package cn.edu.neu.sink;import cn.edu.neu.bean.ProductTypeSaleStatic;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;/*** @author 32098*/public class ProductTypeSaleSink extends RichSinkFunction<ProductTypeSaleStatic> {private Connection conn = null;private PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection("jdbc:mysql://master:3306/user_portrait", "root", "Hive@");String sql = "";sql = "insert into product_type_total_sale(product_type, total_sale) values (?,?) on duplicate key update total_sale=?";ps = conn.prepareStatement(sql);}@Overridepublic void invoke(ProductTypeSaleStatic value, Context context) throws Exception {ps.setString(1, value.getProductType());ps.setDouble(2, value.getTotalSale());ps.setDouble(3, value.getTotalSale());ps.executeUpdate();}@Overridepublic void close() throws Exception {if (conn != null) {conn.close();}if (ps != null) {ps.close();}}}
上述需求实现涉及的Sink类:ConsumptionLevelStaticsSink
package cn.edu.neu.sink;import cn.edu.neu.bean.Statics;import cn.edu.neu.bean.UserConsumptionLevelStatics;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;/*** @author 32098*/public class ConsumptionLevelStaticsSink extends RichSinkFunction<UserConsumptionLevelStatics> {private Connection conn = null;private PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection("jdbc:mysql://master:3306/user_portrait", "root", "Hive@");String sql = "";sql = "insert into user_consumption_level(userId, avg_amount, level) values (?,?,?) on duplicate key update avg_amount=?, level=?";ps = conn.prepareStatement(sql);}@Overridepublic void invoke(UserConsumptionLevelStatics statics, Context context) throws Exception {ps.setString(1, statics.getUid());ps.setDouble(2, statics.getAvgAmount());ps.setString(3, statics.getLevel());ps.setDouble(4, statics.getAvgAmount());ps.setString(5, statics.getLevel());ps.executeUpdate();}@Overridepublic void close() throws Exception {if (conn != null) {conn.close();}if (ps != null) {ps.close();}}}
Mysql数据库表: