Commit a6f92fb6 authored by mcb's avatar mcb

任务 立即运行

parent a81458f4
......@@ -242,6 +242,11 @@
<artifactId>kudu-client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
<build>
......
package com.jz.agent.client;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
public interface DmpDsAgentClient {
//连接
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds);
//获取数据库表
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds);
//获取表字段名
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String tableName);
//获取表数据预览
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String tableName);
}
package com.jz.agent.client;
import com.jz.agent.client.service.*;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
public class DmpDsAgentClientBuilder {
public static DmpDsAgentClient build(DmpAgentDatasourceInfo ds) {
switch (ds.getDatasourceType()) {
case "MYSQL":
return new DmpMysqlAgentClientService();
case "SQLSERVER":
return new DmpSqlServerAgentClientService();
case "POSTGRESQL":
return new DmpPostgreSqlAgentClientService();
case "ORACLE":
return new DmpOracleAgentClientService();
case "DM":
return null;
case "DB2":
return new DmpDB2AgentClientService();
case "HIVE":
return new DmpHiveAgentClientService();
case "IMPALA":
return null;
case "KUDU":
return new DmpKuduAgentClientService();
// return SpringUtils.getBean(DmpKuduAgentClientService.class);
case "HDFS":
return new DmpHdfsAgentClientService();
case "FTP":
return new DmpFtpAgentClientService();
case "ELASTICSEARCH":
return new DmpElasticsearchAgentClientService();
case "MONGODB":
return null;
case "MEMCACHE":
return null;
case "REDIS":
return null;
case "HBASE":
return null;
case "LOGHUB":
return null;
case "KAFKA":
return null;
case "ROCKETMQ":
return null;
case "DEFAULT":
return new DmpHiveAgentClientService();
default:
break;
}
return null;
}
}
package com.jz.agent.client.service;
import com.google.gson.Gson;
import com.jz.agent.client.DmpDsAgentClient;
import com.jz.agent.utils.JDBCUtils;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.agent.ResultCode;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DmpDB2AgentClientService implements DmpDsAgentClient {
private Gson gson = new Gson();
@SuppressWarnings("unused")
private static Logger logger = LoggerFactory.getLogger(DmpDB2AgentClientService.class);
@SuppressWarnings("unused")
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds) {
Connection conn = null;
try {
Class.forName(ds.getDriverClassName());
conn = DriverManager.getConnection(
ds.getJdbcUrl(),
ds.getUserName(),
ds.getPassword()
);
conn.close();
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(true));
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "DB2连通性异常:" + e.getMessage());
} finally {
try {
if (conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds) {
List result = new ArrayList();
String sql = "select * from information_schema.tables where table_schema = '" + this.getTargetDBName(ds) + "'";
Connection conn = null;
ResultSet rs;
PreparedStatement ps;
try {
conn = JDBCUtils.getConnections(ds.getDriverClassName(), ds.getJdbcUrl(), ds.getUserName(), ds.getPassword());
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
result.add(rs.getString("TABLE_NAME").toLowerCase());
}
} catch (SQLException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "DB2获取表列表异常:" + e.getMessage());
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "DB2获取表列表异常:" + e.getMessage());
} finally {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(result));
}
private String getTargetDBName(DmpAgentDatasourceInfo ds) {
String url = ds.getJdbcUrl();
if (url != null && !"".equals(url.trim())) {
String[] arr1 = url.split("/");
String ls = arr1[arr1.length - 1];
String[] arr2 = ls.split("\\?");
return arr2[0];
} else {
return "";
}
}
@SuppressWarnings({"rawtypes", "unchecked", "unused"})
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String tableName) {
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
String sql = "select * from information_schema.columns where table_name = '" + tableName + "'";
Connection conn = null;
ResultSet rs;
PreparedStatement ps;
try {
conn = JDBCUtils.getConnections(ds.getDriverClassName(), ds.getJdbcUrl(), ds.getUserName(), ds.getPassword());
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
Map map = new HashMap();
String colName = rs.getString("COLUMN_NAME");
String dbType = rs.getString("DATA_TYPE");
String remarks = rs.getString("COLUMN_COMMENT");
Integer size = 0;
Integer characterLength = (int) rs.getDouble("CHARACTER_MAXIMUM_LENGTH");
Integer columnSize = (int) rs.getDouble("NUMERIC_PRECISION");
Integer dicimalDigits = (int) rs.getDouble("NUMERIC_SCALE");
if (characterLength != null) {
size = characterLength;
} else {
size = columnSize;
}
map.put("name", colName.toLowerCase());
map.put("type", dbType.toUpperCase());
map.put("size", size);
map.put("scale", dicimalDigits);
map.put("comment", remarks);
list.add(map);
}
} catch (SQLException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "DB2获取表字段信息异常:" + e.getMessage());
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "DB2获取表字段信息异常:" + e.getMessage());
} finally {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String tableName) {
Map<String, Object> map = new HashMap<String, Object>();
Connection conn = null;
ResultSet rs;
PreparedStatement ps;
List<String> headerList = new ArrayList<String>();
try {
conn = JDBCUtils.getConnections(ds.getDriverClassName(), ds.getJdbcUrl(), ds.getUserName(), ds.getPassword());
String sql = "select * from " + tableName + " limit 5";
String sql2 = "select * from information_schema.columns where table_name='" + tableName + "'";
ps = conn.prepareStatement(sql2);
rs = ps.executeQuery();
while (rs.next()) {
String colName = rs.getString("COLUMN_NAME");
headerList.add(colName);
}
map.put("header", headerList);
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
List<List<Object>> resultList = new ArrayList<List<Object>>();
while (rs.next()) {
List<Object> row = new ArrayList<Object>();
for (int i = 1; i <= headerList.size(); i++) {
row.add(rs.getObject(i));
}
resultList.add(row);
}
map.put("result", resultList);
} catch (SQLException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "DB2数据预览异常:" + e.getMessage());
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "DB2数据预览异常:" + e.getMessage());
} finally {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(map));
}
}
package com.jz.agent.client.service;
import com.google.gson.Gson;
import com.jz.agent.client.DmpDsAgentClient;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.agent.ResultCode;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DmpElasticsearchAgentClientService implements DmpDsAgentClient {
private Gson gson = new Gson();
@SuppressWarnings("unused")
private static Logger logger = LoggerFactory.getLogger(DmpElasticsearchAgentClientService.class);
@SuppressWarnings("unused")
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds) {
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(false));
}
@SuppressWarnings({"unchecked", "rawtypes"})
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds) {
List<String> list = new ArrayList<String>();
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
@SuppressWarnings({"rawtypes", "unchecked", "unused"})
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String tableName) {
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String tableName) {
Map<String, Object> map = new HashMap<String, Object>();
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(map));
}
}
package com.jz.agent.client.service;
import com.google.gson.Gson;
import com.jcraft.jsch.*;
import com.jz.agent.client.DmpDsAgentClient;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.agent.ResultCode;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
public class DmpFtpAgentClientService implements DmpDsAgentClient {
private Gson gson = new Gson();
private static Logger logger = LoggerFactory.getLogger(DmpFtpAgentClientService.class);
@SuppressWarnings("unused")
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds) {
if ("ftp".equals(ds.getProtocol())) {
return testFtpConnect(ds);
} else {
return testSftpConnect(ds);
}
}
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds) {
List<String> list = new ArrayList<String>();
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
@SuppressWarnings("unchecked")
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String filePath) {
Object o = this.getColumnAndResult(ds, filePath);
if(o instanceof Map) {
Map<String, Object> m = (Map<String, Object>) o;
List<Map<String, Object>> l = (List<Map<String, Object>>) m.get("column");
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(l));
} else {
return new DmpAgentResult(ResultCode.EXCEPTION, gson.toJson(o));
}
}
@SuppressWarnings({ "unused"})
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String filePath) {
Object o = this.getColumnAndResult(ds, filePath);
if(o instanceof Map) {
Map<String, Object> m = (Map<String, Object>) o;
m.remove("column");
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(m));
} else {
return new DmpAgentResult(ResultCode.EXCEPTION, gson.toJson(o));
}
}
@SuppressWarnings({ "unused", "unchecked" })
private Object getColumnAndResult(DmpAgentDatasourceInfo ds, String filePath) {
Map<String, Object> map = new HashMap<String, Object>();
String remotePath = "";
String fileName = "";
if(StringUtils.isNotBlank(filePath)) {
try {
remotePath = filePath.substring(0, filePath.lastIndexOf("/"));
fileName = filePath.substring(filePath.lastIndexOf("/")+1, filePath.length());
} catch(Exception e) {
return new DmpAgentResult(ResultCode.PARAMS_ERROR, "参数错误:filePath="+filePath);
}
} else {
return new DmpAgentResult(ResultCode.PARAMS_ERROR, "参数错误:filePath="+filePath);
}
String splitSymbol = ds.getDelimiter();
String fileType = ds.getFileType();
int columnNum = 0;
InputStream is = null;
if ("ftp".equals(ds.getProtocol())) {
boolean result = false;
FTPClient ftp = new FTPClient();
try {
int reply;
ftp.connect(ds.getHost(), Integer.valueOf(ds.getPort()).intValue());
ftp.login(ds.getUserName(), ds.getPassword());
reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftp.disconnect();
return new DmpAgentResult(ResultCode.INTERNAL_SERVER_ERROR, "FTP不能连接");
}
ftp.changeWorkingDirectory(remotePath);
FTPFile[] fs = ftp.listFiles();
for (FTPFile ff : fs) {
if (ff.getName().equals(fileName)) {
//File localFile = new File(localPath + "\\" + ff.getName());
/*OutputStream os = new FileOutputStream(localFile);
ftp.retrieveFile(ff.getName(), os);*/
is = ftp.retrieveFileStream(filePath);
StringBuilder sb = new StringBuilder();
int rowCnt = 0;
int rowNum = 6;
byte[] bs = new byte[1024*50];
int len = -1;
List<String> header = new ArrayList<String>();
List<String> type = new ArrayList<String>();
while((len = is.read(bs)) != -1){
sb.append(new String(bs,0,len));
/*Pattern p = Pattern.compile("\\s*|\t|\r\n|\r|\n");
Matcher m = p.matcher(sb.toString());
while(m.find()) {
System.out.println(m.group());
}*/
//System.out.println(m.groupCount());
//char[] c = System.getProperty("line.separator").toCharArray();
//System.out.println(System.getProperty("line.separator").toCharArray());
String s = sb.toString();
List<Object[]> list = new ArrayList<Object[]>();
boolean flag = true;
String enterLine = "\n";
if(s.indexOf("\r\n") > -1) {
enterLine = "\r\n";
} else if(s.indexOf("\r") > -1) {
enterLine = "\r";
} else if(s.indexOf("\n") > -1) {
enterLine = "\n";
} else {
flag = false;
}
while(rowCnt < rowNum && flag) {
int i1 = s.indexOf(enterLine);
String tmp = s.substring(0, i1);
if("json".equalsIgnoreCase(fileType)) {
Map<String, Object> m = gson.fromJson(tmp, Map.class);
if(rowCnt == 0) {
for(Map.Entry<String, Object> me : m.entrySet()) {
header.add(me.getKey());
type.add(this.getObjectType(me.getValue()));
columnNum++;
}
}
Object[] row = new Object[columnNum];
int i = 0;
for(String rn : header) {
row[i] = m.get(rn);
i++;
}
list.add(row);
} else {
if(StringUtils.isBlank(splitSymbol)) {
if(tmp.indexOf(",") > -1) {
splitSymbol = ",";
} else if(tmp.indexOf("\t") > -1) {
splitSymbol = "\t";
} else if(tmp.indexOf(" ") > -1) {
splitSymbol = " ";
}
}
String[] tArr = tmp.split(splitSymbol);
list.add(tArr);
columnNum = tArr.length;
}
s = s.substring(i1+2, s.length());
rowCnt++;
}
if(rowCnt == rowNum) {
map.put("result", list);
break;
} else {
rowCnt = 0;
rowNum = 6;
}
}
//os.close();
break;
}
}
ftp.logout();
} catch (IOException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "FTP读取信息异常:"+e.getMessage());
} finally {
if (ftp.isConnected()) {
try {
ftp.disconnect();
} catch (IOException ioe) {
}
}
if(is != null)
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} else {
String host = ds.getHost();
int port = Integer.valueOf(ds.getPort()).intValue();
String username = ds.getUserName();
String password = ds.getPassword();
ChannelSftp sftp = null;
Session session = null;
try {
JSch jsch = new JSch();
session = jsch.getSession(username, host, port);
if (password != null) {
session.setPassword(password);
}
Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
session.connect();
Channel channel = session.openChannel("sftp");
channel.connect();
sftp = (ChannelSftp) channel;
if (remotePath != null && !"".equals(remotePath)) {
sftp.cd(remotePath);
}
is = sftp.get(fileName);
StringBuilder sb = new StringBuilder();
int rowCnt = 0;
int rowNum = 6;
byte[] bs = new byte[1024*50];
int len = -1;
List<String> header = new ArrayList<String>();
List<String> type = new ArrayList<String>();
while((len = is.read(bs)) != -1){
sb.append(new String(bs,"UTF-8"));
String s = sb.toString();
s = s.replaceAll("\\ufeff", "");
List<Object[]> list = new ArrayList<Object[]>();
boolean flag = true;
String enterLine = "\n";
if(s.indexOf("\r\n") > -1) {
enterLine = "\r\n";
} else if(s.indexOf("\r") > -1) {
enterLine = "\r";
} else if(s.indexOf("\n") > -1) {
enterLine = "\n";
} else {
flag = false;
}
while(rowCnt < rowNum && flag) {
int i1 = s.indexOf(enterLine);
String tmp = s.substring(0, i1);
if("json".equalsIgnoreCase(fileType)) {
Map<String, Object> m = gson.fromJson(tmp, Map.class);
if(rowCnt == 0) {
for(Map.Entry<String, Object> me : m.entrySet()) {
header.add(me.getKey());
type.add(this.getObjectType(me.getValue()));
columnNum++;
}
}
Object[] row = new Object[columnNum];
int i = 0;
for(String rn : header) {
row[i] = m.get(rn);
i++;
}
list.add(row);
} else {
if(StringUtils.isBlank(splitSymbol)) {
if(tmp.indexOf(",") > -1) {
splitSymbol = ",";
} else if(tmp.indexOf("\t") > -1) {
splitSymbol = "\t";
} else if(tmp.indexOf(" ") > -1) {
splitSymbol = " ";
}
}
String[] tArr = tmp.split(splitSymbol);
list.add(tArr);
columnNum = tArr.length;
}
s = s.substring(i1+2, s.length());
rowCnt++;
}
if(rowCnt == rowNum) {
map.put("result", list);
break;
} else {
rowCnt = 0;
rowNum = 3;
}
}
List<Map<String, Object>> l2 = new ArrayList<Map<String, Object>>();
List<String> l3 = new ArrayList<String>();
if("json".equalsIgnoreCase(fileType)) {
for(int i = 0; i < header.size(); i++) {
Map<String, Object> m = new HashMap<String, Object>();
m.put("key", i);
m.put("name", header.get(i));
m.put("type", type.get(i));
l2.add(m);
l3.add(header.get(i).toUpperCase());
}
} else {
if("1".equals(ds.getIsHaveHeader())){
List<String[]> l = (List<String[]>) map.get("result");
String[] arr = l.get(0);
for(int i = 0; i < columnNum; i++) {
Map<String, Object> m = new HashMap<String, Object>();
m.put("key", i);
m.put("name", arr[i]);
m.put("type", "STRING");
l2.add(m);
l3.add(arr[i].toUpperCase());
}
map.put("result", l.subList(1, l.size()));
} else {
for(int i = 0; i < columnNum; i++) {
Map<String, Object> m = new HashMap<String, Object>();
m.put("key", i);
m.put("name", "第"+i+"列");
m.put("type", "STRING");
l2.add(m);
l3.add("第"+i+"列");
}
}
}
map.put("header", l3);
map.put("column", l2);
} catch (JSchException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "SFTP读取信息异常:"+e.getMessage());
} catch (SftpException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "SFTP读取信息异常:"+e.getMessage());
} catch (IOException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "SFTP读取信息异常:"+e.getMessage());
} finally {
if (sftp != null) {
if (sftp.isConnected()) {
sftp.disconnect();
}
}
if (session != null) {
if (session.isConnected()) {
session.disconnect();
}
}
}
}
return map;
}
private String getObjectType(Object obj) {
String t = null;
if(obj instanceof Long) {
t = "LONG";
} else if(obj instanceof Double) {
t = "DOUBLE";
} else if(obj instanceof Boolean) {
t = "BOOLEAN";
} else if(obj instanceof Date) {
t = "DATE";
} else {
t = "STRING";
}
return t;
}
private DmpAgentResult testFtpConnect(DmpAgentDatasourceInfo ds) {
String host = ds.getHost();
int port = Integer.valueOf(ds.getPort()).intValue();
String username = ds.getUserName();
String password = ds.getPassword();
boolean result = false;
FTPClient ftp = new FTPClient();
try {
int reply;
ftp.connect(host, port);
ftp.login(username, password);
reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
result = false;
} else {
result = true;
}
ftp.disconnect();
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(result));
} catch (IOException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.SUCCESS, "FTP连通性测试异常:"+e.getMessage());
} finally {
if (ftp.isConnected()) {
try {
ftp.disconnect();
} catch (IOException ioe) {
}
}
}
}
private DmpAgentResult testSftpConnect(DmpAgentDatasourceInfo ds) {
ChannelSftp sftp = null;
Session session = null;
String host = ds.getHost();
int port = Integer.valueOf(ds.getPort()).intValue();
String username = ds.getUserName();
String password = ds.getPassword();
try {
JSch jsch = new JSch();
session = jsch.getSession(username, host, port);
if (password != null) {
session.setPassword(password);
}
Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
session.connect();
Channel channel = session.openChannel("sftp");
channel.connect();
sftp = (ChannelSftp) channel;
return new DmpAgentResult(ResultCode.SUCCESS, "true");
} catch (JSchException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.SUCCESS, "SFTP连通性测试异常:"+e.getMessage());
} finally {
if (sftp != null) {
if (sftp.isConnected()) {
sftp.disconnect();
}
}
if (session != null) {
if (session.isConnected()) {
session.disconnect();
}
}
}
}
@SuppressWarnings("unused")
private void unzip() {
}
}
package com.jz.agent.client.service;
import com.google.gson.Gson;
import com.jz.agent.client.DmpDsAgentClient;
import com.jz.agent.config.DmpDsAgentConstants;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.agent.ResultCode;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
// HDFS没有提供界面操作接口
public class DmpHdfsAgentClientService implements DmpDsAgentClient {
private Gson gson = new Gson();
private static Logger logger = LoggerFactory.getLogger(DmpHdfsAgentClientService.class);
@SuppressWarnings("unused")
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds) {
FileSystem fileSystem = null;
try {
/*String defaultFs = ds.getDefaultFs();
if(StringUtils.isBlank(defaultFs) && StringUtils.isNotBlank(ds.getJdbcUrl())) {
defaultFs = ds.getJdbcUrl();
}*/
//Configuration configuration = initConfiguration(this.getHadoopConfFilePath(defaultFs));
//if(DmpDsAgentConstants.IS_ENABLED_KERBEROS) JDBCUtils.initKerberosENV4HDFS(configuration);
Configuration configuration = setHdfsConfiguration(ds);
if(configuration == null) {
return new DmpAgentResult(ResultCode.PARAMS_ERROR, "HDFS参数有误");
}
fileSystem = FileSystem.get(configuration);
fileSystem.getUsed();
return new DmpAgentResult(ResultCode.SUCCESS, "true");
} catch (IOException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "测试连通性出错:"+e.getMessage());
} finally {
if(fileSystem != null)
try {
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private Configuration setHdfsConfiguration(DmpAgentDatasourceInfo ds) {
logger.info("DmpAgentDatasourceInfo: "+gson.toJson(ds));
Configuration conf = new Configuration();
//conf.set("hadoop.security.authentication",ds.getHdfsUserName());
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
String defaultFs = ds.getDefaultFs();
if(StringUtils.isBlank(defaultFs) && StringUtils.isNotBlank(ds.getJdbcUrl())) {
defaultFs = ds.getJdbcUrl();
}
conf.set("fs.defaultFS", defaultFs);
logger.info("fs.defaultFS: "+defaultFs);
if(ds.getKerberosIsenable() != null && "true".equals(ds.getKerberosIsenable())) {
conf.set("hadoop.security.authentication","kerberos");
System.setProperty("java.security.krb5.conf",ds.getKerberosKrb5Conf());
//System.setProperty("java.security.auth.login.config",ds.getKerberosJaasConf());
logger.info("hadoop.security.authentication: "+"kerberos");
logger.info("java.security.krb5.conf: "+ds.getKerberosKrb5Conf());
}
UserGroupInformation.setConfiguration(conf);
logger.info("kerberos.is.enabled: "+ds.getKerberosIsenable());
if(ds.getKerberosIsenable() != null && "true".equals(ds.getKerberosIsenable())) {
try {
logger.info("hdfs.user.name: "+ds.getHdfsUserName());
logger.info("hdfs.auth.path: "+ds.getHdfsAuthPath());
UserGroupInformation.loginUserFromKeytab(ds.getHdfsUserName(), ds.getHdfsAuthPath());
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
return conf;
}
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds) {
List<String> list = new ArrayList<String>();
FileSystem fileSystem = null;
try {
String defaultFs = ds.getDefaultFs();
if(StringUtils.isBlank(defaultFs) && StringUtils.isNotBlank(ds.getJdbcUrl())) {
defaultFs = ds.getJdbcUrl();
}
//Configuration configuration = initConfiguration(this.getHadoopConfFilePath(defaultFs));
//if(DmpDsAgentConstants.IS_ENABLED_KERBEROS) JDBCUtils.initKerberosENV4HDFS(configuration);
Configuration configuration = setHdfsConfiguration(ds);
if(configuration == null) {
return new DmpAgentResult(ResultCode.PARAMS_ERROR, "HDFS参数有误");
}
fileSystem = FileSystem.get(configuration);
String filePath = ds.getTargetFileName();
Path srcPath = new Path(filePath);
if(fileSystem.exists(srcPath) && fileSystem.isDirectory(srcPath)) {
RemoteIterator<LocatedFileStatus> ri = fileSystem.listFiles(srcPath, true);
while(ri.hasNext()) {
LocatedFileStatus lfs = ri.next();
System.out.println(lfs.getPath().toString());
list.add(lfs.getPath().toString());
}
}
} catch (IOException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "HDFS获取列表出错:"+e.getMessage());
} finally {
if(fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
@SuppressWarnings("unchecked")
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String targetName) {
Object o = this.getColumnAndResult(ds, targetName);
//List<Map<String, Object>> retMap = (List<Map<String, Object>>) map.get("column");
//return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(retMap));
if(o instanceof DmpAgentResult) {
return new DmpAgentResult(ResultCode.EXCEPTION, gson.toJson(o));
} else {
Map<String, Object> map = (Map<String, Object>) o;
List<Map<String, Object>> retMap = (List<Map<String, Object>>) map.get("column");
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(retMap));
}
}
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String targetName) {
//Map<String, Object> m = this.getColumnAndResult(ds, targetName);
//m.remove("column");
//return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(m));
Object o = this.getColumnAndResult(ds, targetName);
if(o instanceof DmpAgentResult) {
return new DmpAgentResult(ResultCode.EXCEPTION, gson.toJson(o));
} else {
Map<String, Object> map = (Map<String, Object>) o;
map.remove("column");
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(map));
}
}
@SuppressWarnings("unchecked")
private Map<String, Object> getColumnAndResult(DmpAgentDatasourceInfo ds, String filePath) {
Map<String, Object> map = new HashMap<String, Object>();
InputStream is = null;
FileSystem fileSystem = null;
Path srcPath = new Path(filePath);
String splitSymbol = ds.getDelimiter();
String fileType = ds.getFileType();
int columnNum = 0;
try {
String defaultFs = ds.getDefaultFs();
if(StringUtils.isBlank(defaultFs) && StringUtils.isNotBlank(ds.getJdbcUrl())) {
defaultFs = ds.getJdbcUrl();
}
//Configuration configuration = initConfiguration(this.getHadoopConfFilePath(defaultFs));
//if(DmpDsAgentConstants.IS_ENABLED_KERBEROS) JDBCUtils.initKerberosENV4HDFS(configuration);
Configuration configuration = setHdfsConfiguration(ds);
if(configuration == null) {
return map;
}
fileSystem = FileSystem.get(configuration);
is = fileSystem.open(srcPath);
//IOUtils.copyBytes(is, System.out, 4096, false);
StringBuilder sb = new StringBuilder();
int rowCnt = 0;
int rowNum = 6;
byte[] bs = new byte[1024*50];
int len = -1;
List<String> header = new ArrayList<String>();
List<String> type = new ArrayList<String>();
while((len = is.read(bs)) != -1){
sb.append(new String(bs,0,len));
String s = sb.toString();
List<Object[]> list = new ArrayList<Object[]>();
boolean flag = true;
String enterLine = "\n";
if(s.indexOf("\r\n") > -1) {
enterLine = "\r\n";
} else if(s.indexOf("\r") > -1) {
enterLine = "\r";
} else if(s.indexOf("\n") > -1) {
enterLine = "\n";
} else {
flag = false;
}
while(rowCnt < rowNum && flag) {
int i1 = s.indexOf(enterLine);
String tmp = s.substring(0, i1);
if("json".equalsIgnoreCase(fileType)) {
Map<String, Object> m = gson.fromJson(tmp, Map.class);
if(rowCnt == 0) {
for(Map.Entry<String, Object> me : m.entrySet()) {
header.add(me.getKey());
type.add(this.getObjectType(me.getValue()));
columnNum++;
}
}
Object[] row = new Object[columnNum];
int i = 0;
for(String rn : header) {
row[i] = m.get(rn);
i++;
}
list.add(row);
} else {
if(StringUtils.isBlank(splitSymbol)) {
if(tmp.indexOf(",") > -1) {
splitSymbol = ",";
} else if(tmp.indexOf("\t") > -1) {
splitSymbol = "\t";
} else if(tmp.indexOf(" ") > -1) {
splitSymbol = " ";
}
}
String[] tArr = tmp.split(splitSymbol);
list.add(tArr);
columnNum = tArr.length;
}
s = s.substring(i1+2, s.length());
rowCnt++;
}
if(rowCnt == rowNum) {
map.put("result", list);
break;
} else {
rowCnt = 0;
rowNum = 6;
}
}
List<Map<String, Object>> l2 = new ArrayList<Map<String, Object>>();
List<String> l3 = new ArrayList<String>();
if("json".equalsIgnoreCase(fileType)) {
for(int i = 0; i < header.size(); i++) {
Map<String, Object> m = new HashMap<String, Object>();
m.put("key", i);
m.put("name", header.get(i));
m.put("type", type.get(i));
l2.add(m);
l3.add(header.get(i).toUpperCase());
}
} else {
if("1".equals(ds.getIsHaveHeader())){
List<String[]> l = (List<String[]>) map.get("result");
String[] arr = l.get(0);
for(int i = 0; i < columnNum; i++) {
Map<String, Object> m = new HashMap<String, Object>();
m.put("key", i);
m.put("name", arr[i]);
m.put("type", "STRING");
l2.add(m);
l3.add(arr[i].toUpperCase());
}
map.put("result", l.subList(1, l.size()));
} else {
for(int i = 0; i < columnNum; i++) {
Map<String, Object> m = new HashMap<String, Object>();
m.put("key", i);
m.put("name", "第"+i+"列");
m.put("type", "STRING");
l2.add(m);
l3.add("第"+i+"列");
}
}
}
map.put("header", l3);
map.put("column", l2);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return map;
}
private String getObjectType(Object obj) {
String t = null;
if(obj instanceof Long) {
t = "LONG";
} else if(obj instanceof Double) {
t = "DOUBLE";
} else if(obj instanceof Boolean) {
t = "BOOLEAN";
} else if(obj instanceof Date) {
t = "DATE";
} else {
t = "STRING";
}
return t;
}
public String getHadoopConfFilePath(String nameNodeAddr) {
String defaultFs = nameNodeAddr;
defaultFs = defaultFs.replace("hdfs://", "");
int idx = defaultFs.indexOf(":");
String hadoopConfIp = defaultFs.substring(0, idx);
return DmpDsAgentConstants.HADOOP_CONF_FILE_PATH + hadoopConfIp + "/hadoop-conf";
}
/**
* 将该任务对应的表数据转换为xml文件 并上传到hdfs指定路径(/jz/etlprocesses/guide_xml/目标库名/)
* Xml名称:目标库名_目标表名(等同于xml文件中name标签的值)
*/
public DmpAgentResult submitSycingConfig(DmpAgentDatasourceInfo ds, String xml) {
FileSystem fileSystem = null;
try {
//Configuration configuration = initConfiguration(DmpDsAgentConstants.HADOOP_DEFAULT_CONF_FILE_PATH);
//if(DmpDsAgentConstants.IS_ENABLED_KERBEROS) JDBCUtils.initKerberosENV4HDFS(configuration);
Configuration configuration = setHdfsConfiguration(ds);
if(configuration == null) {
return new DmpAgentResult(ResultCode.PARAMS_ERROR, "HDFS参数有误");
}
fileSystem = FileSystem.get(configuration);
/*
mkdir(fileSystem, DmpDsAgentConstants.SUBMIT_SYNCING_CONFIG_HDFS_PATH
+ ds.getTargetDbName() + "/");
createFile(fileSystem, DmpDsAgentConstants.SUBMIT_SYNCING_CONFIG_HDFS_PATH
+ ds.getTargetDbName() + "/" + ds.getTargetFileName(), xml);
*/
String hasSeparate = ds.getHdfsSyncingPath().substring(ds.getHdfsSyncingPath().length() - 1);
String pathSup = hasSeparate.equals("/") ? "" : "/";
String mkdirPath = ds.getHdfsSyncingPath() + pathSup + ds.getProjectId() + "/" + ds.getTargetDbName() + "/";
logger.info("mkdir: " + mkdirPath);
mkdir(fileSystem, mkdirPath);
logger.info("createFile : " + mkdirPath + ds.getTargetFileName());
logger.info("file content : " + xml);
createFile(fileSystem, mkdirPath + ds.getTargetFileName(), xml);
return new DmpAgentResult(ResultCode.SUCCESS, "true");
} catch (IOException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "上传出错:"+e.getMessage());
} finally {
if(fileSystem != null)
try {
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static Configuration initConfiguration(String confPath) {
Configuration configuration = new Configuration();
System.out.println(confPath + File.separator + "core-site.xml");
configuration.addResource(new Path(confPath + File.separator + "core-site.xml"));
configuration.addResource(new Path(confPath + File.separator + "hdfs-site.xml"));
configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
System.setProperty("HADOOP_USER_NAME", DmpDsAgentConstants.AGENT_HADOOP_USER_NAME);
return configuration;
}
public static void mkdir(FileSystem fs, String path) {
try {
Path srcPath = new Path(path);
if (fs.exists(srcPath)) {
System.out.println("目录已存在");
return;
}
boolean isok = fs.mkdirs(srcPath);
if (isok) {
System.out.println("create dir ok!");
} else {
System.out.println("create dir failure");
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void uploadFile(FileSystem fs, String src, String dst) {
try {
Path srcPath = new Path(src);
Path dstPath = new Path(dst);
fs.copyFromLocalFile(false, srcPath, dstPath);
FileStatus[] fileStatus = fs.listStatus(dstPath);
for (FileStatus file : fileStatus) {
System.out.println(file.getPath());
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void createFile(FileSystem fs, String dst, String contents) {
try {
Path path = new Path(dst);
FSDataOutputStream fsDataOutputStream = fs.create(path);
fsDataOutputStream.write(contents.getBytes());
fsDataOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void readFile(FileSystem fs, String filePath) {
try {
Path srcPath = new Path(filePath);
InputStream in = null;
in = fs.open(srcPath);
IOUtils.copyBytes(in, System.out, 4096, false); //复制到标准输出流
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.jz.agent.client.service;
import com.google.gson.Gson;
import com.jz.agent.client.DmpDsAgentClient;
import com.jz.agent.utils.JDBCUtils;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.agent.ResultCode;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class DmpHiveAgentClientService implements DmpDsAgentClient {
private Gson gson = new Gson();
private static Logger logger = LoggerFactory.getLogger(DmpHiveAgentClientService.class);
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds) {
Connection connection = null;
String jdbcUrl = ds.getJdbcUrl();
try {
jdbcUrl += securityConnect(ds);
Class.forName(ds.getDriverClassName());
connection = DriverManager.getConnection(jdbcUrl);
connection.close();
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(true));
} catch (Exception e) {
logger.error("HIVE连通性异常", e);
return new DmpAgentResult(ResultCode.EXCEPTION, "HIVE连通性异常:" + e.getMessage());
} finally {
JDBCUtils.disconnect(connection, null, null);
}
}
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds) {
List<String> list = new ArrayList();
Connection connection = null;
PreparedStatement ps = null;
ResultSet rs = null;
String jdbcUrl = ds.getJdbcUrl();
String sql = "show tables";
try {
jdbcUrl += securityConnect(ds);
Class.forName(ds.getDriverClassName());
connection = DriverManager.getConnection(jdbcUrl);
ps = connection.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
list.add(rs.getString("tab_name"));
}
} catch (Exception e) {
logger.error("HIVE获取表列表异常", e);
return new DmpAgentResult(ResultCode.EXCEPTION, "HIVE获取表列表异常:" + e.getMessage());
} finally {
JDBCUtils.disconnect(connection, rs, ps);
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String tableName) {
List<Map<String, Object>> list = new ArrayList();
Connection connection = null;
PreparedStatement ps = null;
ResultSet rs = null;
String jdbcUrl = ds.getJdbcUrl();
String sql = "DESCRIBE " + tableName;
try {
jdbcUrl += securityConnect(ds);
Class.forName(ds.getDriverClassName());
connection = DriverManager.getConnection(jdbcUrl);
ps = connection.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
Map<String, Object> m = new HashMap<String, Object>();
String col_name = rs.getString("col_name");
String data_type = rs.getString("data_type");
if(!"".equals(col_name) && data_type != null) {
m.put("name", rs.getString("col_name").toLowerCase());
m.put("type", rs.getString("data_type").toUpperCase());
m.put("comment", rs.getString("comment"));
list.add(m);
}
}
} catch (Exception e) {
logger.error("HIVE获取表属性异常", e);
return new DmpAgentResult(ResultCode.EXCEPTION, "HIVE获取表属性异常:" + e.getMessage());
} finally {
JDBCUtils.disconnect(connection, rs, ps);
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String tableName) {
Map<String, Object> map = new HashMap();
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(map));
}
/**
* 安全连接并返回jdbc连接url的安全请求后缀
*
* @param ds
* @return jdbc连接url的安全请求后缀
*/
private String securityConnect(DmpAgentDatasourceInfo ds) throws IOException {
String jdbcUrlSuffix = "";
// 与kudu大体无异,不同于安全校验
if ("true".equalsIgnoreCase(ds.getKerberosIsenable())) {
String principal = ds.getAccessId();
String hiveKeytabPath = ds.getAccessKey(); // D:/keytab/hive.keytab
String hiveUser = ds.getEndpoint(); // hive@CRNETTEST.COM
jdbcUrlSuffix = ";principal=" + principal;
System.setProperty("java.security.krb5.conf", ds.getKerberosKrb5Conf());
System.setProperty("java.security.auth.login.config", ds.getKerberosJaasConf());
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(hiveUser, hiveKeytabPath);
}
return jdbcUrlSuffix;
}
public static void main(String[] args) {
Connection connection = null;
ResultSet rs = null;
PreparedStatement ps = null;
String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
String CONNECT_URL = "jdbc:hive2://10.0.53.177:10000/tmp;principal=hive/jtjzvdra116@CRNETTEST.COM";
// String sql = "show tables";
String sql = "DESCRIBE rzf_report_rzfyinliu_yp";
try {
System.setProperty("java.security.krb5.conf", "D:/keytab/krb5.conf");
System.setProperty("java.security.auth.login.config", "D:/keytab/jaas.conf");
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "Kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab("hive@CRNETTEST.COM", "D:/keytab/hive.keytab");
Class.forName(JDBC_DRIVER);
connection = DriverManager.getConnection(CONNECT_URL);
ps = connection.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
// System.out.println(rs.getString("tab_name"));
System.out.print(rs.getString("col_name").toLowerCase() + " = ");
System.out.print(rs.getString("data_type").toUpperCase() + " = ");
System.out.println(rs.getString("comment") + " = ");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
JDBCUtils.disconnect(connection, rs, ps);
}
}
}
package com.jz.agent.client.service;
import com.google.gson.Gson;
import com.jz.agent.client.DmpDsAgentClient;
import com.jz.agent.config.DmpDsAgentConstants;
import com.jz.agent.utils.JDBCUtils;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.agent.ResultCode;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.PrivilegedAction;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@SuppressWarnings("unused")
public class DmpImpalaAgentClientService implements DmpDsAgentClient {
private static String JDBC_DRIVER = "com.cloudera.impala.jdbc41.Driver";
private Gson gson = new Gson();
private static Logger logger = LoggerFactory.getLogger(DmpImpalaAgentClientService.class);
static {
try {
Class.forName(JDBC_DRIVER);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
protected String CONNECTION_URL;
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds) {
/*boolean flag = false;
Connection connection = null;
try {
connection = getConnection(ds);
if(connection != null) flag = true;
} catch (Exception e) {
e.printStackTrace();
} finally {
if(connection != null)
JDBCUtils.disconnect(connection, null, null);
}
return flag;*/
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(true));
}
private Connection getConnection(DmpAgentDatasourceInfo ds) {
Connection connection = null;
ResultSet rs = null;
PreparedStatement ps = null;
CONNECTION_URL = ds.getJdbcUrl();
try {
if(DmpDsAgentConstants.IS_ENABLED_KERBEROS) {
UserGroupInformation loginUser = JDBCUtils.initKerberosENV4IMPALA();
CONNECTION_URL = CONNECTION_URL + ";AuthMech=1;KrbRealm="
+ DmpDsAgentConstants.KERBEROS_KEYTAB_ACCOUNT_IMPALA
+ ";KrbHostFQDN=" + ds.getAccessId() + ";KrbServiceName=impala";
connection = (Connection) loginUser.doAs(new PrivilegedAction<Object>(){
public Object run() {
Connection connection = null;
ResultSet rs = null;
PreparedStatement ps = null;
try {
Class.forName(JDBC_DRIVER);
connection = DriverManager.getConnection(CONNECTION_URL);
} catch (Exception e) {
e.printStackTrace();
} finally {
JDBCUtils.disconnect(connection, rs, ps);
}
return connection;
}
});
} else {
connection = DriverManager.getConnection(CONNECTION_URL);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(rs != null)
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
if(ps != null)
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
if(connection != null)
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return connection;
}
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds) {
String sql = "show tables"; // like '*likename*'
Connection conn = null;
ResultSet rs = null;
PreparedStatement ps = null;
List<String> list = new ArrayList<String>();
try {
conn = getConnection(ds);
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
list.add(rs.getString(1).toLowerCase());
}
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "Impala获取表列表异常:"+e.getMessage());
} finally {
if(rs != null)
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
if(ps != null)
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
if(conn != null)
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String tableName) {
String sql = "desc " + tableName;
Connection conn = null;
ResultSet rs = null;
PreparedStatement ps = null;
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
try {
conn = getConnection(ds);
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
Map<String, Object> row = new HashMap<String, Object>();
row.put("name", rs.getString(1).toLowerCase());
row.put("type", rs.getString(2).toUpperCase());
row.put("comment", rs.getString(3));
list.add(row);
}
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "Impala获取表字段信息异常:"+e.getMessage());
} finally {
if(rs != null)
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
if(ps != null)
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
if(conn != null)
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
@SuppressWarnings("resource")
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String tableName) {
Map<String, Object> map = new HashMap<String, Object>();
String sql = "select * from " + tableName + " limit 5";
String sql1 = "desc " + tableName;
Connection conn = null;
ResultSet rs = null;
PreparedStatement ps = null;
try {
conn = getConnection(ds);
ps = conn.prepareStatement(sql1);
rs = ps.executeQuery();
List<String> headerList = new ArrayList<String>();
while (rs.next()) {
headerList.add(rs.getString(1).toLowerCase());
}
map.put("header", headerList);
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
List<List<Object>> resultList = new ArrayList<List<Object>>();
while (rs.next()) {
List<Object> row = new ArrayList<Object>();
for (int i = 1; i <= headerList.size(); i++) {
row.add(rs.getObject(i));
}
resultList.add(row);
}
map.put("result", resultList);
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "IMPALA数据预览异常:"+e.getMessage());
} finally {
if(rs != null)
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
if(ps != null)
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
if(conn != null)
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(map));
}
}
package com.jz.agent.client.service;
import com.google.gson.Gson;
import com.jz.agent.client.DmpDsAgentClient;
import com.jz.agent.utils.JDBCUtils;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.agent.ResultCode;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DmpKuduAgentClientService implements DmpDsAgentClient {
private Gson gson = new Gson();
private static Logger logger = LoggerFactory.getLogger(DmpKuduAgentClientService.class);
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds) {
Connection connection = null;
String jdbcUrl = ds.getJdbcUrl();
try {
jdbcUrl += securityConnect(ds);
Class.forName(ds.getDriverClassName());
connection = DriverManager.getConnection(jdbcUrl);
connection.close();
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(true));
} catch (Exception e) {
logger.error("KUDU连通性异常", e);
return new DmpAgentResult(ResultCode.EXCEPTION, "KUDU连通性异常:" + e.getMessage());
} finally {
JDBCUtils.disconnect(connection, null, null);
}
}
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds) {
List<String> list = new ArrayList();
Connection connection = null;
PreparedStatement ps = null;
ResultSet rs = null;
String jdbcUrl = ds.getJdbcUrl();
String sql = "show tables";
try {
jdbcUrl += securityConnect(ds);
Class.forName(ds.getDriverClassName());
connection = DriverManager.getConnection(jdbcUrl);
ps = connection.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
list.add(rs.getString("name"));
}
} catch (Exception e) {
logger.error("KUDU获取表列表异常", e);
return new DmpAgentResult(ResultCode.EXCEPTION, "KUDU获取表列表异常:" + e.getMessage());
} finally {
JDBCUtils.disconnect(connection, rs, ps);
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String tableName) {
List<Map<String, Object>> list = new ArrayList();
Connection connection = null;
PreparedStatement ps = null;
ResultSet rs = null;
String jdbcUrl = ds.getJdbcUrl();
String sql = "DESCRIBE " + tableName;
try {
jdbcUrl += securityConnect(ds);
Class.forName(ds.getDriverClassName());
connection = DriverManager.getConnection(jdbcUrl);
ps = connection.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
Map<String, Object> m = new HashMap<String, Object>();
m.put("name", rs.getString("name").toLowerCase());
m.put("type", convertColumnType(rs.getString("type").toUpperCase()));
m.put("comment", rs.getString("comment"));
list.add(m);
}
} catch (Exception e) {
logger.error("KUDU获取表属性异常", e);
return new DmpAgentResult(ResultCode.EXCEPTION, "KUDU获取表属性异常:" + e.getMessage());
} finally {
JDBCUtils.disconnect(connection, rs, ps);
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String tableName) {
Map<String, Object> map = new HashMap();
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(map));
}
/**
* 安全连接并返回jdbc连接url的安全请求后缀
*
* @param ds
* @return jdbc连接url的安全请求后缀
*/
private String securityConnect(DmpAgentDatasourceInfo ds) {
String jdbcUrlSuffix = "";
if ("true".equalsIgnoreCase(ds.getKerberosIsenable())) {
// 不从系统配置中取kerberosFqdn
jdbcUrlSuffix = ";AuthMech=1;KrbHostFQDN=" + ds.getAccessId() + ";KrbServiceName=impala";
System.setProperty("java.security.krb5.conf", ds.getKerberosKrb5Conf());
System.setProperty("java.security.auth.login.config", ds.getKerberosJaasConf());
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);
}
return jdbcUrlSuffix;
}
private String convertColumnType(String type) {
// KUDU开启INT BIGINT DOUBLE SMALLINT STRING
if ("INT32".equals(type) || "INT8".equals(type) || "INT".equals(type)) {
type = "INT";
} else if ("INT64".equals(type) || "BIGINT".equals(type)) {
type = "BIGINT";
} else if ("FLOAT".equals(type) || "DOUBLE".equals(type)) {
type = "DOUBLE";
} else if ("SMALLINT".equals(type)) {
// 不做变更
} else {
type = "STRING";
}
return type;
}
public DmpAgentResult previewData_bak(DmpAgentDatasourceInfo ds, String tableName) {
Map<String, Object> map = new HashMap<String, Object>();
String kuduClients = ds.getEndpoint();
KuduClient client = null;
KuduSession session = null;
//List<Map<String, Object>> headerList1 = new ArrayList<Map<String, Object>>();
List<String> headerList = new ArrayList<String>();
List<List<Object>> resultList = new ArrayList<List<Object>>();
try {
/*if (DmpDsAgentConstants.IS_ENABLED_KERBEROS)
JDBCUtils.initKerberosENV4KUDU();*/
client = new KuduClient.KuduClientBuilder(kuduClients).build();
session = client.newSession();
KuduTable table = client.openTable(tableName);
List<ColumnSchema> schema = table.getSchema().getColumns();
List<String> projectColumns = new ArrayList<String>(1);
for (ColumnSchema cs : schema) {
headerList.add(cs.getName().toLowerCase());
projectColumns.add(cs.getName());
}
map.put("header", headerList);
//ScanRequestPB srp = new ScanRequestPB(false);
KuduScanner scanner = client.newScannerBuilder(table).setProjectedColumnNames(projectColumns).build();
while (scanner.hasMoreRows()) {
RowResultIterator results = scanner.nextRows();
int i = 0;
while (results.hasNext()) {
RowResult result = results.next();
Map<String, Object> m = new HashMap<String, Object>();
List<Object> l = new ArrayList<Object>();
for (int j = 0; j < projectColumns.size(); j++) {
if (result.getColumnType(j) == Type.STRING) {
m.put(projectColumns.get(j), result.getString(j));
l.add(result.getString(j));
} else if (result.getColumnType(j) == Type.BINARY) {
m.put(projectColumns.get(j), result.getBinary(j));
l.add(result.getBinary(j));
} else if (result.getColumnType(j) == Type.BOOL) {
m.put(projectColumns.get(j), result.getBoolean(j));
l.add(result.getBoolean(j));
} else if (result.getColumnType(j) == Type.DOUBLE) {
m.put(projectColumns.get(j), result.getDouble(j));
l.add(result.getDouble(j));
} else if (result.getColumnType(j) == Type.FLOAT) {
m.put(projectColumns.get(j), result.getFloat(j));
l.add(result.getFloat(j));
} else if (result.getColumnType(j) == Type.INT32) {
m.put(projectColumns.get(j), result.getInt(j));
l.add(result.getInt(j));
} else if (result.getColumnType(j) == Type.INT64) {
m.put(projectColumns.get(j), result.getLong(j));
l.add(result.getLong(j));
} else if (result.getColumnType(j) == Type.INT8) {
m.put(projectColumns.get(j), result.getShort(j));
l.add(result.getShort(j));
}
resultList.add(l);
}
if (i == 5) break;
i++;
}
break;
}
map.put("result", resultList);
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "KUDU数据预览异常:" + e.getMessage());
} finally {
try {
if (session != null && !session.isClosed())
session.close();
} catch (KuduException e) {
e.printStackTrace();
}
if (client != null)
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(map));
}
public static void main(String[] args) {
Connection connection = null;
ResultSet rs = null;
PreparedStatement ps = null;
String JDBC_DRIVER = "com.cloudera.impala.jdbc41.Driver";
// String CONNECT_URL = "jdbc:impala://10.0.53.177:21050/kudu_demo_imp";
String CONNECT_URL = "jdbc:impala://10.0.53.177:21050/tmp";
// String sql = "show tables";
String sql = "CREATE TABLE tmp.test (" +
"id INT NOT NULL," +
"member_id BIGINT NULL," +
"small_id SMALLINT NULL," +
"double_c DOUBLE NULL," +
"etl_create_time STRING NULL," +
"etl_update_time STRING NULL," +
"PRIMARY KEY (id)" +
") PARTITION BY HASH (id) PARTITIONS 2 STORED AS KUDU";
// String sql = "DESCRIBE hht_impala_1";
try {
/*CONNECT_URL += ";AuthMech=1;KrbHostFQDN=jtjzvdra116;KrbServiceName=impala";
System.setProperty("java.security.krb5.conf", "D:/keytab/krb5.conf");
System.setProperty("java.security.auth.login.config", "D:/keytab/jaas.conf");
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);*/
Class.forName(JDBC_DRIVER);
connection = DriverManager.getConnection(CONNECT_URL);
ps = connection.prepareStatement(sql);
ps.execute();
// rs = ps.executeQuery();
/*while (rs.next()) {
System.out.println(rs.getString("name"));
System.out.print(rs.getString("name").toLowerCase() + " = ");
System.out.print(rs.getString("type").toUpperCase() + " = ");
System.out.println(rs.getString("comment") + " = ");
}*/
} catch (Exception e) {
e.printStackTrace();
} finally {
JDBCUtils.disconnect(connection, rs, ps);
}
}
}
package com.jz.agent.client.service;
import com.google.gson.Gson;
import com.jz.agent.client.DmpDsAgentClient;
import com.jz.agent.utils.JDBCUtils;
import com.jz.common.utils.StringUtils;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.agent.ResultCode;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DmpMysqlAgentClientService implements DmpDsAgentClient {
private Gson gson = new Gson();
@SuppressWarnings("unused")
private static Logger logger = LoggerFactory.getLogger(DmpMysqlAgentClientService.class);
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds) {
Connection conn = null;
try {
Class.forName(ds.getDriverClassName());
conn = DriverManager.getConnection(
ds.getJdbcUrl(),
ds.getUserName(),
ds.getPassword()
);
conn.close();
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(true));
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "MYSQL连通性异常:"+e.getMessage());
} finally {
try {
if(conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds) {
List result = new ArrayList();
String sql = "select * from information_schema.tables where table_schema = '"+this.getTargetDBname(ds)+"'";
Connection conn = null;
ResultSet rs = null;
PreparedStatement ps = null;
try {
conn = JDBCUtils.getConnections(ds.getDriverClassName(), ds.getJdbcUrl(), ds.getUserName(), ds.getPassword());
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
result.add(rs.getString("TABLE_NAME").toLowerCase());
}
} catch (SQLException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "MYSQL获取表列表异常:"+e.getMessage());
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "MYSQL获取表列表异常:"+e.getMessage());
} finally {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(result));
}
private String getTargetDBname(DmpAgentDatasourceInfo ds) {
String url = ds.getJdbcUrl();
if(url != null && !"".equals(url.trim())) {
String[] arr1 = url.split("/");
String ls = arr1[arr1.length-1];
String[] arr2 = ls.split("\\?");
return arr2[0];
} else {
return "";
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String tableName) {
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
String dbName = "";
if (StringUtils.hasLength(ds.getDbName()))
dbName = " table_schema = '" + ds.getDbName() + "' and ";
String sql = "select * from information_schema.columns where " + dbName + "table_name = '" + tableName + "'";
Connection conn = null;
ResultSet rs;
PreparedStatement ps;
try {
conn = JDBCUtils.getConnections(ds.getDriverClassName(), ds.getJdbcUrl(), ds.getUserName(), ds.getPassword());
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
Map map = new HashMap();
String colName = rs.getString("COLUMN_NAME");
String dbType = rs.getString("DATA_TYPE");
String remarks = rs.getString("COLUMN_COMMENT");
Integer size= 0;
Integer characterLength = (int) rs.getDouble("CHARACTER_MAXIMUM_LENGTH");
Integer columnSize = (int) rs.getDouble("NUMERIC_PRECISION");
Integer dicimalDigits = (int) rs.getDouble("NUMERIC_SCALE");
if(characterLength != null) {
size = characterLength;
} else {
size = columnSize;
}
map.put("name", colName.toLowerCase());
map.put("type", dbType.toUpperCase());
map.put("size", size);
map.put("scale", dicimalDigits);
map.put("comment", remarks);
list.add(map);
}
} catch (SQLException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "MYSQL获取表字段信息异常:"+e.getMessage());
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "MYSQL获取表字段信息异常:"+e.getMessage());
} finally {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String tableName) {
Map<String, Object> map = new HashMap<String, Object>();
Connection conn = null;
ResultSet rs;
PreparedStatement ps;
List<String> headerList = new ArrayList<String>();
try {
conn = JDBCUtils.getConnections(ds.getDriverClassName(), ds.getJdbcUrl(), ds.getUserName(), ds.getPassword());
String sql = "select * from " + tableName + " limit 5";
String sql2 = "select * from information_schema.columns where table_name = '"+tableName+"'";
ps = conn.prepareStatement(sql2);
rs = ps.executeQuery();
while (rs.next()) {
String colName = rs.getString("COLUMN_NAME");
headerList.add(colName);
}
map.put("header", headerList);
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
List<List<Object>> resultList = new ArrayList<List<Object>>();
while (rs.next()) {
List<Object> row = new ArrayList<Object>();
for(int i = 1; i <= headerList.size(); i++) {
row.add(rs.getObject(i));
}
resultList.add(row);
}
map.put("result", resultList);
} catch (SQLException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "MYSQL数据预览异常:"+e.getMessage());
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "MYSQL数据预览异常:"+e.getMessage());
} finally {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(map));
}
}
package com.jz.agent.client.service;
import com.google.gson.Gson;
import com.jz.agent.client.DmpDsAgentClient;
import com.jz.agent.utils.DbInfoUtil;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.agent.ResultCode;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
public class DmpOracleAgentClientService implements DmpDsAgentClient {
private Gson gson = new Gson();
private static Logger logger = LoggerFactory.getLogger(DmpOracleAgentClientService.class);
@SuppressWarnings("unused")
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds) {
Connection conn = null;
try {
Class.forName(ds.getDriverClassName());
conn = DriverManager.getConnection(
ds.getJdbcUrl(),
ds.getUserName(),
ds.getPassword()
);
conn.close();
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(true));
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "oracle连通性异常:"+e.getMessage());
} finally {
try {
if(conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds) {
List<String> list = DbInfoUtil.getTableNameList(ds.getDriverClassName(), ds.getJdbcUrl(), ds.getUserName(), ds.getPassword());
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
@SuppressWarnings({ "rawtypes" })
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String tableName) {
List list = DbInfoUtil.getTableInfo(ds.getDriverClassName(), ds.getJdbcUrl(), ds.getUserName(), ds.getPassword(), tableName);
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
@SuppressWarnings("unchecked")
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String tableName) {
Map<String, Object> map = DbInfoUtil.getTableLimitData(ds.getDriverClassName(), ds.getJdbcUrl(), ds.getUserName(), ds.getPassword(), tableName);
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(map));
}
}
package com.jz.agent.client.service;
import com.google.gson.Gson;
import com.jz.agent.client.DmpDsAgentClient;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.agent.ResultCode;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DmpPostgreSqlAgentClientService implements DmpDsAgentClient {
private Gson gson = new Gson();
private static Logger logger = LoggerFactory.getLogger(DmpPostgreSqlAgentClientService.class);
@SuppressWarnings("unused")
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds) {
Connection conn = null;
try {
Class.forName(ds.getDriverClassName());
conn = DriverManager.getConnection(ds.getJdbcUrl(), ds.getUserName(), ds.getPassword());
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(true));
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "POSTGRESQL连通性异常:"+e.getMessage());
} finally {
try {
if(conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds) {
String sql = "select relname as tabname from pg_class where relkind='r' and relname not like 'pg_%' and relname not like 'sql_%' order by relname";
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
List<String> list = new ArrayList<String>();
try {
Class.forName("org.postgresql.Driver");// "org.postgresql.Driver"
conn = DriverManager.getConnection(ds.getJdbcUrl(), ds.getUserName(), ds.getPassword());
// "jdbc:postgresql://192.168.204.102:5432/test", "postgres", "postgres"
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
list.add(rs.getString(1).toLowerCase());
}
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "POSTGRESQL获取表列表异常:"+e.getMessage());
} finally {
if(rs != null)
try {
rs.close();
} catch (SQLException e1) {
e1.printStackTrace();
}
if(ps != null)
try {
ps.close();
} catch (SQLException e1) {
e1.printStackTrace();
}
try {
if(conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String tableName) {
String sql = "SELECT col_description(a.attrelid,a.attnum) as comment,format_type(a.atttypid,a.atttypmod) as type,a.attname as name, a.attnotnull as notnull FROM pg_class as c,pg_attribute as a where c.relname = '"
+ tableName + "' and a.attrelid = c.oid and a.attnum>0";
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
try {
Class.forName(ds.getDriverClassName());// "org.postgresql.Driver"
conn = DriverManager.getConnection(ds.getJdbcUrl(), ds.getUserName(), ds.getPassword());
// "jdbc:postgresql://192.168.204.102:5432/test", "postgres", "postgres"
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
Map<String, Object> row = new HashMap<String, Object>();
String type = rs.getString(2).toUpperCase();
if(StringUtils.isNotBlank(type) && type.indexOf("(") > -1) {
int idx1 = type.indexOf("(");
int idx2 = type.indexOf(")");
String t = type.substring(0, idx1);
String ts = type.substring(idx1+1, idx2);
if(ts.indexOf(",") > -1) {
String[] strs = ts.split(",");
row.put("size", new Integer(strs[0]));
row.put("scale", new Integer(strs[1]));
} else {
row.put("size", new Integer(ts));
row.put("scale", 0);
}
type = t;
}
row.put("name", rs.getString(3).toLowerCase());
row.put("type", type);
row.put("comment", rs.getString(1));
list.add(row);
}
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "POSTGRESQL获取表字段信息异常:"+e.getMessage());
} finally {
if(rs != null)
try {
rs.close();
} catch (SQLException e1) {
e1.printStackTrace();
}
if(ps != null)
try {
ps.close();
} catch (SQLException e1) {
e1.printStackTrace();
}
try {
if(conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(list));
}
@SuppressWarnings("resource")
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String tableName) {
Map<String, Object> map = new HashMap<String, Object>();
String sql = "select * from " + tableName + " limit 5";
String sql1 = "SELECT col_description(a.attrelid,a.attnum) as comment,format_type(a.atttypid,a.atttypmod) as type,a.attname as name, a.attnotnull as notnull FROM pg_class as c,pg_attribute as a where c.relname = '"
+ tableName + "' and a.attrelid = c.oid and a.attnum>0";
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
Class.forName(ds.getDriverClassName());// "org.postgresql.Driver"
conn = DriverManager.getConnection(ds.getJdbcUrl(), ds.getUserName(), ds.getPassword());
// "jdbc:postgresql://192.168.204.102:5432/test", "postgres", "postgres"
ps = conn.prepareStatement(sql1);
rs = ps.executeQuery();
List<String> headerList = new ArrayList<String>();
while (rs.next()) {
headerList.add(rs.getString(3).toLowerCase());
}
map.put("header", headerList);
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
List<List<Object>> resultList = new ArrayList<List<Object>>();
while (rs.next()) {
List<Object> row = new ArrayList<Object>();
for (int i = 1; i <= headerList.size(); i++) {
row.add(rs.getObject(i));
}
resultList.add(row);
}
map.put("result", resultList);
} catch (SQLException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "POSTGRESQL数据预览异常:"+e.getMessage());
} catch (Exception e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "POSTGRESQL数据预览异常:"+e.getMessage());
} finally {
if(rs != null)
try {
rs.close();
} catch (SQLException e1) {
e1.printStackTrace();
}
if(ps != null)
try {
ps.close();
} catch (SQLException e1) {
e1.printStackTrace();
}
try {
if(conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(map));
}
}
package com.jz.agent.client.service;
import com.google.gson.Gson;
import com.jz.agent.client.DmpDsAgentClient;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.agent.ResultCode;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DmpSqlServerAgentClientService implements DmpDsAgentClient {
private Gson gson = new Gson();
@Override
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds) {
// 连接数据库
String jdbcUrl = ds.getJdbcUrl();
String userName = ds.getUserName();
String passwd = ds.getPassword();
Connection conn = null;
try {
conn = DriverManager.getConnection(jdbcUrl, userName, passwd);
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(true));
} catch (SQLException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "SQLSERVER连通性异常:" + e.getMessage());
} finally {
if (null != conn) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
@Override
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds) {
// 连接数据库
String jdbcUrl = ds.getJdbcUrl();
String userName = ds.getUserName();
String passwd = ds.getPassword();
Connection conn = null;
List<String> tableNameList = new ArrayList<>();
try {
conn = DriverManager.getConnection(jdbcUrl, userName, passwd);
final DatabaseMetaData metaData = conn.getMetaData();
final ResultSet res = metaData.getTables(conn.getCatalog(), null, null, new String[]{"TABLE"});
while (res.next()) {
String schema = res.getString("TABLE_SCHEM");
tableNameList.add(schema+"."+res.getString("TABLE_NAME").toLowerCase());
// if ("dbo".equals(schema)) {
// tableNameList.add(res.getString("TABLE_NAME").toLowerCase());
// }
}
} catch (SQLException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "SQLSERVER连通性异常:" + e.getMessage());
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(tableNameList));
}
@Override
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String tableName) {
List<Map<String, Object>> columnMapList = new ArrayList<>();
// 连接数据库
String jdbcUrl = ds.getJdbcUrl();
String userName = ds.getUserName();
String passwd = ds.getPassword();
Connection conn = null;
System.out.println("tableName=======:"+tableName);
String[] stableName = tableName.split("\\.");
System.out.println(stableName[0]);
String s = stableName[stableName.length-1];
try {
conn = DriverManager.getConnection(jdbcUrl, userName, passwd);
final DatabaseMetaData metaData = conn.getMetaData();
ResultSet columns = metaData.getColumns(null, "%", s, "%");
Map<String, Object> columnMap = null;
while (columns.next()) {
columnMap = new HashMap<>();
String columnName = columns.getString("COLUMN_NAME");
String typeName = columns.getString("TYPE_NAME");
columnMap.put("name", columnName.toLowerCase());
columnMap.put("type", typeName.toUpperCase());
/*columnMap.put("size", size);
columnMap.put("scale", dicimalDigits);
columnMap.put("comment", remarks);*/
columnMapList.add(columnMap);
}
} catch (SQLException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "SQLSERVER连通性异常:" + e.getMessage());
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(columnMapList));
}
@Override
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String tableName) {
Map<String, Object> map = new HashMap<String, Object>();
// 连接数据库
String jdbcUrl = ds.getJdbcUrl();
String userName = ds.getUserName();
String passwd = ds.getPassword();
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
String[] stableName = tableName.split("\\.");
tableName = stableName[stableName.length-1];
try {
conn = DriverManager.getConnection(jdbcUrl, userName, passwd);
final DatabaseMetaData metaData = conn.getMetaData();
ResultSet columns = metaData.getColumns(null, "%", tableName, "%");
List<String> headerList = new ArrayList<String>();
while (columns.next()) {
String columnName = columns.getString("COLUMN_NAME");
headerList.add(columnName);
}
String queryDataSql = "select top 5 * from " + tableName;
// 建立Statement对象
stmt = conn.createStatement();
rs = stmt.executeQuery(queryDataSql);
List<List<Object>> resultList = new ArrayList<List<Object>>();
while (rs.next()) {
List<Object> row = new ArrayList<Object>();
for (int i = 1; i <= headerList.size(); i++) {
row.add(rs.getObject(i));
}
resultList.add(row);
}
map.put("result", resultList);
} catch (SQLException e) {
e.printStackTrace();
return new DmpAgentResult(ResultCode.EXCEPTION, "SQLSERVER连通性异常:" + e.getMessage());
} finally {
if (rs != null){
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (stmt != null){
try {
stmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return new DmpAgentResult(ResultCode.SUCCESS, gson.toJson(map));
}
}
package com.jz.agent.config;
public class DmpDsAgentConstants {
public static String HADOOP_CONF_FILE_PATH = "G:\\ws_workstation\\dmp-new";
public static String HADOOP_DEFAULT_CONF_FILE_PATH = "";
public static String AGENT_HADOOP_USER_NAME = "hdfs";
public static Boolean IS_ENABLED_KERBEROS = false;
public static String SUBMIT_SYNCING_CONFIG_HDFS_PATH = "/jz/etlprocesses/guide_xml/";
public static String KERBEROS_CONF_FILE_PATH = "G:\\ws_workstation\\dmp-new\\jz-dmp-new\\jz-dmp-web-agent\\src\\main\\resources\\keytab";
public static String KERBEROS_CONFIGURATION_NAME = "krb5.conf";
public static String KERBEROS_KEYTAB_NAME_HDFS = "hdfs.keytab";
public static String KERBEROS_KEYTAB_NAME_HIVE = "hive.keytab";
public static String KERBEROS_KEYTAB_NAME_KUDU = "kudu.keytab";
public static String KERBEROS_KEYTAB_NAME_IMPALA = "impala.keytab";
public static String DMP_AGENT_PUBLIC_KEY = "/gExN7y/C++iJUXj2RZtaFLfsNrTmAcl";
public static String KERBEROS_KEYTAB_ACCOUNT_HDFS = "hdfs/master@HADOOP.COM";
public static String KERBEROS_KEYTAB_ACCOUNT_HIVE = "hive/master@HADOOP.COM";
public static String KERBEROS_KEYTAB_ACCOUNT_KUDU = "kudu/master@HADOOP.COM";
public static String KERBEROS_KEYTAB_ACCOUNT_IMPALA = "HADOOP.COM";
public static String KERBEROS_CONFIGURATION_PATH = KERBEROS_CONF_FILE_PATH + KERBEROS_CONFIGURATION_NAME;
public static String KERBEROS_KEYTAB_PATH_HDFS = KERBEROS_CONF_FILE_PATH + KERBEROS_KEYTAB_NAME_HDFS;
public static String KERBEROS_KEYTAB_PATH_HIVE = KERBEROS_CONF_FILE_PATH + KERBEROS_KEYTAB_NAME_HIVE;
public static String KERBEROS_KEYTAB_PATH_KUDU = KERBEROS_CONF_FILE_PATH + KERBEROS_KEYTAB_NAME_KUDU;
public static String KERBEROS_KEYTAB_PATH_IMPALA = KERBEROS_CONF_FILE_PATH + KERBEROS_KEYTAB_NAME_IMPALA;
}
package com.jz.agent.service;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
public interface DmpDsAgentService {
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds);
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds);
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String tableName);
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String tableName);
public DmpAgentResult submitSycingXmlConfig(DmpAgentDatasourceInfo ds, String xml);
}
\ No newline at end of file
package com.jz.agent.service;
import com.google.gson.Gson;
import com.jz.agent.client.DmpDsAgentClient;
import com.jz.agent.client.DmpDsAgentClientBuilder;
import com.jz.agent.client.service.DmpHdfsAgentClientService;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class DmpDsAgentServiceImp implements DmpDsAgentService {
private Gson gson = new Gson();
private static Logger logger = LoggerFactory.getLogger(DmpDsAgentServiceImp.class);
public DmpAgentResult testConnect(DmpAgentDatasourceInfo ds) {
logger.info("testConnect << REQUEST[DmpAgentDatasourceInfo=" + gson.toJson(ds) + "]");
DmpDsAgentClient client = DmpDsAgentClientBuilder.build(ds);
DmpAgentResult dar = client.testConnect(ds);
logger.info("testConnect >> RESPONSE[" + gson.toJson(dar) + "]");
return dar;
}
public DmpAgentResult getTableNameList(DmpAgentDatasourceInfo ds) {
logger.info("getTableNameList << REQUEST[DmpAgentDatasourceInfo=" + gson.toJson(ds) + "]");
DmpDsAgentClient client = DmpDsAgentClientBuilder.build(ds);
DmpAgentResult dar = client.getTableNameList(ds);
logger.info("getTableNameList >> RESPONSE[" + gson.toJson(dar) + "]");
return dar;
}
public DmpAgentResult getTableColumnList(DmpAgentDatasourceInfo ds, String table) {
logger.info("getTableColumnList << REQUEST[DmpAgentDatasourceInfo=" + gson.toJson(ds) + ", table=" + table + "]");
DmpDsAgentClient client = DmpDsAgentClientBuilder.build(ds);
DmpAgentResult dar = client.getTableColumnList(ds, table);
logger.info("getTableColumnList >> RESPONSE[" + gson.toJson(dar) + "]");
return dar;
}
public DmpAgentResult previewData(DmpAgentDatasourceInfo ds, String table) {
logger.info("previewData << REQUEST[DmpAgentDatasourceInfo=" + gson.toJson(ds) + ", table=" + table + "]");
DmpDsAgentClient client = DmpDsAgentClientBuilder.build(ds);
DmpAgentResult dar = client.previewData(ds, table);
logger.info("previewData >> RESPONSE[" + gson.toJson(dar) + "]");
return dar;
}
public DmpAgentResult submitSycingXmlConfig(DmpAgentDatasourceInfo ds, String xml) {
logger.info("submitSycingXmlConfig << REQUEST[DmpAgentDatasourceInfo=" + gson.toJson(ds) + ", table=" + xml + "]");
DmpHdfsAgentClientService client = new DmpHdfsAgentClientService();
DmpAgentResult dar = client.submitSycingConfig(ds, xml);
logger.info("submitSycingXmlConfig >> RESPONSE[" + gson.toJson(dar) + "]");
return dar;
}
}
package com.jz.agent.utils;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DbInfoUtil {
@SuppressWarnings({ "unused", "rawtypes" })
public static void main(String[] args) {
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public static List getTableInfo(String driver, String url, String user, String pwd, String table) {
List result = new ArrayList();
Connection conn = null;
DatabaseMetaData dbmd = null;
try {
conn = JDBCUtils.getConnections(driver, url, user, pwd);
dbmd = conn.getMetaData();
ResultSet resultSet = dbmd.getTables(null, "%", table, new String[] { "TABLE" });
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
//System.out.println(tableName);
if (tableName.equals(table)) {
ResultSet rs = conn.getMetaData().getColumns(null, getSchema(conn), tableName.toUpperCase(), "%");
while (rs.next()) {
Map map = new HashMap();
String colName = rs.getString("COLUMN_NAME");
String dbType = rs.getString("TYPE_NAME");
String remarks = rs.getString("REMARKS");
Integer columnSize = rs.getInt("COLUMN_SIZE");
Integer dicimalDigits = rs.getInt("DECIMAL_DIGITS");
/*map.put("code", colName);
if (remarks == null || remarks.equals("")) {
remarks = colName;
}
map.put("name", remarks);
map.put("dbType", dbType);
map.put("valueType", changeDbType(dbType));
*/
map.put("name", colName.toLowerCase());
map.put("type", dbType);
map.put("size", columnSize);
map.put("scale", dicimalDigits);
map.put("comment", remarks);
result.add(map);
}
}
}
} catch (SQLException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return result;
}
@SuppressWarnings("unused")
private static String changeDbType(String dbType) {
dbType = dbType.toUpperCase();
switch (dbType) {
case "VARCHAR":
case "VARCHAR2":
case "CHAR":
return "1";
case "NUMBER":
case "DECIMAL":
return "4";
case "INT":
case "SMALLINT":
case "INTEGER":
return "2";
case "BIGINT":
return "6";
case "DATETIME":
case "TIMESTAMP":
case "DATE":
return "7";
default:
return "1";
}
}
private static String getSchema(Connection conn) throws Exception {
String schema;
schema = conn.getMetaData().getUserName();
if ((schema == null) || (schema.length() == 0)) {
throw new Exception("ORACLE数据库模式不允许为空");
}
return schema.toUpperCase().toString();
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public static List<String> getTableNameList(String driver, String url, String user, String pwd) {
List result = new ArrayList();
Connection conn = null;
DatabaseMetaData dbmd = null;
try {
conn = JDBCUtils.getConnections(driver, url, user, pwd);
String catalog = conn.getCatalog();
dbmd = conn.getMetaData();
ResultSet tablesResultSet = dbmd.getTables(catalog, null, null, new String[] { "TABLE" });
while (tablesResultSet.next()) {
String tableName = tablesResultSet.getString("TABLE_NAME");
result.add(tableName.toLowerCase());
}
} catch (SQLException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return result;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public static Map getTableLimitData(String driver, String url, String user, String pwd, String table) {
Map map = new HashMap<String, Object>();
Connection conn = null;
DatabaseMetaData dbmd = null;
try {
conn = JDBCUtils.getConnections(driver, url, user, pwd);
String sql = "select * from " + table + " limit 5";
dbmd = conn.getMetaData();
ResultSet resultSet = dbmd.getTables(null, "%", table, new String[] { "TABLE" });
List<String> headerList = new ArrayList<String>();
List<String> typeList = new ArrayList<String>();
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
if (tableName.equals(table)) {
ResultSet rs = conn.getMetaData().getColumns(null, getSchema(conn), tableName.toUpperCase(), "%");
while (rs.next()) {
String colName = rs.getString("COLUMN_NAME");
headerList.add(colName);
String dbType = rs.getString("TYPE_NAME");
typeList.add(dbType);
}
break;
}
}
map.put("header", headerList);
PreparedStatement ps = conn.prepareStatement(sql);
ResultSet rs = ps.executeQuery();
List<List<Object>> resultList = new ArrayList<List<Object>>();
while (rs.next()) {
List<Object> row = new ArrayList<Object>();
for(int i = 1; i <= headerList.size(); i++) {
row.add(rs.getObject(i));
}
resultList.add(row);
}
map.put("result", resultList);
} catch (SQLException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return map;
}
}
package com.jz.agent.utils;
import com.jz.agent.config.DmpDsAgentConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.sql.*;
import java.util.*;
public class JDBCUtils {
public static Connection getConnections(String driver, String url, String user, String pwd) throws Exception {
Connection conn = null;
try {
Properties props = new Properties();
props.put("remarksReporting", "true");
props.put("user", user);
props.put("password", pwd);
Class.forName(driver);
conn = DriverManager.getConnection(url, props);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
return conn;
}
public static void disconnect(Connection connection, ResultSet res, PreparedStatement ps) {
try {
if (res != null) res.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (ps != null) ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (connection != null) connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
public static void initKerberosENV4HDFS(Configuration conf) throws IOException {
System.setProperty("java.security.krb5.conf", DmpDsAgentConstants.KERBEROS_CONFIGURATION_PATH);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
System.setProperty("sun.security.krb5.debug", "true");
System.setProperty("HADOOP_USER_NAME", DmpDsAgentConstants.AGENT_HADOOP_USER_NAME);
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(
DmpDsAgentConstants.KERBEROS_KEYTAB_ACCOUNT_HDFS, DmpDsAgentConstants.KERBEROS_KEYTAB_PATH_HDFS
);
System.out.println(UserGroupInformation.getCurrentUser());
}
public static void initKerberosENV4HIVE() throws IOException {
System.setProperty("java.security.krb5.conf", DmpDsAgentConstants.KERBEROS_CONFIGURATION_PATH);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
System.setProperty("sun.security.krb5.debug", "true");
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication" , "Kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(
DmpDsAgentConstants.KERBEROS_KEYTAB_ACCOUNT_HIVE, DmpDsAgentConstants.KERBEROS_KEYTAB_PATH_HIVE
);
}
public static UserGroupInformation initKerberosENV4IMPALA() throws IOException {
System.setProperty("java.security.krb5.conf", DmpDsAgentConstants.KERBEROS_CONFIGURATION_PATH);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
System.setProperty("sun.security.krb5.debug", "true");
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication" , "Kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(
DmpDsAgentConstants.KERBEROS_KEYTAB_ACCOUNT_IMPALA, DmpDsAgentConstants.KERBEROS_KEYTAB_PATH_IMPALA
);
return UserGroupInformation.getLoginUser();
}
public static void initKerberosENV4KUDU() throws IOException {
System.setProperty("java.security.krb5.conf", DmpDsAgentConstants.KERBEROS_CONFIGURATION_PATH);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
System.setProperty("sun.security.krb5.debug", "true");
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication" , "Kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(
DmpDsAgentConstants.KERBEROS_KEYTAB_ACCOUNT_KUDU, DmpDsAgentConstants.KERBEROS_KEYTAB_PATH_KUDU
);
}
/**
* 将结果集转换封装成Map,包含columns,rows,totalCount三个键;参数rowSize大于0为限定取得结果集条目(顺序)
*
* @param resultSet 源结果集
* @param rowSize 取出结果集条目(顺序)
* @return
* @throws SQLException
*/
public static Map<String, Object> convertResultSet2Map(ResultSet resultSet, int rowSize) throws SQLException {
if (null == resultSet)
return null;
// 处理列
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
List<String> columnNameList = new ArrayList<>(columnCount);
for (int i = 1; i <= columnCount; i++) {
columnNameList.add(metaData.getColumnName(i));
}
// 处理行
List<List<Object>> allColumnValueList = new ArrayList<>();
List<Object> rowColumnValueList;
int count = 0;
while (resultSet.next()) {
rowColumnValueList = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
rowColumnValueList.add(resultSet.getObject(i));
}
allColumnValueList.add(rowColumnValueList);
count++;
if (rowSize > 0 && count == rowSize) {
break;
}
}
Map<String, Object> returnDataMap = new HashMap<>();
returnDataMap.put("columns", columnNameList);
returnDataMap.put("rows", allColumnValueList);
returnDataMap.put("totalCount", allColumnValueList.size());
return returnDataMap;
}
}
package com.jz.common.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.jz.common.utils.web.SessionUtils;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowPro;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* azkaban ajax api 工具类
*/
public class AzkabanApiUtils2 {
private static final Logger LOGGER = LoggerFactory.getLogger(AzkabanApiUtils2.class);
private String azkabanServerUrl;
private String userName;
private String password;
public AzkabanApiUtils2(String azkabanServerUrl, String userName, String password) {
this(azkabanServerUrl);
this.userName = userName;
this.password = password;
}
public AzkabanApiUtils2(String azkabanServerUrl) {
this.azkabanServerUrl = azkabanServerUrl;
this.userName = "admin";
this.password = "admin";
}
/**
* 构造RestTemplate实例,发起ajax请求
*
* @return
*/
public RestTemplate bulidRestTemplate() {
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
requestFactory.setConnectTimeout(900000);
requestFactory.setReadTimeout(900000);
RestTemplate rest = new RestTemplate(requestFactory);
return rest;
}
/**
* 调用登陆接口,得到sessionId
*
* @return String sessionId 会话id
* @throws Exception
*/
@SuppressWarnings("unchecked")
public String login() {
/*HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
hs.add("X-Requested-With", "XMLHttpRequest");
LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
linkedMultiValueMap.add("action", "login");
linkedMultiValueMap.add("username", userName);
linkedMultiValueMap.add("password", password);
HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
Map<String, String> postForObject = null;
try {
postForObject = bulidRestTemplate().postForObject(azkabanServerUrl, httpEntity, Map.class);
} catch (Exception e) {
e.printStackTrace();
LOGGER.error(azkabanServerUrl+"-----"+linkedMultiValueMap+"调用登录接口异常");
throw new RuntimeException("登录异常");
}
String sessionId = postForObject.get("session.id");
if (StringUtils.isEmpty(sessionId)) {
LOGGER.error(azkabanServerUrl+"-----"+linkedMultiValueMap+" sessionId 为空");
throw new RuntimeException("登陆失败");
}*/
String sessionId = SessionUtils.getSession().getId(); //"dcfc608c-c58a-45b7-adc7-9902b652496e";
//String sessionId = "f70d53fa-55da-4688-8d00-64350e4fb8ea";
System.err.println("----"+sessionId);
return sessionId; //SessionUtils.getSession().getId();
}
/**
* 创建azkaban项目名
*
* @param projectName azkaban项目名 对应 dmp项目下的工作流任务名
* @param projectDesc azkaban项目描述
* @return 创建成功返回项目名, 创建失败 返回
*/
@SuppressWarnings("unchecked")
public String createAzkabanProject(String sessionId, String projectName, String projectDesc) {
String projectManageUrl = azkabanServerUrl + "/manager?token=" + sessionId;
if (StringUtils.isEmpty(sessionId)) {
throw new RuntimeException("还未登录");
}
if (StringUtils.isEmpty(projectName)) {
throw new RuntimeException("项目名不能为空");
}
if (StringUtils.isEmpty(projectDesc)) {
throw new RuntimeException("项目描述不能为空");
}
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
hs.add("X-Requested-With", "XMLHttpRequest");
LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
linkedMultiValueMap.add("session.id", sessionId);
linkedMultiValueMap.add("action", "create");
linkedMultiValueMap.add("name", projectName);
linkedMultiValueMap.add("description", projectDesc);
HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
String result = null;
try {
result = bulidRestTemplate().postForObject(projectManageUrl, httpEntity, String.class);
} catch (Exception e) {
LOGGER.error(projectManageUrl + "----" + linkedMultiValueMap + "---调用创建项目失败");
e.printStackTrace();
throw new RuntimeException("创建项目失败");
}
System.err.println("createAzkabanProject result--"+result);
Map<String, String> postForObject = JSON.parseObject(result, Map.class);
String status = postForObject.get("status");
if (StringUtils.isEmpty(status)) {
throw new RuntimeException("创建项目失败");
} else {
String message = postForObject.get("message");
if ("error".equals(status)) {
if ("Project already exists.".equals(message) || message.contains("already exists in db")) {
} else {
throw new RuntimeException("创建项目失败");
}
}
}
LOGGER.info("创建项目 " + projectName + "成功");
return projectName;
}
/**
* 先创建项目 在上传zip
*
* @param sessionId 登录的会话id
* @param projectName 项目名
* @param uploadZipPath 上传的zip包全路径
* @return
*/
@SuppressWarnings("unchecked")
public Map<String, Object> uploadZip(String sessionId, String projectName, String uploadZipPath) {
String projectManageUrl = azkabanServerUrl + "/manager?token=" + sessionId;
//上传zip
FileSystemResource resource = new FileSystemResource(new File(uploadZipPath));
if (!resource.exists()) {
throw new RuntimeException("上传失败," + uploadZipPath + "不存在");
}
LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
linkedMultiValueMap.add("session.id", sessionId);
linkedMultiValueMap.add("ajax", "upload");
linkedMultiValueMap.add("project", projectName);
linkedMultiValueMap.add("file", resource);
Map<String, Object> postForObject = bulidRestTemplate().postForObject(projectManageUrl, linkedMultiValueMap, Map.class);
return postForObject;
}
/**
* 上传项目
*
* @param projectName 项目名称
* @param uploadZipPath 本地项目压缩包路径
* @return
*/
public Map<String, Object> uploadZip(String projectName, String uploadZipPath) {
String sessionId = login();
return uploadZip(sessionId, projectName, uploadZipPath);
}
/**
* 定时调度一个工作流
*
* @param sessionId 会话id
* @param projectName 项目名
* @param flowName azkaban工作流名称
* @param flowPro
* @return 调用成功后,返回调度id
*/
@SuppressWarnings("unchecked")
public Integer scheduleFlowCron(String sessionId, String projectName, String flowName, FlowPro flowPro) {
if (StringUtils.isEmpty(sessionId)) {
throw new RuntimeException("还未登陆");
}
if (StringUtils.isEmpty(projectName)) {
throw new RuntimeException("调度流程项目名不能为空");
}
if (StringUtils.isEmpty(flowName)) {
throw new RuntimeException("调度流程 流程名不能为空");
}
if (StringUtils.isEmpty(flowPro.getScheduleSetting())) {
throw new RuntimeException("调度流程 调度cron表达式不能为空");
}
String scheduleUrl = azkabanServerUrl + "/schedule?token=" + sessionId;
LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
linkedMultiValueMap.add("session.id", sessionId);
linkedMultiValueMap.add("ajax", "scheduleCronFlow");
linkedMultiValueMap.add("projectName", projectName);
linkedMultiValueMap.add("flow", flowName);
linkedMultiValueMap.add("cronExpression", flowPro.getScheduleSetting());
/*if (!StringUtils.isEmpty(workFlowTask.getSuccessEmails())) {
linkedMultiValueMap.add("successEmails", workFlowTask.getSuccessEmails());
//linkedMultiValueMap.add("successEmailsOverride", workFlowTask.isSuccessEmailsOverride());
}
if (!StringUtils.isEmpty(workFlowTask.getFailureEmails())) {
linkedMultiValueMap.add("failureEmails", workFlowTask.getFailureEmails());
//linkedMultiValueMap.add("failureEmailsOverride", workFlowTask.isFailureEmailsOverride());
}*/
Map<String, Object> postForObject = null;
try {
postForObject = bulidRestTemplate().postForObject(scheduleUrl, linkedMultiValueMap, Map.class);
} catch (Exception e) {
LOGGER.error(scheduleUrl + "----" + linkedMultiValueMap + "----调用任务调度接口异常");
e.printStackTrace();
throw new RuntimeException("定时调度接口异常");
}
if (postForObject.containsKey("error")){
throw new RuntimeException(postForObject.get("error").toString());
}
String status = (String) postForObject.get("status");
if ("error".equals(status)) {
String message = (String) postForObject.get("message");
throw new RuntimeException(message);
}
LOGGER.info("调度projectName=" + projectName + ",flowName=" + flowName + ",cronExpression=" + flowPro.getScheduleSetting() + " 成功");
return (Integer) postForObject.get("scheduleId");
}
/**
* 创建一个新的任务 ,包含 登录 创建项目 上传zip包 并定时调度
*
* @param projectName 项目名
* @param projectDesc 项目描述
* @param uploadZipPath 上传zip包的全路径
*/
public boolean loginCreateProjectuploadZipAndSchedule(String projectName, String projectDesc, String uploadZipPath, FlowPro flowPro) {
//登录
String sessionId = login();
//创建项目
createAzkabanProject(sessionId, projectName, projectDesc);
Map<String, Object> map = uploadZip(sessionId, projectName, uploadZipPath);
if (map.containsKey("error")) {
return false;
}
if (!StringUtils.isEmpty(flowPro.getScheduleSetting())) {
String flowName = flowPro.getFlowName();
//定时调度 设置不为空 直接调用接口进行调度
scheduleFlowCron(sessionId, projectName, flowName, flowPro);
}
return true;
}
/**
* 删除工作流
*
* @param project azkaban项目名
* @param projectName 任务名
* @return
*/
public Map<String, Object> deleteAzkabanFlow(String project, String projectName) {
String sessionId = login();
if (StringUtils.isEmpty(sessionId)) {
throw new RuntimeException("还未登录");
}
if (StringUtils.isEmpty(project)) {
throw new RuntimeException("项目名不能为空");
}
if (StringUtils.isEmpty(projectName)) {
throw new RuntimeException("任务名不能为空");
}
LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap();
String accessToken = "?token=" + sessionId;
Map<String, Object> resp;
String cleanUrl = "";
String projectId;
int i = 0;
try {
cleanUrl = azkabanServerUrl + "/manager" + accessToken;
cleanUrl += "&ajax=fetchprojectflows&session.id=" + sessionId + "&project=" + project;
LOGGER.info("{}.获取一个project的任务流-查询对应项目的信息及其任务流", ++i);
resp = bulidRestTemplate().getForObject(cleanUrl, Map.class);
LOGGER.info("url:{}, response:{}", cleanUrl, resp);
if (null == resp || !resp.containsKey("projectId")) {
throw new RuntimeException(projectName + "项目不存在!");
}
projectId = resp.get("projectId").toString();
cleanUrl = azkabanServerUrl + "/schedule" + accessToken;
cleanUrl += "&ajax=fetchSchedule&session.id=" + sessionId + "&projectId=" + projectId + "&flowId=" + projectName;
LOGGER.info("{}.获取指定Project,flow下的数据流的调度-查询对应工作流的调度信息", ++i);
resp = bulidRestTemplate().getForObject(cleanUrl, Map.class);
LOGGER.info("url:{}, response:{}", cleanUrl, resp);
String scheduleId = String.valueOf(EasyJsonUtils.getKeyJsonQuery(resp, "schedule.scheduleId"));
if (!"null".equals(scheduleId)) {
cleanUrl = azkabanServerUrl + "/schedule" + accessToken;
linkedMultiValueMap.clear();
linkedMultiValueMap.add("action", "removeSched");
linkedMultiValueMap.add("session.id", sessionId);
linkedMultiValueMap.add("scheduleId", scheduleId);
LOGGER.info("{}.取消工作流的调度", ++i);
resp = bulidRestTemplate().postForObject(cleanUrl, linkedMultiValueMap, Map.class);
LOGGER.info("url:{}, param:{}, response:{}", cleanUrl, new Gson().toJson(linkedMultiValueMap), resp);
if (null == resp && !resp.containsKey("status") || "error".equals(resp.get("status").toString()))
throw new RuntimeException(projectName + "取消工作流调度出错!");
}
cleanUrl = azkabanServerUrl + "/executor" + accessToken;
cleanUrl += "&ajax=getRunning&session.id=" + sessionId + "&project=" + project + "&flow=" + projectName;
LOGGER.info("{}.获取任务流正在执行的Execution-查询对应工作流的信息", ++i);
resp = bulidRestTemplate().getForObject(cleanUrl, Map.class);
LOGGER.info("url:{}, response:{}", cleanUrl, resp);
if (null != resp && resp.containsKey("execIds")) {
//List ids = JSONUtils.toObject(List.class, resp.get("execIds").toString());
List ids = JSON.parseArray(resp.get("execIds").toString()).toJavaList(String.class);
for (int j = 0; j < ids.size(); j++) {
cleanUrl = azkabanServerUrl + "/executor" + accessToken;
cleanUrl += "&ajax=cancelFlow&session.id=" + sessionId + "&execid=" + ids.get(j).toString();
LOGGER.info("{}.{}.取消流程执行-停止对应调度工作流", i, j);
resp = bulidRestTemplate().getForObject(cleanUrl, Map.class);
LOGGER.info("url:{}, response:{}", cleanUrl, resp);
}
}
cleanUrl = azkabanServerUrl + "/workflow" + accessToken;
linkedMultiValueMap.clear();
linkedMultiValueMap.add("projectName", projectName);
linkedMultiValueMap.add("type", "cleanFlow");
linkedMultiValueMap.add("project", project);
LOGGER.info("{}.清理azkaban相关表数据及缓存", ++i);
resp = bulidRestTemplate().postForObject(cleanUrl, linkedMultiValueMap, Map.class);
LOGGER.info("url:{}, param:{}, response:{}", cleanUrl, new Gson().toJson(linkedMultiValueMap), resp);
} catch (Exception e) {
LOGGER.error("工作流清理失败,cur_url-{}-param-{}", cleanUrl, new Gson().toJson(linkedMultiValueMap), e);
throw new RuntimeException("工作流清理失败:" + e.getMessage());
}
if (resp.containsKey("error")) {
throw new RuntimeException(resp.get("error").toString());
}
LOGGER.info("清理 {} 项目下的 {} 工作流成功", project, projectName);
return resp;
}
/**
* 检查项目下的流是否存在
*
* @param projectName
* @param flowName
* @return
*/
public boolean checkFlowExists(String projectName, String flowName) {
String accessToken = "?token=" + login();
String url = azkabanServerUrl + "/manager" + accessToken + "&ajax=fetchflowgraph&project=" + projectName + "&flow=" + flowName;
Map resultMap = bulidRestTemplate().getForObject(url, Map.class);
if (resultMap.containsKey("error")) {
return false;
}
return true;
}
/**
* 创建一个新的任务 ,包含 登录 创建项目 上传zip包 并立即执行
*
* @param projectName 项目名
* @param projectDesc 项目描述
* @param uploadZipPath 上传zip包的全路径
*/
public boolean loginCreateProjectuploadZipAndExecute(String projectName, String projectDesc, String uploadZipPath, String flowName) {
//登录
String sessionId = login();
//创建项目
createAzkabanProject(sessionId, projectName, projectDesc);
Map<String, Object> map = uploadZip(sessionId, projectName, uploadZipPath);
if (map.containsKey("error")) {
return false;
}
//立即执行
executeFlow(sessionId, projectName, flowName);
return true;
}
/**
* 调度一个工作流(one time)
*
* @param sessionId 会话id
* @param projectName 项目名
* @param flowName azkaban工作流名称
* @return 调用成功后,返回execid
*/
@SuppressWarnings("unchecked")
public Integer executeFlow(String sessionId, String projectName, String flowName) {
if (StringUtils.isEmpty(sessionId)) {
throw new RuntimeException("还未登陆");
}
if (StringUtils.isEmpty(projectName)) {
throw new RuntimeException("调度流程项目名不能为空");
}
if (StringUtils.isEmpty(flowName)) {
throw new RuntimeException("调度流程 流程名不能为空");
}
String executeFlowUrl = azkabanServerUrl + "/executor?token=" + sessionId;
LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
linkedMultiValueMap.add("session.id", sessionId);
linkedMultiValueMap.add("ajax", "executeFlow");
linkedMultiValueMap.add("project", projectName);
linkedMultiValueMap.add("flow", flowName);
Map<String, Object> postForObject = null;
try {
postForObject = bulidRestTemplate().postForObject(executeFlowUrl, linkedMultiValueMap, Map.class);
} catch (Exception e) {
LOGGER.error(executeFlowUrl + "----" + linkedMultiValueMap + "----调用任务调度接口异常");
e.printStackTrace();
throw new RuntimeException("调度接口异常");
}
if (postForObject.containsKey("error")){
throw new RuntimeException(postForObject.get("error").toString());
}
String status = (String) postForObject.get("status");
if ("error".equals(status)) {
String message = (String) postForObject.get("message");
throw new RuntimeException(message);
}
LOGGER.info("调度projectName=" + projectName + ",flowName=" + flowName + "one time 成功");
return (Integer) postForObject.get("execid");
}
/**
* 停止一个工作流调度
*
* @param sessionId 会话id
* @param projectName 项目名
* @param flowName azkaban工作流名称
* @return
*/
@SuppressWarnings("unchecked")
public String stopFlow(String projectName, String flowName) {
if (StringUtils.isEmpty(projectName)) {
throw new RuntimeException("调度流程项目名不能为空");
}
if (StringUtils.isEmpty(flowName)) {
throw new RuntimeException("调度流程 流程名不能为空");
}
//登录
String sessionId = login();
//找出任务正在执行的所有实例
Integer[] execIds = getFlowRuning(sessionId, projectName, flowName);
String execIdsStr = "";
//将所有的实例停止
for (Integer execid : execIds) {
LOGGER.info("execid:"+execid);
cancleFlow(sessionId, execid);
execIdsStr= execIdsStr +","+ execid;
}
return execIdsStr;
}
/**
* 获取任务所有正在执行的实例
*
* @param sessionId 会话id
* @param projectName 项目名
* @param flowName azkaban工作流名称
* @return 调用成功后,返回execid
*/
@SuppressWarnings("unchecked")
public Integer[] getFlowRuning(String sessionId, String projectName, String flowName) {
if (StringUtils.isEmpty(sessionId)) {
throw new RuntimeException("还未登陆");
}
if (StringUtils.isEmpty(projectName)) {
throw new RuntimeException("调度流程项目名不能为空");
}
if (StringUtils.isEmpty(flowName)) {
throw new RuntimeException("调度流程 流程名不能为空");
}
String executeFlowUrl = azkabanServerUrl + "/executor?token=" + sessionId;
LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
linkedMultiValueMap.add("session.id", sessionId);
linkedMultiValueMap.add("ajax", "getRunning");
linkedMultiValueMap.add("project", projectName);
linkedMultiValueMap.add("flow", flowName);
Map<String, Object> postForObject = null;
try {
postForObject = bulidRestTemplate().postForObject(executeFlowUrl, linkedMultiValueMap, Map.class);
} catch (Exception e) {
LOGGER.error(executeFlowUrl + "----" + linkedMultiValueMap + "----获取任务正在调度实例接口异常");
e.printStackTrace();
throw new RuntimeException("获取任务调度实例接口异常");
}
if (postForObject.containsKey("error")){
throw new RuntimeException(postForObject.get("error").toString());
}
String status = (String) postForObject.get("status");
if ("error".equals(status)) {
String message = (String) postForObject.get("message");
throw new RuntimeException(message);
}
Integer[] execIds = new Integer[] {};
if (postForObject.get("execIds")!=null) {
execIds = (Integer[])postForObject.get("execIds");
}
LOGGER.info("获取所有任务("+projectName+")正在调度execids"+execIds.toString());
return execIds;
}
/**
* 获取任务所有正在执行的实例
*
* @param sessionId 会话id
* @param projectName 项目名
* @param flowName azkaban工作流名称
* @return 调用成功后,返回execid
*/
@SuppressWarnings("unchecked")
public boolean cancleFlow(String sessionId, Integer execid) {
if (StringUtils.isEmpty(sessionId)) {
throw new RuntimeException("还未登陆");
}
String executeFlowUrl = azkabanServerUrl + "/executor?token=" + sessionId;
LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
linkedMultiValueMap.add("session.id", sessionId);
linkedMultiValueMap.add("ajax", "cancelFlow");
linkedMultiValueMap.add("execid", execid);
Map<String, Object> postForObject = null;
try {
postForObject = bulidRestTemplate().postForObject(executeFlowUrl, linkedMultiValueMap, Map.class);
} catch (Exception e) {
LOGGER.error(executeFlowUrl + "----" + linkedMultiValueMap + "----停止任务实例("+execid+")调度接口异常");
e.printStackTrace();
throw new RuntimeException("亭子任务调度实例("+execid+")接口异常");
}
if (postForObject.containsKey("error")){
throw new RuntimeException(postForObject.get("error").toString());
}
String status = (String) postForObject.get("status");
if ("error".equals(status)) {
String message = (String) postForObject.get("message");
throw new RuntimeException(message);
}
LOGGER.info("停止调度("+execid+")成功");
return true;
}
public static void main(String[] args) throws Exception {
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2("http://119.23.32.151:8083");
boolean dw_test = azkabanApiUtils.checkFlowExists("dw_test", "123");
System.err.println(dw_test);
}
/**
* @Title: getSyncingFlowExecution
* @Description: TODO(获取同步状态结果)
* @param @param projectId
* @param @param flow
* @param @param start
* @param @param length
* @param @return 参数
* @return List<FlowExecution> 返回类型
* @throws
*/
// public List<FlowExecution> getSyncingFlowExecution(Long projectId, String flow, Integer start, Integer length) {
//
// List<FlowExecution> list = new ArrayList<FlowExecution>();
//
// //登录
// String sessionId = login();
//
// String executeFlowUrl = azkabanServerUrl + "/manager?token=" + sessionId;
// Map<String, Object> paramMap = new HashMap<String, Object>();
// paramMap.put("session.id", sessionId);
// paramMap.put("ajax", "fetchFlowExecutions");
// paramMap.put("project", "jz_localflow_"+projectId);
// paramMap.put("flow", flow);
// paramMap.put("start", start);
// paramMap.put("length", length);
//
// JSONObject getForObject = null;
// try {
// //postForObject = bulidRestTemplate().postForObject(executeFlowUrl, linkedMultiValueMap, Map.class);
// //String getForObjectStr = bulidRestTemplate().getForObject(executeFlowUrl, String.class, paramMap);
// String getForObjectStr = HttpClientUtils.getJsonForParam(executeFlowUrl, paramMap);
// getForObject = JSON.parseObject(getForObjectStr);
// } catch (Exception e) {
// LOGGER.error(executeFlowUrl + "----" + paramMap + "----获取同步状态结果接口异常");
// e.printStackTrace();
// throw new RuntimeException("获取同步状态结果接口异常");
// }
//
// if (getForObject==null) {
// return list;
// }
//
// if (getForObject.containsKey("error")){
// throw new RuntimeException(getForObject.get("error").toString());
// }
// String status = (String) getForObject.get("status");
// if ("error".equals(status)) {
// String message = (String) getForObject.get("message");
// throw new RuntimeException(message);
// }
//
// String executions = getForObject.getString("executions");
//
// LOGGER.info("获取同步状态结果成功:"+executions);
//
// list = JSONObject.parseArray(executions, FlowExecution.class);
//
// return list;
// }
}
package com.jz.common.utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public class EasyJsonUtils {
public static Object getKeyJsonQuery(Map<?, ?> map, String jsonKey) {
String[] arrs = jsonKey.split("\\.");
Object obj = null;
for (int i = 0; i < arrs.length; i++) {
if (i == 0) {
obj = handle(map, arrs[i]);
} else {
if (obj instanceof Map) {
obj = handle((Map<?, ?>) obj, arrs[i]);
} else if (obj instanceof List) {
obj = handle((List<Map<?, ?>>) obj, arrs[i]);
}
}
}
return obj;
}
//解析map类型的对象
public static Object handle(Map<?, ?> map, String key) {
return map.get(key);
}
// 解析list类型的对象
@SuppressWarnings("unchecked")
public static Object handle(List<Map<?, ?>> listMap, String key) {
List<Map<String, Object>> listMaps = new ArrayList<>();
List<Object> lists = new ArrayList<>();
// 这里用到了java8的一个新特性 stream 流
listMap.stream().filter(e -> null != e.get(key)).forEach(e -> {
//表示既不是map 又不是list类型
if (!(e.get(key) instanceof Map<?, ?>) && !(e.get(key) instanceof List<?>)) {
lists.add(e.get(key));
} else {
listMaps.addAll((Collection<? extends Map<String, Object>>) e.get(key));
}
});
return lists.size() > 0 && !lists.isEmpty() ? lists : listMaps;
}
public static void main(String[] args) {
String resp = "{\"schedule\":{\"cronExpression\":\"0 * 9 ? * *\",\"nextExecTime\":\"2017-04-01 09:00:00\",\"period\":\"null\",\"submitUser\":\"azkaban\",\"executionOptions\":{\"notifyOnFirstFailure\":false,\"notifyOnLastFailure\":false,\"failureEmails\":[],\"successEmails\":[],\"pipelineLevel\":null,\"queueLevel\":0,\"concurrentOption\":\"skip\",\"mailCreator\":\"default\",\"memoryCheck\":true,\"flowParameters\":{},\"failureAction\":\"FINISH_CURRENTLY_RUNNING\",\"failureEmailsOverridden\":false,\"successEmailsOverridden\":false,\"pipelineExecutionId\":null,\"disabledJobs\":[]},\"scheduleId\":\"3\",\"firstSchedTime\":\"2017-03-31 11:45:21\"}}";
}
}
package com.jz.common.utils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.FileWriterWithEncoding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.io.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 删除文件和目录
*
*/
public class FileUtils {
private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
/**
* 写文件
* @param fileAbsolutePath 文件绝对路径
* @param fileContent 要写入的文件内容
*/
@SuppressWarnings("unchecked")
public static void write(String fileAbsolutePath, Object fileContentObject) {
FileWriterWithEncoding fileWriter = null;
BufferedWriter bufferedWriter = null;
try {
fileWriter = new FileWriterWithEncoding(fileAbsolutePath, "UTF-8") ;
bufferedWriter = new BufferedWriter(fileWriter);
if (fileContentObject instanceof String) {
String fileContent = (String) fileContentObject;
bufferedWriter.write(fileContent);
}else if ( fileContentObject instanceof List) {
List<String> fileContentList = (List<String>) fileContentObject;
for (String fileContent : fileContentList) {
bufferedWriter.write(fileContent+"\n");
bufferedWriter.flush();
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
throw new RuntimeException(fileAbsolutePath+" 写文件异常");
}catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(fileAbsolutePath+" 写文件异常");
}finally {
if (bufferedWriter != null) {
try {
bufferedWriter.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fileWriter != null) {
try {
fileWriter.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 删除文件,可以是文件或文件夹
*
* @param fileName
* 要删除的文件名
* @return 删除成功返回true,否则返回false
*/
public static boolean delete(String fileName) {
File file = new File(fileName);
if (!file.exists()) {
logger.info("删除文件失败:" + fileName + "不存在!");
return false;
} else {
if (file.isFile())
return deleteFile(fileName);
else
return deleteDirectory(fileName);
}
}
/**
* 删除单个文件
*
* @param fileName
* 要删除的文件的文件名
* @return 单个文件删除成功返回true,否则返回false
*/
public static boolean deleteFile(String fileName) {
File file = new File(fileName);
// 如果文件路径所对应的文件存在,并且是一个文件,则直接删除
if (file.exists() && file.isFile()) {
if (file.delete()) {
logger.info("删除单个文件" + fileName + "成功!");
return true;
} else {
logger.info("删除单个文件" + fileName + "失败!");
return false;
}
} else {
logger.info("删除单个文件失败:" + fileName + "不存在!");
return false;
}
}
/**
* 删除目录及目录下的文件
*
* @param dir
* 要删除的目录的文件路径
* @return 目录删除成功返回true,否则返回false
*/
public static boolean deleteDirectory(String dir) {
// 如果dir不以文件分隔符结尾,自动添加文件分隔符
if (!dir.endsWith(File.separator))
dir = dir + File.separator;
File dirFile = new File(dir);
// 如果dir对应的文件不存在,或者不是一个目录,则退出
if ((!dirFile.exists()) || (!dirFile.isDirectory())) {
logger.info("删除目录失败:" + dir + "不存在!");
return false;
}
boolean flag = true;
// 删除文件夹中的所有文件包括子目录
File[] files = dirFile.listFiles();
for (int i = 0; i < files.length; i++) {
// 删除子文件
if (files[i].isFile()) {
flag = FileUtils.deleteFile(files[i].getAbsolutePath());
if (!flag)
break;
}
// 删除子目录
else if (files[i].isDirectory()) {
flag = FileUtils.deleteDirectory(files[i]
.getAbsolutePath());
if (!flag)
break;
}
}
if (!flag) {
logger.info("删除目录失败!");
return false;
}
return true;
}
/**
* 拷贝目录
* @param sourceDir 源目录
* @param destinationDir 目标目录
* @throws IOException
*/
public static void copyDir(String sourceDir, String destinationDir){
File file = new File(sourceDir);
String[] filePath = file.list();
if (!(new File(destinationDir)).exists()) {
(new File(destinationDir)).mkdir();
}
for (int i = 0; i < filePath.length; i++) {
if ((new File(sourceDir + "/" + filePath[i])).isDirectory()) {
copyDir(sourceDir + "/" + filePath[i], destinationDir + "/" + filePath[i]);
}
if (new File(sourceDir + "/" + filePath[i]).isFile()) {
copyFile(sourceDir + "/" + filePath[i], destinationDir + "/" + filePath[i]);
}
}
}
/**
* 拷贝文件
* @param sourceFilePath 源文件路劲
* @param destinationFilePath 目标文件路径
* @throws IOException
*/
public static void copyFile(String sourceFilePath, String destinationFilePath) {
File sourceFile = new File(sourceFilePath);
File destinationFile = new File(destinationFilePath);
FileInputStream in = null ;
FileOutputStream out = null ;
try {
in = new FileInputStream(sourceFile);
out = new FileOutputStream(destinationFile);
try {
IOUtils.copy(in, out);
} catch (IOException e) {
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (out != null ) {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 列出文件名
* @param dir 要列出的目录
* @param suffix 要过滤的文件名后缀
* @return
*/
public static Map<String, Integer> listDirFiles (String dir,String suffix){
if (StringUtils.isEmpty(dir)) {
return null;
}
File file = new File(dir);
if (file.exists()) {
Map<String, Integer> map = new HashMap<>();
File[] listFiles = file.listFiles();
for (File file2 : listFiles) {
String fileName = file2.getName();
if (fileName.endsWith(suffix)) {
fileName = fileName.substring(0,fileName.lastIndexOf(suffix));
map.put(fileName, 1);
}
}
return map;
}
return null;
}
public static void main(String[] args) throws NoSuchFieldException, SecurityException {
// // 删除单个文件
// String file = "c:/test/test.txt";
// DeleteFileUtil.deleteFile(file);
// logger.info();
// 删除一个目录
//String dir = "D:/1/aa.sh";
//FileUtils.deleteDirectory(dir);
// logger.info();
// // 删除文件
// dir = "c:/test/test0";
// DeleteFileUtil.delete(dir);
/*List<String> list = new ArrayList<>();
list.add("#!/bin/sh");
list.add("echo 123");
list.add("sleep 5000");
write(dir, list);*/
/*String sourceDir = "d:/source/zhangsad/182";
String destinationDir = "d:/target";
copyDir(destinationDir,sourceDir);*/
Map<String, Integer> listDirFiles = listDirFiles("D:\\10\\temp\\111\\55", ".job");
System.err.println(listDirFiles);
}
}
\ No newline at end of file
package com.jz.common.utils;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.util.JSONPObject;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
public class JsonMapper extends ObjectMapper {
private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(JsonMapper.class);
private static JsonMapper mapper;
public JsonMapper() {
this(Include.NON_EMPTY);
}
public JsonMapper(Include include) {
if (include != null) {
this.setSerializationInclusion(include);
}
this.enableSimple();
this.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
this.getSerializerProvider().setNullValueSerializer(new JsonSerializer<Object>(){
@Override
public void serialize(Object value, JsonGenerator jgen,
SerializerProvider provider) throws IOException,
JsonProcessingException {
jgen.writeString("");
}
});
this.registerModule(new SimpleModule().addSerializer(String.class, new JsonSerializer<String>(){
@Override
public void serialize(String value, JsonGenerator jgen,
SerializerProvider provider) throws IOException,
JsonProcessingException {
jgen.writeString(StringEscapeUtils.unescapeHtml4(value));
}
}));
this.setTimeZone(TimeZone.getDefault());//getTimeZone("GMT+8:00")
}
public static JsonMapper getInstance() {
if (mapper == null){
mapper = new JsonMapper().enableSimple();
}
return mapper;
}
public static JsonMapper nonDefaultMapper() {
if (mapper == null){
mapper = new JsonMapper(Include.NON_DEFAULT);
}
return mapper;
}
public String toJson(Object object) {
try {
return this.writeValueAsString(object);
} catch (IOException e) {
logger.warn("write to json string error:" + object, e);
return null;
}
}
public <T> T fromJson(String jsonString, Class<T> clazz) {
if (StringUtils.isEmpty(jsonString)) {
return null;
}
try {
return this.readValue(jsonString, clazz);
} catch (IOException e) {
logger.warn("parse json string error:" + jsonString, e);
return null;
}
}
@SuppressWarnings("unchecked")
public <T> T fromJson(String jsonString, JavaType javaType) {
if (StringUtils.isEmpty(jsonString)) {
return null;
}
try {
return (T) this.readValue(jsonString, javaType);
} catch (IOException e) {
logger.warn("parse json string error:" + jsonString, e);
return null;
}
}
public JavaType createCollectionType(Class<?> collectionClass, Class<?>... elementClasses) {
return this.getTypeFactory().constructParametricType(collectionClass, elementClasses);
}
@SuppressWarnings("unchecked")
public <T> T update(String jsonString, T object) {
try {
return (T) this.readerForUpdating(object).readValue(jsonString);
} catch (JsonProcessingException e) {
logger.warn("update json string:" + jsonString + " to object:" + object + " error.", e);
} catch (IOException e) {
logger.warn("update json string:" + jsonString + " to object:" + object + " error.", e);
}
return null;
}
public String toJsonP(String functionName, Object object) {
return toJson(new JSONPObject(functionName, object));
}
public JsonMapper enableEnumUseToString() {
this.enable(SerializationFeature.WRITE_ENUMS_USING_TO_STRING);
this.enable(DeserializationFeature.READ_ENUMS_USING_TO_STRING);
return this;
}
public JsonMapper enableSimple() {
this.configure(Feature.ALLOW_SINGLE_QUOTES, true);
this.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
return this;
}
public ObjectMapper getMapper() {
return this;
}
public static String toJsonString(Object object){
return JsonMapper.getInstance().toJson(object);
}
public static Object fromJsonString(String jsonString, Class<?> clazz){
return JsonMapper.getInstance().fromJson(jsonString, clazz);
}
public static void main(String[] args) {
List<Map<String, Object>> list = Lists.newArrayList();
Map<String, Object> map = Maps.newHashMap();
map.put("id", 1);
map.put("pId", -1);
map.put("name", "根节点");
list.add(map);
map = Maps.newHashMap();
map.put("id", 2);
map.put("pId", 1);
map.put("name", "你好");
map.put("open", true);
list.add(map);
String json = JsonMapper.getInstance().toJson(list);
System.out.println(json);
}
}
package com.jz.common.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.io.*;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.zip.*;
public class ZipUtils {
private static final Logger logger = LoggerFactory.getLogger(ZipUtils.class);
/**
* 列出zip压缩包中的文件
* @param toRemoveFileNameMap 返回结果里要删除的
* @param zipPathName
* @throws Exception
*/
public static Map<String, Integer> listZipFile(Map<String, Integer> toRemoveFileNameMap, String zipPathName,String suffix){
Map<String, Integer> fileNameMap = new HashMap<>();
ZipFile zipFile = null;
try {
File file = new File(zipPathName);
if (!file.exists()) {
return fileNameMap;
}
zipFile = new ZipFile(file);
Enumeration<? extends ZipEntry> zipEntrys = zipFile.entries();
while(zipEntrys.hasMoreElements()){
ZipEntry zipEntry = zipEntrys.nextElement();
String fileName = zipEntry.getName();
if (!StringUtils.isEmpty(fileName) && fileName.endsWith(suffix)) {
fileName = fileName.substring(0, fileName.lastIndexOf(suffix));
if (CollectionUtils.isEmpty(toRemoveFileNameMap)) {
fileNameMap.put(fileName, 1);
}else {
if(!toRemoveFileNameMap.containsKey(fileName)) {
fileNameMap.put(fileName, 1);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if (zipFile != null) {
try {
zipFile.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return fileNameMap;
}
/**
* 递归压缩文件夹
*
* @param srcRootDir
* 压缩文件夹根目录的子路径
* @param file
* 当前递归压缩的文件或目录对象
* @param zos
* 压缩文件存储对象
* @throws Exception
*/
private static void zip(String srcRootDir, File file, ZipOutputStream zos) throws IOException {
if (file == null) {
return;
}
// 如果是文件,则直接压缩该文件
if (file.isFile()) {
int count, bufferLen = 1024;
byte data[] = new byte[bufferLen];
// 获取文件相对于压缩文件夹根目录的子路径
String subPath = file.getAbsolutePath();
int index = subPath.indexOf(srcRootDir);
if (index != -1) {
subPath = subPath.substring(srcRootDir.length() + File.separator.length());
}
BufferedInputStream bis = null ;
FileInputStream fis = null;
try {
ZipEntry entry = new ZipEntry(file.getName());
zos.putNextEntry(entry);
fis = new FileInputStream(file);
bis = new BufferedInputStream(fis);
while ((count = bis.read(data, 0, bufferLen)) != -1) {
zos.write(data, 0, count);
}
}finally {
if (bis != null) {
try {
bis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
zos.closeEntry();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 如果是目录,则压缩整个目录
else {
// 压缩目录中的文件或子目录
File[] childFileList = file.listFiles();
for (int n = 0; n < childFileList.length; n++) {
childFileList[n].getAbsolutePath().indexOf(file.getAbsolutePath());
zip(srcRootDir, childFileList[n], zos);
}
}
}
/**
* 对文件或文件目录进行压缩
*
* @param srcPath 要压缩的目录
* @param zipTargetDirPath 压缩包文件所保存的路径。注意:zipPath不能是srcPath路径下的子文件夹
* @param zipFileName 压缩包文件名
* @throws Exception
*/
public static void zip(String projectSourcePath, String zipTargetDirPath, String zipTargetFileName) {
// 创建压缩文件保存的文件对象
String zipTargetFilePath = zipTargetDirPath + zipTargetFileName;
CheckedOutputStream cos = null;
ZipOutputStream zos = null;
File zipTargetFile = new File(zipTargetFilePath);
File backupFile = new File(zipTargetFilePath+".backup");
FileOutputStream fos = null ;
try {
// 判断压缩文件保存的路径是否存在,如果不存在,则创建目录
File zipTargetDir = new File(zipTargetDirPath);
if (!zipTargetDir.exists()) {
zipTargetDir.mkdirs();
}
logger.info("zipTargetDirPath----"+zipTargetDirPath+",zipTargetFileName="+zipTargetFileName);
if (zipTargetFile.exists()) {
// 重命名为备份文件 如果成功就删除备份文件 如果失败 重命名为zip文件
zipTargetFile.renameTo(backupFile);
}
fos = new FileOutputStream(zipTargetFile);
cos = new CheckedOutputStream(fos, new CRC32());
zos = new ZipOutputStream(cos);
// 调用递归压缩方法进行目录或文件压缩
File projectSourceFile = new File(projectSourcePath);
zip(projectSourcePath, projectSourceFile, zos);
zos.flush();
} catch (Exception e) {
if (backupFile.exists()) {
backupFile.renameTo(zipTargetFile);
}
e.printStackTrace();
throw new RuntimeException("压缩"+projectSourcePath+"错误");
} finally {
try {
if (zos != null) {
zos.close();
}
} catch (Exception e) {
e.printStackTrace();
}
if (cos != null) {
try {
cos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws ZipException, IOException {
/*String dir = "D:\\source";
String zipPath = "d:\\target";
String zipFileName = "test.zip";
try {
zip(dir, zipPath, zipFileName);
} catch (Exception e) {
e.printStackTrace();
}*/
/*String zipFilePath = "D:\\10target\\111\\azkaban_upload_project_111.zip";
Map<String, Integer> listZipFile = listZipFile(zipFilePath,".job");
System.err.println(listZipFile);*/
// File file = new File("D:\\10\\target\\111\\azkaban_upload_project_111.zip");
File file = new File("D:\\开发环境软件\\nginx-1.12.2.zip");
System.err.println(file.exists());
ZipFile zipFile = new ZipFile(file);
Enumeration<? extends ZipEntry> zipEntrys = zipFile.entries();
while(zipEntrys.hasMoreElements()){
ZipEntry zipEntry = zipEntrys.nextElement();
String fileName = zipEntry.getName();
System.out.println("文件名"+fileName);
}
}
}
\ No newline at end of file
package com.jz.common.utils.web;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
/**
* @ClassName: SessionUtils
* @Description: TODO(session相关工具类)
* @author ybz
* @date 2017年4月21日 下午5:11:16
*
*/
public class SessionUtils {
/**
* @Title: getRequest
* @Description: TODO(获取request)
* @param @return 设定文件
* @return HttpServletRequest 返回类型
* @throws
*/
public static HttpServletRequest getRequest() {
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
return requestAttributes == null ? null : requestAttributes.getRequest();
}
/**
* @Title: getSession
* @Description: TODO(获取session)
* @param @return 设定文件
* @return HttpSession 返回类型
* @throws
*/
public static HttpSession getSession(){
return getRequest().getSession(false);
}
/**
* @Title: getSessionAttribute
* @Description: TODO(获取session属性)
* @param @param name
* @param @return 设定文件
* @return Object 返回类型
* @throws
*/
public static Object getSessionAttribute(String name){
HttpServletRequest request = getRequest();
return request == null?null:request.getSession().getAttribute(name);
}
/**
* @Title: setSessionAttribute
* @Description: TODO(设置session属性值)
* @param @param name
* @param @param value 设定文件
* @return void 返回类型
* @throws
*/
public static void setSessionAttribute(String name,Object value){
HttpServletRequest request = getRequest();
if(request!=null){
request.getSession().setAttribute(name, value);
}
}
/**
* @Title: getSessionUserId
* @Description: TODO(获取登录用户ID)
* @param @return
* @param @throws Exception 设定文件
* @return long 返回类型
* @throws
*/
/* public static Integer getSessionUserId() throws Exception {
DmpSystemUser user = (DmpSystemUser) SessionUtils.getSessionAttribute("LOGIN_USER");
if (user != null && user.getId() != null && user.getId() != 0l) {
return user.getId();
} else {
return null;
}
}*/
/**
* @Title: hasLoginStatus
* @Description: TODO(通用登录态校验)
* @return boolean
*/
/* public static boolean hasLoginStatus() {
try {
return getSessionUserId() != null;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}*/
/**
* @Title: getSecurityUser
* @Description: TODO(获取登录用户)
* @param @return 设定文件
* @return DspSystemUser 返回类型
* @throws
*/
/* public static DmpSystemUser getSecurityUser(){
Object principal = SecurityContextHolder.getContext().getAuthentication().getPrincipal();
if (principal==null || "anonymousUser".equals(principal)) {
return null;
}
return (DmpSystemUser)principal;
}
public static DmpSystemUser getSecurityDspUser(){
Object principal = SecurityContextHolder.getContext().getAuthentication().getPrincipal();
if (principal==null || "anonymousUser".equals(principal)) {
return null;
}
return (DmpSystemUser)principal;
}*/
}
......@@ -10,6 +10,7 @@ import com.jz.dmp.modules.service.OfflineSynchService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
......@@ -74,6 +75,22 @@ public class OfflineSynchController {
@ApiImplicitParam(name = "sourceDbId", value = "源数据库id")
public JsonResult getSourceTableList(@RequestParam Integer sourceDbId,@RequestParam(value = "targetName", required = false) String targetName) throws Exception {
JsonResult list = offlineSynchService.querygSourceTableList(sourceDbId,targetName);
return new JsonResult(ResultCode.SUCCESS, list);
return list;
}
/**
* 任务立即运行
*
* @return
*/
@ApiOperation(value = "任务立即运行", notes = "任务立即运行")
@GetMapping(value = "/taskRunNowByTaskId")
@ApiImplicitParam(name = "taskId", value = "任务id")
public JsonResult getTaskRunNowByTaskId(@RequestParam(value = "taskId") String taskId) throws Exception {
if(StringUtils.isEmpty(taskId)){
return new JsonResult(ResultCode.PARAMS_ERROR);
}
JsonResult list = offlineSynchService.taskRunNowByTaskId(taskId);
return list;
}
}
package com.jz.dmp.modules.controller.DataIntegration.bean.flow;
/**
* @ClassName: FlowExecution
* @Description: TODO(封装同步状态结果)
* @author ybz
* @date 2020年12月11日
*
*/
public class FlowExecution {
private long submitTime;
private String submitUser;
private long startTime;
private long endTime;
private String flowId;
private long projectId;
private long execId;
private String status;
public long getSubmitTime() {
return submitTime;
}
public void setSubmitTime(long submitTime) {
this.submitTime = submitTime;
}
public String getSubmitUser() {
return submitUser;
}
public void setSubmitUser(String submitUser) {
this.submitUser = submitUser;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
public String getFlowId() {
return flowId;
}
public void setFlowId(String flowId) {
this.flowId = flowId;
}
public long getProjectId() {
return projectId;
}
public void setProjectId(long projectId) {
this.projectId = projectId;
}
public long getExecId() {
return execId;
}
public void setExecId(long execId) {
this.execId = execId;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
package com.jz.dmp.modules.controller.DataIntegration.bean.flow;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 流程每个节点
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FlowNode implements Serializable {
/**
* 节点id
*/
private String nodeId;
/**
* 节点名称
*/
private String nodeName;
/**
* 节点类型
*/
private String nodeType;
/**
* 节点数据
*/
private String nodeData;
/**
* 依赖的节点名称
*/
private String dependedNodeName;
/**
* 节点位置
*/
private String nodeLocation;
/**
* 节点内容
*/
private String script;
/**
* 重试次数
*/
private String retryTimes;
/**
* 重试间隔
*/
private String retryInterval;
}
package com.jz.dmp.modules.controller.DataIntegration.bean.flow;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class FlowNodeChangeInfo implements Serializable {
/**
* 变更节点名称
*/
private String nodeName;
/**
* 变更后的数据
*/
private String nodeData;
/**
* 变更类型 新增 更新 删除
*/
private String changedType;
/**
* 节点类型
*/
private String nodeType;
/**
* 变更人
*/
private String createBy;
/**
* 变更时间
*/
private String updateDateStr;
/**
* 版本
*/
private int version;
}
package com.jz.dmp.modules.controller.DataIntegration.bean.flow;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class FlowPro implements Serializable {
/**
* 工作流名称
*/
private String flowName;
/**
* 调度周期设置
*/
private String scheduleSetting;
/**
* 失败选项
*/
private String afterFailSelect;
/// ----- 作为接收参数用
/**
* 流程项目id
*/
private Long projectId;
/**
* 流程要发布到的项目id
*/
private Long publishedToProjectId;
/**
* dmp里生成的任务id
*/
private Long taskId;
/**
* dmp里的树id
*/
private Long treeId;
/**
* 整个流程图数据
*/
private String flowJson;
/**
* 是否带版本号进行节点变更查询
*/
private boolean isCheckVerion;
/**
* 检查节点名称要用到的参数
*/
private String nodeName;
/**
*
*/
private String isSubmit = "0";
}
......@@ -23,4 +23,6 @@ public interface OfflineSynchDao {
DmpSyncingDatasource queryDmpSyncingDatasource(@Param("sourceDbId") Integer sourceDbId) throws Exception;
DmpAgentDatasourceInfo querySourceDbInfoBySourceId(@Param("sourceDbId") Integer sourceDbId) throws Exception;
Map<String,Object> selectNavigationTreeByTaskId(String taskId) throws Exception;
}
\ No newline at end of file
......@@ -21,5 +21,7 @@ public interface OfflineSynchService {
JsonResult querygSourceDbList(Integer projectId) throws Exception;
JsonResult querygSourceTableList(Integer sourceDbId,String targetName) throws Exception;
JsonResult querygSourceTableList(Integer sourceDbId, String targetName) throws Exception;
JsonResult taskRunNowByTaskId(String taskId) throws Exception;
}
......@@ -7,7 +7,10 @@ import com.jz.common.constant.JsonResult;
import com.jz.common.constant.ResultCode;
import com.jz.common.page.PageInfoResponse;
import com.jz.common.persistence.BaseService;
import com.jz.common.utils.AzkabanApiUtils2;
import com.jz.common.utils.FileUtils;
import com.jz.common.utils.JsonMapper;
import com.jz.common.utils.ZipUtils;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.modules.controller.DataIntegration.bean.SourceDbNameListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.TaskListPageDto;
......@@ -15,6 +18,8 @@ import com.jz.dmp.modules.controller.DataIntegration.bean.TaskListPageReq;
import com.jz.dmp.modules.dao.DmpProjectDao;
import com.jz.dmp.modules.dao.OfflineSynchDao;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import com.jz.dmp.modules.model.DmpProjectSystemInfo;
import com.jz.dmp.modules.model.DmpSyncingDatasource;
import com.jz.dmp.modules.service.OfflineSynchService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -22,6 +27,8 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -80,20 +87,99 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
public JsonResult querygSourceTableList(Integer sourceDbId, String targetName) throws Exception {
//通过源数据库id ,查询数据源配置
DmpAgentDatasourceInfo dsInfo = offlineSynchDao.querySourceDbInfoBySourceId(sourceDbId);
//解码数据库密码
//解码数据库密码
if (StringUtils.isNotBlank(dsInfo.getPassword())) {
dsInfo.setPassword(new BaseService().decode(dsInfo.getPassword(), publicKey));
}
if (StringUtils.isNotBlank(targetName)) {
dsInfo.setTargetFileName(targetName);
}
//创建jdbc,获取源数据表
DmpAgentResult rst = dmpDsAgentServiceImp.getTableNameList(dsInfo);
if (!rst.getCode().val().equals("200")) {
return new JsonResult(rst.getCode(), rst.getMessage());
} else {
//成功
rst.setResult(JsonMapper.fromJsonString(rst.getMessage(), List.class));
return new JsonResult(ResultCode.SUCCESS, rst);
}
}
/**
* 立即运行
*/
@Override
public JsonResult taskRunNowByTaskId(String taskId) throws Exception {
//根据任务id,查询DMP资源导航树和DW系统配置信息
Map<String,Object> map = offlineSynchDao.selectNavigationTreeByTaskId(taskId);
if (map.size() > 0 && map != null) {
this.publish(map);
}
return null;
}
/**
* 发布流程
*/
private boolean publish(Map<String,Object> map) {
String taskId = map.get("taskId").toString(); //任务id
String projectId = map.get("projectId").toString(); //项目id
String treeName = map.get("treeName").toString(); //流程名称
String azkabanExectorXmlExec = map.get("azkabanExectorXmlExec").toString(); //执行数据同步任务命令
String azkabanLocalTaskFilePath = map.get("azkabanLocalTaskFilePath").toString(); //文件路径
String azkabanMonitorUrl = map.get("azkabanMonitorUrl").toString();//AZKABAN WEB服务地址
String azkabanJobCommand = "command=" + azkabanExectorXmlExec + " " + projectId + " ${azkaban.flow.flowid} ${azkaban.job.id} ${azkaban.flow.execid} " + treeName;
/**
* 当前任务生成文件存放根路径
*/
String localTaskPath = azkabanLocalTaskFilePath + "/" + projectId + "/local/" + taskId;
File localTaskFile = new File(localTaskPath);
if (!localTaskFile.exists()) {
localTaskFile.mkdirs();
}
String localTaskSourcePath = localTaskPath + "/source/";
File localTaskSourceFile = new File(localTaskSourcePath);
if (!localTaskSourceFile.exists()) {
localTaskSourceFile.mkdirs();
}
String localTaskExecArgsPath = localTaskPath + "/execArgs/";
File localTaskExecArgsFile = new File(localTaskExecArgsPath);
if (!localTaskExecArgsFile.exists()) {
localTaskExecArgsFile.mkdirs();
}
String localTaskZipPath = localTaskPath + "/target/";
File localTaskZipFile = new File(localTaskZipPath);
if (!localTaskZipFile.exists()) {
localTaskZipFile.mkdirs();
}
//先删除上次创建的文件 ---每次都会新建
FileUtils.deleteDirectory(localTaskPath);
List<String> contents = new ArrayList<>();
// 子流程类型
contents.add("type=command");
contents.add(azkabanJobCommand);
// 生成job文件
String jobFileAbsolutePath = localTaskSourcePath + treeName + ".job";
FileUtils.write(jobFileAbsolutePath, contents);
String localZipTargetFileName = treeName + ".zip";
ZipUtils.zip(localTaskSourcePath, localTaskZipPath, localZipTargetFileName);
//上传到azkaban todo
//上次zip包到azkaban
String localTaskZipAbsolutePath = localTaskZipPath + "/" + localZipTargetFileName;
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl);
return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_localflow_" + projectId, "local_sync_project", localTaskZipAbsolutePath, treeName);
}
}
......@@ -130,4 +130,19 @@
LEFT JOIN dmp_project_system_info t3 ON t1.PROJECT_ID = t3.PROJECT_ID AND t3.data_status = '1'
WHERE 1 = 1 AND t1.id = #{sourceDbId}
</select>
<select id="selectNavigationTreeByTaskId" parameterType="string" resultType="map">
select
t1.ID as taskId,
t1.TREE_ID as treeId,
t2.PROJECT_ID as projectId,
t2.NAME as treeName,
t3.AZKABAN_EXECTOR_XML_EXEC as azkabanExectorXmlExec,
t3.AZKABAN_LOCAL_TASK_FILE_PATH as azkabanLocalTaskFilePath,
t3.AZKABAN_MONITOR_URL as azkabanMonitorUrl
from dmp_develop_task t1
left join dmp_navigation_tree t2 on t1.TREE_ID=t2.ID
left join dmp_project_system_info t3 on t2.PROJECT_ID=t3.PROJECT_ID and t3.data_status = '1'
where 1=1 and t1.id = #{taskId}
</select>
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment