#!/usr/local/bin/python import json import os from json import JSONDecodeError from wanjia.spark.common import Constants from wanjia.spark.sources import Sources from wanjia.spark.sources.Sources import updateStatusById def getColumns(x): try: sql_list = [] sql_file_list = [] sql_create_list = [] list = [i for i in json.loads(x).get('col')] for i in range(0, len(list)): if json.loads(x).get('tm').get(list[i]) == "md5": sql_list.append(' md5('+list[i]+') as '+list[i]+", "+list[i]+' as t_'+list[i]) sql_file_list.append(' md5('+list[i]+') as '+list[i]) elif json.loads(x).get('tm').get(list[i]) == "rtl": sql_list.append(' rtl('+list[i]+') as '+list[i]+", "+list[i]+' as t_'+list[i]) sql_file_list.append(' rtl('+list[i]+') as '+list[i]) else: sql_list.append(' ' + list[i]) sql_file_list.append(' ' + list[i]) sql_create_list.append(' ' + list[i] + " string") col = "" col_file = "" for i in sql_list: col += i + ',' for i in sql_file_list: col_file += i + ',' col_create = "" for i in sql_create_list: col_create += i + ',' except IndexError as e: print("index error:{0}".format(e)) except JSONDecodeError as e: print("json pase exception:{0}".format(e)) src_sql = 'select %s from %s'%(col[0:-1], 'a') md5_sql = 'select %s from %s' % (col_file[0:-1], 'a') dir ='/data/test/src' create_sql = 'create external table a (%s) stored as txt location "%s" ' %(col_create[0:-1],dir) print("---------------------->", src_sql) print("---------------------->", md5_sql) print("---------------------->", create_sql) return [src_sql, md5_sql,create_sql] if __name__ == '__main__': try: # 拿到 列名 等 # record = Sources.getRecordByIdResultT1(sys.argv[1]) record = Sources.getRecordByIdResultT1(1) print(record) col = json.loads(record.get("json")).get('col') sql_lists = getColumns(record.get("json")) src_sql = sql_lists[0] md5_sql = sql_lists[1] create_sql= sql_lists[2] os.system("impala-shell -q ${create_sql}") #sink_method = json.loads(str(record)[2:-3]).get('sink_method') except JSONDecodeError as e: print("JSONDecodeError:{0}".format(e)) updateStatusById(Constants.SELECT_CODE)