Commit 38bbe60f authored by 宋朋's avatar 宋朋

songpeng update

parent 651c9f26
import os
'''
1.自定义检查方法
2.判断数据库状态是否是失败状态
①判断目标表是否有数据,如果有就将数据回滚
②判断写入的文件是否存在,如果存在直接删除
传入的id,数据库库名,表名,用户名,密码 后端传入的用户名和密码需加密传入
我们定义udf函数将拿到的加密字段进行解密
'''
def getMessage(file):
file = os.path.basename(file)
print(file)
list = file.split("_")
print(list)
dbname = list[0]
table = list[1]
user = list[2]
password = list[3]
list_id = list[4]
str_list = list_id.split(".")
id = str_list[0]
return [dbname, table, user, password, id]
if __name__ == '__main__':
list = getMessage('D:\\dbname_table_root_123456_id.csv')
print(list[0])
print(list[1])
print(list[2])
print(list[3])
print(list[4])
\ No newline at end of file
import os
import sys
import spark.bootstarp.dependencies as dependencies
from spark.sources import Sinks
'''
文件:定义失败的状态与更新数据库的状态相等则直接删除落的文件
'''
FAILEID = -1
def transaction_file(id, hdfsPath, dstPath):
id_info = Sinks.updateMysqlStatus(i_status=1, i_readcount=df.count(), i_sinkcount=sinkDF.count(), i_id=1)
if id == id_info:
os.system("hdfs dfs -rmr hdfsPath")
os.removedirs(dstPath)
'''
表:定义失败的状态与更新数据库的状态相等则需要事务支持,有些数据库不支持事务,需再讨论
'''
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment