Commit 3247ad3d authored by kinomin's avatar kinomin

初始化项目

parent bb3e8a89
......@@ -11,30 +11,39 @@ from sources import Sources, Sinks
def getColumns(x):
sql_list = []
sql_file_list = []
list = [i for i in json.loads(x).get('col')]
for i in range(0, len(list)):
if json.loads(x).get('tm').get(list[i]) == "md5":
sql_list.append(' md5('+list[i]+') as '+list[i]+", "+list[i]+' as t_'+list[i])
sql_file_list.append(' md5('+list[i]+') as '+list[i])
elif json.loads(x).get('tm').get(list[i]) == "rtl":
sql_list.append(' rtl('+list[i]+') as '+list[i]+", "+list[i]+' as t_'+list[i])
sql_file_list.append(' rtl('+list[i]+') as '+list[i])
else:
sql_list.append(list[i])
sql_list.append(' ' + list[i])
sql_file_list.append(' ' + list[i])
col = ""
col_file = ""
for i in sql_list:
if i == list[len(list) - 1]:
col += i
else:
col += i + ','
col += i + ','
for i in sql_file_list:
col_file += i + ','
sql = "select " + col + " current_timestamp() as create_time from kino"
sql_file = "select " + col_file[0:-1] + " from kino"
print("---------------------->", sql)
return sql
print("---------------------->", sql_file)
return [sql, sql_file]
if __name__ == '__main__':
# 拿到 列名 等
record = Sources.getRecordByIdResultT1(sys.argv[1])
col = json.loads(str(record)[2:-3]).get('col')
sql = getColumns(str(record)[2:-3])
sql_lists = getColumns(str(record)[2:-3])
sql = sql_lists[0]
sql_file = sql_lists[1]
sink_method = json.loads(str(record)[2:-3]).get('sink_method')
# spark 初始化
......@@ -47,11 +56,16 @@ if __name__ == '__main__':
t_table = df.createTempView('kino')
sinkDF = spark.sql(sql)
sinkDF.show()
sinkFileDF = spark.sql(sql_file)
sinkFileDF.show()
if sink_method.lower() == 'hdfs':
# TODO 落 HDFS
Sinks.sinkHDFS(sinkDF, Constants.HDFS_PATH, "csv", Constants.SAVE_MODE_OVERWRITE)
Sinks.sinkHDFS(sinkDF=sinkFileDF,
path=Constants.HDFS_PATH,
delimiter=",",
format="csv",
mode=Constants.SAVE_MODE_OVERWRITE)
elif sink_method.lower() == 'mysql':
# TODO 落 MySQL
Sinks.sinkMySql(sinkDF,
......
......@@ -4,5 +4,9 @@ output_file=$2
echo "传入的数据库id为: ${id}"
echo "目标文件路径为: " ${output_file}
# nohup spark-submit --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar dependencies.py ${id} ${output_file} > /root/test.log 2>&1 &
spark-submit --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar dependencies.py ${id} ${output_file}
\ No newline at end of file
#nohup spark-submit --jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar dependencies.py ${id} ${output_file} > /root/test.log 2>&1 &
spark-submit \
--jars /usr/bigdata/spark-3.0.1-bin-hadoop3.2/jars/mariadb-java-client-2.1.2.jar \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=./log4j.properties" \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=./log4j.properties" \
dependencies.py ${id} ${output_file}
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the console
log4j.rootCategory=INFO
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=-------->%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.org.sparkproject.jetty=WARN
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
\ No newline at end of file
......@@ -27,20 +27,22 @@ def sinkMySql(sinkDF, driver, url, user, password, table, mode=Constants.SAVE_MO
fileType: csv
file: ftp://192.168.31.45:21/wanjia/sample.csv
'''
def sinkFTP(sinkDF, host, user, password, filetype, file):
def sinkFTP(sinkDF, host, user, password, filetype, file, delimiter):
sinkDF.write.format("com.springml.spark.sftp") \
.option("host", host) \
.option("username", user) \
.option("password", password) \
.option("fileType", filetype) \
.option("delimiter", ";") \
.option("delimiter", delimiter) \
.save(file)
# sink HDFS
def sinkHDFS(sinkDF, path, format="parquet", mode=Constants.SAVE_MODE_OVERWRITE):
def sinkHDFS(sinkDF, path, delimiter, format="parquet", mode=Constants.SAVE_MODE_OVERWRITE):
sinkDF\
.write\
.format(format)\
.format(format) \
.option("delimiter", delimiter) \
.option("quote", '"') \
.mode(mode)\
.save(path)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment