Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
W
wanjia
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
kino
wanjia
Commits
da60ef33
Commit
da60ef33
authored
Dec 15, 2020
by
zhangminghui
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
c6b61ee2
Changes
17
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
933 additions
and
0 deletions
+933
-0
DbUtils.py
wanjia_tuomin/data_process/DbUtils.py
+60
-0
MyFTP.py
wanjia_tuomin/data_process/MyFTP.py
+252
-0
api_assemble.job
wanjia_tuomin/data_process/api_assemble.job
+3
-0
api_assemble.py
wanjia_tuomin/data_process/api_assemble.py
+2
-0
checkData.py
wanjia_tuomin/data_process/checkData.py
+32
-0
constants.py
wanjia_tuomin/data_process/constants.py
+113
-0
end.job
wanjia_tuomin/data_process/end.job
+3
-0
file_handle.job
wanjia_tuomin/data_process/file_handle.job
+3
-0
file_handle.py
wanjia_tuomin/data_process/file_handle.py
+122
-0
file_handle.sh
wanjia_tuomin/data_process/file_handle.sh
+31
-0
file_pretreatment.job
wanjia_tuomin/data_process/file_pretreatment.job
+4
-0
file_pretreatment.py
wanjia_tuomin/data_process/file_pretreatment.py
+142
-0
modifyStatus.py
wanjia_tuomin/data_process/modifyStatus.py
+45
-0
prodece_api.py
wanjia_tuomin/data_process/prodece_api.py
+16
-0
produce_api.sh
wanjia_tuomin/data_process/produce_api.sh
+46
-0
round_robin_mysql.sh
wanjia_tuomin/data_process/round_robin_mysql.sh
+57
-0
start.job
wanjia_tuomin/data_process/start.job
+2
-0
No files found.
wanjia_tuomin/data_process/DbUtils.py
0 → 100644
View file @
da60ef33
import
pymysql
from
dbutils.pooled_db
import
PooledDB
from
wanjia_tuomin.micko
import
constants
"""
数据库连接池
creator=pymysql, # 使用链接数据库的模块,这里使用的是pymysq模块
maxconnections=10, # 连接池允许的最大连接数,这里设置的10,具体自己定义
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=1, # 链接池中最多共享的链接数量
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
host=constants.ETL_HOST, #数据库ip
port=3306, #数据库端口
user=constants.ETL_USER, #数据库用户名
password=constants.ETL_PASSWORD, #数据库密码
database=constants.ETL_DBNAME, #数据库库名
charset='utf8' #数据库编码
"""
POOL
=
PooledDB
(
creator
=
pymysql
,
maxconnections
=
10
,
mincached
=
2
,
maxcached
=
5
,
maxshared
=
1
,
blocking
=
True
,
maxusage
=
None
,
setsession
=
[],
host
=
constants
.
ETL_HOST
,
port
=
3306
,
user
=
constants
.
ETL_USER
,
password
=
constants
.
ETL_PASSWORD
,
database
=
constants
.
ETL_DBNAME
,
charset
=
'utf8'
)
def
create_conn
():
"""
:return: 返回连接和游标
"""
conn
=
POOL
.
connection
()
cursor
=
conn
.
cursor
(
pymysql
.
cursors
.
DictCursor
)
return
conn
,
cursor
def
close_conn
(
conn
,
cursor
):
"""
:param conn: 连接
:param cursor: 游标
:return:
"""
conn
.
close
()
cursor
.
close
()
wanjia_tuomin/data_process/MyFTP.py
0 → 100644
View file @
da60ef33
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from
ftplib
import
FTP
import
os
import
socket
"""
将文件上传到ftp
"""
class
MyFTP
:
def
__init__
(
self
,
host
,
port
=
21
):
""" 初始化 FTP 客户端
参数:
host:ip地址
port:端口号
"""
self
.
host
=
host
self
.
port
=
port
self
.
ftp
=
FTP
()
# 重新设置下编码方式
self
.
ftp
.
encoding
=
'gbk'
self
.
file_list
=
[]
def
login
(
self
,
username
,
password
):
""" 初始化 FTP 客户端
参数:
username: 用户名
password: 密码
"""
try
:
# 超时时间
timeout
=
60
socket
.
setdefaulttimeout
(
timeout
)
# 0主动模式 1 #被动模式
self
.
ftp
.
set_pasv
(
True
)
self
.
debug_print
(
'开始尝试连接到
%
s'
%
self
.
host
)
self
.
ftp
.
connect
(
self
.
host
,
self
.
port
)
self
.
debug_print
(
'成功连接到
%
s'
%
self
.
host
)
self
.
debug_print
(
'开始尝试登录到
%
s'
%
self
.
host
)
self
.
ftp
.
login
(
username
,
password
)
self
.
debug_print
(
'成功登录到
%
s'
%
self
.
host
)
self
.
debug_print
(
self
.
ftp
.
welcome
)
except
Exception
as
err
:
self
.
deal_error
(
"FTP 连接或登录失败 ,错误信息为:
%
s"
%
err
)
pass
# def is_same_size(self, local_file, remote_file):
# """判断远程文件和本地文件大小是否一致
# 参数:
# local_file: 本地文件
# remote_file: 远程文件
# """
# try:
# remote_file_size = self.ftp.size(remote_file)
# except Exception as err:
#
# remote_file_size = -1
#
# try:
# local_file_size = os.path.getsize(local_file)
# except Exception as err:
# local_file_size = -1
#
# self.debug_print('local_file_size:%d , remote_file_size:%d' % (local_file_size, remote_file_size))
# if remote_file_size == local_file_size:
# return 1
# else:
# return 0
# def download_file(self, local_file, remote_file):
# """从ftp下载文件
# 参数:
# local_file: 本地文件
# remote_file: 远程文件
# """
# self.debug_print("download_file()---> local_path = %s ,remote_path = %s" % (local_file, remote_file))
#
# if self.is_same_size(local_file, remote_file):
# self.debug_print('%s 文件大小相同,无需下载' % local_file)
# return
# else:
# try:
# self.debug_print('>>>>>>>>>>>>下载文件 %s ... ...' % local_file)
# buf_size = 1024
# file_handler = open(local_file, 'wb')
# self.ftp.retrbinary('RETR %s' % remote_file, file_handler.write, buf_size)
# file_handler.close()
# except Exception as err:
# self.debug_print('下载文件出错,出现异常:%s ' % err)
# return
# def download_file_tree(self, local_path, remote_path):
# """从远程目录下载多个文件到本地目录
# 参数:
# local_path: 本地路径
# remote_path: 远程路径
# """
# print("download_file_tree()---> local_path = %s ,remote_path = %s" % (local_path, remote_path))
# try:
# self.ftp.cwd(remote_path)
# except Exception as err:
# self.debug_print('远程目录%s不存在,继续...' % remote_path + " ,具体错误描述为:%s" % err)
# return
#
# if not os.path.isdir(local_path):
# self.debug_print('本地目录%s不存在,先创建本地目录' % local_path)
# os.makedirs(local_path)
#
# self.debug_print('切换至目录: %s' % self.ftp.pwd())
#
# self.file_list = []
# # 方法回调
# self.ftp.dir(self.get_file_list)
#
# remote_names = self.file_list
# self.debug_print('远程目录 列表: %s' % remote_names)
# for item in remote_names:
# file_type = item[0]
# file_name = item[1]
# local = os.path.join(local_path, file_name)
# if file_type == 'd':
# print("download_file_tree()---> 下载目录: %s" % file_name)
# self.download_file_tree(local, file_name)
# elif file_type == '-':
# print("download_file()---> 下载文件: %s" % file_name)
# self.download_file(local, file_name)
# self.ftp.cwd("..")
# self.debug_print('返回上层目录 %s' % self.ftp.pwd())
# return True
def
upload_file
(
self
,
local_file
,
remote_file
):
"""从本地上传文件到ftp
参数:
local_path: 本地文件
remote_path: 远程文件
"""
if
not
os
.
path
.
isfile
(
local_file
):
self
.
debug_print
(
'
%
s 不存在'
%
local_file
)
return
if
self
.
is_same_size
(
local_file
,
remote_file
):
self
.
debug_print
(
'跳过相等的文件:
%
s'
%
local_file
)
return
buf_size
=
1024
file_handler
=
open
(
local_file
,
'rb'
)
self
.
ftp
.
storbinary
(
'STOR
%
s'
%
remote_file
,
file_handler
,
buf_size
)
file_handler
.
close
()
self
.
debug_print
(
'上传:
%
s'
%
local_file
+
"成功!"
)
# def upload_file_tree(self, local_path, remote_path):
# """从本地上传目录下多个文件到ftp
# 参数:
# local_path: 本地路径
# remote_path: 远程路径
# """
# if not os.path.isdir(local_path):
# self.debug_print('本地目录 %s 不存在' % local_path)
# return
#
# self.ftp.cwd(remote_path)
# self.debug_print('切换至远程目录: %s' % self.ftp.pwd())
#
# local_name_list = os.listdir(local_path)
# for local_name in local_name_list:
# src = os.path.join(local_path, local_name)
# if os.path.isdir(src):
# try:
# self.ftp.mkd(local_name)
# except Exception as err:
# self.debug_print("目录已存在 %s ,具体错误描述为:%s" % (local_name, err))
# self.debug_print("upload_file_tree()---> 上传目录: %s" % local_name)
# self.upload_file_tree(src, local_name)
# else:
# self.debug_print("upload_file_tree()---> 上传文件: %s" % local_name)
# self.upload_file(src, local_name)
# self.ftp.cwd("..")
def
close
(
self
):
""" 退出ftp
"""
self
.
debug_print
(
"close()---> FTP退出"
)
self
.
ftp
.
quit
()
self
.
log_file
.
close
()
# def debug_print(self, s):
# """ 打印日志
# """
# self.write_log(s)
# def deal_error(self, e):
# """ 处理错误异常
# 参数:
# e:异常
# """
# log_str = '发生错误: %s' % e
# self.write_log(log_str)
# sys.exit()
# def write_log(self, log_str):
# """ 记录日志
# 参数:
# log_str:日志
# """
# time_now = time.localtime()
# date_now = time.strftime('%Y-%m-%d', time_now)
# format_log_str = "%s ---> %s \n " % (date_now, log_str)
# print(format_log_str)
# self.log_file.write(format_log_str)
# def get_file_list(self, line):
# """ 获取文件列表
# 参数:
# line:
# """
# file_arr = self.get_file_name(line)
# # 去除 . 和 ..
# if file_arr[1] not in ['.', '..']:
# self.file_list.append(file_arr)
# def get_file_name(self, line):
# """ 获取文件名
# 参数:
# line:
# """
# pos = line.rfind(':')
# while (line[pos] != ' '):
# pos += 1
# while (line[pos] == ' '):
# pos += 1
# file_arr = [line[0], line[pos:]]
# return file_arr
# if __name__ == "__main__":
#
# host_address = "192.168.153.1"
# username = "sp"
# password = "sp940219sp"
# my_ftp = MyFTP(host_address)
# my_ftp.login(username, password)
#
# # 上传csv文件
# my_ftp.upload_file("C:/Users/songpeng/Desktop/test1/1.csv", "/22.csv")
# my_ftp.close()
wanjia_tuomin/data_process/api_assemble.job
0 → 100644
View file @
da60ef33
type=command
dependencies=start
command=sh produce_api.py $1 $2
\ No newline at end of file
wanjia_tuomin/data_process/api_assemble.py
0 → 100644
View file @
da60ef33
if
__name__
==
'__main__'
:
print
(
"api_assemble.py"
)
wanjia_tuomin/data_process/checkData.py
0 → 100644
View file @
da60ef33
import
os
import
shutil
"""
出错后要将对应的文件进行删除
"""
def
checkFileExists
(
copyPath
,
dstPath
):
"""
:param copyPath: 将文件复制到指定的目录下
:param dstPath: 目标目录
:return:
"""
if
os
.
path
.
exists
(
copyPath
):
shutil
.
rmtree
(
copyPath
)
if
os
.
path
.
exists
(
dstPath
):
shutil
.
rmtree
(
dstPath
)
def
checkHdfsFileExists
(
hdfsPath
):
"""
:param hdfsPath: hdfs路径
:return:
"""
if
os
.
system
(
"hdfs dfs -test -d hdfsPath"
)
!=
0
:
os
.
system
(
"hdfs dfs -rm -r hdfsPath"
)
# if __name__ == '__main__':
# rockBackData("D:\\csv\\1", "D:\\csv\\2")
wanjia_tuomin/data_process/constants.py
0 → 100644
View file @
da60ef33
"""
各种类型错误定义:
"""
"""
文件预处理:
PRE_FILE_WAIT_PROCESS = 1 上传
PRE_FILE_PROCESSING = 2 正在处理中
PRE_FILE_PROCESS_FINISH = 3 处理完成
"""
PRE_FILE_WAIT_PROCESS
=
1
PRE_FILE_PROCESSING
=
2
PRE_FILE_PROCESS_FINISH
=
3
"""
处理文件的状态:
FILE_WAIT_PROCESS = 1 待处理
FILE_PROCESSING = 2 正在处理中
FILE_PROCESSING_FINISH = 3 处理完成
"""
FILE_WAIT_PROCESS
=
1
FILE_PROCESSING
=
2
FILE_PROCESS_FINISH
=
3
"""
每个阶段失败的code定义:
UNZIP_CODE=1 文件解压失败
FTP_CODE=2 ftp信息错误
UNCODE_CODE=3 编码错误
PARSE_CSV_FILE=4 解析csv文件异常
CLEAN_CODE=5 清洗错误
PARSE_COLS=6 列名解析异常
FINAL_RESULT_SUCCESS 文件最终处理成功结果
FINAL_RESULT_FAILED 文件最终处理失败结果
TIME_FORMAT_CODE 文件最终处理失败结果
"""
UNZIP_CODE
=
1
FTP_CODE
=
2
UNCODE_CODE
=
3
PARSE_CSV_FILE
=
4
CLEAN_CODE
=
5
PARSE_COLS
=
6
FINAL_RESULT_SUCCESS
=
0
FINAL_RESULT_FAILED
=
-
1
TIME_FORMAT_CODE
=
7
"""
ETL处理部分:
ETL_WAIT_PROCESS = 1 ETL等待处理
ETL_PROCESSING = 2 ETL正在处理中
ETL_PROCESSING_FINISH = 3 ETL处理完成
PRODUCT_API_FINISH=4 接口生成完毕
"""
"""
数据库连接:
HOST='' 数据库ip地址
USER='' 数据库用户名
PASS_WORD='' 数据库密码
DB_NAME='' 数据库库名
"""
HOST
=
''
USER
=
''
PASS_WORD
=
''
DB_NAME
=
''
"""
更新人:
UPDATE_PERSON=1 sp
UPDATE_PERSON=1 zmh
UPDATE_PERSON=1 rm
"""
SP_UPDATE_PERSON
=
1
ZMH_UPDATE_PERSON
=
2
RM_UPDATE_PERSON
=
3
"""
最后处理结果:
DEAL_SUCCESS=200 成功
DEAL_FAIL=400 失败
"""
DEAL_SUCCESS
=
200
DEAL_FAILED
=
404
"""
ETL处理表:
ETL_HOST="47.113.20.243" 数据库ip地址
ETL_USER=root 数据库用户名
ETL_PASSWORD="I
%
ou$buy!ok" 数据库密码
ETL_DBNAME="wj-mkt-project" 数据库库名
"""
ETL_HOST
=
"rm-wz9n399q2avsy3k6m4o.mysql.rds.aliyuncs.com"
ETL_USER
=
"root"
ETL_PASSWORD
=
"I
%
ou$buy!ok"
ETL_DBNAME
=
"wj-mkt-project"
"""
文件预处理默认信息
DEFAULT_PRETREATMENT_STATUS_RESULT='' 文件默认的状态
DEFAULT_ERROR_INFO='' 文件默认的错误信息
"""
DEFAULT_PRETREATMENT_STATUS_RESULT
=
''
DEFAULT_ERROR_INFO
=
''
"""
TO DO:
每个阶段执行的结果
SUCCESS_CODE=0 执行成功
FAILED_CODE=1 执行失败
"""
SUCCESS_CODE
=
0
FAILED_CODE
=
1
wanjia_tuomin/data_process/end.job
0 → 100644
View file @
da60ef33
type=command
dependencies=file_pretreatment,file_handle,api_assemble
command=echo "......Scheduling end"
\ No newline at end of file
wanjia_tuomin/data_process/file_handle.job
0 → 100644
View file @
da60ef33
type=command
dependencies=start
command=sh file_handle.sh
\ No newline at end of file
wanjia_tuomin/data_process/file_handle.py
0 → 100644
View file @
da60ef33
import
sys
from
wanjia_tuomin.micko
import
modifyStatus
,
DbUtils
,
constants
,
MyFTP
import
time
import
os
localTime
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
time
.
localtime
())
print
(
time
)
"""
通过传入的记录获取需要的字段
"""
def
getColumns
(
column
):
"""
:param column: 数据库查询出来的记录
:return: 具体需要的字段
"""
try
:
print
(
column
)
# 遍历后判断是否为None,如果为None就用原始字段,如果不为None就用用户修改的字段
list_cols
=
[[]
for
i
in
range
(
len
(
column
))]
for
i
in
range
(
0
,
len
(
column
)):
if
column
[
i
][
2
]
is
not
None
:
list_cols
[
i
]
.
append
(
column
[
i
][
2
])
else
:
list_cols
[
i
]
.
append
(
column
[
i
][
0
])
if
column
[
i
][
3
]
is
not
None
:
list_cols
[
i
]
.
append
(
column
[
i
][
3
])
else
:
list_cols
[
i
]
.
append
(
column
[
i
][
1
])
list_cols
[
i
]
.
append
(
column
[
i
][
4
])
print
(
list_cols
)
# all columes
col
=
""
# rule columes
col_file
=
""
col_create
=
""
col_list
=
[]
# clean_up rule
for
i
in
range
(
0
,
len
(
list_cols
)):
if
list_cols
[
i
][
2
]
==
"md5"
or
list_cols
[
i
][
2
]
==
"base64"
:
col
+=
list_cols
[
i
][
2
]
+
'('
+
list_cols
[
i
][
0
]
+
') as '
+
list_cols
[
i
][
0
]
+
", "
+
list_cols
[
i
][
0
]
+
' as t_'
+
list_cols
[
i
][
0
]
+
', '
# col_file += list_cols[i][2] + '(' + list_cols[i][0] + ') as ' + list_cols[i][0] + ', '
col_file
+=
list_cols
[
i
][
0
]
+
', '
col_list
.
append
(
list_cols
[
i
][
0
])
col_list
.
append
(
't_'
+
list_cols
[
i
][
0
])
else
:
col
+=
list_cols
[
i
][
0
]
+
', '
col_file
+=
list_cols
[
i
][
0
]
+
', '
col_list
.
append
(
list_cols
[
i
][
0
])
col_create
+=
list_cols
[
i
][
0
]
+
" string, "
create_tmp_sql
=
'create table impala_tmp as select
%
s from
%
s'
%
(
col
[
0
:
-
2
],
'a'
)
md5_sql
=
'select
%
s from
%
s'
%
(
col_file
[
0
:
-
2
],
'impala_tmp'
)
impala_sql
=
""
for
i
in
range
(
0
,
len
(
col_list
)):
if
i
<
10
:
impala_sql
+=
col_list
[
i
]
+
" as c0"
+
str
(
i
+
1
)
+
", "
else
:
impala_sql
+=
col_list
[
i
]
+
" as c"
+
str
(
i
+
1
)
+
", "
impala_sql
=
'insert into
%
s as select
%
s from
%
s'
%
(
"impala_table"
,
impala_sql
[
0
:
-
2
],
'impala_tmp'
)
dir
=
'/test/dt'
create_sql
=
'create external table a (
%
s) stored as txt location "
%
s" '
%
(
col_create
[
0
:
-
2
],
dir
)
print
(
"---------------------->"
,
create_sql
)
print
(
"---------------------->"
,
create_tmp_sql
)
print
(
"---------------------->"
,
md5_sql
)
print
(
"---------------------->"
,
impala_sql
)
return
[
create_sql
,
create_tmp_sql
,
md5_sql
,
impala_sql
]
except
Exception
as
e
:
# update file_status set FILE_PROCESSING,update file_code set PARSE_COLS
modifyStatus
.
updateStatusById
(
constants
.
FILE_PROCESSING
,
constants
.
PARSE_COLS
,
e
,
localTime
,
constants
.
SP_UPDATE_PERSON
,
file_id
)
print
(
"json parse exception:{0}"
.
format
(
e
))
if
__name__
==
'__main__'
:
try
:
file_id
=
sys
.
argv
[
1
]
hdfs_path
=
sys
.
argv
[
2
]
# file_id = 1
# hdfs_path = "/user/datawarehouse/2020-12-10/user_info"
# modifyStatus.updateStatusById(constants.DEAL_FILE_PROCESSING,constants.DEFAULT_PRETREATMENT_STATUS_RESULT,constants.DEFAULT_ERROR_INFO,localTime,constants.ZMH_UPDATE_PERSON,file_id)
db
,
cursor
=
DbUtils
.
create_conn
()
cursor
=
db
.
cursor
()
get_col_info_sql
=
'''select field_name,field_type,modify_field_name,modify_field_type,clean_rule
from t_file_save_field_info where file_deposit = '''
+
str
(
file_id
)
cursor
.
execute
(
get_col_info_sql
)
cols
=
cursor
.
fetchall
()
db
.
commit
()
list_sqls
=
getColumns
(
cols
)
# excute create table sql
os
.
system
(
'impala-shell -q "
%
s"'
%
(
list_sqls
[
0
]))
# excute create tmp table sql
os
.
system
(
'impala-shell -q "
%
s"'
%
(
list_sqls
[
1
]))
query
=
list_sqls
[
2
]
# excute src_sql to impala table
os
.
system
(
'impala-shell -q "
%
s"'
%
(
list_sqls
[
3
]))
# excute src_sql to csv
dir
=
''
os
.
system
(
'impala-shell -B -q "
%
s" -o
%
s
\'
--output_delimiter=,
\'
'
%
(
query
,
dir
))
# upload file to ftp
host_address
=
"192.168.153.1"
username
=
"sp"
password
=
"sp940219sp"
my_ftp
=
MyFTP
(
host_address
)
my_ftp
.
login
(
username
,
password
)
my_ftp
.
upload_file
(
"C:/Users/songpeng/Desktop/test1/1.csv"
,
"/22.csv"
)
my_ftp
.
close
()
# update file_status set FILE_PROCESS_FINISH,update file_code set FINAL_RESULT_SUCCESS
modifyStatus
.
updateStatusById
(
constants
.
FILE_PROCESS_FINISH
,
constants
.
FINAL_RESULT_SUCCESS
,
"成功"
,
localTime
,
constants
.
ZMH_UPDATE_PERSON
,
file_id
)
except
Exception
as
e
:
print
(
"execute table error{}"
.
format
(
e
))
# update file_status set FILE_PROCESSING,update file_code set FINAL_RESULT_FAILED
modifyStatus
.
updateStatusById
(
constants
.
FILE_PROCESSING
,
constants
.
FINAL_RESULT_FAILED
,
"失败"
,
localTime
,
constants
.
ZMH_UPDATE_PERSON
,
file_id
)
wanjia_tuomin/data_process/file_handle.sh
0 → 100644
View file @
da60ef33
#!/bin/bash
set
-e
#数据库的ip地址
hostname
=
"192.168.1.90"
#数据库的端口
port
=
"3306"
#数据库的用户名
username
=
"root"
#数据库的密码
password
=
"root"
#数据库的库名
dbname
=
"demo_test"
#通过状态查询出文件id
select_db_sql
=
"select file_deposit from t_file_handle_status where deposit_status='01'"
file_deposit
=
$(
mysql
-h
${
hostname
}
-P
${
port
}
-u
${
username
}
-p
${
password
}
${
dbname
}
-s
-e
"
${
select_db_sql
}
"
)
echo
"file_deposit执行结果返回:
$file_deposit
"
#判断上个执行结果
if
[
${#
file_deposit
}
-gt
0
]
;
then
echo
"轮循到新增文件,文件id为:
$file_deposit
"
#根据上面的文件id查询出需要的字段
get_file_deposit_info_sql
=
"select file_name from t_file_deposit where file_deposit_id=
$file_deposit
"
get_file_deposit_info
=
$(
mysql
-h
${
hostname
}
-P
${
port
}
-u
${
username
}
-p
${
password
}
${
dbname
}
-s
-e
"
${
get_file_deposit_info_sql
}
"
)
#获取压缩文件名
zip_file_name
=
`
echo
$get_file_deposit_info
|
awk
-F
' '
'{print $1}'
`
file_name
=
`
echo
$zip_file_name
|
cut
-d
\.
-f
1
`
hdfs_path
=
"/user/datawarehouse/
`
date
+%Y-%m-%d
`
/
$file_name
"
python3 file_handle.py
$file_deposit
$hdfs_path
else
echo
"未轮循到新增文件..."
fi
\ No newline at end of file
wanjia_tuomin/data_process/file_pretreatment.job
0 → 100644
View file @
da60ef33
type=command
dependencies=start
command=echo "file_pretreatment"
command.1=sh round_robin_mysql.sh
\ No newline at end of file
wanjia_tuomin/data_process/file_pretreatment.py
0 → 100644
View file @
da60ef33
import
datetime
import
glob
import
os
import
shutil
import
zipfile
import
sys
import
pandas
as
pd
import
numpy
as
np
import
re
from
wanjia_tuomin.micko
import
modifyStatus
,
constants
,
DbUtils
import
time
localTime
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
time
.
localtime
())
print
(
time
)
def
unzip_all
(
source_dir
,
dest_dir
):
"""
:param source_dir: 源目录
:param dest_dir: 目标目录
:return:
"""
try
:
it
=
os
.
scandir
(
source_dir
)
for
entry
in
it
:
if
entry
.
is_file
()
and
os
.
path
.
splitext
(
entry
.
name
)[
1
]
==
'.zip'
:
if
not
os
.
path
.
exists
(
dest_dir
):
# os.mkdir(dest_dir)
print
(
"file not exists"
)
# zipfile.ZipFile(entry.path).extractall(path=dest_dir)
else
:
zipfile
.
ZipFile
(
entry
.
path
)
.
extractall
(
path
=
dest_dir
)
except
Exception
as
e
:
# update file_status set PRE_FILE_PROCESSING,update file_code set UNZIP_CODE
modifyStatus
.
updateStatusById
(
constants
.
PRE_FILE_PROCESSING
,
constants
.
UNZIP_CODE
,
e
,
localTime
,
constants
.
SP_UPDATE_PERSON
,
file_id
)
print
(
"unzip_all error:{0}"
.
format
(
e
))
return
def
copy_file
(
srcPath
,
dtPath
):
try
:
for
root
,
dirs
,
files
in
os
.
walk
(
srcPath
,
True
):
for
fileName
in
files
:
if
fileName
.
endswith
(
'.csv'
)
or
fileName
.
endswith
(
'.txt'
):
shutil
.
copy
(
os
.
path
.
join
(
root
,
fileName
),
dtPath
)
except
Exception
as
e
:
# update file_status set PRE_FILE_PROCESSING,update file_code set FTP_CODE
modifyStatus
.
updateStatusById
(
constants
.
PRE_FILE_PROCESSING
,
constants
.
FTP_CODE
,
e
,
localTime
,
constants
.
ZMH_UPDATE_PERSON
,
file_id
)
print
(
"copy_file error:{0}"
.
format
(
e
))
return
1
def
analysis_csv
(
file_id
,
csv_path
):
"""
:param file_id: 文件id
:param csv_path: csv路径
:return:
"""
csvFiles
=
glob
.
glob
(
csv_path
+
"/*"
)
list_
=
[]
list_error_file
=
[]
for
files
in
csvFiles
:
try
:
df
=
pd
.
read_csv
(
files
,
index_col
=
None
,
header
=
0
)
list_
.
append
(
df
)
except
Exception
as
e
:
# update file_status set PRE_FILE_PROCESSING,update file_code set PARSE_CSV_FILE
modifyStatus
.
updateStatusById
(
constants
.
PRE_FILE_PROCESSING
,
constants
.
PARSE_CSV_FILE
,
e
,
localTime
,
constants
.
ZMH_UPDATE_PERSON
,
file_id
)
list_error_file
.
append
(
files
)
continue
data
=
pd
.
concat
(
list_
,
ignore_index
=
True
)
print
(
data
)
ten_json
=
data
.
head
(
2
)
.
to_json
(
orient
=
'index'
)
update_sql
=
"update t_file_pretreatment_status set explore_json='"
+
ten_json
+
"' where file_deposit="
+
file_id
# trim
try
:
stripstr
=
lambda
x
:
x
.
strip
()
if
isinstance
(
x
,
np
.
unicode
)
else
x
data
=
data
.
applymap
(
stripstr
)
for
i
in
data
.
keys
():
col
=
str
(
data
[
i
][
0
])
compile1
=
re
.
compile
(
'
\
d{4}[-/.]
\
d{1,2}[-/.]
\
d{1,2}
\
s
\
d{1,2}:
\
d{1,2}:
\
d{1,2}|
\
d{4}[-/.]
\
d{1,2}[-/.]
\
d{1,2}'
)
match_all
=
compile1
.
findall
(
col
)
if
len
(
match_all
)
!=
0
:
if
len
(
col
)
==
19
:
data
[
i
]
=
pd
.
to_datetime
(
data
[
i
])
elif
len
(
col
)
==
10
:
data
[
i
]
=
pd
.
to_datetime
(
data
[
i
])
str_col
=
""
for
i
in
range
(
0
,
len
(
data
.
dtypes
)):
# example:(id,'name','int','2012-12-02 00:00:00','sp'),(id,'name','int','2012-12-02 00:00:00','sp'),
str_col
+=
"("
+
str
(
file_id
)
+
", '"
+
str
(
data
.
columns
.
values
[
i
])
+
"', '"
+
str
(
data
.
dtypes
[
i
])
+
\
"', '"
+
str
(
datetime
.
datetime
.
now
())
+
"', '"
+
constants
.
ZMH_UPDATE_PERSON
+
"'),"
# example:insert into table (id,name,age) value((1,'zh','se','2012-12-02 00:00:00'),(1,'sg','er','2012-12-02 00:00:00'))
insert_sql
=
"insert into t_file_save_field_info (file_deposit,field_name,field_type,cre_time,cre_person) values "
+
str_col
except
Exception
as
e
:
# update file_status set PRE_FILE_PROCESSING,update file_code set TIME_FORMAT_CODE
modifyStatus
.
updateStatusById
(
constants
.
PRE_FILE_PROCESSING
,
constants
.
TIME_FORMAT_CODE
,
e
,
localTime
,
constants
.
ZMH_UPDATE_PERSON
,
file_id
)
print
(
"time format error{}"
.
format
(
e
))
return
list_error_file
,
update_sql
,
insert_sql
[
0
:
-
1
]
if
__name__
==
'__main__'
:
get_file_deposit_info
=
sys
.
argv
print
(
get_file_deposit_info
)
file_id
=
get_file_deposit_info
[
1
]
get_ftp_path
=
get_file_deposit_info
[
3
]
get_output_hdfs_path
=
get_file_deposit_info
[
5
]
get_unzip_output_path
=
get_file_deposit_info
[
6
]
get_copy_output_path
=
get_file_deposit_info
[
7
]
# update file_status set FILE_WAIT_PROCESS,update file_code set DEFAULT_PRETREATMENT_STATUS_RESULT
modifyStatus
.
updateStatusById
(
constants
.
FILE_WAIT_PROCESS
,
constants
.
DEFAULT_PRETREATMENT_STATUS_RESULT
,
constants
.
DEFAULT_ERROR_INFO
,
localTime
,
constants
.
SP_UPDATE_PERSON
,
file_id
)
unzip_all
(
get_ftp_path
,
get_unzip_output_path
)
su_code
=
copy_file
(
get_unzip_output_path
,
get_copy_output_path
)
if
su_code
==
1
:
os
.
system
(
"hdfs dfs -rm -r /user/tuomin_test"
)
os
.
system
(
"hdfs dfs -mkdir /user/tuomin_test"
)
pre_list
=
analysis_csv
(
file_id
,
get_copy_output_path
)
# print("csv无法读取的文件list:" + str(pre_list[0]))
# print("十条json数据更新sql:" + str(pre_list[1]))
# print("数据字段写入sql:" + str(pre_list[2]))
db
,
cursor
=
DbUtils
.
create_conn
()
# db = pymysql.connect("192.168.1.90", "root", "root", "demo_test")
cursor
=
db
.
cursor
()
cursor
.
execute
(
pre_list
[
1
])
cursor
.
execute
(
pre_list
[
2
])
db
.
commit
()
DbUtils
.
close_conn
(
db
,
cursor
)
# update file_status set FILE_PROCESSING_FINISH,update file_code set DEAL_SUCCESS
modifyStatus
.
updateStatusById
(
constants
.
FILE_PROCESSING_FINISH
,
constants
.
DEAL_SUCCESS
,
"成功"
,
localTime
,
constants
.
SP_UPDATE_PERSON
,
file_id
)
\ No newline at end of file
wanjia_tuomin/data_process/modifyStatus.py
0 → 100644
View file @
da60ef33
import
pymysql
from
wanjia_tuomin.micko
import
constants
,
DbUtils
"""
更改状态:
"""
def
updateStatusById
(
pretreatment_status
,
pretreatment_status_result
,
error_info
,
upt_time
,
upt_person
,
file_deposit
):
"""
:param pretreatment_status: 文件预处理状态
:param pretreatment_status_result: 预处理结果码 无 有默认值
:param error_info: 错误信息 无 有默认值
:param upt_time: 更新时间
:param upt_person: 更新人
:param file_deposit: 文件id
:return:
"""
try
:
db
,
cursor
=
DbUtils
.
create_conn
()
# use cursor excute sql
cursor
=
db
.
cursor
()
# serch_data = """select t1 from test where id = {table_id}""".format(table_id=id)
# update_data = 'update test set status=%s where id =%s' %(status, id)
if
constants
.
DEFAULT_PRETREATMENT_STATUS_RESULT
==
pretreatment_status_result
or
constants
.
DEFAULT_ERROR_INFO
==
error_info
:
update_data
=
'update
%
s set pretreatment_status=
%
s,upt_time=
%
s,upt_person where file_deposit=
%
s'
%
(
"table_name"
,
pretreatment_status
,
pretreatment_status_result
,
error_info
,
upt_time
,
upt_person
,
file_deposit
)
else
:
update_data
=
'update
%
s set pretreatment_status=
%
s,pretreatment_status_result=
%
s,error_info=
%
s,upt_time=
%
s,upt_person where file_deposit=
%
s'
%
(
"table_name"
,
pretreatment_status
,
pretreatment_status_result
,
error_info
,
upt_time
,
upt_person
,
file_deposit
)
result
=
cursor
.
execute
(
update_data
)
db
.
commit
()
# get result
except
Exception
as
e
:
print
(
"executor failed:{}"
.
format
(
e
))
finally
:
# close conn
cursor
.
close
()
db
.
close
()
return
result
wanjia_tuomin/data_process/prodece_api.py
0 → 100644
View file @
da60ef33
import
requests
import
json
# 请求api地址
url
=
"http://127.0.0.1:8090/dmpapi/basicVideoService/getResponseById"
# 请求参数
data
=
{
'xxxxx'
:
'xxxx'
}
# 执行请求
response
=
requests
.
get
(
url
,
params
=
data
)
# 查看执行的url
print
(
'
\n
查看请求执行的url:
\n
'
,
response
.
url
)
# 获得请求的内容
print
(
'
\n
获得请求的内容:
\n
'
,
response
.
text
)
# 解析获取的json数据
data_json
=
json
.
loads
(
response
.
text
)
#将json数据进行解析后得到需要插入的数据,然后回写数据库
print
(
'
\n
解析获取json中data的值:
\n
'
,
data_json
[
'data'
])
\ No newline at end of file
wanjia_tuomin/data_process/produce_api.sh
0 → 100644
View file @
da60ef33
#!/bin/bash
set
-e
# 数据库的ip地址
hostname
=
"47.113.20.243"
# 数据库的端口
port
=
"3306"
# 数据库的用户名
username
=
"root"
# 数据库的密码
password
=
"I%ou
$buy
!ok"
# 数据库的库名
dbname
=
"wj-mkt-project"
# 通过状态获取商品id
select_db_sql
=
"select goods_api from
${
dbname
}
.t_data_goods_api_params where handle_status='01'"
goods_api
=
$(
mysql
-h
${
hostname
}
-P
${
port
}
-u
${
username
}
-p
${
password
}
${
dbname
}
-s
-e
"
${
select_db_sql
}
"
)
echo
"file_deposit执行结果返回:
$file_deposit
"
# 判断上个命令是否成功
if
[
${#
file_deposit
}
-gt
0
]
;
then
echo
"轮循到新增文件,文件id为:
$file_deposit
"
# 通过文件id来查询需要的字段名和字段类型
get_file_deposit_info_sql
=
"select fields_name,fields_type from
${
dbname
}
.t_data_test_gateway_api where goods_api=
$goods_api
"
get_file_deposit_info
=
$(
mysql
-h
${
hostname
}
-P
${
port
}
-u
${
username
}
-p
${
password
}
${
dbname
}
-s
-e
"
${
get_file_deposit_info_sql
}
"
)
# 字段名称
field_name
=
`
echo
$get_file_deposit_info
|
awk
-F
' '
'{print $1}'
`
# 字段类型
field_type
=
`
echo
$get_file_deposit_info
|
awk
-F
' '
'{print $2}'
`
python3 produce_api.py
$1
$2
else
echo
"未轮循到新增文件......"
fi
echo
"api_assemble.sh"
python api_assemble.py
\ No newline at end of file
wanjia_tuomin/data_process/round_robin_mysql.sh
0 → 100644
View file @
da60ef33
#!/bin/bash
set
-e
# 数据库的ip地址
hostname
=
"192.168.1.90"
# 数据库的端口
port
=
"3306"
# 数据库的ip用户名
username
=
"root"
# 数据库的密码
password
=
"root"
# 数据库的库名
dbname
=
"demo_test"
# 文件预处理的状态
preStatus
=
"预处理"
processingStatus
=
"预处理中"
finishStatus
=
"预处理完成"
# 通过文件预处理的状态获取文件id
select_db_sql
=
"select file_deposit from t_file_pretreatment_status where pretreatment_status='01'"
file_deposit
=
$(
mysql
-h
${
hostname
}
-P
${
port
}
-u
${
username
}
-p
${
password
}
${
dbname
}
-s
-e
"
${
select_db_sql
}
"
)
echo
"file_deposit执行结果返回:
$file_deposit
"
# 判断上个命令是否执行成功
if
[
${#
file_deposit
}
-gt
0
]
;
then
echo
"轮循到新增文件,文件id为:
$file_deposit
"
#update
# 通过文件id获取文件名,文件地址,文件格式
get_file_deposit_info_sql
=
"select file_deposit_id,file_name,file_address,file_format from t_file_deposit where file_deposit_id=
$file_deposit
"
get_file_deposit_info
=
$(
mysql
-h
${
hostname
}
-P
${
port
}
-u
${
username
}
-p
${
password
}
${
dbname
}
-s
-e
"
${
get_file_deposit_info_sql
}
"
)
# zip文件的名字
zip_file_name
=
`
echo
$get_file_deposit_info
|
awk
-F
' '
'{print $2}'
`
# 文件名
file_name
=
`
echo
$zip_file_name
|
cut
-d
\.
-f
1
`
# hdfs路径
hdfs_path
=
"/user/datawarehouse/
`
date
+%Y-%m-%d
`
/
$file_name
"
unzip_output_path
=
"/root/azkaban/dst"
copy_output_path
=
"/root/azkaban/dt"
python3 file_pretreatment.py
$get_file_deposit_info
$hdfs_path
$unzip_output_path
$copy_output_path
# 判断hdfs目录是否存在
hdfs dfs
-test
-d
/user/tuomin_test
if
[
$?
=
0
]
;
then
echo
"==========1=========="
hdfs dfs
-test
-d
$hdfs_path
if
[
$?
!=
0
]
;
then
echo
"==========2=========="
hdfs dfs
-mkdir
-p
$hdfs_path
hdfs dfs
-put
$copy_output_path
$hdfs_path
else
echo
""
==========
3
==========
""
hdfs dfs
-rm
-r
$hdfs_path
hdfs dfs
-mkdir
-p
$hdfs_path
hdfs dfs
-put
$copy_output_path
$hdfs_path
fi
echo
""
==========
4
==========
""
fi
else
echo
"未轮循到新增文件......"
fi
\ No newline at end of file
wanjia_tuomin/data_process/start.job
0 → 100644
View file @
da60ef33
type=command
command=echo "Scheduling began....."
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment