flink 测试数据生成
所属分类 flink
浏览量 935
交易转账事件数据生成
event_id event_time account_from account_to amount
event_id 事件ID 事件唯一标识
event_time
event_time 事件时间
account_from 转出账户
account_to 收款账户
amount 转账金额
import java.io.BufferedWriter;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import dyyx.util.CommUtil;
public class EventDataGen {
private static final Random RAND = new Random();
private static final String SEP = ",";
private static final String MIN_TIME_STR = "2020-01-01 00:00:00";
private static final String MAX_TIME_STR = "2021-05-01 00:00:00";
private static final long MIN_TIME_LONG = CommUtil.parseDate(MIN_TIME_STR, null).getTime();
private static final long MAX_TIME_LONG = CommUtil.parseDate(MAX_TIME_STR, null).getTime();
private static final int FACTOR = 100;
private static final int MAX_INT_RANDOM = (int) ((MAX_TIME_LONG - MIN_TIME_LONG) / FACTOR - 1000);
private static final AtomicLong SEQ = new AtomicLong(1);
// event_id event_time account_from account_to amount trade_type
public static void main(String[] args) throws Exception {
int count = CommUtil.getInt(System.getProperty("count"));
if (count <= 0) {
count = 50000000;
}
String file = System.getProperty("file");
if (CommUtil.isBlank(file)) {
file = "/tmp/events.txt";
}
System.out.println("count=" + count + ",file=" + file);
BufferedWriter writer = null;
try {
writer = CommUtil.createBufferedWriter(file, null, false);
for (int i = 0; i < count; i++) {
if(i>0 && i%1000000 <=0) {
System.out.println(i+","+LocalDateTime.now());
}
writer.write(genDataLine());
writer.newLine();
}
} finally {
CommUtil.close(writer);
}
System.out.println("write done,"+LocalDateTime.now()+",count=" + count + ",file=" + file);
}
private static String genDataLine() {
StringBuilder sb = new StringBuilder();
sb.append(SEQ.getAndIncrement()).append(SEP);
sb.append(genEventTime()).append(SEP);
sb.append(genAcountFrom()).append(SEP);
sb.append(genAcountTo()).append(SEP);
sb.append(genAmount());
return sb.toString();
}
private static int genAmount() {
return 1 + RAND.nextInt(10000);
}
private static String genAcount(String prefix) {
return prefix + (1000 + RAND.nextInt(100));
}
private static String genAcountFrom() {
return genAcount("account_from_");
}
private static String genAcountTo() {
return genAcount("account_to_");
}
private static String genEventTime() {
long tmp = (long) RAND.nextInt(MAX_INT_RANDOM);
long time = MIN_TIME_LONG + (tmp * FACTOR);
Date date = new Date(time);
return CommUtil.getDateString(date);
}
private static String genMaxEventTime() {
long tmp = MAX_INT_RANDOM;
long time = MIN_TIME_LONG + (tmp * FACTOR);
Date date = new Date(time);
return CommUtil.getDateString(date);
}
}
完整代码
https://gitee.com/dyyx/demos/blob/master/flinkdemo/src/main/java/dyyx/zb/EventDataGen.java
flink dataset 测试数据说明
上一篇
下一篇
flink dataset groupBy sortGroup 实例
flink dataset 输出注意点
搞金融必备的各类计算公式
flink dataset 大数据集测试
flink dataset groupBy sortBy 实例与说明
flink术语