Commit 651c9f26 authored by kinomin's avatar kinomin

初始化项目

parent 1cae46b3
import pymysql import pymysql
import exception.MyErrorException as ex import exception.MyErrorException as ex
from pyspark import SparkContext from pyspark import SparkContext
from sources import Sinks
def getMySqlConnect(): def getMySqlConnect():
try: try:
# 连接数据库 # 连接数据库
conn = pymysql.connect( conn = pymysql.connect(
host="hadoop1", # 连接主机名,也可以用ip地址,例如127.0.0.1 host="hadoop1", # 连接主机名,也可以用ip地址,例如127.0.0.1
user="root", # 连接用户名 user="root", # 连接用户名
...@@ -15,6 +17,7 @@ def getMySqlConnect(): ...@@ -15,6 +17,7 @@ def getMySqlConnect():
) )
except ex.MyError as e: except ex.MyError as e:
print("connection error", e.errorinfo) print("connection error", e.errorinfo)
Sinks.updateMysqlStatus(i_status=1, i_readcount=0, i_sinkcount=0, i_id=1)
return conn return conn
...@@ -28,6 +31,7 @@ def getRecordByIdResultT1(id): ...@@ -28,6 +31,7 @@ def getRecordByIdResultT1(id):
# 获取下一个查询结果集 # 获取下一个查询结果集
record = cur.fetchone() record = cur.fetchone()
except ex.MyError as e: except ex.MyError as e:
Sinks.updateMysqlStatus(i_status=1, i_readcount=0, i_sinkcount=0, i_id=1)
print("executor failed", e.errorinfo) print("executor failed", e.errorinfo)
conn.rollback() conn.rollback()
finally: finally:
...@@ -37,13 +41,20 @@ def getRecordByIdResultT1(id): ...@@ -37,13 +41,20 @@ def getRecordByIdResultT1(id):
def readCsv(spark, path, col): def readCsv(spark, path, col):
df = spark.read.format('csv').option('inferSchema', 'true').load(path).toDF(*col) try:
df = spark.read.format('csv').option('inferSchema', 'true').load(path).toDF(*col)
except ex.MyError as e:
Sinks.updateMysqlStatus(i_status=1, i_readcount=0, i_sinkcount=0, i_id=1)
print("executor failed", e.errorinfo)
return df return df
def readTxt(spark, path, col, delimiter): def readTxt(spark, path, delimiter, *col):
rddFile = SparkContext.textFile("1.txt") try:
rddMap = rddFile.map(lambda x: x.split(delimiter)) rddFile = SparkContext.textFile(path)
df = spark.createDataFrame(rddMap, col) rddMap = rddFile.map(lambda x: x.split(delimiter))
return df df = spark.createDataFrame(rddMap, *col)
except ex.MyError as e:
Sinks.updateMysqlStatus(i_status=1, i_readcount=0, i_sinkcount=0, i_id=1)
print("executor failed", e.errorinfo)
return df
\ No newline at end of file
...@@ -2,13 +2,14 @@ ...@@ -2,13 +2,14 @@
import sys import sys
import os import os
import shutil import shutil
import exception.MyErrorException as ex
import zipfile import zipfile
# pyName = sys.argv[0] # sys.argv[0] 类似于shell中的$0, 但不是脚本名称,而是脚本的路径 pyName = sys.argv[0] # sys.argv[0] 类似于shell中的$0, 但不是脚本名称,而是脚本的路径
# zipPath = sys.argv[1] # sys.argv[1] 表示传入的第一个参数 zipPath = sys.argv[1] # sys.argv[1] 表示传入的第一个参数
# inputPath = sys.argv[2] # sys.argv[2] 表示传入的第二个参数 inputPath = sys.argv[2] # sys.argv[2] 表示传入的第二个参数
# outputPath = sys.argv[3] outputPath = sys.argv[3]
#问题:解压的时候要考虑密码吗??? #问题:解压的时候要考虑密码吗???
# os.system("hdfs dfs -put /srcPath /dstPath") # os.system("hdfs dfs -put /srcPath /dstPath")
...@@ -36,7 +37,7 @@ def unzip_single(srcPath, dstPath): ...@@ -36,7 +37,7 @@ def unzip_single(srcPath, dstPath):
zf = zipfile.ZipFile(srcPath) zf = zipfile.ZipFile(srcPath)
try: try:
zf.extractall(path=dstPath) zf.extractall(path=dstPath)
except RuntimeError as e: except ex.MyError as e:
print(e) print(e)
zf.close() zf.close()
...@@ -65,7 +66,7 @@ if __name__ == '__main__': ...@@ -65,7 +66,7 @@ if __name__ == '__main__':
# if len(sys.argv) == 3: # if len(sys.argv) == 3:
# source_dir, dest_dir = os.path.abspath(sys.argv[0].strip('"')), sys.argv[1] # source_dir, dest_dir = os.path.abspath(sys.argv[0].strip('"')), sys.argv[1]
print('解压、复制中...') print('解压、复制中...')
# unzip_file(zipPath, inputPath) unzip_all(zipPath, inputPath)
# get_file(inputPath, outputPath) get_file(inputPath, outputPath)
unzip_all("D:\\src", "D:\\dst") # unzip_all("D:\\src", "D:\\dst")
get_file("D:\\src", "D:\\dst") # get_file("D:\\src", "D:\\dst")
#! /bin/bash #! /bin/bash
set -e
echo -e "\033[34m==========文件解压、复制开始\033[0m==========" echo -e "\033[34m==========文件解压、复制开始\033[0m=========="
python3 unzip.py /root/wanjia/files/zip_file/20201102/20201102.zip /root/wanjia/files/input_file/20201102 /root/wanjia/files/output_file/20201102 python3 unzip.py /root/wanjia/files/zip_file/20201102/202011021111.zip /root/wanjia/files/input_file/20201102 /root/wanjia/files/output_file/20201102
echo -e "\033[34m==========文件解压、复制结束\033[0m==========" echo -e "\033[34m==========文件解压、复制结束\033[0m=========="
echo "" echo ""
echo -e "\033[34m==========将id写入环境变量\033[0m============" echo -e "\033[34m==========将id写入环境变量\033[0m============"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment