diff --git a/README.md b/README.md index c9ecc59796bbcca8360c11f4f9e535315c09a7e3..98e3ef5a418f21accd8b2dd77f5e7d59fe1d5202 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,19 @@ postgis extensions for geopandas -geopandas is lackaing to_postgis method (see https://github.com/geopandas/geopandas/pull/457 ) +geopandas is lacking to_postgis method (see https://github.com/geopandas/geopandas/pull/457 ) add a to_postgis method to GeoDataFrame, with following extensions: - gfd.geometry is converted to Geometry (if crs/srid) or Geography if no srid - shapely.geometry types supported + + +Performance: add method='copy' to allowed to_sql method's. (this is the default : use method='default' or method=None to fallback legacy) + +pandas to_sql postgres 'COPY' instead of 'INSERT' is not implemented. +geopandas_postgis override to_sql def to provide method='copy' ( compatible with if_exist='append', using tempora + ``` from sqlalchemy import create_engine @@ -18,6 +25,6 @@ geopandas_postgis.add_postgis(gpd) gdf=gpd.GeoDataFrame(df,geometry='geom') # dump to postgis -gdf.to_postgis('table_name',create_engine('postgresql://user:pass@host:5432/base_name') +gdf.to_postgis('table_name',create_engine('postgresql://user:pass@host:5432/base_name')) ``` \ No newline at end of file diff --git a/geopandas_postgis/geopandas_postgis.py b/geopandas_postgis/geopandas_postgis.py index 003fa36d58446ca1e3dfce5a8686627577569b25..c4336ea77b0f639c3db3d9847496a5cd8c8563ce 100644 --- a/geopandas_postgis/geopandas_postgis.py +++ b/geopandas_postgis/geopandas_postgis.py @@ -3,6 +3,11 @@ from psycopg2.extensions import adapt, register_adapter, AsIs, QuotedString import shapely as shp import shapely.geometry as shpg from sqlalchemy.orm import sessionmaker +from sqlalchemy.dialects import postgresql +from sqlalchemy import text; +import csv +from io import StringIO +import pandas as pd import os import logging @@ -19,6 +24,21 @@ shapely_handled_types=[ shpg.MultiLineString, shpg.MultiPolygon] +#temporary table for pandas +#https://gist.github.com/alecxe/44682f79b18f0c82a99c +class TemporaryPandasTable(pd.io.sql.SQLTable): + def _execute_create(self): + # Inserting table into database, add to MetaData object + self.table = self.table.tometadata(self.pd_sql.meta) + + # allow creation of temporary tables + self.table._prefixes.append('TEMPORARY') + + self.table.create() +# methods that use a temporary table wrapper if the table allreaydy exists +temporary_table_methods=['copy'] + + def get_srid(crs): """ Extract the srid from a crs dict or proj string. If no srid can be @@ -42,6 +62,115 @@ def get_srid(crs): srid = crs.split('epsg:')[1].split(' ')[0] return int(srid) + + +# Alternative to_sql() *method* for DBs that support COPY FROM +def method_copy(table, conn, keys, data_iter): + # https://pandas.pydata.org/pandas-docs/version/0.24.2/user_guide/io.html#io-sql-method + # https://github.com/pandas-dev/pandas/pull/21401 + logger.debug("using method_copy") + # gets a DBAPI connection that can provide a cursor + dbapi_conn = conn.connection + with dbapi_conn.cursor() as cur: + s_buf = StringIO() + writer = csv.writer(s_buf) + writer.writerows(data_iter) + s_buf.seek(0) + + columns = ', '.join('"{}"'.format(k) for k in keys) + if table.schema: + table_name = '{}.{}'.format(table.schema, table.name) + else: + table_name = table.name + + sql = 'COPY "{}" ({}) FROM STDIN WITH CSV'.format( + table_name, columns) + cur.copy_expert(sql=sql, file=s_buf) + +to_sql_methods = { + 'copy' : method_copy, + } + +def to_sql(df,*args,**kwargs): + """ + improved to_sql to take postgres dialect benefits. + + method is 'copy' by default. + if attempting to append to an existing table, temporary table is created, + and inserted into previous table, taking care of on_conflict + + index columns names from df are used as primary key + """ + + logger.debug("using wrapped to_sql") + + + table=args[0] + con=args[1] + + # get method + if 'method' not in kwargs: + logger.debug('setting "copy" as default method for to_sql') + kwargs['method'] = 'copy' + + + Session = sessionmaker(bind=con) + + # use a session connection + session = Session() + con1=session.connection().connect() + + tmp_table = None + new_table = False + if 'if_exists' in kwargs and con1.dialect.has_table(con1, table): + if kwargs['method'] in temporary_table_methods: + # will use a temporary table as a buffer ( for upsert copy ) + logger.info("Temporary table used for fast update") + pandas_engine = pd.io.sql.pandasSQL_builder(con1) + tmp_table = "_%s" % table + tmp_table_omr = TemporaryPandasTable("'_%s'" % table, pandas_engine, frame=df, if_exists="replace") + tmp_table_omr.create() + else: + new_table=True + + # replace method with func if available + if kwargs['method'] in to_sql_methods: + kwargs['method']=to_sql_methods[kwargs['method']] + + + try: + # use connection session + args=list(args) + if tmp_table is not None: + args[0]=tmp_table + args[1]=con1 + args=tuple(args) + ret=df.to_sql_legacy(*args,**kwargs) + + if new_table: + # this was probably allready done if it's an append ... + logger.debug('defining primary key from pandas index') + con1.execute('''alter table "%s" add constraint "%s_pk" primary key( %s )''' % (table,table,",".join([ '"%s"' %q for q in df.index.names]))) + + if tmp_table is not None: + logger.debug("merging tmp table") + con1.execute('''insert into "%s" select * from "%s" on conflict do nothing''' % (table,tmp_table)) + con1.execute('''drop table "%s"''' % tmp_table) + session.commit() + except: + session.rollback() + raise + finally: + session.close() + + return ret + + + + + + + def to_postgis(gdf,*args,**kwargs): """ like to_sql from geopandas, with following extensions: @@ -80,17 +209,19 @@ def to_postgis(gdf,*args,**kwargs): session = Session() con1=session.connection().connect() - - if 'if_exists' in kwargs and kwargs['if_exists'] == 'append' and con1.dialect.has_table(con1, table): - # get actual column srid - column_srid=con1.execute('''SELECT Find_SRID('', '%s', '%s');''' % (table , gdf.geometry.name)).fetchone()[0] + if 'if_exists' in kwargs and con1.dialect.has_table(con1, table): + if kwargs['if_exists'] not in ['fail','replace']: + # table will be modified + # get actual column srid + column_srid=con1.execute('''SELECT Find_SRID('', '%s', '%s');''' % (table , gdf.geometry.name)).fetchone()[0] + + if column_srid != srid: + raise ValueError("geopandas srid %s doesn't match postgis Find_SRID %s" % (srid , column_srid)) - if column_srid != srid: - raise ValueError("geopandas srid %s doesn't match postgis Find_SRID %s" % (srid , column_srid)) + if column_srid != 0: + # overwrite column srid so we don't need to give a srid for each wkt (we be done at the end) + con1.execute('''SELECT UpdateGeometrySRID('%s','%s',0);''' % (table , gdf.geometry.name)) - if column_srid != 0: - # overwrite column srid so we don't need to give a srid for each wkt (we be done at the end) - con1.execute('''SELECT UpdateGeometrySRID('%s','%s',0);''' % (table , gdf.geometry.name)) ret=None try: @@ -99,10 +230,11 @@ def to_postgis(gdf,*args,**kwargs): args[1]=con1 args=tuple(args) ret=gdf.to_sql(*args,**kwargs) - + if srid != 0: con1.execute('''SELECT UpdateGeometrySRID('%s','%s',%s);''' % (table , gdf.geometry.name , srid)) + session.commit() except: @@ -126,6 +258,9 @@ def add_postgis(gpd): register_adapter(Shapely_geo,shapelyAdapter) gpd.GeoDataFrame.to_postgis=to_postgis + gpd.GeoDataFrame.to_sql_legacy = gpd.GeoDataFrame.to_sql + gpd.GeoDataFrame.to_sql = to_sql +