diff --git a/README-CN.md b/README-CN.md index e2bf2a6..c830187 100644 --- a/README-CN.md +++ b/README-CN.md @@ -106,6 +106,15 @@ job配置 ![image](https://user-images.githubusercontent.com/28300167/229501053-4e33b6fb-851a-4fb2-9b19-16308359f57e.png) +(3)数据接入 + +ods表管理:通过解析shell文件中的库表,进行抽数表的管理 + +配置项1:shell文件所在路径 + +配置项2:hdfs namenode所在节点url +![img_5.png](img_5.png) +![image-20230628143018827](https://image-1312585102.cos.ap-nanjing.myqcloud.com/typora/image-20230628143018827.png) #### 系统运行 系统运行环境要求: diff --git a/img_5.png b/img_5.png new file mode 100644 index 0000000..2f4b158 Binary files /dev/null and b/img_5.png differ diff --git a/pom.xml b/pom.xml index 9650392..5544073 100644 --- a/pom.xml +++ b/pom.xml @@ -274,6 +274,10 @@ org.springframework.boot spring-boot-starter-mail + + org.projectlombok + lombok + diff --git a/src/main/java/com/vince/xq/project/system/dataAccess/controller/OdsTableManageController.java b/src/main/java/com/vince/xq/project/system/dataAccess/controller/OdsTableManageController.java new file mode 100644 index 0000000..3fddaaa --- /dev/null +++ b/src/main/java/com/vince/xq/project/system/dataAccess/controller/OdsTableManageController.java @@ -0,0 +1,78 @@ +package com.vince.xq.project.system.dataAccess.controller; + +import com.vince.xq.common.utils.StringUtils; +import com.vince.xq.framework.web.controller.BaseController; +import com.vince.xq.framework.web.page.TableDataInfo; +import com.vince.xq.project.system.dataAccess.domain.BigDataTableQuery; +import com.vince.xq.project.tool.gen.util.HdfsUtils; +import org.apache.shiro.authz.annotation.RequiresPermissions; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +/** + * ods表管理 + */ +@Controller +@RequestMapping("/system/dataAccess") +public class OdsTableManageController extends BaseController { + private String prefix = "system/dataAccess"; + @Value("${hdfs.sqoop.file}") + private String sqoopFile; + @Value("${hdfs.url}") + private String HDFS_PATH; + + @GetMapping("") + public String bigDataTablePage() { return prefix + "/odsTableManage"; } + @RequestMapping("/list") + @RequiresPermissions("system:odsTableManage:view") + @ResponseBody + public TableDataInfo bigDataTableQuery(@RequestParam String table) throws Exception{ + startPage(); + List list=new ArrayList<>(); + List result=new ArrayList<>(); + String[] split = sqoopFile.replaceAll("\n","").split(","); + HdfsUtils hdfsUtils=new HdfsUtils(); + hdfsUtils.setHDFS_PATH(HDFS_PATH); + hdfsUtils.initialHdfsSession(); + for(String str:split){ + String file = hdfsUtils.text(str, "utf8").replaceAll("\n",""); + String[] split1 = file.split(";"); + for(String cmd:split1){ + while (true){ + if(cmd.contains(" ")){ + cmd=cmd.replaceAll(" "," "); + }else{ + break; + } + } + try { + BigDataTableQuery bigDataTableQuery=new BigDataTableQuery(); + bigDataTableQuery.setThread(cmd.substring(cmd.length()-1,cmd.length())); + bigDataTableQuery.setDb(cmd.split("--hive-database ")[1].split(" --fields-terminated-by")[0].split(" ")[0]); + bigDataTableQuery.setTable(cmd.split("--hive-database ")[1].split(" --fields-terminated-by")[0].split(" ")[2]); + bigDataTableQuery.setFile(new File(str).getName()); + list.add(bigDataTableQuery); + }catch (Exception e){ + e.printStackTrace(); + System.out.println(cmd); + } + } + } + //表名查找返回 + for (BigDataTableQuery tableQuery:list) { + if (tableQuery.getTable().equals(table)){ + result.add(tableQuery); + return getDataTable(result); + } + } + return getDataTable(list); + } +} diff --git a/src/main/java/com/vince/xq/project/system/dataAccess/domain/BigDataTableQuery.java b/src/main/java/com/vince/xq/project/system/dataAccess/domain/BigDataTableQuery.java new file mode 100644 index 0000000..285ea60 --- /dev/null +++ b/src/main/java/com/vince/xq/project/system/dataAccess/domain/BigDataTableQuery.java @@ -0,0 +1,19 @@ +package com.vince.xq.project.system.dataAccess.domain; + +import lombok.Data; + +/** + * ods表查询bean + */ +@Data +public class BigDataTableQuery { + + private String file; + + private String db; + + private String table; + + private String thread; + +} diff --git a/src/main/java/com/vince/xq/project/tool/gen/util/HdfsUtils.java b/src/main/java/com/vince/xq/project/tool/gen/util/HdfsUtils.java new file mode 100644 index 0000000..48be5c8 --- /dev/null +++ b/src/main/java/com/vince/xq/project/tool/gen/util/HdfsUtils.java @@ -0,0 +1,209 @@ +package com.vince.xq.project.tool.gen.util; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.springframework.stereotype.Component; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * HDFS工具类 + */ +public class HdfsUtils { + private String HDFS_PATH; + private static final String HDFS_USER = "hdfs"; + private FileSystem fileSystem; + + public FileSystem getFileSystem() { + return fileSystem; + } + + public void setFileSystem(FileSystem fileSystem) { + this.fileSystem = fileSystem; + } + + //初始化连接hdfs + public boolean initialHdfsSession() { + try { + Configuration configuration = new Configuration(); + configuration.set("dfs.replication", "1"); + fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + return true; + } + + public String getHDFS_PATH() { + return HDFS_PATH; + } + + public void setHDFS_PATH(String HDFS_PATH) { + this.HDFS_PATH = HDFS_PATH; + } + + /** + * 创建目录 支持递归创建 + * + * @param path 路径地址 + * @return 创建是否成功 + */ + public static boolean mkdir(String path) throws Exception { + HdfsUtils hdfsUtils = new HdfsUtils(); + hdfsUtils.initialHdfsSession(); + return hdfsUtils.fileSystem.mkdirs(new Path(path)); + } + /** + * 查看文件内容 + * + * @param path 路径地址 + * @return 返回文件内容字符串 + */ + public String text(String path, String encode) throws Exception { + FSDataInputStream inputStream = fileSystem.open(new Path(path)); + return inputStreamToString(inputStream, encode); + } + /** + * 创建文件并写入内容 + * + * @param path 路径地址 + * @param context 文件内容 + */ + public void createAndWrite(String path, String context) throws Exception { + FSDataOutputStream out = fileSystem.create(new Path(path)); + out.write(context.getBytes()); + out.flush(); + out.close(); + } + /** + * 重命名文件名 + * + * @param oldPath 旧文件路径 + * @param newPath 新文件路径 + * @return 重命名是否成功 + */ + public boolean rename(String oldPath, String newPath) throws Exception { + return fileSystem.rename(new Path(oldPath), new Path(newPath)); + } + /** + * 拷贝文件到HDFS + * + * @param localPath 本地文件路径 + * @param hdfsPath 存储到hdfs上的路径 + */ + public void copyFromLocalFile(String localPath, String hdfsPath) throws Exception { + fileSystem.copyFromLocalFile(new Path(localPath), new Path(hdfsPath)); + } + /** + * 从HDFS下载文件 + * + * @param hdfsPath 文件在hdfs上的路径 + * @param localPath 存储到本地的路径 + */ + public void copyToLocalFile(String hdfsPath, String localPath) throws Exception { + fileSystem.copyToLocalFile(new Path(hdfsPath), new Path(localPath)); + } + + + /** + * 查询给定路径中文件/目录的状态 + * + * @param path 目录路径 + * @return 文件信息的数组 + */ + public FileStatus[] listFiles(String path) throws Exception { + return fileSystem.listStatus(new Path(path)); + } + + /** + * 判断目录是否存在 + */ + public boolean getDirectoryIsExist(String path){ + try { + if (this.initialHdfsSession()){ + return fileSystem.exists(new Path(path)); + } + } catch (IOException e) { + e.printStackTrace(); + } catch (IllegalArgumentException e) { + e.printStackTrace(); + } + return false; + } + /** + * 查询给定路径中文件的状态和块位置 + * + * @param path 路径可以是目录路径也可以是文件路径 + * @return 文件信息的数组 + */ + public RemoteIterator listFilesRecursive(String path, boolean recursive) throws Exception { + return fileSystem.listFiles(new Path(path), recursive); + } + + + /** + * 查看文件块信息 + * + * @param path 文件路径 + * @return 块信息数组 + */ + public BlockLocation[] getFileBlockLocations(String path) throws Exception { + FileStatus fileStatus = fileSystem.getFileStatus(new Path(path)); + return fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + } + + /** + * 删除文件 + * + * @param path 文件路径 + * @return 删除是否成功 + */ + public boolean delete(String path){ + boolean flag =false; + try { + if (initialHdfsSession()){ + flag = fileSystem.delete(new Path(path), true); + } + } catch (IOException e) { + e.printStackTrace(); + } + return flag; + } + + + /** + * 把输入流转换为指定字符 + * + * @param inputStream 输入流 + * @param encode 指定编码类型 + */ + private static String inputStreamToString(InputStream inputStream, String encode) { + try { + if (encode == null || ("".equals(encode))) { + encode = "utf-8"; + } + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encode)); + StringBuilder builder = new StringBuilder(); + String str = ""; + while ((str = reader.readLine()) != null) { + builder.append(str).append("\n"); + } + return builder.toString(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + +} + diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index b34f7ae..271264a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -76,7 +76,7 @@ spring: druid: # 主库数据源 master: - url: jdbc:mysql://ip:3306/dataCompare?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 + url: jdbc:mysql://127.0.0.1:3306/dataCompare?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 username: root password: 123456 # 从库数据源 @@ -215,4 +215,15 @@ gen: # 自动去除表前缀,默认是true autoRemovePre: true # 表前缀(生成类名不会包含表前缀,多个用逗号分隔) - tablePrefix: sys_ \ No newline at end of file + tablePrefix: sys_ + +# sqoop shell文件在hdfs路径 +# hdfs:8020端口所在地址 +hdfs: + sqoop: + file: /user/hive/ods_shell/ods_test/test.sh + url: hdfs://test-namenode-server:8020 + + + + diff --git a/src/main/resources/templates/system/dataAccess/odsTableManage.html b/src/main/resources/templates/system/dataAccess/odsTableManage.html new file mode 100644 index 0000000..5d0170c --- /dev/null +++ b/src/main/resources/templates/system/dataAccess/odsTableManage.html @@ -0,0 +1,65 @@ + + + + + + +
+
+
+
+
+ +
+
+
+ +
+
+
+
+
+ + + + \ No newline at end of file