Commit 86a1d43f authored by ARCHER's avatar ARCHER

do not create temporary table for to_sql

it doesn't work beacause of connection/transaction pb.
instead, create a real table, and drop it.
parent 3c9728cc
......@@ -107,6 +107,14 @@ def to_sql(df,*args,**kwargs):
and inserted into previous table, taking care of on_conflict
index columns names from df are used as primary key
additional options:
temp_method={'pandas'|'sql'}
method to create temporary table.
'sql' : will create a real table *like* the destination table, and drop it a the end.
'pandas' : will use TemporaryPandasTable, whith same columns order
'pandas' by default.
"""
logger.debug("using wrapped to_sql")
......@@ -132,9 +140,16 @@ def to_sql(df,*args,**kwargs):
if 'schema' not in kwargs:
kwargs['schema'] = None
if 'temp_method' not in kwargs:
kwargs['temp_method'] = 'pandas'
if kwargs['schema'] is None:
kwargs['schema'] = inspect(con).default_schema_name
# replace method with func if available
if kwargs['method'] in to_sql_methods:
kwargs['method']=to_sql_methods[kwargs['method']]
metadata = MetaData(con,reflect=True)
......@@ -162,14 +177,15 @@ def to_sql(df,*args,**kwargs):
except Exception as e:
raise ValueError("Columns mismatch. need more checks for useful debug msg: %s" % str(e))
temporary_method='PANDAS'
logger.info("Temporary table used for fast update with method %s" % temporary_method)
if temporary_method == 'LIKE':
# create a temporary table like destination table, but empty
logger.info("Temporary table used for fast update with method %s" % kwargs['temp_method'])
if kwargs['temp_method'] == 'sql':
# create a *fake* temporary table like destination table, but empty
# we can't create a real temporary table because they are not visible to to_sql_legacy
# so we will have to drop it
tmp_table = "_%s" % table
con.execute("""
DROP TABLE IF EXISTS "%s";
CREATE TEMP TABLE "%s" (
CREATE TABLE "%s" (
LIKE "%s"
) ;""" % (tmp_table, tmp_table, table))
tmp_table_orm = Table(tmp_table, metadata, autoload=True, autoload_with=con) # metadata.tables[tmp_table]
......@@ -179,7 +195,7 @@ def to_sql(df,*args,**kwargs):
logger.debug("Column mismatch between tmp table %s and dest table %s: %s" % (tmp_table_orm.description, table_orm.description, str(columns_diff_names)))
else:
elif kwargs['temp_method'] == 'pandas':
#will use a temporary table as a buffer ( for upsert copy )
pandas_engine = pd.io.sql.pandasSQL_builder(con,meta=metadata)
tmp_table = "_%s" % table
......@@ -192,17 +208,14 @@ def to_sql(df,*args,**kwargs):
else:
new_table=True
# replace method with func if available
if kwargs['method'] in to_sql_methods:
kwargs['method']=to_sql_methods[kwargs['method']]
# use connection session
args=list(args)
if tmp_table is not None:
args[0]=tmp_table
args[1] = con
args=tuple(args)
del kwargs['temp_method']
ret=df.to_sql_legacy(*args,**kwargs)
if new_table and (None not in list(df.index.names)):
......@@ -217,16 +230,12 @@ def to_sql(df,*args,**kwargs):
if columns_diff_names:
raise ValueError("Column mismatch between tmp table %s and dest table %s: %s" % (tmp_table_orm.description, table_orm.description, str(columns_diff_names)))
# get non common rows
new_rows=except_all(tmp_table_orm.select(),table_orm.select())
# https://stackoverflow.com/questions/41724658/how-to-do-a-proper-upsert-using-sqlalchemy-on-postgresql
# get existing table orm
stmt = postgresql.insert(table_orm).from_select(tmp_table_orm.columns,new_rows)
if non_pk_cols :
on_conflict_stmt = stmt.on_conflict_do_update(
index_elements=table_orm.primary_key.columns,
......@@ -239,8 +248,8 @@ def to_sql(df,*args,**kwargs):
res=con.execute(on_conflict_stmt)
logger.info("insert/update %s rows" % res.rowcount)
logger.debug("dropping tmp_table %s" % tmp_table)
tmp_table_orm.drop(con)
tmp_table_orm.drop(con) # only useful if a fake table was used
return ret
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment