Commit 1cae46b3 authored by 宋朋's avatar 宋朋

update

parent 3247ad3d
import pymysql import pymysql
import exception.MyErrorException as ex import exception.MyErrorException as ex
from pyspark import SparkContext
def getMySqlConnect(): def getMySqlConnect():
try: try:
...@@ -28,11 +29,21 @@ def getRecordByIdResultT1(id): ...@@ -28,11 +29,21 @@ def getRecordByIdResultT1(id):
record = cur.fetchone() record = cur.fetchone()
except ex.MyError as e: except ex.MyError as e:
print("executor failed", e.errorinfo) print("executor failed", e.errorinfo)
conn.rollback()
finally: finally:
cur.close() # 关闭数据库连接 cur.close() # 关闭数据库连接
conn.close() conn.close()
return record return record
def readCsv(spark, path, col): def readCsv(spark, path, col):
df = spark.read.format('csv').option('inferSchema', 'true').load(path).toDF(*col) df = spark.read.format('csv').option('inferSchema', 'true').load(path).toDF(*col)
return df return df
def readTxt(spark, path, col, delimiter):
rddFile = SparkContext.textFile("1.txt")
rddMap = rddFile.map(lambda x: x.split(delimiter))
df = spark.createDataFrame(rddMap, col)
return df
...@@ -3,38 +3,69 @@ import sys ...@@ -3,38 +3,69 @@ import sys
import os import os
import shutil import shutil
import zipfile import zipfile
import exception.MyErrorException as ex
pyName = sys.argv[0] #sys.argv[0] 类似于shell中的$0, 但不是脚本名称,而是脚本的路径
zipPath = sys.argv[1] #sys.argv[1] 表示传入的第一个参数 # pyName = sys.argv[0] # sys.argv[0] 类似于shell中的$0, 但不是脚本名称,而是脚本的路径
inputPath = sys.argv[2] #sys.argv[2] 表示传入的第二个参数 # zipPath = sys.argv[1] # sys.argv[1] 表示传入的第一个参数
outputPath = sys.argv[3] # inputPath = sys.argv[2] # sys.argv[2] 表示传入的第二个参数
# outputPath = sys.argv[3]
#问题:解压的时候要考虑密码吗???
# os.system("hdfs dfs -put /srcPath /dstPath") # os.system("hdfs dfs -put /srcPath /dstPath")
def get_file(srcPath, dstPath): def get_file(srcPath, dstPath):
if not os.path.exists(srcPath):
os.mkdir(srcPath)
print("srcPath mkdir success")
if not os.path.exists(dstPath):
os.mkdir(srcPath)
print("dstPath mkdir success")
else:
res = []
for root, dirs, files in os.walk(srcPath, True):
for fileName in files:
name, suf = os.path.splitext(fileName)
if fileName.endswith('.csv'):
shutil.copy(os.path.join(root, fileName), dstPath)
def unzip_single(srcPath, dstPath):
''' 解压单个文件到目标文件夹。
'''
zf = zipfile.ZipFile(srcPath)
try: try:
if not os.path.exists(srcPath): zf.extractall(path=dstPath)
print("srcPath not exist") except RuntimeError as e:
else: print(e)
res = [] zf.close()
for root, dirs, files in os.walk(srcPath, True):
for fileName in files:
name, suf = os.path.splitext(fileName) def unzip_all(source_dir, dest_dir):
if fileName.endswith('.csv'): if not os.path.isdir(source_dir): # 如果是单一文件
shutil.copy(os.path.join(root, fileName), dstPath) unzip_single(source_dir, dest_dir)
except ex.MyError as e: else:
print("失败", e.errorinfo) it = os.scandir(source_dir)
for entry in it:
def unzip_file(zip_file_name, destination_path): if entry.is_file() and os.path.splitext(entry.name)[1] == '.zip':
try: unzip_single(entry.path, dest_dir)
archive = zipfile.ZipFile(zip_file_name, mode='r')
for file in archive.namelist(): # def unzip_file(zip_file_name, destination_path):
archive.extract(file, destination_path) # archive = zipfile.ZipFile(zip_file_name, mode='r')
except ex.MyError as e: # for file in archive.namelist():
print("解压失败", e.errorinfo) # print(file)
sys.exit() # name, suf = os.path.splitext(file)
# if file.endswith('.zip'):
# archive.extract(file, destination_path)
# print("解压失败")
if __name__ == '__main__': if __name__ == '__main__':
# if len(sys.argv) == 3:
# source_dir, dest_dir = os.path.abspath(sys.argv[0].strip('"')), sys.argv[1]
print('解压、复制中...') print('解压、复制中...')
unzip_file(zipPath, inputPath) # unzip_file(zipPath, inputPath)
get_file(inputPath, outputPath) # get_file(inputPath, outputPath)
unzip_all("D:\\src", "D:\\dst")
get_file("D:\\src", "D:\\dst")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment