Commit 3c9728cc authored by ARCHER's avatar ARCHER

reorder df cols to match table. allow empty idx. no 'like' table anymore

parent 254093e5
......@@ -4,8 +4,8 @@ import shapely as shp
import shapely.geometry as shpg
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql
from sqlalchemy import text, MetaData, inspect, Table
from geoalchemy2 import select
from sqlalchemy import text, MetaData, inspect#, Table
from geoalchemy2 import select, Table
from sqlalchemy.sql import except_all
import csv
......@@ -144,23 +144,41 @@ def to_sql(df,*args,**kwargs):
if kwargs['if_exists'] == 'replace':
if table in metadata.tables :
# delete all rows in the table (not drop), then append, to prevent drop cascade
logger.info("deleting all rows in existing table %s" % table)
logger.info("replace mode : deleting all rows in existing table %s" % table)
con.execute("delete from %s" % table)
# force append, because old table is empty
# force append, because old table is empty. Temporary table not needed
kwargs['if_exists'] = 'append'
if (kwargs['if_exists'] == 'append') and (table in metadata.tables):
elif (kwargs['if_exists'] == 'append') and (table in metadata.tables):
# dest table exists. we need to be sure that df has same columns ordering
table_orm=metadata.tables[table]
non_pk_cols=[ c.name for c in table_orm.columns
if c not in list(table_orm.primary_key.columns)
]
if list(df.keys()) != non_pk_cols:
logger.debug("df columns are not in the same order as existing table. Rearranging them")
try:
df=df[non_pk_cols]
except Exception as e:
raise ValueError("Columns mismatch. need more checks for useful debug msg: %s" % str(e))
temporary_method='PANDAS'
logger.info("Temporary table used for fast update with method %s" % temporary_method)
if temporary_method == 'LIKE':
# create a temporary table like destination table
# create a temporary table like destination table, but empty
tmp_table = "_%s" % table
con.execute("""
DROP TABLE IF EXISTS "%s";
CREATE TEMP TABLE "%s" (
LIKE "%s" INCLUDING CONSTRAINTS INCLUDING DEFAULTS INCLUDING INDEXES
LIKE "%s"
) ;""" % (tmp_table, tmp_table, table))
tmp_table_orm = Table(tmp_table, metadata, autoload=True, autoload_with=con) # metadata.tables[tmp_table]
columns_diff_names = set((c.name for c in tmp_table_orm.columns)) ^ set((c.name for c in table_orm.columns))
if columns_diff_names:
logger.debug("Column mismatch between tmp table %s and dest table %s: %s" % (tmp_table_orm.description, table_orm.description, str(columns_diff_names)))
else:
#will use a temporary table as a buffer ( for upsert copy )
pandas_engine = pd.io.sql.pandasSQL_builder(con,meta=metadata)
......@@ -187,25 +205,20 @@ def to_sql(df,*args,**kwargs):
args=tuple(args)
ret=df.to_sql_legacy(*args,**kwargs)
if new_table:
if new_table and (None not in list(df.index.names)):
logger.debug('defining primary key from pandas index')
con.execute('''alter table "%s" add constraint "%s_pk" primary key( %s )''' % (table,table,",".join([ '"%s"' %q for q in df.index.names])))
if tmp_table is not None:
table_orm=metadata.tables[table]
#tmp_table_orm = tmp_table_pd.table
logger.debug("merging tmp table")
columns_diff = set(tmp_table_orm.columns) ^ set(tmp_table_orm.columns)
if columns_diff:
raise ValueError("Missing column name in src or dst: %s" % str(columns_diff))
columns_diff_names = set((c.name for c in tmp_table_orm.columns)) ^ set((c.name for c in table_orm.columns))
if columns_diff_names:
raise ValueError("Column mismatch between tmp table %s and dest table %s: %s" % (tmp_table_orm.description, table_orm.description, str(columns_diff_names)))
# exclude rows from tmp_table that are already in dest table
new_rows=except_all(tmp_table_orm.select(),table_orm.select())
......@@ -214,10 +227,7 @@ def to_sql(df,*args,**kwargs):
# get existing table orm
stmt = postgresql.insert(table_orm).from_select(tmp_table_orm.columns,new_rows)
non_pk_cols=[ c.name for c in table_orm.columns
if c not in list(table_orm.primary_key.columns)
]
if non_pk_cols:
if non_pk_cols :
on_conflict_stmt = stmt.on_conflict_do_update(
index_elements=table_orm.primary_key.columns,
set_={ k: getattr(stmt.excluded, k) for k in non_pk_cols }
......@@ -225,13 +235,11 @@ def to_sql(df,*args,**kwargs):
else:
logger.debug("only primary key , so do nothing on update")
on_conflict_stmt = stmt.on_conflict_do_nothing()
try:
res=con.execute(on_conflict_stmt)
logger.info("insert/update %s rows" % res.rowcount)
except:
logger.error("couln't merge tables. Did they have same columns ?")
raise
res=con.execute(on_conflict_stmt)
logger.info("insert/update %s rows" % res.rowcount)
logger.debug("dropping tmp_table %s" % tmp_table)
tmp_table_orm.drop(con)
......@@ -244,7 +252,7 @@ def get_all_geometry_names(gdf):
""" return all geoemetry columns names (ie those not only the one defined by gdf.geometry """
all=[]
for c in gdf.keys():
if hasattr(gdf[c][0], '__geom__'):
if hasattr(gdf[c].iloc[0], '__geom__'):
all.append(c)
return all
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment