Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ job配置

![image](https://user-images.githubusercontent.com/28300167/229501053-4e33b6fb-851a-4fb2-9b19-16308359f57e.png)

(3)数据接入

ods表管理:通过解析shell文件中的库表,进行抽数表的管理

配置项1:shell文件所在路径

配置项2:hdfs namenode所在节点url
![img_5.png](img_5.png)
![image-20230628143018827](https://image-1312585102.cos.ap-nanjing.myqcloud.com/typora/image-20230628143018827.png)
#### 系统运行
系统运行环境要求:

Expand Down
Binary file added img_5.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.vince.xq.project.system.dataAccess.controller;

import com.vince.xq.common.utils.StringUtils;
import com.vince.xq.framework.web.controller.BaseController;
import com.vince.xq.framework.web.page.TableDataInfo;
import com.vince.xq.project.system.dataAccess.domain.BigDataTableQuery;
import com.vince.xq.project.tool.gen.util.HdfsUtils;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

/**
* ods表管理
*/
@Controller
@RequestMapping("/system/dataAccess")
public class OdsTableManageController extends BaseController {
private String prefix = "system/dataAccess";
@Value("${hdfs.sqoop.file}")
private String sqoopFile;
@Value("${hdfs.url}")
private String HDFS_PATH;

@GetMapping("")
public String bigDataTablePage() { return prefix + "/odsTableManage"; }
@RequestMapping("/list")
@RequiresPermissions("system:odsTableManage:view")
@ResponseBody
public TableDataInfo bigDataTableQuery(@RequestParam String table) throws Exception{
startPage();
List<BigDataTableQuery> list=new ArrayList<>();
List<BigDataTableQuery> result=new ArrayList<>();
String[] split = sqoopFile.replaceAll("\n","").split(",");
HdfsUtils hdfsUtils=new HdfsUtils();
hdfsUtils.setHDFS_PATH(HDFS_PATH);
hdfsUtils.initialHdfsSession();
for(String str:split){
String file = hdfsUtils.text(str, "utf8").replaceAll("\n","");
String[] split1 = file.split(";");
for(String cmd:split1){
while (true){
if(cmd.contains(" ")){
cmd=cmd.replaceAll(" "," ");
}else{
break;
}
}
try {
BigDataTableQuery bigDataTableQuery=new BigDataTableQuery();
bigDataTableQuery.setThread(cmd.substring(cmd.length()-1,cmd.length()));
bigDataTableQuery.setDb(cmd.split("--hive-database ")[1].split(" --fields-terminated-by")[0].split(" ")[0]);
bigDataTableQuery.setTable(cmd.split("--hive-database ")[1].split(" --fields-terminated-by")[0].split(" ")[2]);
bigDataTableQuery.setFile(new File(str).getName());
list.add(bigDataTableQuery);
}catch (Exception e){
e.printStackTrace();
System.out.println(cmd);
}
}
}
//表名查找返回
for (BigDataTableQuery tableQuery:list) {
if (tableQuery.getTable().equals(table)){
result.add(tableQuery);
return getDataTable(result);
}
}
return getDataTable(list);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.vince.xq.project.system.dataAccess.domain;

import lombok.Data;

/**
* ods表查询bean
*/
@Data
public class BigDataTableQuery {

private String file;

private String db;

private String table;

private String thread;

}
209 changes: 209 additions & 0 deletions src/main/java/com/vince/xq/project/tool/gen/util/HdfsUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package com.vince.xq.project.tool.gen.util;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.springframework.stereotype.Component;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;

/**
* HDFS工具类
*/
public class HdfsUtils {
private String HDFS_PATH;
private static final String HDFS_USER = "hdfs";
private FileSystem fileSystem;

public FileSystem getFileSystem() {
return fileSystem;
}

public void setFileSystem(FileSystem fileSystem) {
this.fileSystem = fileSystem;
}

//初始化连接hdfs
public boolean initialHdfsSession() {
try {
Configuration configuration = new Configuration();
configuration.set("dfs.replication", "1");
fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
return true;
}

public String getHDFS_PATH() {
return HDFS_PATH;
}

public void setHDFS_PATH(String HDFS_PATH) {
this.HDFS_PATH = HDFS_PATH;
}

/**
* 创建目录 支持递归创建
*
* @param path 路径地址
* @return 创建是否成功
*/
public static boolean mkdir(String path) throws Exception {
HdfsUtils hdfsUtils = new HdfsUtils();
hdfsUtils.initialHdfsSession();
return hdfsUtils.fileSystem.mkdirs(new Path(path));
}
/**
* 查看文件内容
*
* @param path 路径地址
* @return 返回文件内容字符串
*/
public String text(String path, String encode) throws Exception {
FSDataInputStream inputStream = fileSystem.open(new Path(path));
return inputStreamToString(inputStream, encode);
}
/**
* 创建文件并写入内容
*
* @param path 路径地址
* @param context 文件内容
*/
public void createAndWrite(String path, String context) throws Exception {
FSDataOutputStream out = fileSystem.create(new Path(path));
out.write(context.getBytes());
out.flush();
out.close();
}
/**
* 重命名文件名
*
* @param oldPath 旧文件路径
* @param newPath 新文件路径
* @return 重命名是否成功
*/
public boolean rename(String oldPath, String newPath) throws Exception {
return fileSystem.rename(new Path(oldPath), new Path(newPath));
}
/**
* 拷贝文件到HDFS
*
* @param localPath 本地文件路径
* @param hdfsPath 存储到hdfs上的路径
*/
public void copyFromLocalFile(String localPath, String hdfsPath) throws Exception {
fileSystem.copyFromLocalFile(new Path(localPath), new Path(hdfsPath));
}
/**
* 从HDFS下载文件
*
* @param hdfsPath 文件在hdfs上的路径
* @param localPath 存储到本地的路径
*/
public void copyToLocalFile(String hdfsPath, String localPath) throws Exception {
fileSystem.copyToLocalFile(new Path(hdfsPath), new Path(localPath));
}


/**
* 查询给定路径中文件/目录的状态
*
* @param path 目录路径
* @return 文件信息的数组
*/
public FileStatus[] listFiles(String path) throws Exception {
return fileSystem.listStatus(new Path(path));
}

/**
* 判断目录是否存在
*/
public boolean getDirectoryIsExist(String path){
try {
if (this.initialHdfsSession()){
return fileSystem.exists(new Path(path));
}
} catch (IOException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
}
return false;
}
/**
* 查询给定路径中文件的状态和块位置
*
* @param path 路径可以是目录路径也可以是文件路径
* @return 文件信息的数组
*/
public RemoteIterator<LocatedFileStatus> listFilesRecursive(String path, boolean recursive) throws Exception {
return fileSystem.listFiles(new Path(path), recursive);
}


/**
* 查看文件块信息
*
* @param path 文件路径
* @return 块信息数组
*/
public BlockLocation[] getFileBlockLocations(String path) throws Exception {
FileStatus fileStatus = fileSystem.getFileStatus(new Path(path));
return fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
}

/**
* 删除文件
*
* @param path 文件路径
* @return 删除是否成功
*/
public boolean delete(String path){
boolean flag =false;
try {
if (initialHdfsSession()){
flag = fileSystem.delete(new Path(path), true);
}
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}


/**
* 把输入流转换为指定字符
*
* @param inputStream 输入流
* @param encode 指定编码类型
*/
private static String inputStreamToString(InputStream inputStream, String encode) {
try {
if (encode == null || ("".equals(encode))) {
encode = "utf-8";
}
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encode));
StringBuilder builder = new StringBuilder();
String str = "";
while ((str = reader.readLine()) != null) {
builder.append(str).append("\n");
}
return builder.toString();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

}

15 changes: 13 additions & 2 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ spring:
druid:
# 主库数据源
master:
url: jdbc:mysql://ip:3306/dataCompare?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
url: jdbc:mysql://127.0.0.1:3306/dataCompare?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: 123456
# 从库数据源
Expand Down Expand Up @@ -215,4 +215,15 @@ gen:
# 自动去除表前缀,默认是true
autoRemovePre: true
# 表前缀(生成类名不会包含表前缀,多个用逗号分隔)
tablePrefix: sys_
tablePrefix: sys_

# sqoop shell文件在hdfs路径
# hdfs:8020端口所在地址
hdfs:
sqoop:
file: /user/hive/ods_shell/ods_test/test.sh
url: hdfs://test-namenode-server:8020




Loading