Commit 1e55b544 authored by zhangminghui's avatar zhangminghui

update dmp_scripts

parent 88301be3
import pymysql
from dbutils.pooled_db import PooledDB
import constants
"""
数据库连接池
creator=pymysql, # 使用链接数据库的模块,这里使用的是pymysq模块
maxconnections=10, # 连接池允许的最大连接数,这里设置的10,具体自己定义
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=1, # 链接池中最多共享的链接数量
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
host=constants.ETL_HOST, #数据库ip
port=3306, #数据库端口
user=constants.ETL_USER, #数据库用户名
password=constants.ETL_PASSWORD, #数据库密码
database=constants.ETL_DBNAME, #数据库库名
charset='utf8' #数据库编码
"""
POOL = PooledDB(
creator=pymysql,
maxconnections=10,
mincached=2,
maxcached=5,
maxshared=1,
blocking=True,
maxusage=None,
setsession=[],
host=constants.ETL_HOST,
port=3306,
user=constants.ETL_USER,
password=constants.ETL_PASSWORD,
database=constants.ETL_DBNAME,
charset='utf8'
)
def create_conn():
"""
:return: 返回连接和游标
"""
conn = POOL.connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
return conn, cursor
def close_conn(conn, cursor):
"""
:param conn: 连接
:param cursor: 游标
:return:
"""
conn.close()
cursor.close()
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from ftplib import FTP
import os
import socket
"""
将文件上传到ftp
"""
class MyFTP:
def __init__(self, host, port=21):
""" 初始化 FTP 客户端
参数:
host:ip地址
port:端口号
"""
self.host = host
self.port = port
self.ftp = FTP()
# 重新设置下编码方式
self.ftp.encoding = 'gbk'
self.file_list = []
def login(self, username, password):
""" 初始化 FTP 客户端
参数:
username: 用户名
password: 密码
"""
try:
# 超时时间
timeout = 60
socket.setdefaulttimeout(timeout)
# 0主动模式 1 #被动模式
self.ftp.set_pasv(True)
self.debug_print('开始尝试连接到 %s' % self.host)
self.ftp.connect(self.host, self.port)
self.debug_print('成功连接到 %s' % self.host)
self.debug_print('开始尝试登录到 %s' % self.host)
self.ftp.login(username, password)
self.debug_print('成功登录到 %s' % self.host)
self.debug_print(self.ftp.welcome)
except Exception as err:
self.deal_error("FTP 连接或登录失败 ,错误信息为:%s" % err)
pass
def is_same_size(self, local_file, remote_file):
"""判断远程文件和本地文件大小是否一致
参数:
local_file: 本地文件
remote_file: 远程文件
"""
try:
remote_file_size = self.ftp.size(remote_file)
except Exception as err:
remote_file_size = -1
try:
local_file_size = os.path.getsize(local_file)
except Exception as err:
local_file_size = -1
self.debug_print('local_file_size:%d , remote_file_size:%d' % (local_file_size, remote_file_size))
if remote_file_size == local_file_size:
return 1
else:
return 0
def download_file(self, local_file, remote_file):
"""从ftp下载文件
参数:
local_file: 本地文件
remote_file: 远程文件
"""
self.debug_print("download_file()---> local_path = %s ,remote_path = %s" % (local_file, remote_file))
if self.is_same_size(local_file, remote_file):
self.debug_print('%s 文件大小相同,无需下载' % local_file)
return
else:
try:
self.debug_print('>>>>>>>>>>>>下载文件 %s ... ...' % local_file)
buf_size = 1024
file_handler = open(local_file, 'wb')
self.ftp.retrbinary('RETR %s' % remote_file, file_handler.write, buf_size)
file_handler.close()
except Exception as err:
self.debug_print('下载文件出错,出现异常:%s ' % err)
return
def download_file_tree(self, local_path, remote_path):
"""从远程目录下载多个文件到本地目录
参数:
local_path: 本地路径
remote_path: 远程路径
"""
print("download_file_tree()---> local_path = %s ,remote_path = %s" % (local_path, remote_path))
try:
self.ftp.cwd(remote_path)
except Exception as err:
self.debug_print('远程目录%s不存在,继续...' % remote_path + " ,具体错误描述为:%s" % err)
return
if not os.path.isdir(local_path):
self.debug_print('本地目录%s不存在,先创建本地目录' % local_path)
os.makedirs(local_path)
self.debug_print('切换至目录: %s' % self.ftp.pwd())
self.file_list = []
# 方法回调
self.ftp.dir(self.get_file_list)
remote_names = self.file_list
self.debug_print('远程目录 列表: %s' % remote_names)
for item in remote_names:
file_type = item[0]
file_name = item[1]
local = os.path.join(local_path, file_name)
if file_type == 'd':
print("download_file_tree()---> 下载目录: %s" % file_name)
self.download_file_tree(local, file_name)
elif file_type == '-':
print("download_file()---> 下载文件: %s" % file_name)
self.download_file(local, file_name)
self.ftp.cwd("..")
self.debug_print('返回上层目录 %s' % self.ftp.pwd())
return True
def upload_file(self, local_file, remote_file):
"""从本地上传文件到ftp
参数:
local_path: 本地文件
remote_path: 远程文件
"""
if not os.path.isfile(local_file):
self.debug_print('%s 不存在' % local_file)
return
if self.is_same_size(local_file, remote_file):
self.debug_print('跳过相等的文件: %s' % local_file)
return
buf_size = 1024
file_handler = open(local_file, 'rb')
self.ftp.storbinary('STOR %s' % remote_file, file_handler, buf_size)
file_handler.close()
self.debug_print('上传: %s' % local_file + "成功!")
# def upload_file_tree(self, local_path, remote_path):
# """从本地上传目录下多个文件到ftp
# 参数:
# local_path: 本地路径
# remote_path: 远程路径
# """
# if not os.path.isdir(local_path):
# self.debug_print('本地目录 %s 不存在' % local_path)
# return
#
# self.ftp.cwd(remote_path)
# self.debug_print('切换至远程目录: %s' % self.ftp.pwd())
#
# local_name_list = os.listdir(local_path)
# for local_name in local_name_list:
# src = os.path.join(local_path, local_name)
# if os.path.isdir(src):
# try:
# self.ftp.mkd(local_name)
# except Exception as err:
# self.debug_print("目录已存在 %s ,具体错误描述为:%s" % (local_name, err))
# self.debug_print("upload_file_tree()---> 上传目录: %s" % local_name)
# self.upload_file_tree(src, local_name)
# else:
# self.debug_print("upload_file_tree()---> 上传文件: %s" % local_name)
# self.upload_file(src, local_name)
# self.ftp.cwd("..")
def close(self):
""" 退出ftp
"""
self.debug_print("close()---> FTP退出")
self.ftp.quit()
self.log_file.close()
# def debug_print(self, s):
# """ 打印日志
# """
# self.write_log(s)
# def deal_error(self, e):
# """ 处理错误异常
# 参数:
# e:异常
# """
# log_str = '发生错误: %s' % e
# self.write_log(log_str)
# sys.exit()
# def write_log(self, log_str):
# """ 记录日志
# 参数:
# log_str:日志
# """
# time_now = time.localtime()
# date_now = time.strftime('%Y-%m-%d', time_now)
# format_log_str = "%s ---> %s \n " % (date_now, log_str)
# print(format_log_str)
# self.log_file.write(format_log_str)
# def get_file_list(self, line):
# """ 获取文件列表
# 参数:
# line:
# """
# file_arr = self.get_file_name(line)
# # 去除 . 和 ..
# if file_arr[1] not in ['.', '..']:
# self.file_list.append(file_arr)
# def get_file_name(self, line):
# """ 获取文件名
# 参数:
# line:
# """
# pos = line.rfind(':')
# while (line[pos] != ' '):
# pos += 1
# while (line[pos] == ' '):
# pos += 1
# file_arr = [line[0], line[pos:]]
# return file_arr
# if __name__ == "__main__":
#
# host_address = "192.168.153.1"
# username = "sp"
# password = "sp940219sp"
# my_ftp = MyFTP(host_address)
# my_ftp.login(username, password)
#
# # 上传csv文件
# my_ftp.upload_file("C:/Users/songpeng/Desktop/test1/1.csv", "/22.csv")
# my_ftp.close()
"""
各种类型错误定义:
"""
"""
==================公用数据常量池定义==================
"""
"""
DEFAULT_file_STATUS 文件预处理/处理 默认的状态
FINAL_RESULT_SUCCESS 文件预处理/处理 最终处理成功结果
"""
FINAL_RESULT_SUCCESS = 200
DEFAULT_FILE_STATUS = 0
"""
更新人:
UPDATE_PERSON=1 sp
UPDATE_PERSON=2 zmh
UPDATE_PERSON=3 rm
"""
SP_UPDATE_PERSON = 1
ZMH_UPDATE_PERSON = 2
RM_UPDATE_PERSON = 3
"""
DEFAULT_ERROR_INFO='' 文件默认的错误信息
"""
DEFAULT_ERROR_INFO = ''
"""
PRE_FILE_TABLE_NAME 文件预处理,需更新的表名
FILE_TABLE_NAME 文件处理,需更新的表名
"""
PRE_FILE_TABLE_NAME='t_file_pretreatment_status'
FILE_TABLE_NAME='t_file_handle_status'
"""
==================文件预处理,数据常量池定义==================
"""
"""
PRE_FILE_WAIT_PROCESS = 1 上传
PRE_FILE_PROCESSING = 2 正在处理中
PRE_FILE_PROCESS_FINISH = 3 处理完成
"""
PRE_FILE_WAIT_PROCESS = 1
PRE_FILE_PROCESSING = 2
PRE_FILE_PROCESS_FINISH = 3
"""
文件预处理阶段失败的code定义:
UNZIP_CODE=1 文件解压失败
FTP_CODE=2 ftp信息错误
UNCODE_CODE=3 编码错误
PARSE_CSV_FILE=4 解析csv文件异常
CSV_TYPES_PARSE csv字段类型解析异常
"""
UNZIP_CODE = 1
FTP_CODE = 2
UNCODE_CODE = 3
PARSE_CSV_FILE = 4
CSV_TYPES_PARSE = 5
"""
==================文件处理,数据常量池定义==================
"""
"""
FILE_WAIT_PROCESS = 1 待处理
FILE_PROCESSING = 2 正在处理中
FILE_PROCESSING_FINISH = 3 处理完成
"""
FILE_WAIT_PROCESS = 1
FILE_PROCESSING = 2
FILE_PROCESSING_FINISH = 3
"""
文件处理阶段失败的code定义:
CLEAN_CODE=1 清洗错误
PARSE_COLS=2 列名解析异常
GET_COL_FAILE=3 连接数据库获取文件字段、类型异常
CSV_TABLE_FILE=4 csv落表异常
CSV_TO_FTP_FILE=5 csv落ftp异常
"""
CLEAN_CODE = 1
PARSE_COLS = 2
GET_COL_FAILE=3
CSV_TABLE_FILE=4
CSV_TO_FTP_FILE=5
"""
==================api,数据常量池定义==================
"""
"""
==================数据库连接,常量池定义==================
"""
"""
数据库连接:
HOST='' 数据库ip地址
USER='' 数据库用户名
PASS_WORD='' 数据库密码
DB_NAME='' 数据库库名
"""
HOST = ''
USER = ''
PASS_WORD = ''
DB_NAME = ''
"""
ETL处理表:
ETL_HOST="47.113.20.243" 数据库ip地址
ETL_USER=root 数据库用户名
ETL_PASSWORD="I%ou$buy!ok" 数据库密码
ETL_DBNAME="wj-mkt-project" 数据库库名
"""
ETL_HOST = "rm-wz9n399q2avsy3k6m4o.mysql.rds.aliyuncs.com"
ETL_USER = "root"
ETL_PASSWORD = "I%ou$buy!ok"
ETL_DBNAME = "wj-mkt-project"
import sys
sys.path.append('/root/dmp/etl_process_data/etl_job')
import modifyStatus, DbUtils, constants, MyFTP
import time
import os
import logging as logger
localTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(localTime)
"""
通过传入的记录获取需要的字段
"""
def getColumns(column,hdfs_csv_path):
"""
:param hdfs_csv_path: 上传到hdfs的csv路径
:param column: 数据库查询出来的记录
:return: 具体需要的字段
"""
try:
# 遍历后判断是否为None,如果为None就用原始字段,如果不为None就用用户修改的字段
# 使用 list_cols 存储
list_cols = [[] for i in range(len(column))]
for i in range(0, len(column)):
if column[i][2] is not None:
list_cols[i].append(column[i][2])
else:
list_cols[i].append(column[i][0])
if column[i][3] is not None:
list_cols[i].append(column[i][3])
else:
list_cols[i].append(column[i][1])
list_cols[i].append(column[i][4])
# all columes
col = ""
# rule columes
col_file = ""
col_create = ""
col_list = []
# clean_up rule
for i in range(0, len(list_cols)):
if list_cols[i][2] == "md5" or list_cols[i][2] == "base64":
col += list_cols[i][2] + '(' + list_cols[i][0] + ') as ' + list_cols[i][0] + ", " + list_cols[i][
0] + ' as t_' + list_cols[i][0] + ', '
col_file += list_cols[i][0] + ', '
col_list.append(list_cols[i][0])
col_list.append('t_' + list_cols[i][0])
else:
col += list_cols[i][0] + ', '
col_file += list_cols[i][0] + ', '
col_list.append(list_cols[i][0])
col_create += list_cols[i][0] + " string, "
create_tmp_sql = 'create table impala_tmp as select %s from %s' % (col[0:-2], 'a')
md5_sql = 'select %s from %s' % (col_file[0:-2], 'impala_tmp')
impala_sql = ""
for i in range(0, len(col_list)):
if i < 10:
impala_sql += col_list[i] + " as c0" + str(i + 1) + ", "
else:
impala_sql += col_list[i] + " as c" + str(i + 1) + ", "
impala_sql = 'insert into %s as select %s from %s' % ("impala_table", impala_sql[0:-2], 'impala_tmp')
create_sql = 'create external table a (%s) stored as textfile location \\"%s\\" ' % (col_create[0:-2], hdfs_csv_path)
logger.info("load csv sql:", create_sql)
logger.info("create impala external table sql:", create_tmp_sql)
logger.info("tuomin sql:", md5_sql)
logger.info("output impala sql:", impala_sql)
return [create_sql, create_tmp_sql, md5_sql, impala_sql]
except Exception as e:
logger.error("json parse exception:{0}".format(e))
# 更新文件处理状态:正在处理中...,列名解析异常
modifyStatus.updateStatusById(constants.FILE_PROCESSING, constants.PARSE_COLS, e, localTime,
constants.SP_UPDATE_PERSON, file_id,constants.FILE_TABLE_NAME)
if __name__ == '__main__':
if len(sys.argv)<3:
logger.info("python arguments is 3 but arguments pass %s" %len(sys.argv))
else:
file_id = sys.argv[1]
hdfs_path = sys.argv[2]
# 更新文件处理状态:正在处理中...
modifyStatus.updateStatusById(constants.FILE_PROCESSING,constants.DEFAULT_FILE_STATUS,
constants.DEFAULT_ERROR_INFO,localTime,constants.ZMH_UPDATE_PERSON,
file_id,constants.FILE_TABLE_NAME)
try:
db, cursor = DbUtils.create_conn()
cursor = db.cursor()
get_col_info_sql = '''select field_name,field_type,modify_field_name,modify_field_type,clean_rule
from t_file_save_field_info where file_deposit = ''' + str(file_id)
cursor.execute(get_col_info_sql)
cols = cursor.fetchall()
db.commit()
except Exception as e:
logger.error("file treatment get columns and types exception:{0}".format(e))
# 更新文件处理状态:正在处理中...,连接数据库获取文件字段、类型异常
modifyStatus.updateStatusById(constants.FILE_PROCESSING, constants.GET_COL_FAILE, e, localTime,
constants.SP_UPDATE_PERSON, file_id, constants.FILE_TABLE_NAME)
list_sqls = getColumns(cols,hdfs_path)
try:
# 创建原始csv外部表
os.system('impala-shell -q "drop table if exists dmp_demo.a"')
os.system('impala-shell -q "%s"' % (list_sqls[0]))
# 创建临时表(字段反转、及脱敏后数据)
os.system('impala-shell -q "drop table if exists dmp_demo.impala_tmp"')
os.system('impala-shell -q "%s"' % (list_sqls[1]))
query = list_sqls[2]
# 临时表 insert 进 impala_table
# os.system('impala-shell -q "%s"' % (list_sqls[3]))
# 临时表落csv(仅脱敏后的数据)
dir = '/root/dmp/etl_process_data/csv_path/user_info.csv'
os.system("rm -f %s" % dir)
os.system('impala-shell -B -q "%s" -o %s \'--output_delimiter=,\'' % (query, dir))
except Exception as e:
logger.error("csv load and impala create exception:{0}".format(e))
# 更新文件处理状态:正在处理中...,csv落表异常
modifyStatus.updateStatusById(constants.FILE_PROCESSING, constants.CSV_TABLE_FILE, e, localTime,
constants.SP_UPDATE_PERSON, file_id, constants.FILE_TABLE_NAME)
try:
# upload file to ftp
host_address = "192.168.153.1"
username = "sp"
password = "sp940219sp"
my_ftp = MyFTP(host_address)
my_ftp.login(username, password)
my_ftp.upload_file("/root/dmp/etl_process_data/csv_path/user_info.csv",
"/opt/dmp/dmp_etlprocesses/ftp_data/csv_ftp/user_info_upload.csv")
my_ftp.close()
except Exception as e:
logger.error("csv output to ftp exception:{0}".format(e))
# 更新文件处理状态:正在处理中...,csv落ftp异常
modifyStatus.updateStatusById(constants.FILE_PROCESSING, constants.CSV_TO_FTP_FILE, e, localTime,
constants.SP_UPDATE_PERSON, file_id, constants.FILE_TABLE_NAME)
# 更新文件处理状态:文件处理完成
modifyStatus.updateStatusById(constants.FILE_PROCESSING_FINISH, constants.FINAL_RESULT_SUCCESS, "成功", localTime,
constants.ZMH_UPDATE_PERSON, file_id,constants.FILE_TABLE_NAME)
#!/bin/bash
set -e
#数据库的ip地址
hostname="rm-wz9n399q2avsy3k6m4o.mysql.rds.aliyuncs.com"
#数据库的端口
port="3306"
#数据库的用户名
username="root"
#数据库的密码
password="I%ou\$buy!ok"
#数据库的库名
dbname="wj-mkt-project"
#通过状态查询出文件id
select_db_sql="select file_deposit from t_file_handle_status where deposit_status=1"
file_deposit=$(mysql -h${hostname} -P${port} -u${username} -p${password} ${dbname} -s -e "${select_db_sql}")
echo "file_deposit执行结果返回:$file_deposit"
#判断上个执行结果
if [ ${#file_deposit} -gt 0 ];
then
echo "轮循到新增文件,文件id为:$file_deposit"
#根据上面的文件id查询出需要的字段
get_file_deposit_info_sql="select file_name from t_file_deposit where file_deposit_id=$file_deposit"
get_file_deposit_info=$(mysql -h${hostname} -P${port} -u${username} -p${password} ${dbname} -s -e "${get_file_deposit_info_sql}")
#获取压缩文件名
zip_file_name=`echo $get_file_deposit_info | awk -F' ' '{print $1}'`
file_name=`echo $zip_file_name | cut -d \. -f 1`
hdfs_path="/tmp/tuomin_data/`date +%Y-%m-%d`/$file_name/dt"
python3 /root/dmp/etl_process_data/etl_job/file_handle/file_handle.py $file_deposit $hdfs_path
else
echo "未轮循到新增文件..."
fi
import glob
import os
import shutil
import zipfile
import sys
import pandas as pd
import numpy as np
import re
sys.path.append('/root/dmp/etl_process_data/etl_job')
import modifyStatus, constants, DbUtils
import time
localTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(localTime)
def unzip_all(source_dir, dest_dir):
"""
:param source_dir: 源目录
:param dest_dir: 目标目录
:return:
"""
try:
it = os.scandir(source_dir)
for entry in it:
if entry.is_file() and os.path.splitext(entry.name)[1] == '.zip':
if not os.path.exists(dest_dir):
# os.mkdir(dest_dir)
print("file not exists")
# zipfile.ZipFile(entry.path).extractall(path=dest_dir)
else:
zipfile.ZipFile(entry.path).extractall(path=dest_dir)
except Exception as e:
# update file_status set PRE_FILE_PROCESSING,update file_code set UNZIP_CODE
modifyStatus.updateStatusById(constants.PRE_FILE_PROCESSING, constants.UNZIP_CODE, e, localTime,
constants.SP_UPDATE_PERSON, file_id,constants.PRE_FILE_TABLE_NAME)
print("unzip_all error:{0}".format(e))
return
def copy_file(srcPath, dtPath):
try:
for root, dirs, files in os.walk(srcPath, True):
for fileName in files:
if fileName.endswith('.csv') or fileName.endswith('.txt'):
shutil.copy(os.path.join(root, fileName), dtPath)
except Exception as e:
# update file_status set PRE_FILE_PROCESSING,update file_code set FTP_CODE
modifyStatus.updateStatusById(constants.PRE_FILE_PROCESSING, constants.FTP_CODE, e, localTime,
constants.ZMH_UPDATE_PERSON, file_id,constants.PRE_FILE_TABLE_NAME)
print("copy_file error:{0}".format(e))
return 1
def analysis_csv(file_id, csv_path):
"""
:param file_id: 文件id
:param csv_path: csv路径
:return:
"""
csvFiles = glob.glob(csv_path + "/*")
list_ = []
list_error_file = []
for files in csvFiles:
try:
df = pd.read_csv(files, index_col=None, header=0)
list_.append(df)
except Exception as e:
# update file_status set PRE_FILE_PROCESSING,update file_code set PARSE_CSV_FILE
modifyStatus.updateStatusById(constants.PRE_FILE_PROCESSING, constants.PARSE_CSV_FILE, e, localTime,
constants.ZMH_UPDATE_PERSON, file_id,constants.PRE_FILE_TABLE_NAME)
list_error_file.append(files)
continue
data = pd.concat(list_, ignore_index=True)
print(data)
ten_json = data.head(2).to_json(orient='index')
update_sql = "update t_file_pretreatment_status set explore_json='" + ten_json + "' where file_deposit=" + str(file_id)
# trim
try:
stripstr = lambda x: x.strip() if isinstance(x, np.unicode) else x
data = data.applymap(stripstr)
for i in data.keys():
col = str(data[i][0])
compile1 = re.compile(
'\d{4}[-/.]\d{1,2}[-/.]\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}|\d{4}[-/.]\d{1,2}[-/.]\d{1,2}')
match_all = compile1.findall(col)
if len(match_all) != 0:
if len(col) == 19:
data[i] = pd.to_datetime(data[i])
elif len(col) == 10:
data[i] = pd.to_datetime(data[i])
str_col = ""
for i in range(0, len(data.dtypes)):
# example:(id,'name','int','2012-12-02 00:00:00','sp'),(id,'name','int','2012-12-02 00:00:00','sp'),
str_col += "(" + str(file_id) + ", '" + str(data.columns.values[i]) + "', '" + str(data.dtypes[i]) + \
"', '" + str(localTime) + "', '" + str(constants.ZMH_UPDATE_PERSON) + "'),"
# example:insert into table (id,name,age) value((1,'zh','se','2012-12-02 00:00:00'),(1,'sg','er','2012-12-02 00:00:00'))
insert_sql = "insert into t_file_save_field_info (file_deposit,field_name,field_type,cre_time,cre_person) values " + str_col[0:-1]
return list_error_file, update_sql, insert_sql
except Exception as e:
# update file_status set PRE_FILE_PROCESSING,update file_code set TIME_FORMAT_CODE
modifyStatus.updateStatusById(constants.PRE_FILE_PROCESSING, constants.CSV_TYPES_PARSE, e, localTime,
constants.ZMH_UPDATE_PERSON, file_id,constants.PRE_FILE_TABLE_NAME)
print("time format error{}".format(e))
if __name__ == '__main__':
get_file_deposit_info = sys.argv
print(get_file_deposit_info)
file_id = get_file_deposit_info[1]
get_ftp_path = get_file_deposit_info[3]
get_output_hdfs_path = get_file_deposit_info[5]
get_unzip_output_path = get_file_deposit_info[6]
get_copy_output_path = get_file_deposit_info[7]
# update file_status set FILE_WAIT_PROCESS,update file_code set DEFAULT_PRETREATMENT_STATUS_RESULT
modifyStatus.updateStatusById(constants.PRE_FILE_PROCESSING, constants.DEFAULT_FILE_STATUS,
constants.DEFAULT_ERROR_INFO, localTime, constants.SP_UPDATE_PERSON,
file_id,constants.PRE_FILE_TABLE_NAME)
unzip_all(get_ftp_path, get_unzip_output_path)
su_code = copy_file(get_unzip_output_path, get_copy_output_path)
if su_code == 1:
os.system("hdfs dfs -rm -r /user/tuomin_test")
os.system("hdfs dfs -mkdir /user/tuomin_test")
pre_list = analysis_csv(file_id, get_copy_output_path)
print("csv无法读取的文件list:" + str(pre_list[0]))
print("十条json数据更新sql:" + str(pre_list[1]))
print("数据字段写入sql:" + str(pre_list[2]))
db, cursor = DbUtils.create_conn()
cursor = db.cursor()
cursor.execute(pre_list[1])
cursor.execute(pre_list[2])
db.commit()
DbUtils.close_conn(db, cursor)
# update file_status set FILE_PROCESSING_FINISH,update file_code set DEAL_SUCCESS
modifyStatus.updateStatusById(constants.FILE_PROCESSING_FINISH, constants.FINAL_RESULT_SUCCESS, "成功", localTime,
constants.SP_UPDATE_PERSON, file_id,constants.PRE_FILE_TABLE_NAME)
# except Exception as e:
# print("主函数update执行异常:{}".format(e))
# modifyStatus.updateStatusById(constants.PRE_FILE_PROCESSING, constants.MAIN_UPDATE_FILE, e, localTime,
# constants.ZMH_UPDATE_PERSON, file_id,constants.PRE_FILE_TABLE_NAME)
\ No newline at end of file
#!/bin/bash
set -e
# 数据库的ip地址
hostname="rm-wz9n399q2avsy3k6m4o.mysql.rds.aliyuncs.com"
# 数据库的端口
port="3306"
# 数据库的ip用户名
username="root"
# 数据库的密码
password="I%ou\$buy!ok"
# 数据库的库名
dbname="wj-mkt-project"
# 文件预处理的状态
preStatus="预处理"
processingStatus="预处理中"
finishStatus="预处理完成"
# 通过文件预处理的状态获取文件id
select_db_sql="select file_deposit from t_file_pretreatment_status where pretreatment_status=1"
file_deposit=$(mysql -h${hostname} -P${port} -u${username} -p${password} ${dbname} -s -e "${select_db_sql}")
echo "file_deposit执行结果返回:$file_deposit"
# 判断上个命令是否执行成功
if [ ${#file_deposit} -gt 0 ];
then
echo "轮循到新增文件,文件id为:$file_deposit"
#update
# 通过文件id获取文件名,文件地址,文件格式
get_file_deposit_info_sql="select file_deposit_id,file_name,file_address,file_format from t_file_deposit where file_deposit_id=$file_deposit"
get_file_deposit_info=$(mysql -h${hostname} -P${port} -u${username} -p${password} ${dbname} -s -e "${get_file_deposit_info_sql}")
# zip文件的名字
zip_file_name=`echo $get_file_deposit_info | awk -F' ' '{print $2}'`
# 文件名
file_name=`echo $zip_file_name | cut -d \. -f 1`
# hdfs路径
hdfs_path="/tmp/tuomin_data/`date +%Y-%m-%d`/$file_name"
unzip_output_path="/opt/dmp/dmp_etlprocesses/ftp_data/dst"
copy_output_path="/opt/dmp/dmp_etlprocesses/ftp_data/dt"
python3 /root/dmp/etl_process_data/etl_job/file_pretreatment/file_pretreatment.py $get_file_deposit_info $hdfs_path $unzip_output_path $copy_output_path
# 判断hdfs目录是否存在
hdfs dfs -test -d /user/tuomin_test
if [ $? = 0 ];then
echo "==========1=========="
hdfs dfs -test -d $hdfs_path
if [ $? != 0 ];then
echo "==========2=========="
hdfs dfs -mkdir -p $hdfs_path
hdfs dfs -put $copy_output_path $hdfs_path
else
echo ""==========3==========""
hdfs dfs -rm -r $hdfs_path
hdfs dfs -mkdir -p $hdfs_path
hdfs dfs -put $copy_output_path $hdfs_path
fi
echo ""==========4==========""
fi
else
echo "未轮循到新增文件......"
fi
import pymysql
import constants, DbUtils
"""
更改状态:
"""
def updateStatusById(treatment_status, treatment_status_result, error_info, upt_time, upt_person, file_deposit,tableName):
"""
:param pretreatment_status: 文件预处理状态
:param pretreatment_status_result: 预处理结果码 无 有默认值
:param error_info: 错误信息 无 有默认值
:param upt_time: 更新时间
:param upt_person: 更新人
:param file_deposit: 文件id
:return:
"""
try:
db, cursor = DbUtils.create_conn()
# use cursor excute sql
cursor = db.cursor()
# serch_data = """select t1 from test where id = {table_id}""".format(table_id=id)
# update_data = 'update test set status=%s where id =%s' %(status, id)
if tableName==constants.PRE_FILE_TABLE_NAME:
update_data = 'update %s set pretreatment_status=%s,pretreatment_status_result=%s,error_info=\'%s\', ' \
'upt_time=\'%s\',upt_person=\'%s\' where file_deposit=%s' % (
tableName, treatment_status, treatment_status_result, error_info, upt_time, upt_person,file_deposit)
elif tableName==constants.FILE_TABLE_NAME:
update_data = 'update %s set deposit_status=%s,deposit_status_result=%s,error_info=\'%s\',' \
'upt_time=\'%s\',upt_person=\'%s\' where file_deposit=%s' % (
tableName, treatment_status, treatment_status_result, error_info, upt_time, upt_person, file_deposit)
cursor.execute(update_data)
db.commit()
DbUtils.close_conn(db, cursor)
except Exception as e:
print(" error_info: %s ,update status executor failed:%s" %(error_info,e))
import sys
sys.path.append('/root/dmp/etl_process_data/etl_job')
import constants
print(constants.UNZIP_CODE)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment