Commit 256c8700 authored by kinomin's avatar kinomin

初始化项目

parent fe96331f
Pipeline #287 canceled with stages
#dependencies.job
type=command
dependencies=start
command=sh dependencies.sh ${id} ${output_file}
#!/usr/local/bin/python
import hashlib
import json
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
# import os
import pymysql
# os.system("pip install pyspark")
def getMySqlConnect():
# 连接数据库
conn = pymysql.connect(
host="hadoop1", # 连接主机名,也可以用ip地址,例如127.0.0.1
user="root", # 连接用户名
passwd="123", # 用户密码
db="wanjia", # 要连接的数据库名
charset="utf8", # 指定编码格式
autocommit=True, # 如果插入数据,, 是否自动提交? 和conn.commit()功能一致。
)
return conn
def getRecordByIdResultT1(id):
conn = getMySqlConnect()
# 创建游标对象,用来给数据库发送sql语句
cur = conn.cursor()
serch_data = """select t1 from test where id = {table_id}""".format(table_id=id)
result = cur.execute(serch_data)
# 获取下一个查询结果集
record = cur.fetchone()
cur.close() # 关闭数据库连接
return record
def getColumns(x):
sql_list = []
list = [i for i in json.loads(x).get('col')]
for i in range(0, len(list)):
if json.loads(x).get('tm').get(list[i]) == "md5":
sql_list.append('list[i] as t_'+list[i]+'md5('+list[i]+') as '+list[i])
elif json.loads(x).get('tm').get(list[i]) == "rtl":
sql_list.append('list[i] as t_'+list[i]+'rtl('+list[i]+') as '+list[i])
else:
print("")
col = ""
for i in list:
if i == list[len(list) - 1]:
col += i
else:
col += i + ' ,'
sql = "select " + col + ", current_timestamp() as create_time from kino"
print("---------------------->", sql)
return sql
def md5(col):
md5 = hashlib.md5()
md5.update(str(col).encode('utf-8'))
return md5.hexdigest()
# 脱敏
id = sys.argv[1]
output_file=sys.argv[2]
if __name__ == '__main__':
# 拿到 列名 等
record = getRecordByIdResultT1(id)
col = json.loads(str(record)[2:-3]).get('col')
sql = getColumns(str(record)[2:-3])
print(sql)
# spark 初始化
spark = SparkSession.Builder().appName('sql').master('local').getOrCreate()
spark.udf.register('md5', md5, StringType())
print('-------->', output_file)
df = spark.read.format('csv').option('inferSchema', 'true').load(output_file).toDF(*col)
df.count
t_table = df.createTempView('kino')
sinkDF = spark.sql(sql)
sinkDF.show()
'''
:param mode: specifies the behavior of the save operation when data already exists.
* ``append``: Append contents of this :class:`DataFrame` to existing data.
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` or ``errorifexists`` (default case): Throw an exception if data already exists.
'''
sinkDF.write.mode("overwrite") \
.format("jdbc") \
.option("truncate", "true") \
.option("batchsize", 10000) \
.option("isolationLevel", "NONE") \
.option("driver", "org.mariadb.jdbc.Driver") \
.option("url", 'jdbc:mysql://hadoop1:3306/wanjia?useSSL=false') \
.option("user", 'root') \
.option("password", '123') \
.option("dbtable", "test1") \
.save()
# TODO 修改状态
# 落 ftp
# sinkDF.write.format("com.springml.spark.sftp") \
# .option("host", "192.168.31.45") \
# .option("username", "raomin@9zdata.cn") \
# .option("password", "9zdata123.") \
# .option("fileType", "csv") \
# .option("delimiter", ";") \
# .save("ftp://192.168.31.45:21/wanjia/sample.csv")
sinkDF.write.mode("overwrite").text("ftp://192.168.31.45/wanjia/sample.csv")
# sinkDF.write.format("parquet").mode("overwrite").save('hdfs://hadoop1:9000/wanjia')
# 关闭spark会话
spark.stop()
#! /bin/bash
id=$1
output_file=$2
echo "传入的数据库id为: ${id}"
echo "目标文件路径为: " ${output_file}
#spark-submit --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mysql-connector-java.jar dependencies.py ${id} ${output_file}
#python3 dependencies.py --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar ${id} ${output_file}
nohup spark-submit --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar dependencies.py ${id} ${output_file} > /root/test.log 2>&1 &
#start.job
type = command
command = sh unzip.sh
#!/usr/local/bin/python
import sys
import os
import shutil
import zipfile
pyName = sys.argv[0] #sys.argv[0] 类似于shell中的$0, 但不是脚本名称,而是脚本的路径
zipPath = sys.argv[1] #sys.argv[1] 表示传入的第一个参数
inputPath = sys.argv[2] #sys.argv[2] 表示传入的第二个参数
outputPath = sys.argv[3]
def get_file(srcPath, dstPath):
res = []
for root, dirs, files in os.walk(srcPath, True):
for fileName in files:
name, suf = os.path.splitext(fileName)
if fileName.endswith('.csv'):
shutil.copy(os.path.join(root, fileName), dstPath)
def unzip_file(zip_file_name, destination_path):
archive = zipfile.ZipFile(zip_file_name, mode='r')
for file in archive.namelist():
archive.extract(file, destination_path)
if __name__ == '__main__':
print('解压、复制中...')
unzip_file(zipPath, inputPath)
get_file(inputPath, outputPath)
#! /bin/bash
echo -e "\033[34m==========文件解压、复制开始\033[0m=========="
python3 unzip.py /root/wanjia/files/zip_file/20201102/20201102.zip /root/wanjia/files/input_file/20201102 /root/wanjia/files/output_file/20201102
echo -e "\033[34m==========文件解压、复制结束\033[0m=========="
echo ""
echo -e "\033[34m==========将id写入环境变量\033[0m============"
# echo '{"id": 1}' >> ${JOB_OUTPUT_PROP_FILE}
echo '{"id": 1, "output_file": "/root/wanjia/files/output_file/20201102"}' >> ${JOB_OUTPUT_PROP_FILE}
#dependencies.job
type=command
dependencies=start
command=sh dependencies.sh ${id} ${output_file}
#!/usr/local/bin/python
import hashlib
import json
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
# import os
import pymysql
# os.system("pip install pyspark")
def getMySqlConnect():
# 连接数据库
conn = pymysql.connect(
host="hadoop1", # 连接主机名,也可以用ip地址,例如127.0.0.1
user="root", # 连接用户名
passwd="123", # 用户密码
db="wanjia", # 要连接的数据库名
charset="utf8", # 指定编码格式
autocommit=True, # 如果插入数据,, 是否自动提交? 和conn.commit()功能一致。
)
return conn
def getRecordByIdResultT1(id):
conn = getMySqlConnect()
# 创建游标对象,用来给数据库发送sql语句
cur = conn.cursor()
serch_data = """select t1 from test where id = {table_id}""".format(table_id=id)
result = cur.execute(serch_data)
# 获取下一个查询结果集
record = cur.fetchone()
cur.close() # 关闭数据库连接
return record
def getColumns(x):
sql_list = []
list = [i for i in json.loads(x).get('col')]
for i in range(0, len(list)):
if json.loads(x).get('tm').get(list[i]) == "md5":
sql_list.append('list[i] as t_'+list[i]+'md5('+list[i]+') as '+list[i])
elif json.loads(x).get('tm').get(list[i]) == "rtl":
sql_list.append('list[i] as t_'+list[i]+'rtl('+list[i]+') as '+list[i])
else:
print("")
col = ""
for i in list:
if i == list[len(list) - 1]:
col += i
else:
col += i + ' ,'
sql = "select " + col + ", current_timestamp() as create_time from kino"
print("---------------------->", sql)
return sql
def md5(col):
md5 = hashlib.md5()
md5.update(str(col).encode('utf-8'))
return md5.hexdigest()
# 脱敏
id = sys.argv[1]
output_file=sys.argv[2]
if __name__ == '__main__':
# 拿到 列名 等
record = getRecordByIdResultT1(id)
col = json.loads(str(record)[2:-3]).get('col')
sql = getColumns(str(record)[2:-3])
print(sql)
# spark 初始化
spark = SparkSession.Builder().appName('sql').master('local').getOrCreate()
spark.udf.register('md5', md5, StringType())
print('-------->', output_file)
df = spark.read.format('csv').option('inferSchema', 'true').load(output_file).toDF(*col)
df.count
t_table = df.createTempView('kino')
sinkDF = spark.sql(sql)
sinkDF.show()
'''
:param mode: specifies the behavior of the save operation when data already exists.
* ``append``: Append contents of this :class:`DataFrame` to existing data.
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` or ``errorifexists`` (default case): Throw an exception if data already exists.
'''
sinkDF.write.mode("overwrite") \
.format("jdbc") \
.option("truncate", "true") \
.option("batchsize", 10000) \
.option("isolationLevel", "NONE") \
.option("driver", "org.mariadb.jdbc.Driver") \
.option("url", 'jdbc:mysql://hadoop1:3306/wanjia?useSSL=false') \
.option("user", 'root') \
.option("password", '123') \
.option("dbtable", "test1") \
.save()
# TODO 修改状态
# 落 ftp
# sinkDF.write.format("com.springml.spark.sftp") \
# .option("host", "192.168.31.45") \
# .option("username", "raomin@9zdata.cn") \
# .option("password", "9zdata123.") \
# .option("fileType", "csv") \
# .option("delimiter", ";") \
# .save("ftp://192.168.31.45:21/wanjia/sample.csv")
sinkDF.write.mode("overwrite").text("ftp://192.168.31.45/wanjia/sample.csv")
# sinkDF.write.format("parquet").mode("overwrite").save('hdfs://hadoop1:9000/wanjia')
# 关闭spark会话
spark.stop()
#! /bin/bash
id=$1
output_file=$2
echo "传入的数据库id为: ${id}"
echo "目标文件路径为: " ${output_file}
#spark-submit --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mysql-connector-java.jar dependencies.py ${id} ${output_file}
#python3 dependencies.py --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar ${id} ${output_file}
nohup spark-submit --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar dependencies.py ${id} ${output_file} > /root/test.log 2>&1 &
#start.job
type = command
command = sh unzip.sh
#!/usr/local/bin/python
import sys
import os
import shutil
import zipfile
pyName = sys.argv[0] #sys.argv[0] 类似于shell中的$0, 但不是脚本名称,而是脚本的路径
zipPath = sys.argv[1] #sys.argv[1] 表示传入的第一个参数
inputPath = sys.argv[2] #sys.argv[2] 表示传入的第二个参数
outputPath = sys.argv[3]
def get_file(srcPath, dstPath):
res = []
for root, dirs, files in os.walk(srcPath, True):
for fileName in files:
name, suf = os.path.splitext(fileName)
if fileName.endswith('.csv'):
shutil.copy(os.path.join(root, fileName), dstPath)
def unzip_file(zip_file_name, destination_path):
archive = zipfile.ZipFile(zip_file_name, mode='r')
for file in archive.namelist():
archive.extract(file, destination_path)
if __name__ == '__main__':
print('解压、复制中...')
unzip_file(zipPath, inputPath)
get_file(inputPath, outputPath)
#! /bin/bash
echo -e "\033[34m==========文件解压、复制开始\033[0m=========="
python3 unzip.py /root/wanjia/files/zip_file/20201102/20201102.zip /root/wanjia/files/input_file/20201102 /root/wanjia/files/output_file/20201102
echo -e "\033[34m==========文件解压、复制结束\033[0m=========="
echo ""
echo -e "\033[34m==========将id写入环境变量\033[0m============"
# echo '{"id": 1}' >> ${JOB_OUTPUT_PROP_FILE}
echo '{"id": 1, "output_file": "/root/wanjia/files/output_file/20201102"}' >> ${JOB_OUTPUT_PROP_FILE}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment