首页  

flink 测试数据生成     所属分类 flink 浏览量 766
交易转账事件数据生成
event_id event_time account_from account_to amount 
event_id     事件ID 事件唯一标识
event_time
event_time     事件时间
account_from   转出账户
account_to     收款账户
amount         转账金额



import java.io.BufferedWriter;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

import dyyx.util.CommUtil;

public class EventDataGen {

    private static final Random RAND = new Random();

    private static final String SEP = ",";

    private static final String MIN_TIME_STR = "2020-01-01 00:00:00";
    private static final String MAX_TIME_STR = "2021-05-01 00:00:00";

    private static final long MIN_TIME_LONG = CommUtil.parseDate(MIN_TIME_STR, null).getTime();
    private static final long MAX_TIME_LONG = CommUtil.parseDate(MAX_TIME_STR, null).getTime();

    private static final int FACTOR = 100;
    private static final int MAX_INT_RANDOM = (int) ((MAX_TIME_LONG - MIN_TIME_LONG) / FACTOR - 1000);

    private static final AtomicLong SEQ = new AtomicLong(1);

    // event_id event_time account_from account_to amount trade_type

    public static void main(String[] args) throws Exception {

        int count = CommUtil.getInt(System.getProperty("count"));
        if (count <= 0) {
            count = 50000000;
        }
        String file = System.getProperty("file");
        if (CommUtil.isBlank(file)) {
            file = "/tmp/events.txt";
        }

        System.out.println("count=" + count + ",file=" + file);

        BufferedWriter writer = null;

        try {
            writer = CommUtil.createBufferedWriter(file, null, false);

            for (int i = 0; i < count; i++) {
                if(i>0 && i%1000000 <=0) {
                    System.out.println(i+","+LocalDateTime.now());
                }
                writer.write(genDataLine());
                writer.newLine();
            }

        } finally {
            CommUtil.close(writer);
        }
        
        System.out.println("write done,"+LocalDateTime.now()+",count=" + count + ",file=" + file);


    }

    private static String genDataLine() {
        StringBuilder sb = new StringBuilder();
        sb.append(SEQ.getAndIncrement()).append(SEP);
        sb.append(genEventTime()).append(SEP);
        sb.append(genAcountFrom()).append(SEP);
        sb.append(genAcountTo()).append(SEP);
        sb.append(genAmount());

        return sb.toString();
    }

    private static int genAmount() {
        return 1 + RAND.nextInt(10000);
    }

    private static String genAcount(String prefix) {
        return prefix + (1000 + RAND.nextInt(100));
    }

    private static String genAcountFrom() {
        return genAcount("account_from_");
    }

    private static String genAcountTo() {
        return genAcount("account_to_");
    }

    private static String genEventTime() {
        long tmp = (long) RAND.nextInt(MAX_INT_RANDOM);
        long time = MIN_TIME_LONG + (tmp * FACTOR);
        Date date = new Date(time);
        return CommUtil.getDateString(date);
    }

    private static String genMaxEventTime() {
        long tmp = MAX_INT_RANDOM;
        long time = MIN_TIME_LONG + (tmp * FACTOR);
        Date date = new Date(time);
        return CommUtil.getDateString(date);
    }

}


完整代码
https://gitee.com/dyyx/demos/blob/master/flinkdemo/src/main/java/dyyx/zb/EventDataGen.java



flink dataset 测试数据说明

上一篇     下一篇
flink dataset groupBy sortGroup 实例

flink dataset 输出注意点

搞金融必备的各类计算公式

flink dataset 大数据集测试

flink dataset groupBy sortBy 实例与说明

flink术语