最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Write to different schema in postgresql using pandas - Stack Overflow

programmeradmin1浏览0评论

hi i have a python code where i am reading some data from PostgreSQL table(schema of the tables in a db) and doing some transformations and then writing some data back to PostgreSQL table but in another schema so same database but under different schema. but my code is writing the data or creating table and writing data to same schema.

below is my code. in the below code i am giving the schema name and table name in the variable 'table_name' but its not helping me.

import pandas as pd
import psycopg2
from sqlalchemy import create_engine, JSON
import json

# Function to convert PostgreSQL data types to Snowflake data types
def convert_data_type(pg_data_type):
    conversion_dict = {
        'integer': 'NUMBER',
        'bigint': 'NUMBER',
        'smallint': 'NUMBER',
        'serial': 'NUMBER',
        'bigserial': 'NUMBER',
        'decimal': 'NUMBER',
        'numeric': 'NUMBER',
        'real': 'FLOAT',
        'double precision': 'FLOAT',
        'money': 'FLOAT',
        'character varying': 'VARCHAR',
        'varchar': 'VARCHAR',
        'character': 'CHAR',
        'char': 'CHAR',
        'text': 'STRING',
        'bytea': 'BINARY',
        'timestamp without time zone': 'TIMESTAMP_NTZ',
        'timestamp with time zone': 'TIMESTAMP_TZ',
        'date': 'DATE',
        'time without time zone': 'TIME_NTZ',
        'time with time zone': 'TIME_TZ',
        'boolean': 'BOOLEAN'
    }
    return conversion_dict.get(pg_data_type, pg_data_type)

pg_dbname = "my_db_name"
pg_user = "my_user"
pg_password = "My_pw"
pg_host = "My_host"
pg_port = "5432"


# Connect to PostgreSQL database
conn = psycopg2.connect(
    dbname=pg_dbname,
    user=pg_user,
    password=pg_password,
    host=pg_host,
    port=pg_port
)

# Create a cursor object
cur = conn.cursor()

# Query to get table schemas
cur.execute("""
SELECT table_schema, table_name, column_name, data_type 
FROM information_schema.columns 
WHERE table_schema = 'public'
ORDER BY table_name
""")

# Fetch all results
results = cur.fetchall()

# Close the cursor and connection
cur.close()
conn.close()

# Process the results and create a DataFrame
data = []
for row in results:
    table_schema, table_name, column_name, data_type = row
    converted_data_type = convert_data_type(data_type)
    data.append([table_schema, table_name, column_name, data_type, converted_data_type])

df = pd.DataFrame(data, columns=['table_schema', 'table_name', 'column_name', 'original_data_type', 'converted_data_type'])

# Grouping data and making dictionary
result = (
    df.groupby(['table_schema', 'table_name'])
    .apply(lambda x: pd.Series({
        'columns': dict(zip(x['column_name'], x['converted_data_type'])),
        'original_columns': dict(zip(x['column_name'], x['original_data_type']))
    }))
    .reset_index()
)

# Create SQLAlchemy engine
engine = create_engine(f'postgresql+psycopg2://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_dbname}')

# Define the table name in the new schema
table_name = 'project_demo.mapping_table'

# Insert the DataFrame into the PostgreSQL table in the new schema
result.to_sql(table_name, engine, if_exists='replace', index=False, dtype={'columns': JSON, 'original_columns': JSON})

print("Data inserted successfully!")

i tried to change the schema name in the variable

hi i have a python code where i am reading some data from PostgreSQL table(schema of the tables in a db) and doing some transformations and then writing some data back to PostgreSQL table but in another schema so same database but under different schema. but my code is writing the data or creating table and writing data to same schema.

below is my code. in the below code i am giving the schema name and table name in the variable 'table_name' but its not helping me.

import pandas as pd
import psycopg2
from sqlalchemy import create_engine, JSON
import json

# Function to convert PostgreSQL data types to Snowflake data types
def convert_data_type(pg_data_type):
    conversion_dict = {
        'integer': 'NUMBER',
        'bigint': 'NUMBER',
        'smallint': 'NUMBER',
        'serial': 'NUMBER',
        'bigserial': 'NUMBER',
        'decimal': 'NUMBER',
        'numeric': 'NUMBER',
        'real': 'FLOAT',
        'double precision': 'FLOAT',
        'money': 'FLOAT',
        'character varying': 'VARCHAR',
        'varchar': 'VARCHAR',
        'character': 'CHAR',
        'char': 'CHAR',
        'text': 'STRING',
        'bytea': 'BINARY',
        'timestamp without time zone': 'TIMESTAMP_NTZ',
        'timestamp with time zone': 'TIMESTAMP_TZ',
        'date': 'DATE',
        'time without time zone': 'TIME_NTZ',
        'time with time zone': 'TIME_TZ',
        'boolean': 'BOOLEAN'
    }
    return conversion_dict.get(pg_data_type, pg_data_type)

pg_dbname = "my_db_name"
pg_user = "my_user"
pg_password = "My_pw"
pg_host = "My_host"
pg_port = "5432"


# Connect to PostgreSQL database
conn = psycopg2.connect(
    dbname=pg_dbname,
    user=pg_user,
    password=pg_password,
    host=pg_host,
    port=pg_port
)

# Create a cursor object
cur = conn.cursor()

# Query to get table schemas
cur.execute("""
SELECT table_schema, table_name, column_name, data_type 
FROM information_schema.columns 
WHERE table_schema = 'public'
ORDER BY table_name
""")

# Fetch all results
results = cur.fetchall()

# Close the cursor and connection
cur.close()
conn.close()

# Process the results and create a DataFrame
data = []
for row in results:
    table_schema, table_name, column_name, data_type = row
    converted_data_type = convert_data_type(data_type)
    data.append([table_schema, table_name, column_name, data_type, converted_data_type])

df = pd.DataFrame(data, columns=['table_schema', 'table_name', 'column_name', 'original_data_type', 'converted_data_type'])

# Grouping data and making dictionary
result = (
    df.groupby(['table_schema', 'table_name'])
    .apply(lambda x: pd.Series({
        'columns': dict(zip(x['column_name'], x['converted_data_type'])),
        'original_columns': dict(zip(x['column_name'], x['original_data_type']))
    }))
    .reset_index()
)

# Create SQLAlchemy engine
engine = create_engine(f'postgresql+psycopg2://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_dbname}')

# Define the table name in the new schema
table_name = 'project_demo.mapping_table'

# Insert the DataFrame into the PostgreSQL table in the new schema
result.to_sql(table_name, engine, if_exists='replace', index=False, dtype={'columns': JSON, 'original_columns': JSON})

print("Data inserted successfully!")

i tried to change the schema name in the variable

Share Improve this question edited Feb 4 at 17:41 Gord Thompson 124k36 gold badges245 silver badges449 bronze badges asked Feb 4 at 15:17 DarkmasterDarkmaster 434 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

I googled "pandas to_sql", which pointed me here: https://pandas.pydata./docs/reference/api/pandas.DataFrame.to_sql.html. There is a schema keyword argument listed. So, just do result.to_sql('mapping_table', engine, schema='project_demo', ...). By passing a dotted schema.table as the first arg, your current code will instead try to write to a table called "project_demo.mapping_table" in the public schema.

发布评论

评论列表(0)

  1. 暂无评论