Commit c6b61ee2 authored by zhangminghui's avatar zhangminghui

update

parent 47682d05
type=command
dependencies=start
command=bash api_assemble.sh
\ No newline at end of file
if __name__ == '__main__':
print("api_assemble.py")
\ No newline at end of file
echo "api_assemble.sh"
python api_assemble.py
\ No newline at end of file
type=command
dependencies=file_pretreatment,file_handle,api_assemble
command=echo "......Scheduling end"
\ No newline at end of file
type=command
dependencies=start
command=bash file_handle.sh
\ No newline at end of file
import sys
import pymysql
import json
def conn(sql):
db = pymysql.connect("192.168.1.90", "root", "root", "demo_test")
cursor = db.cursor()
cursor.execute(sql)
data = cursor.fetchall()
db.commit()
db.close()
return data
def getColumns(column):
try:
print(column)
list_cols = [[] for i in range(len(column))]
for i in range(0, len(column)):
if column[i][2] is not None:
list_cols[i].append(column[i][2])
else:
list_cols[i].append(column[i][0])
if column[i][3] is not None:
list_cols[i].append(column[i][3])
else:
list_cols[i].append(column[i][1])
list_cols[i].append(column[i][4])
print(list_cols)
col = ""
col_file = ""
col_create = ""
for i in range(0,len(list_cols)):
if list_cols[i][2] == "md5" or list_cols[i][2] == "base64":
col += list_cols[i][2] + '(' + list_cols[i][0] + ') as ' + list_cols[i][0] + ", " + list_cols[i][0] + ' as t_' + list_cols[i][0] + ', '
# col_file += list_cols[i][2] + '(' + list_cols[i][0] + ') as ' + list_cols[i][0] + ', '
col_file += list_cols[i][0] + ', '
else:
col += list_cols[i][0] + ', '
col_file += list_cols[i][0] + ', '
col_create += list_cols[i][0] + " string, "
src_sql = 'select %s from %s'%(col[0:-2], 'a')
md5_sql = 'select %s from %s' % (col_file[0:-2], 'impala')
dir ='/test/dt'
create_sql = 'create external table a (%s) stored as txt location "%s" ' %(col_create[0:-2],dir)
print("---------------------->", src_sql)
print("---------------------->", md5_sql)
print("---------------------->", create_sql)
return [src_sql, md5_sql,create_sql]
except FileNotFoundError as e:
print("json pase exception:{0}".format(e))
if __name__ == '__main__':
# file_id = sys.argv[1]
# hdfs_path = sys.argv[2]
file_id = 1
hdfs_path = "/user/datawarehouse/2020-12-10/user_info"
get_col_info_sql = '''select field_name,field_type,modify_field_name,modify_field_type,clean_rule
from t_file_save_field_info where file_deposit = ''' + str(file_id)
cols = conn(get_col_info_sql)
getColumns(cols)
# 执行create_sql建外表
# 执行src_sql落impala
# 执行md5_sql落ftp
# 更新状态
#!/bin/bash
set -e
hostname="192.168.1.90"
port="3306"
username="root"
password="root"
dbname="demo_test"
select_db_sql="select file_deposit from t_file_handle_status where deposit_status='01'"
file_deposit=$(mysql -h${hostname} -P${port} -u${username} -p${password} ${dbname} -s -e "${select_db_sql}")
echo "file_deposit执行结果返回:$file_deposit"
if [ ${#file_deposit} -gt 0 ];
then
echo "轮循到新增文件,文件id为:$file_deposit"
get_file_deposit_info_sql="select file_name from t_file_deposit where file_deposit_id=$file_deposit"
get_file_deposit_info=$(mysql -h${hostname} -P${port} -u${username} -p${password} ${dbname} -s -e "${get_file_deposit_info_sql}")
zip_file_name=`echo $get_file_deposit_info | awk -F' ' '{print $1}'`
file_name=`echo $zip_file_name | cut -d \. -f 1`
hdfs_path="/user/datawarehouse/`date +%Y-%m-%d`/$file_name"
python3 file_handle.py $file_deposit $hdfs_path
else
echo "未轮循到新增文件..."
fi
\ No newline at end of file
type=command
dependencies=start
command=echo "file_pretreatment"
# command=bash round_robin_mysql.sh
\ No newline at end of file
import datetime
import getpass
import glob
import os
import shutil
import zipfile
import sys
import pandas as pd
import numpy as np
import re
import pymysql
def unzip_all(source_dir, dest_dir):
try:
it = os.scandir(source_dir)
for entry in it:
if entry.is_file() and os.path.splitext(entry.name)[1] == '.zip':
if not os.path.exists(dest_dir):
os.mkdir(dest_dir)
zipfile.ZipFile(entry.path).extractall(path=dest_dir)
else:
zipfile.ZipFile(entry.path).extractall(path=dest_dir)
except Exception as e:
print("unzip_all error:{0}".format(e))
def copy_file(srcPath, dtPath):
success_code=0
try:
for root, dirs, files in os.walk(srcPath, True):
for fileName in files:
if fileName.endswith('.csv'):
shutil.copy(os.path.join(root, fileName), dtPath)
success_code = 1
return success_code
except Exception as e:
print("copy_file error:{0}".format(e))
def analysis_csv(file_id,csv_path):
csvFiles = glob.glob(csv_path + "/*")
list_ = []
list_error_file = []
for files in csvFiles:
try:
df = pd.read_csv(files, index_col=None, header=0)
list_.append(df)
except Exception as e:
list_error_file.append(files)
continue
data = pd.concat(list_, ignore_index=True)
print(data)
ten_json = data.head(2).to_json(orient='index')
update_sql = "update t_file_pretreatment_status set explore_json='" + ten_json +"' where file_deposit=" + file_id
# 剔除所有值前后空格
stripstr = lambda x: x.strip() if isinstance(x, np.unicode) else x
data = data.applymap(stripstr)
for i in data.keys():
col = str(data[i][0])
compile1 = re.compile('\d{4}[-/.]\d{1,2}[-/.]\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}|\d{4}[-/.]\d{1,2}[-/.]\d{1,2}')
match_all = compile1.findall(col)
if len(match_all) != 0:
if len(col) == 19:
data[i] = pd.to_datetime(data[i])
elif len(col) == 10:
data[i] = pd.to_datetime(data[i])
str_col=""
for i in range(0,len(data.dtypes)):
str_col += "(" + str(file_id) + ", '" + str(data.columns.values[i]) + "', '" + str(data.dtypes[i]) + \
"', '" + str(datetime.datetime.now()) + "', '" + str(getpass.getuser()) + "'),"
insert_sql = "insert into t_file_save_field_info (file_deposit,field_name,field_type,cre_time,cre_person) values " + str_col
return list_error_file, update_sql, insert_sql[0:-1]
if __name__ == '__main__':
get_file_deposit_info=sys.argv
print(get_file_deposit_info)
get_file_id=get_file_deposit_info[1]
get_ftp_path=get_file_deposit_info[3]
get_output_hdfs_path=get_file_deposit_info[5]
get_unzip_output_path=get_file_deposit_info[6]
get_copy_output_path=get_file_deposit_info[7]
unzip_all(get_ftp_path, get_unzip_output_path)
su_code=copy_file(get_unzip_output_path, get_copy_output_path)
if su_code==1:
os.system("hdfs dfs -rm -r /user/tuomin_test")
os.system("hdfs dfs -mkdir /user/tuomin_test")
pre_list = analysis_csv(get_file_id,get_copy_output_path)
print("csv无法读取的文件list:" + str(pre_list[0]))
print("十条json数据更新sql:" + str(pre_list[1]))
print("数据字段写入sql:" + str(pre_list[2]))
db = pymysql.connect("192.168.1.90", "root", "root", "demo_test")
cursor = db.cursor()
cursor.execute(pre_list[1])
cursor.execute(pre_list[2])
db.commit()
db.close()
#!/bin/bash
set -e
hostname="192.168.1.90"
port="3306"
username="root"
password="root"
dbname="demo_test"
select_db_sql="select file_deposit from t_file_pretreatment_status where pretreatment_status='01'"
file_deposit=$(mysql -h${hostname} -P${port} -u${username} -p${password} ${dbname} -s -e "${select_db_sql}")
echo "file_deposit执行结果返回:$file_deposit"
if [ ${#file_deposit} -gt 0 ];
then
echo "轮循到新增文件,文件id为:$file_deposit"
get_file_deposit_info_sql="select file_deposit_id,file_name,file_address,file_format from t_file_deposit where file_deposit_id=$file_deposit"
get_file_deposit_info=$(mysql -h${hostname} -P${port} -u${username} -p${password} ${dbname} -s -e "${get_file_deposit_info_sql}")
zip_file_name=`echo $get_file_deposit_info | awk -F' ' '{print $2}'`
file_name=`echo $zip_file_name | cut -d \. -f 1`
hdfs_path="/user/datawarehouse/`date +%Y-%m-%d`/$file_name"
unzip_output_path="/root/azkaban/dst"
copy_output_path="/root/azkaban/dt"
python3 file_pretreatment.py $get_file_deposit_info $hdfs_path $unzip_output_path $copy_output_path
hdfs dfs -test -d /user/tuomin_test
if [ $? = 0 ];then
echo "=====1===="
hdfs dfs -test -d $hdfs_path
if [ $? != 0 ];then
echo "=====2===="
hdfs dfs -mkdir -p $hdfs_path
hdfs dfs -put $copy_output_path $hdfs_path
else
echo "=====3===="
hdfs dfs -rm -r $hdfs_path
hdfs dfs -mkdir -p $hdfs_path
hdfs dfs -put $copy_output_path $hdfs_path
fi
echo "=========4====="
fi
else
echo "未轮循到新增文件..."
fi
\ No newline at end of file
type=command
command=echo "Scheduling began....."
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment