Commit bb3e8a89 authored by kinomin's avatar kinomin

初始化项目

parent 7e34eec0
# mysql 相关参数
MY_MYSQL_DRIVER = 'org.mariadb.jdbc.Driver'
MY_MYSQL_URL = 'jdbc:mysql://hadoop1:3306/wanjia?useSSL=false'
MY_MYSQL_USER = 'root'
MY_MYSQL_PASSWORD = '123'
# save 相关参数
'''
:param mode: specifies the behavior of the save operation when data already exists.
* ``append``: Append contents of this :class:`DataFrame` to existing data.
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` or ``errorifexists`` (default case): Throw an exception if data already exists.
'''
SAVE_MODE_OVERWRITE = 'overwrite'
SAVE_MODE_APPEND = 'append'
SAVE_MODE_IGNORE = 'ignore'
SAVE_MODE_ERROR = 'error'
HDFS_PATH = 'hdfs://hadoop1:9000/wanjia'
\ No newline at end of file
#!/usr/local/bin/python #!/usr/local/bin/python
import hashlib
import json import json
import sys import sys
from common import Constants
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from pyspark.sql.types import StringType from pyspark.sql.types import StringType
# import os from rules import MD5
import pymysql from sources import Sources, Sinks
# os.system("pip install pyspark")
def getMySqlConnect():
# 连接数据库
conn = pymysql.connect(
host="hadoop1", # 连接主机名,也可以用ip地址,例如127.0.0.1
user="root", # 连接用户名
passwd="123", # 用户密码
db="wanjia", # 要连接的数据库名
charset="utf8", # 指定编码格式
autocommit=True, # 如果插入数据,, 是否自动提交? 和conn.commit()功能一致。
)
return conn
def getRecordByIdResultT1(id):
conn = getMySqlConnect()
# 创建游标对象,用来给数据库发送sql语句
cur = conn.cursor()
serch_data = """select t1 from test where id = {table_id}""".format(table_id=id)
result = cur.execute(serch_data)
# 获取下一个查询结果集
record = cur.fetchone()
cur.close() # 关闭数据库连接
return record
def getColumns(x): def getColumns(x):
sql_list = [] sql_list = []
...@@ -52,64 +30,44 @@ def getColumns(x): ...@@ -52,64 +30,44 @@ def getColumns(x):
print("---------------------->", sql) print("---------------------->", sql)
return sql return sql
def md5(col):
md5 = hashlib.md5()
md5.update(str(col).encode('utf-8'))
return md5.hexdigest()
if __name__ == '__main__': if __name__ == '__main__':
# 拿到 列名 等 # 拿到 列名 等
record = getRecordByIdResultT1(sys.argv[1]) record = Sources.getRecordByIdResultT1(sys.argv[1])
col = json.loads(str(record)[2:-3]).get('col') col = json.loads(str(record)[2:-3]).get('col')
sql = getColumns(str(record)[2:-3]) sql = getColumns(str(record)[2:-3])
print(sql) sink_method = json.loads(str(record)[2:-3]).get('sink_method')
# spark 初始化 # spark 初始化
spark = SparkSession.Builder().appName('sql').master('local').getOrCreate() spark = SparkSession.Builder().appName('sql').master('local').getOrCreate()
spark.udf.register('md5', md5, StringType()) spark.udf.register('md5', MD5.md5, StringType())
print('=======>', output_file) print('=======>', sys.argv[2])
df = spark.read.format('csv').option('inferSchema', 'true').load(sys.argv[2]).toDF(*col)
df.count df = Sources.readCsv(spark, sys.argv[2], col)
t_table = df.createTempView('kino') t_table = df.createTempView('kino')
sinkDF = spark.sql(sql) sinkDF = spark.sql(sql)
sinkDF.show() sinkDF.show()
''' if sink_method.lower() == 'hdfs':
:param mode: specifies the behavior of the save operation when data already exists. # TODO 落 HDFS
* ``append``: Append contents of this :class:`DataFrame` to existing data. Sinks.sinkHDFS(sinkDF, Constants.HDFS_PATH, "csv", Constants.SAVE_MODE_OVERWRITE)
* ``overwrite``: Overwrite existing data. elif sink_method.lower() == 'mysql':
* ``ignore``: Silently ignore this operation if data already exists. # TODO 落 MySQL
* ``error`` or ``errorifexists`` (default case): Throw an exception if data already exists. Sinks.sinkMySql(sinkDF,
''' mode=Constants.SAVE_MODE_OVERWRITE,
sinkDF.write.mode("overwrite") \ driver=Constants.MY_MYSQL_DRIVER,
.format("jdbc") \ url=Constants.MY_MYSQL_URL,
.option("truncate", "true") \ user=Constants.MY_MYSQL_USER,
.option("batchsize", 10000) \ password=Constants.MY_MYSQL_PASSWORD,
.option("isolationLevel", "NONE") \ table="test1")
.option("driver", "org.mariadb.jdbc.Driver") \ elif sink_method.lower() == 'ftp':
.option("url", 'jdbc:mysql://hadoop1:3306/wanjia?useSSL=false') \ print("落FTP")
.option("user", 'root') \ else:
.option("password", '123') \ print("...")
.option("dbtable", "test1") \
.save()
# TODO 修改状态
# 落 ftp
# sinkDF.write.format("com.springml.spark.sftp") \
# .option("host", "192.168.31.45") \
# .option("username", "raomin@9zdata.cn") \
# .option("password", "9zdata123.") \
# .option("fileType", "csv") \
# .option("delimiter", ";") \
# .save("ftp://192.168.31.45:21/wanjia/sample.csv")
# sinkDF.write.mode("overwrite").text("ftp://192.168.31.45/wanjia/sample.csv")
# sinkDF.write.format("parquet").mode("overwrite").save('hdfs://hadoop1:9000/wanjia')
# TODO 修改状态, 数据校验(读取-写出 数据量)
Sinks.updateMysqlStatus(i_status=1, i_readcount=df.count(), i_sinkcount=sinkDF.count(), i_id=1)
# 关闭spark会话 # 关闭spark会话
spark.stop() spark.stop()
......
...@@ -3,7 +3,6 @@ id=$1 ...@@ -3,7 +3,6 @@ id=$1
output_file=$2 output_file=$2
echo "传入的数据库id为: ${id}" echo "传入的数据库id为: ${id}"
echo "目标文件路径为: " ${output_file} echo "目标文件路径为: " ${output_file}
#spark-submit --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mysql-connector-java.jar dependencies.py ${id} ${output_file}
#python3 dependencies.py --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar ${id} ${output_file}
nohup spark-submit --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar dependencies.py ${id} ${output_file} > /root/test.log 2>&1 & # nohup spark-submit --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar dependencies.py ${id} ${output_file} > /root/test.log 2>&1 &
spark-submit --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar dependencies.py ${id} ${output_file}
\ No newline at end of file
class MyError(Exception):
def __init__(self, ErrorInfo):
super().__init__(self)
self.errorinfo = ErrorInfo
def __str__(self):
return self.errorinfo
if __name__ == '__main__':
try:
raise MyError('零异常')
except MyError as e:
print("my exception currend:", e.errorinfo)
\ No newline at end of file
import hashlib
def md5(col):
md5 = hashlib.md5()
md5.update(str(col).encode('utf-8'))
return md5.hexdigest()
\ No newline at end of file
from common import Constants
import exception.MyErrorException as ex
# 写出到 MySQL
from sources import Sources
def sinkMySql(sinkDF, driver, url, user, password, table, mode=Constants.SAVE_MODE_OVERWRITE):
sinkDF.write.mode(mode) \
.format("jdbc") \
.option("truncate", "true") \
.option("batchsize", 10000) \
.option("isolationLevel", "NONE") \
.option("driver", driver) \
.option("url", url) \
.option("user", user) \
.option("password", password) \
.option("dbtable", table) \
.save()
# 写出到FTP
'''
相关参数, 例如:
host: 192.168.31.45
username: raomin@9zdata.cn
password: 9zdata123.
fileType: csv
file: ftp://192.168.31.45:21/wanjia/sample.csv
'''
def sinkFTP(sinkDF, host, user, password, filetype, file):
sinkDF.write.format("com.springml.spark.sftp") \
.option("host", host) \
.option("username", user) \
.option("password", password) \
.option("fileType", filetype) \
.option("delimiter", ";") \
.save(file)
# sink HDFS
def sinkHDFS(sinkDF, path, format="parquet", mode=Constants.SAVE_MODE_OVERWRITE):
sinkDF\
.write\
.format(format)\
.mode(mode)\
.save(path)
# 修改 MySQL 状态
def updateMysqlStatus(i_status, i_readcount, i_sinkcount, i_id):
try:
conn = Sources.getMySqlConnect()
# 创建游标对象,用来给数据库发送sql语句
cur = conn.cursor()
serch_data = """update uptable set status={status}, readcount={readcount}, sinkcount={sinkcount} where id = {id}"""\
.format(status=i_status, readcount=i_readcount, sinkcount=i_sinkcount, id=i_id)
result = cur.execute(serch_data)
except ex.MyError as e:
print("update error", e.errorinfo)
finally:
cur.close() # 关闭数据库连接
conn.close()
\ No newline at end of file
import pymysql
import exception.MyErrorException as ex
def getMySqlConnect():
try:
# 连接数据库
conn = pymysql.connect(
host="hadoop1", # 连接主机名,也可以用ip地址,例如127.0.0.1
user="root", # 连接用户名
passwd="123", # 用户密码
db="wanjia", # 要连接的数据库名
charset="utf8", # 指定编码格式
autocommit=True, # 如果插入数据,, 是否自动提交? 和conn.commit()功能一致。
)
except ex.MyError as e:
print("connection error", e.errorinfo)
return conn
def getRecordByIdResultT1(id):
conn = getMySqlConnect()
# 创建游标对象,用来给数据库发送sql语句
try:
cur = conn.cursor()
serch_data = """select t1 from test where id = {table_id}""".format(table_id=id)
result = cur.execute(serch_data)
# 获取下一个查询结果集
record = cur.fetchone()
except ex.MyError as e:
print("executor failed", e.errorinfo)
finally:
cur.close() # 关闭数据库连接
conn.close()
return record
def readCsv(spark, path, col):
df = spark.read.format('csv').option('inferSchema', 'true').load(path).toDF(*col)
return df
#!/usr/local/bin/python #!/usr/local/bin/python
import sys import sys
import os import os
import shutil import shutil
import zipfile import zipfile
import exception.MyErrorException as ex
pyName = sys.argv[0] #sys.argv[0] 类似于shell中的$0, 但不是脚本名称,而是脚本的路径 pyName = sys.argv[0] #sys.argv[0] 类似于shell中的$0, 但不是脚本名称,而是脚本的路径
zipPath = sys.argv[1] #sys.argv[1] 表示传入的第一个参数 zipPath = sys.argv[1] #sys.argv[1] 表示传入的第一个参数
inputPath = sys.argv[2] #sys.argv[2] 表示传入的第二个参数 inputPath = sys.argv[2] #sys.argv[2] 表示传入的第二个参数
outputPath = sys.argv[3] outputPath = sys.argv[3]
# os.system("hdfs dfs -put /srcPath /dstPath")
def get_file(srcPath, dstPath): def get_file(srcPath, dstPath):
try:
if not os.path.exists(srcPath):
print("srcPath not exist")
else:
res = [] res = []
for root, dirs, files in os.walk(srcPath, True): for root, dirs, files in os.walk(srcPath, True):
for fileName in files: for fileName in files:
name, suf = os.path.splitext(fileName) name, suf = os.path.splitext(fileName)
if fileName.endswith('.csv'): if fileName.endswith('.csv'):
shutil.copy(os.path.join(root, fileName), dstPath) shutil.copy(os.path.join(root, fileName), dstPath)
except ex.MyError as e:
print("失败", e.errorinfo)
def unzip_file(zip_file_name, destination_path): def unzip_file(zip_file_name, destination_path):
try:
archive = zipfile.ZipFile(zip_file_name, mode='r') archive = zipfile.ZipFile(zip_file_name, mode='r')
for file in archive.namelist(): for file in archive.namelist():
archive.extract(file, destination_path) archive.extract(file, destination_path)
except ex.MyError as e:
print("解压失败", e.errorinfo)
sys.exit()
if __name__ == '__main__': if __name__ == '__main__':
print('解压、复制中...') print('解压、复制中...')
......
...@@ -2,9 +2,7 @@ ...@@ -2,9 +2,7 @@
echo -e "\033[34m==========文件解压、复制开始\033[0m==========" echo -e "\033[34m==========文件解压、复制开始\033[0m=========="
python3 unzip.py /root/wanjia/files/zip_file/20201102/20201102.zip /root/wanjia/files/input_file/20201102 /root/wanjia/files/output_file/20201102 python3 unzip.py /root/wanjia/files/zip_file/20201102/20201102.zip /root/wanjia/files/input_file/20201102 /root/wanjia/files/output_file/20201102
echo -e "\033[34m==========文件解压、复制结束\033[0m==========" echo -e "\033[34m==========文件解压、复制结束\033[0m=========="
echo "" echo ""
echo -e "\033[34m==========将id写入环境变量\033[0m============" echo -e "\033[34m==========将id写入环境变量\033[0m============"
# echo '{"id": 1}' >> ${JOB_OUTPUT_PROP_FILE} # echo '{"id": 1}' >> ${JOB_OUTPUT_PROP_FILE}
echo '{"id": 1, "output_file": "/root/wanjia/files/output_file/20201102"}' >> ${JOB_OUTPUT_PROP_FILE} echo '{"id": 1, "output_file": "/root/wanjia/files/output_file/20201102"}' >> ${JOB_OUTPUT_PROP_FILE}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment