Commit 42bec0a7 authored by 宋朋's avatar 宋朋

songpeng update

parent 38bbe60f
......@@ -10,26 +10,29 @@ from sources import Sources, Sinks
def getColumns(x):
sql_list = []
sql_file_list = []
list = [i for i in json.loads(x).get('col')]
for i in range(0, len(list)):
if json.loads(x).get('tm').get(list[i]) == "md5":
sql_list.append(' md5('+list[i]+') as '+list[i]+", "+list[i]+' as t_'+list[i])
sql_file_list.append(' md5('+list[i]+') as '+list[i])
elif json.loads(x).get('tm').get(list[i]) == "rtl":
sql_list.append(' rtl('+list[i]+') as '+list[i]+", "+list[i]+' as t_'+list[i])
sql_file_list.append(' rtl('+list[i]+') as '+list[i])
else:
sql_list.append(' ' + list[i])
sql_file_list.append(' ' + list[i])
col = ""
col_file = ""
for i in sql_list:
col += i + ','
for i in sql_file_list:
col_file += i + ','
try:
sql_list = []
sql_file_list = []
list = [i for i in json.loads(x).get('col')]
for i in range(0, len(list)):
if json.loads(x).get('tm').get(list[i]) == "md5":
sql_list.append(' md5('+list[i]+') as '+list[i]+", "+list[i]+' as t_'+list[i])
sql_file_list.append(' md5('+list[i]+') as '+list[i])
elif json.loads(x).get('tm').get(list[i]) == "rtl":
sql_list.append(' rtl('+list[i]+') as '+list[i]+", "+list[i]+' as t_'+list[i])
sql_file_list.append(' rtl('+list[i]+') as '+list[i])
else:
sql_list.append(' ' + list[i])
sql_file_list.append(' ' + list[i])
col = ""
col_file = ""
for i in sql_list:
col += i + ','
for i in sql_file_list:
col_file += i + ','
except Exception as e:
print("index error:{0}".format(e))
sql = "select " + col + " current_timestamp() as create_time from kino"
sql_file = "select " + col_file[0:-1] + " from kino"
......
from common import Constants
import exception.MyErrorException as ex
import os
# 写出到 MySQL
from sources import Sources
def sinkMySql(sinkDF, driver, url, user, password, table, mode=Constants.SAVE_MODE_OVERWRITE):
sinkDF.write.mode(mode) \
.format("jdbc") \
......@@ -20,7 +17,7 @@ def sinkMySql(sinkDF, driver, url, user, password, table, mode=Constants.SAVE_MO
# 写出到FTP
'''
相关参数, 例如:
相关参数, 例如:
host: 192.168.31.45
username: raomin@9zdata.cn
password: 9zdata123.
......@@ -38,25 +35,30 @@ def sinkFTP(sinkDF, host, user, password, filetype, file, delimiter):
# sink HDFS
def sinkHDFS(sinkDF, path, delimiter, format="parquet", mode=Constants.SAVE_MODE_OVERWRITE):
sinkDF\
.write\
.format(format) \
.option("delimiter", delimiter) \
.option("quote", '"') \
.mode(mode)\
.save(path)
if os.system("hdfs dfs -test -e path") != 0:
print("hdfsPath not exists")
else:
sinkDF\
.write\
.format(format) \
.option("delimiter", delimiter) \
.option("quote", '"') \
.mode(mode)\
.save(path)
# 修改 MySQL 状态
def updateMysqlStatus(i_status, i_readcount, i_sinkcount, i_id):
try:
# see = 1 /0
conn = Sources.getMySqlConnect()
# 创建游标对象,用来给数据库发送sql语句
cur = conn.cursor()
serch_data = """update uptable set status={status}, readcount={readcount}, sinkcount={sinkcount} where id = {id}"""\
.format(status=i_status, readcount=i_readcount, sinkcount=i_sinkcount, id=i_id)
result = cur.execute(serch_data)
except ex.MyError as e:
print("update error", e.errorinfo)
except Exception as e:
print("update error: {0}".format(e))
finally:
cur.close() # 关闭数据库连接
conn.close()
\ No newline at end of file
conn.close()
......@@ -15,8 +15,8 @@ def getMySqlConnect():
charset="utf8", # 指定编码格式
autocommit=True, # 如果插入数据,, 是否自动提交? 和conn.commit()功能一致。
)
except ex.MyError as e:
print("connection error", e.errorinfo)
except Exception as e:
print("connection error:{0}".format(e))
Sinks.updateMysqlStatus(i_status=1, i_readcount=0, i_sinkcount=0, i_id=1)
return conn
......@@ -30,10 +30,9 @@ def getRecordByIdResultT1(id):
result = cur.execute(serch_data)
# 获取下一个查询结果集
record = cur.fetchone()
except ex.MyError as e:
except Exception as e:
Sinks.updateMysqlStatus(i_status=1, i_readcount=0, i_sinkcount=0, i_id=1)
print("executor failed", e.errorinfo)
conn.rollback()
print("executor failed:{0}".format(e))
finally:
cur.close() # 关闭数据库连接
conn.close()
......@@ -43,18 +42,18 @@ def getRecordByIdResultT1(id):
def readCsv(spark, path, col):
try:
df = spark.read.format('csv').option('inferSchema', 'true').load(path).toDF(*col)
except ex.MyError as e:
except Exception as e:
Sinks.updateMysqlStatus(i_status=1, i_readcount=0, i_sinkcount=0, i_id=1)
print("executor failed", e.errorinfo)
print("executor failed:{0}".format(e))
return df
def readTxt(spark, path, delimiter, *col):
def readTxt(spark, path, delimiter, col):
try:
rddFile = SparkContext.textFile(path)
rddFile = spark.SparkContext.textFile(path)
rddMap = rddFile.map(lambda x: x.split(delimiter))
df = spark.createDataFrame(rddMap, *col)
except ex.MyError as e:
df = spark.createDataFrame(rddMap, col)
except Exception as e:
Sinks.updateMysqlStatus(i_status=1, i_readcount=0, i_sinkcount=0, i_id=1)
print("executor failed", e.errorinfo)
print("executor failed:{0}".format(e))
return df
\ No newline at end of file
......@@ -4,8 +4,6 @@ import os
2.判断数据库状态是否是失败状态
①判断目标表是否有数据,如果有就将数据回滚
②判断写入的文件是否存在,如果存在直接删除
传入的id,数据库库名,表名,用户名,密码 后端传入的用户名和密码需加密传入
我们定义udf函数将拿到的加密字段进行解密
'''
def getMessage(file):
file = os.path.basename(file)
......
......@@ -2,7 +2,6 @@
import sys
import os
import shutil
import exception.MyErrorException as ex
import zipfile
......@@ -10,8 +9,6 @@ pyName = sys.argv[0] # sys.argv[0] 类似于shell中的$0, 但不是脚本名
zipPath = sys.argv[1] # sys.argv[1] 表示传入的第一个参数
inputPath = sys.argv[2] # sys.argv[2] 表示传入的第二个参数
outputPath = sys.argv[3]
#问题:解压的时候要考虑密码吗???
# os.system("hdfs dfs -put /srcPath /dstPath")
def get_file(srcPath, dstPath):
......@@ -37,8 +34,8 @@ def unzip_single(srcPath, dstPath):
zf = zipfile.ZipFile(srcPath)
try:
zf.extractall(path=dstPath)
except ex.MyError as e:
print(e)
except FileNotFoundError as e:
print("zipPath not exists:{0}".format(e))
zf.close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment