python – 将临时表与SQLAlchemy一起使用
|
我试图使用临时表与SQLAlchemy,并将其连接到现有的表.这就是我到目前为止所拥有的 engine = db.get_engine(db.app,'MY_DATABASE')
df = pd.DataFrame({"id": [1,2,3],"value": [100,200,300],"date": [date.today(),date.today(),date.today()]})
temp_table = db.Table('#temp_table',db.Column('id',db.Integer),db.Column('value',db.Column('date',db.DateTime))
temp_table.create(engine)
df.to_sql(name='tempdb.dbo.#temp_table',con=engine,if_exists='append',index=False)
query = db.session.query(ExistingTable.id).join(temp_table,temp_table.c.id == ExistingTable.id)
out_df = pd.read_sql(query.statement,engine)
temp_table.drop(engine)
return out_df.to_dict('records')
这不会返回任何结果,因为to_sql的插入语句没有运行(我认为这是因为它们是使用sp_prepexec运行的,但我并不完全确定). 然后我尝试写出SQL语句(CREATE TABLE #temp_table …,INSERT INTO #temp_table …,SELECT [id] FROM …)然后运行pd.read_sql(query,engine).我收到错误消息
我想这是因为声明不仅仅是SELECT? 我该如何解决这个问题(任何一个解决方案都可以工作,虽然第一个会更好,因为它避免了硬编码的SQL).要清楚,我无法修改现有数据库中的模式 – 它是供应商数据库. 解决方法如果要插入临时表中的记录数量很小/中等,一种可能性是使用文字子查询或值CTE而不是创建临时表.# MODEL
class ExistingTable(Base):
__tablename__ = 'existing_table'
id = sa.Column(sa.Integer,primary_key=True)
name = sa.Column(sa.String)
# ...
假设还将以下数据插入临时表: # This data retrieved from another database and used for filtering
rows = [
(1,100,datetime.date(2017,1,1)),(3,300,3,(5,500,5,]
创建包含该数据的CTE或子查询: stmts = [
# @NOTE: optimization to reduce the size of the statement:
# make type cast only for first row,for other rows DB engine will infer
sa.select([
sa.cast(sa.literal(i),sa.Integer).label("id"),sa.cast(sa.literal(v),sa.Integer).label("value"),sa.cast(sa.literal(d),sa.DateTime).label("date"),]) if idx == 0 else
sa.select([sa.literal(i),sa.literal(v),sa.literal(d)]) # no type cast
for idx,(i,v,d) in enumerate(rows)
]
subquery = sa.union_all(*stmts)
# Choose one option below.
# I personally prefer B because one could reuse the CTE multiple times in the same query
# subquery = subquery.alias("temp_table") # option A
subquery = subquery.cte(name="temp_table") # option B
使用所需的连接和过滤器创建最终查询: query = (
session
.query(ExistingTable.id)
.join(subquery,subquery.c.id == ExistingTable.id)
# .filter(subquery.c.date >= XXX_DATE)
)
# TEMP: Test result output
for res in query:
print(res)
最后,获取pandas数据框: out_df = pd.read_sql(query.statement,engine)
result = out_df.to_dict('records') (编辑:哈尔滨站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- Python,名称未定义
- python – 我的Model主键如何以特定的数字开头?
- ImportError:没有模块名称’matplotlib’Python 3.3
- 如何在Python中单个测试的持续时间内替换类变量?
- python – 从defaultdict获取原始密钥集
- python – 有效地减去不同形状的numpy数组
- python – Django 1.7 makemigrations – ValueError:无法
- python – 列表中元素的顺序是否会导致for循环中的错误?
- python-2.7 – 在Python 2.7中手动构建ConfigParser的深层副
- python – 重新分发字典值列表
