Commit 47682d05 authored by zhangminghui's avatar zhangminghui

zmh update

parent fea442f1
#!/usr/local/bin/python #!/usr/local/bin/python
import json import json
import sys import os
from json import JSONDecodeError
from common import Constants from wanjia.spark.common import Constants
from pyspark.sql import SparkSession from wanjia.spark.sources import Sources
from pyspark.sql.types import StringType from wanjia.spark.sources.Sources import updateStatusById
from rules import MD5
from sources import Sources, Sinks
def getColumns(x): def getColumns(x):
try: try:
sql_list = [] sql_list = []
sql_file_list = [] sql_file_list = []
sql_create_list = []
list = [i for i in json.loads(x).get('col')] list = [i for i in json.loads(x).get('col')]
for i in range(0, len(list)): for i in range(0, len(list)):
if json.loads(x).get('tm').get(list[i]) == "md5": if json.loads(x).get('tm').get(list[i]) == "md5":
...@@ -24,6 +24,7 @@ def getColumns(x): ...@@ -24,6 +24,7 @@ def getColumns(x):
else: else:
sql_list.append(' ' + list[i]) sql_list.append(' ' + list[i])
sql_file_list.append(' ' + list[i]) sql_file_list.append(' ' + list[i])
sql_create_list.append(' ' + list[i] + " string")
col = "" col = ""
col_file = "" col_file = ""
for i in sql_list: for i in sql_list:
...@@ -31,63 +32,46 @@ def getColumns(x): ...@@ -31,63 +32,46 @@ def getColumns(x):
for i in sql_file_list: for i in sql_file_list:
col_file += i + ',' col_file += i + ','
except Exception as e:
col_create = ""
for i in sql_create_list:
col_create += i + ','
except IndexError as e:
print("index error:{0}".format(e)) print("index error:{0}".format(e))
except JSONDecodeError as e:
print("json pase exception:{0}".format(e))
src_sql = 'select %s from %s'%(col[0:-1], 'a')
md5_sql = 'select %s from %s' % (col_file[0:-1], 'a')
dir ='/data/test/src'
create_sql = 'create external table a (%s) stored as txt location "%s" ' %(col_create[0:-1],dir)
print("---------------------->", src_sql)
print("---------------------->", md5_sql)
print("---------------------->", create_sql)
return [src_sql, md5_sql,create_sql]
sql = "select " + col + " current_timestamp() as create_time from kino"
sql_file = "select " + col_file[0:-1] + " from kino"
print("---------------------->", sql)
print("---------------------->", sql_file)
return [sql, sql_file]
if __name__ == '__main__': if __name__ == '__main__':
# 拿到 列名 等 try:
record = Sources.getRecordByIdResultT1(sys.argv[1]) # 拿到 列名 等
col = json.loads(str(record)[2:-3]).get('col') # record = Sources.getRecordByIdResultT1(sys.argv[1])
sql_lists = getColumns(str(record)[2:-3]) record = Sources.getRecordByIdResultT1(1)
sql = sql_lists[0] print(record)
sql_file = sql_lists[1] col = json.loads(record.get("json")).get('col')
sink_method = json.loads(str(record)[2:-3]).get('sink_method') sql_lists = getColumns(record.get("json"))
src_sql = sql_lists[0]
# spark 初始化 md5_sql = sql_lists[1]
spark = SparkSession.Builder().appName('sql').master('local').getOrCreate() create_sql= sql_lists[2]
os.system("impala-shell -q ${create_sql}")
spark.udf.register('md5', MD5.md5, StringType())
print('=======>', sys.argv[2]) #sink_method = json.loads(str(record)[2:-3]).get('sink_method')
except JSONDecodeError as e:
df = Sources.readCsv(spark, sys.argv[2], col) print("JSONDecodeError:{0}".format(e))
updateStatusById(Constants.SELECT_CODE)
t_table = df.createTempView('kino')
sinkDF = spark.sql(sql)
sinkFileDF = spark.sql(sql_file)
sinkFileDF.show()
if sink_method.lower() == 'hdfs':
# TODO 落 HDFS
Sinks.sinkHDFS(sinkDF=sinkFileDF,
path=Constants.HDFS_PATH,
delimiter=",",
format="csv",
mode=Constants.SAVE_MODE_OVERWRITE)
elif sink_method.lower() == 'mysql':
# TODO 落 MySQL
Sinks.sinkMySql(sinkDF,
mode=Constants.SAVE_MODE_OVERWRITE,
driver=Constants.MY_MYSQL_DRIVER,
url=Constants.MY_MYSQL_URL,
user=Constants.MY_MYSQL_USER,
password=Constants.MY_MYSQL_PASSWORD,
table="test1")
elif sink_method.lower() == 'ftp':
print("落FTP")
else:
print("...")
# TODO 修改状态, 数据校验(读取-写出 数据量)
Sinks.updateMysqlStatus(i_status=1, i_readcount=df.count(), i_sinkcount=sinkDF.count(), i_id=1)
# 关闭spark会话
spark.stop()
......
...@@ -19,4 +19,14 @@ SAVE_MODE_APPEND = 'append' ...@@ -19,4 +19,14 @@ SAVE_MODE_APPEND = 'append'
SAVE_MODE_IGNORE = 'ignore' SAVE_MODE_IGNORE = 'ignore'
SAVE_MODE_ERROR = 'error' SAVE_MODE_ERROR = 'error'
HDFS_PATH = 'hdfs://hadoop1:9000/wanjia' HDFS_PATH = 'hdfs://hadoop1:9000/wanjia'
\ No newline at end of file
"""
:parameter:-1 表示复制文件失败,-2表示md5函数解析失败,-3
"""
COPY_FILE_CODE = -1
DATA_DEAL_CODE = -2
UPLOAD_CSV_FTP_CODE = -3
SELECT_CODE = -4
SUCCESS_CODE = 0
import pymysql from json import JSONDecodeError
from pyspark import SparkContext
from sources import Sinks
def getMySqlConnect(): import pymysql
try:
# 连接数据库
conn = pymysql.connect(
host="hadoop1", # 连接主机名,也可以用ip地址,例如127.0.0.1
user="root", # 连接用户名
passwd="123", # 用户密码
db="wanjia", # 要连接的数据库名
charset="utf8", # 指定编码格式
autocommit=True, # 如果插入数据,, 是否自动提交? 和conn.commit()功能一致。
)
except Exception as e:
print("connection error:{0}".format(e))
Sinks.updateMysqlStatus(i_status=1, i_readcount=0, i_sinkcount=0, i_id=1)
return conn
from wanjia.spark.common import Constants
from wanjia.spark.utils import DbUtils as dbutils
def getRecordByIdResultT1(id): def getRecordByIdResultT1(id):
conn = getMySqlConnect()
# 创建游标对象,用来给数据库发送sql语句
try: try:
cur = conn.cursor() conn, cur = dbutils.create_conn()
serch_data = """select t1 from test where id = {table_id}""".format(table_id=id) # 创建游标对象,用来给数据库发送sql语句
result = cur.execute(serch_data) serch_data = """select json from test1 where id = %s """ %(id)
cur.execute(serch_data)
# 获取下一个查询结果集 # 获取下一个查询结果集
conn.commit()
record = cur.fetchone() record = cur.fetchone()
except Exception as e: except ConnectionError as e:
Sinks.updateMysqlStatus(i_status=1, i_readcount=0, i_sinkcount=0, i_id=1)
print("executor failed:{0}".format(e)) print("executor failed:{0}".format(e))
updateStatusById(Constants.SELECT_CODE)
finally: finally:
cur.close() # 关闭数据库连接 # 关闭数据库连接
conn.close() dbutils.close_conn(conn, cur)
return record return record
def updateStatusById(status):
def readCsv(spark, path, col):
try: try:
df = spark.read.format('csv').option('inferSchema', 'true').load(path).toDF(*col) id = 1
except Exception as e: conn, cur = dbutils.create_conn()
Sinks.updateMysqlStatus(i_status=1, i_readcount=0, i_sinkcount=0, i_id=1) # 创建游标对象,用来给数据库发送sql语句
print("executor failed:{0}".format(e)) #serch_data = """select t1 from test where id = {table_id}""".format(table_id=id)
return df update_data = 'update test set status=%s where id =%s' %(status, id)
result = cur.execute(update_data)
def readTxt(spark, path, delimiter, col): conn.commit()
try: # 获取下一个查询结果集
rddFile = spark.SparkContext.textFile(path)
rddMap = rddFile.map(lambda x: x.split(delimiter))
df = spark.createDataFrame(rddMap, col)
except Exception as e: except Exception as e:
Sinks.updateMysqlStatus(i_status=1, i_readcount=0, i_sinkcount=0, i_id=1)
print("executor failed:{0}".format(e)) print("executor failed:{0}".format(e))
return df finally:
\ No newline at end of file # 关闭数据库连接
dbutils.close_conn(conn, cur)
return result
\ No newline at end of file
...@@ -4,29 +4,14 @@ import os ...@@ -4,29 +4,14 @@ import os
import shutil import shutil
import zipfile import zipfile
from wanjia.spark.common import Constants
from wanjia.spark.sources import Sources as sources
pyName = sys.argv[0] # sys.argv[0] 类似于shell中的$0, 但不是脚本名称,而是脚本的路径 # pyName = sys.argv[0] # sys.argv[0] 类似于shell中的$0, 但不是脚本名称,而是脚本的路径
zipPath = sys.argv[1] # sys.argv[1] 表示传入的第一个参数 # zipPath = sys.argv[1] # sys.argv[1] 表示传入的第一个参数
inputPath = sys.argv[2] # sys.argv[2] 表示传入的第二个参数 # inputPath = sys.argv[2] # sys.argv[2] 表示传入的第二个参数
outputPath = sys.argv[3] # outputPath = sys.argv[3]
# os.system("hdfs dfs -put /srcPath /dstPath") # os.system("hdfs dfs -put /srcPath /dstPath")
def get_file(srcPath, dstPath):
if not os.path.exists(srcPath):
os.mkdir(srcPath)
print("srcPath mkdir success")
if not os.path.exists(dstPath):
os.mkdir(srcPath)
print("dstPath mkdir success")
else:
res = []
for root, dirs, files in os.walk(srcPath, True):
for fileName in files:
name, suf = os.path.splitext(fileName)
if fileName.endswith('.csv'):
shutil.copy(os.path.join(root, fileName), dstPath)
def unzip_single(srcPath, dstPath): def unzip_single(srcPath, dstPath):
''' 解压单个文件到目标文件夹。 ''' 解压单个文件到目标文件夹。
...@@ -48,22 +33,30 @@ def unzip_all(source_dir, dest_dir): ...@@ -48,22 +33,30 @@ def unzip_all(source_dir, dest_dir):
if entry.is_file() and os.path.splitext(entry.name)[1] == '.zip': if entry.is_file() and os.path.splitext(entry.name)[1] == '.zip':
unzip_single(entry.path, dest_dir) unzip_single(entry.path, dest_dir)
# def unzip_file(zip_file_name, destination_path):
# archive = zipfile.ZipFile(zip_file_name, mode='r')
# for file in archive.namelist():
# print(file)
# name, suf = os.path.splitext(file)
# if file.endswith('.zip'):
# archive.extract(file, destination_path)
# print("解压失败")
def copy_file(srcPath, dstPath):
if not os.path.exists(srcPath):
os.mkdir(srcPath)
print("srcPath mkdir success")
if not os.path.exists(dstPath):
os.mkdir(srcPath)
print("dstPath mkdir success")
else:
for root, dirs, files in os.walk(srcPath, True):
for fileName in files:
if fileName.endswith('.csv'):
shutil.copy(os.path.join(root, fileName), dstPath)
if __name__ == '__main__': if __name__ == '__main__':
# if len(sys.argv) == 3:
# source_dir, dest_dir = os.path.abspath(sys.argv[0].strip('"')), sys.argv[1] try:
print('解压、复制中...') # unzip_all(source_dir, dest_dir)
unzip_all(zipPath, inputPath) # copy_file(srcPath, dstPath)
get_file(inputPath, outputPath) unzip_all("D:\\ftp.zip", "D:\\dst")
# unzip_all("D:\\src", "D:\\dst") copy_file("D:\\dst", "D:\\dt")
# get_file("D:\\src", "D:\\dst") except FileNotFoundError as e:
print("unzip failed:{0}".format(e))
status = Constants.COPY_FILE_CODE
sources.updateStatusById(status)
import pymysql
from dbutils.pooled_db import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=1, # 链接池中最多共享的链接数量
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
host='localhost', #数据库ip
port=3306, #数据库端口
user='root', #数据库用户名
password='root', #数据库密码
database='demo_test', #数据库库名
charset='utf8' #数据库编码
)
def create_conn():
conn = POOL.connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
return conn, cursor
def close_conn(conn, cursor):
conn.close()
cursor.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment