Commit 0265ea6a authored by ARCHER's avatar ARCHER

delete row instead drop (cascade pb), and only update changed rows

parent 8f304b1c
......@@ -4,8 +4,9 @@ import shapely as shp
import shapely.geometry as shpg
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql
from sqlalchemy import text, MetaData, inspect;
from sqlalchemy import text, MetaData, inspect
from geoalchemy2 import select
from sqlalchemy.sql import except_all
import csv
from io import StringIO
......@@ -17,7 +18,7 @@ import logging
logging.basicConfig()
logger = logging.getLogger(os.path.basename(__file__))
logger.setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO)
logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.DEBUG)
shapely_handled_types=[
shpg.Polygon,
......@@ -90,7 +91,7 @@ def method_copy(table, conn, keys, data_iter):
sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
table_name, columns)
logger.debug('method_copy sql code: %s' % sql)
#logger.debug('method_copy sql code: %s' % sql)
cur.copy_expert(sql=sql, file=s_buf)
to_sql_methods = {
......@@ -148,7 +149,7 @@ def to_sql(df,*args,**kwargs):
# force append, because old table is empty
kwargs['if_exists'] = 'append'
elif (kwargs['if_exists'] == 'append') and (table in metadata.tables):
if (kwargs['if_exists'] == 'append') and (table in metadata.tables):
# will use a temporary table as a buffer ( for upsert copy )
logger.info("Temporary table used for fast update")
......@@ -182,14 +183,22 @@ def to_sql(df,*args,**kwargs):
con.execute('''alter table "%s" add constraint "%s_pk" primary key( %s )''' % (table,table,",".join([ '"%s"' %q for q in df.index.names])))
if tmp_table is not None:
table_orm=metadata.tables[table]
tmp_table_orm = tmp_table_pd.table
logger.debug("merging tmp table")
# exclude rows from tmp_table that are already in dest table
new_rows=except_all(tmp_table_orm.select(),table_orm.select())
# https://stackoverflow.com/questions/41724658/how-to-do-a-proper-upsert-using-sqlalchemy-on-postgresql
# get existing table orm
table_orm=metadata.tables[table]
tmp_table_orm = tmp_table_pd.table
stmt = postgresql.insert(table_orm).from_select(tmp_table_orm.columns,select([tmp_table_orm]))
#stmt = postgresql.insert(table_orm).from_select(tmp_table_orm.columns,select=session.query(tmp_table_orm))
stmt = postgresql.insert(table_orm).from_select(tmp_table_orm.columns,new_rows)
on_conflict_stmt = stmt.on_conflict_do_update(
index_elements=table_orm.primary_key.columns,
set_={
......@@ -200,7 +209,8 @@ def to_sql(df,*args,**kwargs):
}
)
try:
con.execute(on_conflict_stmt)
res=con.execute(on_conflict_stmt)
logger.info("insert/update %s rows" % res.rowcount)
except:
logger.error("couln't merge tables. Did they have same columns ?")
raise
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment