Commit 660b2e37 authored by BH's avatar BH

20201218 优化数据相关操作,完成新接口

parent b4c13ad8
# coding: utf-8
import builtins
import traceback
from abc import ABCMeta, abstractmethod
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.base.logger import log
from base.configs import tp_cfg
class StoreDBBase(object):
__metaclass__ = ABCMeta
def __init__(self, host='127.0.0.1', port='8123', user='default', pwd='123456', db="default", **kwargs):
'''初始化构造函数'''
super(StoreDBBase, self).__init__()
self.host = host
self.port = port
self.user = user
self.pwd = pwd
connect_args = {
"server_host": self.host,
"port": self.port,
"user": self.user,
"password": self.pwd,
"db": db
}
connection_str = self.connection_str(connect_args)
self.engine = create_engine(connection_str, pool_size=100, pool_recycle=3600, pool_timeout=20)
# self._session = self.get_session(self.engine)
@abstractmethod
def connection_str(self, conf):
raise NotImplementedError
@abstractmethod
def get_session(self, engine):
raise NotImplementedError
@property
def session(self):
if not self._session:
self._session = self.get_session(self.engine)
return self._session
def insert_list(self, table, data_list):
"""
插入列表
:param table:
:param data_list:
:return:
"""
session = self.get_session(self.engine)
try:
session.execute(table.__table__.insert(), data_list)
except Exception as e:
log.e('DataBase INSERT ERROR error:{} data:{}'.format(e, data_list))
finally:
session.close()
def insert_list_for_one(self, table, data_list):
"""
插入列表
:param table:
:param data_list:
:return:
"""
session = self.get_session(self.engine)
try:
for item in data_list:
session.execute(table.__table__.insert(), [item])
except Exception as e:
log.e('DataBase INSERT ERROR error:{} data:{}'.format(e, data_list))
finally:
session.close()
def insert_single(self, table, data):
"""
插入单条
:param table:
:param data:
:return:
"""
session = self.get_session(self.engine)
try:
session.execute(table.__table__.insert(), [data])
except Exception as e:
log.e('DataBase INSERT ERROR error:{} data:{}'.format(e, data))
finally:
session.close()
@staticmethod
def generate_sql(name, fileters, fields='', limit=None):
assert isinstance(fileters, (dict, str))
if isinstance(fileters, str):
return str
where_sql = StoreDBBase.where_sql(fileters)
# 无条件默认查询1000条
# if not where_sql.strip() and not limit:
# limit = 1000
if fields and isinstance(fields, list):
fields = ','.join(fields)
else:
fields = '*'
_sql = u'select {fields} from {table} {where_sql}'.format(fields=fields, table=name, where_sql=where_sql)
print('sql', _sql)
if limit and isinstance(limit, int):
_sql = '{sql} limit {num}'.format(sql=_sql, num=limit)
return _sql
@staticmethod
def where_sql(filters):
if not isinstance(filters, dict):
return ''
where_param = []
for q in filters.keys():
# 区间查询
if isinstance(filters[q], dict):
operate_sql = StoreDBBase.operate_sql(filters[q], q)
where_param.append(' ({}) '.format(operate_sql))
# 定值查询
elif isinstance(filters[q], (str,)):
where_param.append(u" ({field} {operate} '{value}') ".format(field=q, operate='=', value=filters[q]))
elif isinstance(filters[q], (int,)):
where_param.append(u" ({field} {operate} {value}) ".format(field=q, operate='=', value=filters[q]))
else:
continue
_where_sql = u' and '.join(where_param)
return u' {} {} '.format(u' where ' if _where_sql else u'', _where_sql)
@staticmethod
def operate_sql(intervals, q):
# todo 根据表,自动转换字段类型
result = []
for item in intervals.keys():
# todo python3 无long
if isinstance(intervals[item], (int, float)):
value = '{}'.format(intervals[item])
elif isinstance(intervals[item], str):
value = " '{}' ".format(intervals[item])
# 时间戳转换
elif isinstance(intervals[item], list):
value = "( '{}')".format("','".join(intervals[item]))
else:
value = str(intervals[item])
value = " '{}' ".format(value)
result.append(' {field} {operate} {value} '.format(field=q, operate=item, value=value))
return ' and '.join(result)
def _query(self, table_name, q):
sql = self.generate_sql(table_name, q)
log.i('query sql:{}'.format(str(sql)))
session = self.get_session(self.engine)
def query(self, table_name, q, fields):
sql = self.generate_sql(table_name, q, fields)
log.i('query sql:{}'.format(str(sql)))
session = self.get_session(self.engine)
cursor = session.execute(sql)
try:
fields = cursor._metadata.keys
return [dict(zip(fields, item)) for item in cursor.fetchall()]
finally:
cursor.close()
session.close()
def partial_query(self, table_name, q):
sql = self.generate_sql(table_name, q)
log.i('partial_query sql:{}'.format(str(sql)))
session = self.get_session(self.engine)
cursor = session.execute(sql)
try:
fields = cursor._metadata.keys
while True:
data = cursor.fetchmany(1000)
if not data:
return
yield [dict(zip(fields, item)) for item in data]
finally:
cursor.close()
session.close()
def fields(self, table_name):
sql = self.generate_sql(table_name, {}, limit=1)
log.i('query sql:{}'.format(str(sql)))
session = self.get_session(self.engine)
cursor = session.execute(sql)
try:
fields = cursor._metadata.keys
return fields
finally:
cursor.close()
session.close()
def execute(self, sql):
session = self.get_session(self.engine)
try:
# self.logger.info('execute sql:{}'.format(str(sql)))
session.execute(sql)
except:
log.e("execute sql error:{}".format(traceback.format_exc()))
finally:
session.close()
class MySql(StoreDBBase):
def __init__(self, host='127.0.0.1', port='8123', user='default', pwd='123456', db="micr_pay", **kwargs):
'''初始化构造函数'''
cfg = tp_cfg()
if 'mysql' != cfg.database.type:
raise Exception("Configure the mysql type!")
super(MySql, self).__init__(cfg.database.mysql_host, cfg.database.mysql_port, cfg.database.mysql_user,
cfg.database.mysql_password, cfg.database.mysql_db)
def connection_str(self, conf):
connection = 'mysql+pymysql://{user}:{password}@{server_host}:{port}/{db}'.format(**conf)
print(connection)
return connection
def get_session(self, engine):
Session = sessionmaker(bind=engine)
session = Session()
return session
def get_plugin_db():
"""
:rtype : TPDatabase
"""
if '__plugin_db__' not in builtins.__dict__:
builtins.__dict__['__plugin_db__'] = MySql()
return builtins.__dict__['__plugin_db__']
...@@ -287,9 +287,13 @@ controllers = [ ...@@ -287,9 +287,13 @@ controllers = [
(r'/plugin/host_list', plugin.GetHostListHandler), (r'/plugin/host_list', plugin.GetHostListHandler),
# 服务器详情接口 # 服务器详情接口
(r'/plugin/host_info', plugin.GetHostInfoHandler), (r'/plugin/host_info', plugin.GetHostInfoHandler),
# 空闲主机接口
(r'/plugin/free_host', plugin.FreeHostHandler),
# 服务器远程信息 # 服务器远程信息
(r'/plugin/session_info', plugin.GetSessionInfoHandler), (r'/plugin/session_info', plugin.GetSessionInfoHandler),
# 绑定支付宝账户 # 绑定支付宝账户
(r'/plugin/bind_list', plugin.BindAccountListHandler),
# 绑定支付宝账户
(r'/plugin/bind_pay_account', plugin.BindPayAccountHandler), (r'/plugin/bind_pay_account', plugin.BindPayAccountHandler),
# 账户状态 # 账户状态
(r'/plugin/account_status', plugin.AccountStatusHandler), (r'/plugin/account_status', plugin.AccountStatusHandler),
...@@ -305,6 +309,10 @@ controllers = [ ...@@ -305,6 +309,10 @@ controllers = [
(r'/plugin/password_update', plugin.PasswordUpdateHandler), (r'/plugin/password_update', plugin.PasswordUpdateHandler),
# 下载地址获取接口 # 下载地址获取接口
(r'/plugin/bat_download_url', plugin.BatDownloadHandler), (r'/plugin/bat_download_url', plugin.BatDownloadHandler),
# token 获取
(r'/plugin/token', plugin.TokenHandler),
# 状态推送接口
(r'/plugin/push_status', plugin.PushStatusHandler),
# websocket for real-time information # websocket for real-time information
(r'/plugin/exe_download_url', plugin.ExeDownloadHandler), (r'/plugin/exe_download_url', plugin.ExeDownloadHandler),
# ws-client call 'http://ip:7190/ws/action/' # ws-client call 'http://ip:7190/ws/action/'
......
# coding: utf-8 # coding: utf-8
import builtins
import datetime import datetime
import traceback
from abc import ABCMeta, abstractmethod
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.base.logger import log from app.base.logger import log
from app.base.db import get_db, SQL from app.base.db import get_db, SQL
from app.const import TPE_EXISTS, TPE_DATABASE, TPE_OK, TP_GROUP_HOST, TPE_NOT_EXISTS, TPE_FAILED from app.const import TPE_EXISTS, TPE_DATABASE, TPE_OK, TP_GROUP_HOST, TPE_NOT_EXISTS, TPE_FAILED
from base.configs import tp_cfg
host_id_lock = {} host_id_lock = {}
...@@ -85,52 +92,6 @@ def get_shop_bind_info(host_id): ...@@ -85,52 +92,6 @@ def get_shop_bind_info(host_id):
return bind return bind
def get_account_info(host_id, mch_no):
s = SQL(get_db())
s.select_from('remote_account_host_bind',
['id', 'mch_no', 'comp_id', 'host_id', 'host_assigned', 'account_source', 'account',
'login_status', 'mch_name', 'create_time'], alt_name='a')
# 判断
if mch_no:
s.where('a.mch_no="{}"'.format(mch_no))
elif host_id:
s.where('a.host_id={}'.format(host_id))
err = s.query()
if err != TPE_OK:
return err, None
if s.recorder:
host_id = host_id or s.recorder[0].host_id
# 未发现设备
if not host_id:
return TPE_DATABASE, None
sh = SQL(get_db())
sh.select_from('remote_host',
['id', 'assets_num', 'os_type', 'ip', 'status', 'username', 'password', 'name', 'remark', ],
alt_name='h')
sh.where('h.id={}'.format(host_id))
err = sh.query()
if err != TPE_OK:
return err, None
# 使用id查询可能查询多个
if len(sh.recorder) != 1:
return TPE_DATABASE, None
result = sh.recorder[0]
bind = s.recorder
for i in bind:
i['password'] = '******'
result['bind'] = s.recorder
return TPE_OK, result
def free_host(): def free_host():
db = get_db() db = get_db()
sql = """select a.id as id sql = """select a.id as id
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment