PHP前端开发

如何使用Python读取Hive数据库?

百变鹏仔 2个月前 (01-21) #Python
文章标签 如何使用

实际业务读取hive数据库的代码

import loggingimport pandas as pdfrom impala.dbapi import connectimport sqlalchemyfrom sqlalchemy.orm import sessionmakerimport osimport timeimport osimport datetimefrom dateutil.relativedelta import relativedeltafrom typing import Dict, Listimport loggingimport threadingimport pandas as pdimport pickleclass HiveHelper(object):    def __init__(        self,        host='10.2.32.22',        port=21051,        database='ur_ai_dw',        auth_mechanism='LDAP',        user='urbi',        password='Ur#730xd',        logger:logging.Logger=None        ):        self.host = host        self.port = port        self.database = database        self.auth_mechanism = auth_mechanism        self.user = user        self.password = password        self.logger = logger        self.impala_conn = None        self.conn = None        self.cursor = None        self.engine = None        self.session = None    def create_table_code(self, file_name):        '''创建表类代码'''        os.system(f'sqlacodegen {self.connection_str} &gt; {file_name}')        return self.conn    def get_conn(self):        '''创建连接或获取连接'''        if self.conn is None:            engine = self.get_engine()            self.conn = engine.connect()        return self.conn    def get_impala_conn(self):        '''创建连接或获取连接'''        if self.impala_conn is None:            self.impala_conn = connect(                host=self.host,                port=self.port,                database=self.database,                auth_mechanism=self.auth_mechanism,                user=self.user,                password=self.password                )        return self.impala_conn    def get_engine(self):        '''创建连接或获取连接'''        if self.engine is None:            self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)        return self.engine    def get_cursor(self):        '''创建连接或获取连接'''        if self.cursor is None:            self.cursor = self.conn.cursor()        return self.cursor    def get_session(self) -&gt; sessionmaker:        '''创建连接或获取连接'''        if self.session is None:            engine = self.get_engine()            Session = sessionmaker(bind=engine)            self.session = Session()        return self.session    def close_conn(self):        '''关闭连接'''        if self.conn is not None:            self.conn.close()            self.conn = None        self.dispose_engine()        self.close_impala_conn()    def close_impala_conn(self):        '''关闭impala连接'''        if self.impala_conn is not None:            self.impala_conn.close()            self.impala_conn = None    def close_session(self):        '''关闭连接'''        if self.session is not None:            self.session.close()            self.session = None        self.dispose_engine()    def dispose_engine(self):        '''释放engine'''        if self.engine is not None:            # self.engine.dispose(close=False)            self.engine.dispose()            self.engine = None    def close_cursor(self):        '''关闭cursor'''        if self.cursor is not None:            self.cursor.close()            self.cursor = None    def get_data(self, sql, auto_close=True) -&gt; pd.DataFrame:        '''查询数据'''        conn = self.get_conn()        data = None        try:            # 异常重试3次            for i in range(3):                try:                    data = pd.read_sql(sql, conn)                    break                except Exception as ex:                    if i == 2:                        raise ex # 往外抛出异常                    time.sleep(60) # 一分钟后重试        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            if auto_close:                self.close_conn()        return datapassclass VarsHelper():    def __init__(self, save_dir, auto_save=True):        self.save_dir = save_dir        self.auto_save = auto_save        self.values = {}        if not os.path.exists(os.path.dirname(self.save_dir)):            os.makedirs(os.path.dirname(self.save_dir))        if os.path.exists(self.save_dir):            with open(self.save_dir, 'rb') as f:                self.values = pickle.load(f)                f.close()    def set_value(self, key, value):        self.values[key] = value        if self.auto_save:            self.save_file()    def get_value(self, key):        return self.values[key]    def has_key(self, key):        return key in self.values.keys()    def save_file(self):        with open(self.save_dir, 'wb') as f:            pickle.dump(self.values, f)            f.close()passclass GlobalShareArgs():    args = {        "debug": False    }    def get_args():        return GlobalShareArgs.args    def set_args(args):        GlobalShareArgs.args = args    def set_args_value(key, value):        GlobalShareArgs.args[key] = value    def get_args_value(key, default_value=None):        return GlobalShareArgs.args.get(key, default_value)    def contain_key(key):        return key in GlobalShareArgs.args.keys()    def update(args):        GlobalShareArgs.args.update(args)passclass ShareArgs():    args = {        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共        "only_predict": False, # 只识别,不训练        "delete_model": True, # 先删除模型,仅在训练时使用        "export_excel": False, # 导出excel        "classes": 12, # 聚类数        "batch_size": 16,        "hidden_size": 32,        "max_nrof_epochs": 100,        "learning_rate": 0.0005,        "loss_type": "categorical_crossentropy",        "avg_model_num": 10,        "steps_per_epoch": 4.0, # 4.0        "lr_callback_patience": 4,         "lr_callback_cooldown": 1,        "early_stopping_callback_patience": 6,        "get_data": True,    }    def get_args():        return ShareArgs.args    def set_args(args):        ShareArgs.args = args    def set_args_value(key, value):        ShareArgs.args[key] = value    def get_args_value(key, default_value=None):        return ShareArgs.args.get(key, default_value)    def contain_key(key):        return key in ShareArgs.args.keys()    def update(args):        ShareArgs.args.update(args)passclass UrBiGetDatasBase():    # 线程锁列表,同保存路径共用锁    lock_dict:Dict[str, threading.Lock] = {}    # 时间列表,用于判断是否超时    time_dict:Dict[str, datetime.datetime] = {}    # 用于记录是否需要更新超时时间    get_data_timeout_dict:Dict[str, bool] = {}    def __init__(        self,        host='10.2.32.22',        port=21051,        database='ur_ai_dw',        auth_mechanism='LDAP',        user='urbi',        password='Ur#730xd',        save_dir=None,        logger:logging.Logger=None,        ):        self.save_dir = save_dir        self.logger = logger        self.db_helper = HiveHelper(            host=host,            port=port,            database=database,            auth_mechanism=auth_mechanism,            user=user,            password=password,            logger=logger            )        # 创建子目录        if self.save_dir is not None and not os.path.exists(self.save_dir):            os.makedirs(self.save_dir)        self.vars_helper = None        if GlobalShareArgs.get_args_value('debug'):            self.vars_helper = VarsHelper('./hjx/data/vars/UrBiGetDatas')     def close(self):        '''关闭连接'''        self.db_helper.close_conn()    def get_last_time(self, key_name) -&gt; bool:        '''获取是否超时'''        # 转静态路径,确保唯一性        key_name = os.path.abspath(key_name)        if self.vars_helper is not None and self.vars_helper.has_key('UrBiGetDatasBase.time_list'):            UrBiGetDatasBase.time_dict = self.vars_helper.get_value('UrBiGetDatasBase.time_list')        timeout = 12 # 12小时        if GlobalShareArgs.get_args_value('debug'):            timeout = 24 # 24小时        get_data_timeout = False        if key_name not in UrBiGetDatasBase.time_dict.keys() or (datetime.datetime.today() - UrBiGetDatasBase.time_dict[key_name]).total_seconds()&gt;(timeout*60*60):            self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)            # UrBiGetDatasBase.time_list[key_name] = datetime.datetime.today()            get_data_timeout = True        else:            self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)        # if self.vars_helper is not None :        #     self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_list)        UrBiGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout        return get_data_timeout    def save_last_time(self, key_name):        '''更新状态超时'''        # 转静态路径,确保唯一性        key_name = os.path.abspath(key_name)        if UrBiGetDatasBase.get_data_timeout_dict[key_name]:            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()        if self.vars_helper is not None :            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()            self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_dict)    def get_lock(self, key_name) -&gt; threading.Lock:        '''获取锁'''        # 转静态路径,确保唯一性        key_name = os.path.abspath(key_name)        if key_name not in UrBiGetDatasBase.lock_dict.keys():            UrBiGetDatasBase.lock_dict[key_name] = threading.Lock()        return UrBiGetDatasBase.lock_dict[key_name]    def get_data_of_date(        self,        save_dir,        sql,        sort_columns:List[str],        del_index_list=[-1], # 删除最后下标        start_date = datetime.datetime(2017, 1, 1), # 开始时间        offset = relativedelta(months=3), # 时间间隔        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化        stop_date = '20700101', # 超过时间则停止        data_format_fun = None, # 格式化数据        ):        '''分时间增量读取数据'''        # 创建文件夹        if not os.path.exists(save_dir):            os.makedirs(save_dir)        else:            #删除最后一个文件            file_list = os.listdir(save_dir)            if len(file_list)&gt;0:                file_list.sort()                for del_index in del_index_list:                    os.remove(os.path.join(save_dir,file_list[del_index]))                    print('删除最后一个文件:', file_list[del_index])        select_index = -1        # start_date = datetime.datetime(2017, 1, 1)        while True:            end_date = start_date + offset            start_date_str = date_format_fun(start_date)            end_date_str = date_format_fun(end_date)            self.logger.info('date: %s-%s', start_date_str, end_date_str)            file_path = os.path.join(save_dir, filename_format_fun(start_date))            # self.logger.info('file_path: %s', file_path)            if not os.path.exists(file_path):                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))                if data is None:                    break                self.logger.info('data: %d', len(data))                # self.logger.info('data: %d', data.columns)                if len(data)&gt;0:                    select_index+=1                    if data_format_fun is not None:                        data = data_format_fun(data)                    # 排序                    data = data.sort_values(sort_columns)                    data.to_csv(file_path)                elif select_index!=-1:                    break                elif stop_date = %s            and dd.date_key =%s and weather.date_key=%s and sales.date_key=%s and delivery.date_key<h3>代码说明和领悟</h3><p>每个类的具体作用说明,代码需要根据下面的文字说明进行“食用”:</p><p>(第一层)HiveHelper完成了连接数据库、关闭数据库连接、生成事务、执行、引擎、连接等功能</p><p>VarsHelper提供了一个简单的持久化功能,可以将对象以文件的形式存放在磁盘上。并提供设置值、获取值、判断值是否存在的方法</p><p>GlobalShareArgs提供了一个字典,并且提供了获取字典、设置字典、设置字典键值对、设置字典键的值、判断键是否在字典中、更新字典等方法</p><p>ShareArgs跟GlobalShareArgs类似,只是一开始字典的初始化的键值对比较多</p><p><span>立即学习</span>“<a href="https://pan.quark.cn/s/00968c3c2c15" style="text-decoration: underline !important; color: blue; font-weight: bolder;" rel="nofollow" target="_blank">Python免费学习笔记(深入)</a>”;</p><p>(第二层)UrBiGetDataBase类,提供了线程锁字典、时间字典、超时判断字典,都是类变量;使用了HiveHelper类,但注意,不是继承。在具体的sql读数时,提供了线程固定和时间判断</p><p>(第三层)UrBiGetDatas类,获取hive数据库那边的日期数据、店铺数据、会员数据、天气数据、天气城市数据、商品数据、店铺生命周期数据、全国商品生命周期数据、商品开发码数数据、实际销售金额、实际配货金额、商品畅滞销数据、商品成本价数据、尺码映射数据等。</p><p>(第四层)get_common_data函数,使用URBiGetData类读取日期、店铺、天气、天气城市、货品、实际销量数据,并缓存到文件夹./yongjian/data/ur_bi_data下面</p><p>CustomUrBiGetData类,继承了UrBiGetDatasBase类,读取销售目标金额、点系列面积数据。</p><p>(这个也是第四层)get_datas函数,通过CustomUrBiGetData类,读取年月销售目标金额。</p><p>总的函数:(这个是总的调用入口函数)get_data_ur_bi_dw函数,调用了get_common_data和get_datas函数进行读取数据,然后将数据保存到某个文件夹目录下面。</p><p>举一反三,如果你不是hive数据库,你可以将第一层这个底层更换成mysql。主页有解释如果进行更换。第二层不需要改变,第三层就是你想要进行读取的数据表,不同的数据库你想要读取的数据表也不同,所以sql需要你在这里写,套用里面的方法即可,基本上就是修改sql就好了。</p><p>这种方法的好处在于,数据不会重复读取,并且读取的数据都可以得到高效的使用。</p><h3>后续附上修改成mysql的一个例子代码</h3><pre class="brush:py;">import loggingimport pandas as pdfrom impala.dbapi import connectimport sqlalchemyfrom sqlalchemy.orm import sessionmakerimport osimport timeimport osimport datetimefrom dateutil.relativedelta import relativedeltafrom typing import Dict, Listimport loggingimport threadingimport pandas as pdimport pickleclass MySqlHelper(object):    def __init__(        self,        host='192.168.15.144',        port=3306,        database='test_ims',        user='spkjz_writer',        password='7cmoP3QDtueVJQj2q4Az',        logger:logging.Logger=None        ):        self.host = host        self.port = port        self.database = database        self.user = user        self.password = password        self.logger = logger        self.connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(            self.user, self.password, self.host, self.port, self.database        )        self.conn = None        self.cursor = None        self.engine = None        self.session = None    def create_table_code(self, file_name):        '''创建表类代码'''        os.system(f'sqlacodegen {self.connection_str} &gt; {file_name}')        return self.conn    def get_conn(self):        '''创建连接或获取连接'''        if self.conn is None:            engine = self.get_engine()            self.conn = engine.connect()        return self.conn    def get_engine(self):        '''创建连接或获取连接'''        if self.engine is None:            self.engine = sqlalchemy.create_engine(self.connection_str)        return self.engine    def get_cursor(self):        '''创建连接或获取连接'''        if self.cursor is None:            self.cursor = self.conn.cursor()        return self.cursor    def get_session(self) -&gt; sessionmaker:        '''创建连接或获取连接'''        if self.session is None:            engine = self.get_engine()            Session = sessionmaker(bind=engine)            self.session = Session()        return self.session    def close_conn(self):        '''关闭连接'''        if self.conn is not None:            self.conn.close()            self.conn = None        self.dispose_engine()    def close_session(self):        '''关闭连接'''        if self.session is not None:            self.session.close()            self.session = None        self.dispose_engine()    def dispose_engine(self):        '''释放engine'''        if self.engine is not None:            # self.engine.dispose(close=False)            self.engine.dispose()            self.engine = None    def close_cursor(self):        '''关闭cursor'''        if self.cursor is not None:            self.cursor.close()            self.cursor = None    def get_data(self, sql, auto_close=True) -&gt; pd.DataFrame:        '''查询数据'''        conn = self.get_conn()        data = None        try:            # 异常重试3次            for i in range(3):                try:                    data = pd.read_sql(sql, conn)                    break                except Exception as ex:                    if i == 2:                        raise ex # 往外抛出异常                    time.sleep(60) # 一分钟后重试        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            if auto_close:                self.close_conn()        return datapassclass VarsHelper():    def __init__(self, save_dir, auto_save=True):        self.save_dir = save_dir        self.auto_save = auto_save        self.values = {}        if not os.path.exists(os.path.dirname(self.save_dir)):            os.makedirs(os.path.dirname(self.save_dir))        if os.path.exists(self.save_dir):            with open(self.save_dir, 'rb') as f:                self.values = pickle.load(f)                f.close()    def set_value(self, key, value):        self.values[key] = value        if self.auto_save:            self.save_file()    def get_value(self, key):        return self.values[key]    def has_key(self, key):        return key in self.values.keys()    def save_file(self):        with open(self.save_dir, 'wb') as f:            pickle.dump(self.values, f)            f.close()passclass GlobalShareArgs():    args = {        "debug": False    }    def get_args():        return GlobalShareArgs.args    def set_args(args):        GlobalShareArgs.args = args    def set_args_value(key, value):        GlobalShareArgs.args[key] = value    def get_args_value(key, default_value=None):        return GlobalShareArgs.args.get(key, default_value)    def contain_key(key):        return key in GlobalShareArgs.args.keys()    def update(args):        GlobalShareArgs.args.update(args)passclass ShareArgs():    args = {        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共        "only_predict": False, # 只识别,不训练        "delete_model": True, # 先删除模型,仅在训练时使用        "export_excel": False, # 导出excel        "classes": 12, # 聚类数        "batch_size": 16,        "hidden_size": 32,        "max_nrof_epochs": 100,        "learning_rate": 0.0005,        "loss_type": "categorical_crossentropy",        "avg_model_num": 10,        "steps_per_epoch": 4.0, # 4.0        "lr_callback_patience": 4,         "lr_callback_cooldown": 1,        "early_stopping_callback_patience": 6,        "get_data": True,    }    def get_args():        return ShareArgs.args    def set_args(args):        ShareArgs.args = args    def set_args_value(key, value):        ShareArgs.args[key] = value    def get_args_value(key, default_value=None):        return ShareArgs.args.get(key, default_value)    def contain_key(key):        return key in ShareArgs.args.keys()    def update(args):        ShareArgs.args.update(args)passclass IMSGetDatasBase():    # 线程锁列表,同保存路径共用锁    lock_dict:Dict[str, threading.Lock] = {}    # 时间列表,用于判断是否超时    time_dict:Dict[str, datetime.datetime] = {}    # 用于记录是否需要更新超时时间    get_data_timeout_dict:Dict[str, bool] = {}    def __init__(        self,        host='192.168.15.144',        port=3306,        database='test_ims',        user='spkjz_writer',        password='Ur#7cmoP3QDtueVJQj2q4Az',        save_dir=None,        logger:logging.Logger=None,        ):        self.save_dir = save_dir        self.logger = logger        self.db_helper = MySqlHelper(            host=host,            port=port,            database=database,            user=user,            password=password,            logger=logger            )        # 创建子目录        if self.save_dir is not None and not os.path.exists(self.save_dir):            os.makedirs(self.save_dir)        self.vars_helper = None        if GlobalShareArgs.get_args_value('debug'):            self.vars_helper = VarsHelper('./hjx/data/vars/IMSGetDatas') # 把超时时间保存到文件,注释该行即可停掉,只用于调试    def close(self):        '''关闭连接'''        self.db_helper.close_conn()    def get_last_time(self, key_name) -&gt; bool:        '''获取是否超时'''        # 转静态路径,确保唯一性        key_name = os.path.abspath(key_name)        if self.vars_helper is not None and self.vars_helper.has_key('IMSGetDatasBase.time_list'):            IMSGetDatasBase.time_dict = self.vars_helper.get_value('IMSGetDatasBase.time_list')        timeout = 12 # 12小时        if GlobalShareArgs.get_args_value('debug'):            timeout = 24 # 24小时        get_data_timeout = False        if key_name not in IMSGetDatasBase.time_dict.keys() or (datetime.datetime.today() - IMSGetDatasBase.time_dict[key_name]).total_seconds()&gt;(4*60*60):            self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)            # IMSGetDatasBase.time_list[key_name] = datetime.datetime.today()            get_data_timeout = True        else:            self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)        # if self.vars_helper is not None :        #     self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_list)        IMSGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout        return get_data_timeout    def save_last_time(self, key_name):        '''更新状态超时'''        # 转静态路径,确保唯一性        key_name = os.path.abspath(key_name)        if IMSGetDatasBase.get_data_timeout_dict[key_name]:            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()        if self.vars_helper is not None :            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()            self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_dict)    def get_lock(self, key_name) -&gt; threading.Lock:        '''获取锁'''        # 转静态路径,确保唯一性        key_name = os.path.abspath(key_name)        if key_name not in IMSGetDatasBase.lock_dict.keys():            IMSGetDatasBase.lock_dict[key_name] = threading.Lock()        return IMSGetDatasBase.lock_dict[key_name]    def get_data_of_date(        self,        save_dir,        sql,        sort_columns:List[str],        del_index_list=[-1], # 删除最后下标        start_date = datetime.datetime(2017, 1, 1), # 开始时间        offset = relativedelta(months=3), # 时间间隔        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化        stop_date = '20700101', # 超过时间则停止        ):        '''分时间增量读取数据'''        # 创建文件夹        if not os.path.exists(save_dir):            os.makedirs(save_dir)        else:            #删除最后一个文件            file_list = os.listdir(save_dir)            if len(file_list)&gt;0:                file_list.sort()                for del_index in del_index_list:                    os.remove(os.path.join(save_dir,file_list[del_index]))                    print('删除最后一个文件:', file_list[del_index])        select_index = -1        # start_date = datetime.datetime(2017, 1, 1)        while True:            end_date = start_date + offset            start_date_str = date_format_fun(start_date)            end_date_str = date_format_fun(end_date)            self.logger.info('date: %s-%s', start_date_str, end_date_str)            file_path = os.path.join(save_dir, filename_format_fun(start_date))            # self.logger.info('file_path: %s', file_path)            if not os.path.exists(file_path):                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))                if data is None:                    break                self.logger.info('data: %d', len(data))                # self.logger.info('data: %d', data.columns)                if len(data)&gt;0:                    select_index+=1                    # 排序                    data = data.sort_values(sort_columns)                    data.to_csv(file_path)                elif select_index!=-1:                    break                elif stop_date