利用MySQL原数据信息批量转换指定库数据表生成Hive建表语句

求知探索 1年前 ⋅ 718 阅读

1.编写文件工具类

package ccc.utile;

import java.io.*;

/**
 * @author ccc
 * @version 1.0.0
 * @ClassName WriteToFileExample.java
 * @Description TODO IO流
 * @ProjectName ccc
 * @createTime 2021年08月07日 18:32:00
 */
public class WriteToFileExample {
    /**
     * 追加写入数据到指定文件
     *
     * @param str
     * @param path
     */
    public void writeFileSQL(String str, String path) {
        FileWriter fw = null;
        try {
            File f = new File(path);
            fw = new FileWriter(f, true);
        } catch (IOException e) {
            e.printStackTrace();
        }
        PrintWriter pw = new PrintWriter(fw);
        pw.println(str);
        pw.flush();
        try {
            fw.flush();
            pw.close();
            fw.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 清空文件内容
     *
     * @param fileName
     */
    public void clearInfoForFile(String fileName) {
        File file = new File(fileName);
        try {
            if (!file.exists()) {
                file.createNewFile();
            }
            FileWriter fileWriter = new FileWriter(file);
            fileWriter.write("");
            fileWriter.flush();
            fileWriter.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

2.jdbc工具类:

package ccc.utile;

import java.sql.*;
import java.util.Map;

/**
 * @author ccc
 * @version 1.0.0
 * @ClassName JDBCMySQL.java
 * @Description TODO MySQLJDBC链接
 * @ProjectName ccc
 * @createTime 2021年08月06日 14:19:00
 */
public class JDBCJAVAMySQL {
    public static Connection getConnection() {
        //定义Connection对象
        Connection conn = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");//            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/", "root", "123456");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return conn;
    }

    private static void connection(Connection connection) {
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
            }
        }
    }

    private static void resultSet(ResultSet resultSet) {
        if (resultSet != null) {
            try {
                resultSet.close();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
            }
        }
    }

    private static void preparedStatement(PreparedStatement preparedStatement) {
        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
            }
        }
    }

    /*
     * @Description TODO 关闭连接
     * @Date 2021/7/21 22:42
     * @param
     * @return
     */
    public static void close(Connection connection, ResultSet resultSet, PreparedStatement preparedStatement) {
        connection(connection);
        resultSet(resultSet);
        preparedStatement(preparedStatement);
    }
}

3.表属性实体类:

package ccc.enty;

/**
 * @author ccc
 * @version 1.0.0
 * @ClassName TableSchema.java
 * @Description TODO
 * @ProjectName ccc
 * @createTime 2021年08月06日 14:58:00
 */
public class TableSchema {
    private String table_name;
    private String table_comment;

    public String getTable_name() {
        return table_name;
    }

    public void setTable_name(String table_name) {
        this.table_name = table_name;
    }

    public String getTable_comment() {
        return table_comment;
    }

    public void setTable_comment(String table_comment) {
        this.table_comment = table_comment;
    }

    @Override
    public String toString() {
        return "TableSchema{" +
                "table_name='" + table_name + '\'' +
                ", table_comment='" + table_comment + '\'' +
                '}';
    }
}

4.表结构实体类:

package ccc.enty;

/**
 * @author ccc
 * @version 1.0.0
 * @ClassName ColumnSchema.java
 * @Description TODO
 * @ProjectName ccc
 * @createTime 2021年08月06日 14:59:00
 */
public class ColumnSchema {
    private String column_name;
    private String column_comment;
    private String column_type;

    public String getColumn_name() {
        return column_name;
    }

    public void setColumn_name(String column_name) {
        this.column_name = column_name;
    }

    public String getColumn_comment() {
        return column_comment;
    }

    public void setColumn_comment(String column_comment) {
        this.column_comment = column_comment;
    }

    public String getColumn_type() {
        return column_type;
    }

    public void setColumn_type(String column_type) {
        this.column_type = column_type;
    }

    @Override
    public String toString() {
        return "ColumnSchema{" +
                "column_name='" + column_name + '\'' +
                ", column_comment='" + column_comment + '\'' +
                ", column_type='" + column_type + '\'' +
                '}';
    }
}

5.启动类:

package ccc.contorller;

import ccc.enty.ColumnSchema;
import ccc.enty.TableSchema;
import ccc.utile.JDBCJAVAMySQL;
import ccc.utile.WriteToFileExample;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

/** 需关闭连接
 * @author ccc
 * @version 1.0.0
 * @ClassName BatchMySQL2HIVE.java
 * @Description TODO 通过MySQL原数据信息,生成HIVE建表语句
 * @ProjectName ccc
 * @createTime 2021年08月06日 14:52:00
 */
public class BatchMySQL2HIVE {

    /**
     * 获取表信息
     *
     * @return
     */
    public static List<TableSchema> getTable_schema(String databases) {
        List<TableSchema> list = new ArrayList<TableSchema>();
        String sql = "SELECT a.table_name,a.table_comment FROM information_schema.`TABLES` a where a.table_schema=" + "\"" + databases + "\"";
        PreparedStatement ps = null;
        ResultSet resultSet = null;
        Connection connection = JDBCJAVAMySQL.getConnection();
        try {
            ps = connection.prepareStatement(sql);
            resultSet = ps.executeQuery();
            while (resultSet.next()) {
                TableSchema a = new TableSchema();
                a.setTable_name(resultSet.getString("table_name"));
                a.setTable_comment(resultSet.getString("table_comment"));
                list.add(a);
            }
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        } finally {
            JDBCJAVAMySQL.close(connection, resultSet, ps);
        }
        return list;
    }

    /**
     * 获取表结构信息
     *
     * @return
     */
    public static List<ColumnSchema> getColumn_schema(String database, String table_name) {
        List<ColumnSchema> list = new ArrayList<ColumnSchema>();
        String c = "SELECT a.column_name,a.column_comment,a.data_type FROM information_schema.`COLUMNS` a where a.table_schema=" + "\"" + database + "\" ";
        String b = " and a.table_name=" + "\"" + table_name + "\"";
        String sql = c + b;
        System.out.println(sql);
        Connection connection = JDBCJAVAMySQL.getConnection();
        PreparedStatement ps = null;
        ResultSet resultSet = null;
        try {
            ps = connection.prepareStatement(sql);
            resultSet = ps.executeQuery();
            while (resultSet.next()) {
                ColumnSchema a = new ColumnSchema();
                a.setColumn_comment(resultSet.getString("column_comment"));
                a.setColumn_name(resultSet.getString("column_name"));
                a.setColumn_type(resultSet.getString("data_type"));
                list.add(a);
            }
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }
        return list;
    }

    /**
     * 生成表结构
     *
     * @param j
     * @return
     */
    public static String createTable(String database, int j) {
        StringBuffer sb = new StringBuffer();
        List<TableSchema> table_schema = getTable_schema(database);
        List<ColumnSchema> column_schema = getColumn_schema(database, table_schema.get(j).getTable_name());
        sb.append("--" + getTable_comment(table_schema.get(j).getTable_comment(), table_schema.get(j).getTable_name()) + ":" + table_schema.get(j).getTable_name() + "\n");
        sb.append("CREATE TABLE IF NOT EXISTS " + table_schema.get(j).getTable_name() + "(" + "\n");
        int f = 0;
        for (int i = 0; i < column_schema.size(); i++) {
            //判断是否是最后一个字段,如果是则不加都号
            if (f == column_schema.size() - 1) {
                sb.append("  " + tranColumn2xx(column_schema.get(i).getColumn_name()) + " " + getColumn_type(column_schema.get(i).getColumn_type()) + " COMMENT " + getColumn_Comment(column_schema.get(i).getColumn_comment()) + "\n");
            } else {
                sb.append("  " + tranColumn2xx(column_schema.get(i).getColumn_name()) + " " + getColumn_type(column_schema.get(i).getColumn_type()) + " COMMENT " + getColumn_Comment(column_schema.get(i).getColumn_comment()) + "," + "\n");
            }
            f++;
        }
        sb.append(") COMMENT " + "\"" + getTable_comment(table_schema.get(j).getTable_comment(), table_schema.get(j).getTable_name()) + "\"" + ";" + "\n");
        return sb.toString();
    }

    /**
     * 填充字段注释
     *
     * @param comment
     * @return
     */
    public static String getColumn_Comment(String comment) {
        if (comment == null || comment.equals("")) {
            return "\"\"";
        } else {
            return "\"" + comment + "\"";
        }
    }

    /**
     * 填充表注释
     *
     * @param comment
     * @param table_name
     * @return
     */
    public static String getTable_comment(String comment, String table_name) {
        if (comment == null || comment.equals("")) {
            return table_name;
        } else {
            return comment;
        }
    }

    /**
     * 匹配类型
     *
     * @param column_type
     * @return
     */
    public static String getColumn_type(String column_type) {
        if ("int".equals(column_type)) {
            return "BIGINT";
        } else if ("tinyint".equals(column_type)) {
            return "BIGINT";
        } else if ("bigint".equals(column_type)) {
            return "BIGINT";
        } else if ("smallint".equals(column_type)) {
            return "BIGINT";
        } else if ("mediumint".equals(column_type)) {
            return "BIGINT";
        } else if ("float".equals(column_type)) {
            return "DOUBLE";
        } else if ("double".equals(column_type)) {
            return "DOUBLE";
        } else if ("decimal".equals(column_type)) {
            return "STRING";
        } else if ("numeric".equals(column_type)) {
            return "STRING";
        } else if ("bit".equals(column_type)) {
            return "STRING";
        } else if ("char".equals(column_type)) {
            return "STRING";
        } else if ("varchar".equals(column_type)) {
            return "STRING";
        } else if ("blob".equals(column_type)) {
            return "STRING";
        } else if ("mediumblob".equals(column_type)) {
            return "STRING";
        } else if ("longblob".equals(column_type)) {
            return "STRING";
        } else if ("tinytext".equals(column_type)) {
            return "STRING";
        } else if ("mediumtext".equals(column_type)) {
            return "STRING";
        } else if ("longtext".equals(column_type)) {
            return "STRING";
        } else if ("binary".equals(column_type)) {
            return "STRING";
        } else if ("varbinary".equals(column_type)) {
            return "STRING";
        } else if ("time".equals(column_type)) {
            return "STRING";
        } else if ("datetime".equals(column_type)) {
            return "STRING";
        } else if ("timestemp".equals(column_type)) {
            return "STRING";
        } else if ("year".equals(column_type)) {
            return "STRING";
        } else if ("date".equals(column_type)) {
            return "STRING";
        } else if ("text".equals(column_type)) {
            return "STRING";
        }else if ("longtext".equals(column_type)) {
            return "STRING";
        } else {
            return "STRING";
        }
    }

    /**
     * 字段转小写
     *
     * @param column_name 传入原始字段
     * @return 返回转换字段
     */
    public static String tranColumn2xx(String column_name) {
        return column_name.toLowerCase();
    }

    /**
     * 批量启动
     *
     * @param database 数据库名称
     * @param path     写入文件路径
     */
    public static void start(String database, String path) {
        List<TableSchema> table_schema = getTable_schema(database);
        WriteToFileExample writeToFileExample = new WriteToFileExample();
        writeToFileExample.clearInfoForFile(path);
        int f = 0;
        for (int i = 0; i < table_schema.size(); i++) {
            String table = createTable(database, i);
            System.out.println(table);
            writeToFileExample.writeFileSQL(table, path);
            f++;
        }
        System.out.println("共记录:" + f + "条数据!");
    }

    public static void main(String[] args) {
        start("CCC", "mysql2HIVE.sql");
    }
}

至此就实现了Mysql到Hive表的转换功能。


全部评论: 0

    我有话说: