Commit 267ec90c authored by zhangminghui's avatar zhangminghui

update

parent 1e55b544
if __name__ == '__main__':
print("api_assemble.py")
...@@ -51,30 +51,33 @@ def getColumns(column,hdfs_csv_path): ...@@ -51,30 +51,33 @@ def getColumns(column,hdfs_csv_path):
col_file += list_cols[i][0] + ', ' col_file += list_cols[i][0] + ', '
col_list.append(list_cols[i][0]) col_list.append(list_cols[i][0])
col_create += list_cols[i][0] + " string, " col_create += list_cols[i][0] + " string, "
create_tmp_sql = 'create table impala_tmp as select %s from %s' % (col[0:-2], 'a') create_tmp_sql = 'create table dmp_demo.impala_tmp as select %s from %s' % (col[0:-2], 'dmp_demo.a')
md5_sql = 'select %s from %s' % (col_file[0:-2], 'impala_tmp') md5_sql = 'select %s from %s' % (col_file[0:-2], 'dmp_demo.impala_tmp')
impala_sql = "" impala_sql = ""
for i in range(0, len(col_list)): for i in range(0, len(col_list)):
if i < 10: if i < 10:
impala_sql += col_list[i] + " as c0" + str(i + 1) + ", " impala_sql += col_list[i] + " as c0" + str(i + 1) + ", "
else: else:
impala_sql += col_list[i] + " as c" + str(i + 1) + ", " impala_sql += col_list[i] + " as c" + str(i + 1) + ", "
impala_sql = 'insert into %s as select %s from %s' % ("impala_table", impala_sql[0:-2], 'impala_tmp') impala_sql = 'insert into %s as select %s from %s' % ("dmp_demo.impala_table", impala_sql[0:-2], 'dmp_demo.impala_tmp')
create_sql = 'create external table a (%s) stored as textfile location \\"%s\\" ' % (col_create[0:-2], hdfs_csv_path) create_sql = 'create external table dmp_demo.a (%s) row format delimited fields terminated by \',\' ' \
logger.info("load csv sql:", create_sql) 'location \\"%s\\" tblproperties(\\"skip.header.line.count\\"=\\"1\\")' % (col_create[0:-2], hdfs_csv_path)
logger.info("create impala external table sql:", create_tmp_sql) print("=======> load csv sql:", create_sql)
logger.info("tuomin sql:", md5_sql) print("=======> create impala external table sql:", create_tmp_sql)
logger.info("output impala sql:", impala_sql) print("=======> tuomin sql:", md5_sql)
print("=======> output impala sql:", impala_sql)
return [create_sql, create_tmp_sql, md5_sql, impala_sql] return [create_sql, create_tmp_sql, md5_sql, impala_sql]
except Exception as e: except Exception as e:
logger.error("json parse exception:{0}".format(e)) logger.error("json parse exception:{0}".format(e))
# 更新文件处理状态:正在处理中...,列名解析异常 # 更新文件处理状态:正在处理中...,列名解析异常
modifyStatus.updateStatusById(constants.FILE_PROCESSING, constants.PARSE_COLS, e, localTime, modifyStatus.updateStatusById(constants.FILE_PROCESSING, constants.PARSE_COLS, e, localTime,
constants.SP_UPDATE_PERSON, file_id,constants.FILE_TABLE_NAME) constants.SP_UPDATE_PERSON, file_id,constants.FILE_TABLE_NAME)
sys.exit(1)
if __name__ == '__main__': if __name__ == '__main__':
if len(sys.argv)<3: if len(sys.argv)<3:
logger.info("python arguments is 3 but arguments pass %s" %len(sys.argv)) print("=======> python arguments is 3 but arguments pass %s" %len(sys.argv))
else: else:
file_id = sys.argv[1] file_id = sys.argv[1]
hdfs_path = sys.argv[2] hdfs_path = sys.argv[2]
...@@ -95,6 +98,7 @@ if __name__ == '__main__': ...@@ -95,6 +98,7 @@ if __name__ == '__main__':
# 更新文件处理状态:正在处理中...,连接数据库获取文件字段、类型异常 # 更新文件处理状态:正在处理中...,连接数据库获取文件字段、类型异常
modifyStatus.updateStatusById(constants.FILE_PROCESSING, constants.GET_COL_FAILE, e, localTime, modifyStatus.updateStatusById(constants.FILE_PROCESSING, constants.GET_COL_FAILE, e, localTime,
constants.SP_UPDATE_PERSON, file_id, constants.FILE_TABLE_NAME) constants.SP_UPDATE_PERSON, file_id, constants.FILE_TABLE_NAME)
sys.exit(1)
list_sqls = getColumns(cols,hdfs_path) list_sqls = getColumns(cols,hdfs_path)
...@@ -117,6 +121,7 @@ if __name__ == '__main__': ...@@ -117,6 +121,7 @@ if __name__ == '__main__':
# 更新文件处理状态:正在处理中...,csv落表异常 # 更新文件处理状态:正在处理中...,csv落表异常
modifyStatus.updateStatusById(constants.FILE_PROCESSING, constants.CSV_TABLE_FILE, e, localTime, modifyStatus.updateStatusById(constants.FILE_PROCESSING, constants.CSV_TABLE_FILE, e, localTime,
constants.SP_UPDATE_PERSON, file_id, constants.FILE_TABLE_NAME) constants.SP_UPDATE_PERSON, file_id, constants.FILE_TABLE_NAME)
sys.exit(1)
try: try:
# upload file to ftp # upload file to ftp
...@@ -133,6 +138,7 @@ if __name__ == '__main__': ...@@ -133,6 +138,7 @@ if __name__ == '__main__':
# 更新文件处理状态:正在处理中...,csv落ftp异常 # 更新文件处理状态:正在处理中...,csv落ftp异常
modifyStatus.updateStatusById(constants.FILE_PROCESSING, constants.CSV_TO_FTP_FILE, e, localTime, modifyStatus.updateStatusById(constants.FILE_PROCESSING, constants.CSV_TO_FTP_FILE, e, localTime,
constants.SP_UPDATE_PERSON, file_id, constants.FILE_TABLE_NAME) constants.SP_UPDATE_PERSON, file_id, constants.FILE_TABLE_NAME)
sys.exit(1)
# 更新文件处理状态:文件处理完成 # 更新文件处理状态:文件处理完成
modifyStatus.updateStatusById(constants.FILE_PROCESSING_FINISH, constants.FINAL_RESULT_SUCCESS, "成功", localTime, modifyStatus.updateStatusById(constants.FILE_PROCESSING_FINISH, constants.FINAL_RESULT_SUCCESS, "成功", localTime,
constants.ZMH_UPDATE_PERSON, file_id,constants.FILE_TABLE_NAME) constants.ZMH_UPDATE_PERSON, file_id,constants.FILE_TABLE_NAME)
#!/bin/bash
# 本地脱敏后的CSV,上传到ftp
#SFTP配置信息
#用户名
USER=sp
#密码
PASSWORD=sp940219sp
#待上传文件根目录
SRCDIR=/root
#FTP目录
DESDIR=/home/songpeng/testfile
#IP
IP=192.168.153.100
#端口
PORT=2121
#获取文件
cd ${SRCDIR};
#目录下的所有文件
#FILES=`ls`
#修改时间在执行时间五分钟之前的xml文件
FILES=`find ${SRCDIR} -name '*.txt'`
for FILE in ${FILES}
do
echo ${FILE}
#发送文件 (关键部分)
lftp -u ${USER},${PASSWORD} sftp://${IP}:${PORT} <<EOF
cd ${DESDIR}/
lcd ${SRCDIR}
put ${FILE}
by
EOF
done
\ No newline at end of file
import pymysql import sys
import constants, DbUtils import constants, DbUtils
""" """
...@@ -24,11 +23,11 @@ def updateStatusById(treatment_status, treatment_status_result, error_info, upt_ ...@@ -24,11 +23,11 @@ def updateStatusById(treatment_status, treatment_status_result, error_info, upt_
# serch_data = """select t1 from test where id = {table_id}""".format(table_id=id) # serch_data = """select t1 from test where id = {table_id}""".format(table_id=id)
# update_data = 'update test set status=%s where id =%s' %(status, id) # update_data = 'update test set status=%s where id =%s' %(status, id)
if tableName==constants.PRE_FILE_TABLE_NAME: if tableName==constants.PRE_FILE_TABLE_NAME:
update_data = 'update %s set pretreatment_status=%s,pretreatment_status_result=%s,error_info=\'%s\', ' \ update_data = 'update %s set pretreatment_status=%s,pretreatment_status_result=%s,error_info=\"%s\", ' \
'upt_time=\'%s\',upt_person=\'%s\' where file_deposit=%s' % ( 'upt_time=\'%s\',upt_person=\'%s\' where file_deposit=%s' % (
tableName, treatment_status, treatment_status_result, error_info, upt_time, upt_person,file_deposit) tableName, treatment_status, treatment_status_result, error_info, upt_time, upt_person,file_deposit)
elif tableName==constants.FILE_TABLE_NAME: elif tableName==constants.FILE_TABLE_NAME:
update_data = 'update %s set deposit_status=%s,deposit_status_result=%s,error_info=\'%s\',' \ update_data = 'update %s set deposit_status=%s,deposit_status_result=%s,error_info=\"%s\",' \
'upt_time=\'%s\',upt_person=\'%s\' where file_deposit=%s' % ( 'upt_time=\'%s\',upt_person=\'%s\' where file_deposit=%s' % (
tableName, treatment_status, treatment_status_result, error_info, upt_time, upt_person, file_deposit) tableName, treatment_status, treatment_status_result, error_info, upt_time, upt_person, file_deposit)
...@@ -37,3 +36,4 @@ def updateStatusById(treatment_status, treatment_status_result, error_info, upt_ ...@@ -37,3 +36,4 @@ def updateStatusById(treatment_status, treatment_status_result, error_info, upt_
DbUtils.close_conn(db, cursor) DbUtils.close_conn(db, cursor)
except Exception as e: except Exception as e:
print(" error_info: %s ,update status executor failed:%s" %(error_info,e)) print(" error_info: %s ,update status executor failed:%s" %(error_info,e))
sys.exit(1)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment