Commit 879c50e3 authored by kinomin's avatar kinomin

初始化项目

parent 256c8700
Pipeline #289 canceled with stages
...@@ -36,19 +36,19 @@ def getColumns(x): ...@@ -36,19 +36,19 @@ def getColumns(x):
list = [i for i in json.loads(x).get('col')] list = [i for i in json.loads(x).get('col')]
for i in range(0, len(list)): for i in range(0, len(list)):
if json.loads(x).get('tm').get(list[i]) == "md5": if json.loads(x).get('tm').get(list[i]) == "md5":
sql_list.append('list[i] as t_'+list[i]+'md5('+list[i]+') as '+list[i]) sql_list.append(' md5('+list[i]+') as '+list[i]+", "+list[i]+' as t_'+list[i])
elif json.loads(x).get('tm').get(list[i]) == "rtl": elif json.loads(x).get('tm').get(list[i]) == "rtl":
sql_list.append('list[i] as t_'+list[i]+'rtl('+list[i]+') as '+list[i]) sql_list.append(' rtl('+list[i]+') as '+list[i]+", "+list[i]+' as t_'+list[i])
else: else:
print("") sql_list.append(list[i])
col = "" col = ""
for i in list: for i in sql_list:
if i == list[len(list) - 1]: if i == list[len(list) - 1]:
col += i col += i
else: else:
col += i + ' ,' col += i + ','
sql = "select " + col + ", current_timestamp() as create_time from kino" sql = "select " + col + " current_timestamp() as create_time from kino"
print("---------------------->", sql) print("---------------------->", sql)
return sql return sql
...@@ -72,7 +72,7 @@ if __name__ == '__main__': ...@@ -72,7 +72,7 @@ if __name__ == '__main__':
spark = SparkSession.Builder().appName('sql').master('local').getOrCreate() spark = SparkSession.Builder().appName('sql').master('local').getOrCreate()
spark.udf.register('md5', md5, StringType()) spark.udf.register('md5', md5, StringType())
print('-------->', output_file) print('=======>', output_file)
df = spark.read.format('csv').option('inferSchema', 'true').load(output_file).toDF(*col) df = spark.read.format('csv').option('inferSchema', 'true').load(output_file).toDF(*col)
df.count df.count
t_table = df.createTempView('kino') t_table = df.createTempView('kino')
...@@ -108,7 +108,8 @@ if __name__ == '__main__': ...@@ -108,7 +108,8 @@ if __name__ == '__main__':
# .option("fileType", "csv") \ # .option("fileType", "csv") \
# .option("delimiter", ";") \ # .option("delimiter", ";") \
# .save("ftp://192.168.31.45:21/wanjia/sample.csv") # .save("ftp://192.168.31.45:21/wanjia/sample.csv")
sinkDF.write.mode("overwrite").text("ftp://192.168.31.45/wanjia/sample.csv")
# sinkDF.write.mode("overwrite").text("ftp://192.168.31.45/wanjia/sample.csv")
# sinkDF.write.format("parquet").mode("overwrite").save('hdfs://hadoop1:9000/wanjia') # sinkDF.write.format("parquet").mode("overwrite").save('hdfs://hadoop1:9000/wanjia')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment