Commit d1dd8d95 authored by ARCHER's avatar ARCHER

use copy instead of insert - upsert - primary keys from df index

temp table are used if append, then insert into des df
parent bc619374
......@@ -2,12 +2,19 @@
postgis extensions for geopandas
geopandas is lackaing to_postgis method (see https://github.com/geopandas/geopandas/pull/457 )
geopandas is lacking to_postgis method (see https://github.com/geopandas/geopandas/pull/457 )
add a to_postgis method to GeoDataFrame, with following extensions:
- gfd.geometry is converted to Geometry (if crs/srid) or Geography if no srid
- shapely.geometry types supported
Performance: add method='copy' to allowed to_sql method's. (this is the default : use method='default' or method=None to fallback legacy)
pandas to_sql postgres 'COPY' instead of 'INSERT' is not implemented.
geopandas_postgis override to_sql def to provide method='copy' ( compatible with if_exist='append', using tempora
```
from sqlalchemy import create_engine
......@@ -18,6 +25,6 @@ geopandas_postgis.add_postgis(gpd)
gdf=gpd.GeoDataFrame(df,geometry='geom')
# dump to postgis
gdf.to_postgis('table_name',create_engine('postgresql://user:pass@host:5432/base_name')
gdf.to_postgis('table_name',create_engine('postgresql://user:pass@host:5432/base_name'))
```
\ No newline at end of file
......@@ -3,6 +3,11 @@ from psycopg2.extensions import adapt, register_adapter, AsIs, QuotedString
import shapely as shp
import shapely.geometry as shpg
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql
from sqlalchemy import text;
import csv
from io import StringIO
import pandas as pd
import os
import logging
......@@ -19,6 +24,21 @@ shapely_handled_types=[
shpg.MultiLineString,
shpg.MultiPolygon]
#temporary table for pandas
#https://gist.github.com/alecxe/44682f79b18f0c82a99c
class TemporaryPandasTable(pd.io.sql.SQLTable):
def _execute_create(self):
# Inserting table into database, add to MetaData object
self.table = self.table.tometadata(self.pd_sql.meta)
# allow creation of temporary tables
self.table._prefixes.append('TEMPORARY')
self.table.create()
# methods that use a temporary table wrapper if the table allreaydy exists
temporary_table_methods=['copy']
def get_srid(crs):
"""
Extract the srid from a crs dict or proj string. If no srid can be
......@@ -42,6 +62,115 @@ def get_srid(crs):
srid = crs.split('epsg:')[1].split(' ')[0]
return int(srid)
# Alternative to_sql() *method* for DBs that support COPY FROM
def method_copy(table, conn, keys, data_iter):
# https://pandas.pydata.org/pandas-docs/version/0.24.2/user_guide/io.html#io-sql-method
# https://github.com/pandas-dev/pandas/pull/21401
logger.debug("using method_copy")
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)
columns = ', '.join('"{}"'.format(k) for k in keys)
if table.schema:
table_name = '{}.{}'.format(table.schema, table.name)
else:
table_name = table.name
sql = 'COPY "{}" ({}) FROM STDIN WITH CSV'.format(
table_name, columns)
cur.copy_expert(sql=sql, file=s_buf)
to_sql_methods = {
'copy' : method_copy,
}
def to_sql(df,*args,**kwargs):
"""
improved to_sql to take postgres dialect benefits.
method is 'copy' by default.
if attempting to append to an existing table, temporary table is created,
and inserted into previous table, taking care of on_conflict
index columns names from df are used as primary key
"""
logger.debug("using wrapped to_sql")
table=args[0]
con=args[1]
# get method
if 'method' not in kwargs:
logger.debug('setting "copy" as default method for to_sql')
kwargs['method'] = 'copy'
Session = sessionmaker(bind=con)
# use a session connection
session = Session()
con1=session.connection().connect()
tmp_table = None
new_table = False
if 'if_exists' in kwargs and con1.dialect.has_table(con1, table):
if kwargs['method'] in temporary_table_methods:
# will use a temporary table as a buffer ( for upsert copy )
logger.info("Temporary table used for fast update")
pandas_engine = pd.io.sql.pandasSQL_builder(con1)
tmp_table = "_%s" % table
tmp_table_omr = TemporaryPandasTable("'_%s'" % table, pandas_engine, frame=df, if_exists="replace")
tmp_table_omr.create()
else:
new_table=True
# replace method with func if available
if kwargs['method'] in to_sql_methods:
kwargs['method']=to_sql_methods[kwargs['method']]
try:
# use connection session
args=list(args)
if tmp_table is not None:
args[0]=tmp_table
args[1]=con1
args=tuple(args)
ret=df.to_sql_legacy(*args,**kwargs)
if new_table:
# this was probably allready done if it's an append ...
logger.debug('defining primary key from pandas index')
con1.execute('''alter table "%s" add constraint "%s_pk" primary key( %s )''' % (table,table,",".join([ '"%s"' %q for q in df.index.names])))
if tmp_table is not None:
logger.debug("merging tmp table")
con1.execute('''insert into "%s" select * from "%s" on conflict do nothing''' % (table,tmp_table))
con1.execute('''drop table "%s"''' % tmp_table)
session.commit()
except:
session.rollback()
raise
finally:
session.close()
return ret
def to_postgis(gdf,*args,**kwargs):
"""
like to_sql from geopandas, with following extensions:
......@@ -80,17 +209,19 @@ def to_postgis(gdf,*args,**kwargs):
session = Session()
con1=session.connection().connect()
if 'if_exists' in kwargs and kwargs['if_exists'] == 'append' and con1.dialect.has_table(con1, table):
# get actual column srid
column_srid=con1.execute('''SELECT Find_SRID('', '%s', '%s');''' % (table , gdf.geometry.name)).fetchone()[0]
if 'if_exists' in kwargs and con1.dialect.has_table(con1, table):
if kwargs['if_exists'] not in ['fail','replace']:
# table will be modified
# get actual column srid
column_srid=con1.execute('''SELECT Find_SRID('', '%s', '%s');''' % (table , gdf.geometry.name)).fetchone()[0]
if column_srid != srid:
raise ValueError("geopandas srid %s doesn't match postgis Find_SRID %s" % (srid , column_srid))
if column_srid != srid:
raise ValueError("geopandas srid %s doesn't match postgis Find_SRID %s" % (srid , column_srid))
if column_srid != 0:
# overwrite column srid so we don't need to give a srid for each wkt (we be done at the end)
con1.execute('''SELECT UpdateGeometrySRID('%s','%s',0);''' % (table , gdf.geometry.name))
if column_srid != 0:
# overwrite column srid so we don't need to give a srid for each wkt (we be done at the end)
con1.execute('''SELECT UpdateGeometrySRID('%s','%s',0);''' % (table , gdf.geometry.name))
ret=None
try:
......@@ -99,10 +230,11 @@ def to_postgis(gdf,*args,**kwargs):
args[1]=con1
args=tuple(args)
ret=gdf.to_sql(*args,**kwargs)
if srid != 0:
con1.execute('''SELECT UpdateGeometrySRID('%s','%s',%s);''' % (table , gdf.geometry.name , srid))
session.commit()
except:
......@@ -126,6 +258,9 @@ def add_postgis(gpd):
register_adapter(Shapely_geo,shapelyAdapter)
gpd.GeoDataFrame.to_postgis=to_postgis
gpd.GeoDataFrame.to_sql_legacy = gpd.GeoDataFrame.to_sql
gpd.GeoDataFrame.to_sql = to_sql
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment