I'm working with a PostgreSQL database in Python using psycopg2 and pandas. My workflow involves reading data into Python, uploading it to the database, retrieving it back into Python, and then updating the database. To ensure data integrity, I aim to create a comprehensive mapping between pandas data types and PostgreSQL data types. Currently, I'm encountering an issue where date and time data (including both day and hour) are not being uploaded to PostgreSQL with the correct data type.
I'm utilizing the following function to map pandas data types to PostgreSQL data types:
def map_dtype(dtype):
if np.issubdtype(dtype, np.integer):
return 'BIGINT' # Maps to PostgreSQL's BIGINT for large integers
elif np.issubdtype(dtype, np.floating):
return 'DOUBLE PRECISION' # Maps to PostgreSQL's DOUBLE PRECISION for floating-point numbers
elif np.issubdtype(dtype, np.datetime64):
return 'TIMESTAMP' # Maps to PostgreSQL's TIMESTAMP for date and time
elif np.issubdtype(dtype, np.bool_):
return 'BOOLEAN' # Maps to PostgreSQL's BOOLEAN for boolean values
else:
return 'TEXT' # Default mapping to PostgreSQL's TEXT for other types
This is the function I'm using to upload data to DB:
def upload_to_db(path_to_config, tbl_name, col_str, dataframe):
"""Upload a DataFrame to the specified database."""
conn = connect_to_db(path_to_config)
if not conn:
return
cursor = conn.cursor()
try:
# Get the current user
cursor.execute("SELECT current_user;")
current_user = cursor.fetchone()[0]
# Define schema based on user
schema = "developer_schema" if current_user == "dev1" else "public"
full_table_name = f"{schema}.{tbl_name}"
cursor.execute(f"DROP TABLE IF EXISTS {full_table_name};")
# Generate the CREATE TABLE statement with explicit data types
cursor.execute(f"CREATE TABLE {full_table_name} ({col_str});")
print(f'Table {full_table_name} was created successfully.')
csv_file_path = f"{tbl_name}.csv"
dataframe.to_csv(csv_file_path, header=True, index=False, encoding='utf-8')
with open(csv_file_path, 'r') as my_file:
print('File opened in memory.')
sql_statement = f"""
COPY {full_table_name} FROM STDIN WITH
CSV
HEADER
DELIMITER ','
"""
cursor.copy_expert(sql=sql_statement, file=my_file)
print('File copied to the database.')
cursor.execute(f"GRANT SELECT ON TABLE {full_table_name} TO public;")
connmit()
print(f'Table {full_table_name} imported to the database successfully.')
except Exception as e:
print(f"Error during upload: {e}")
conn.rollback()
finally:
cursor.close()
conn.close()
if os.path.exists(csv_file_path):
try:
os.remove(csv_file_path)
print(f"Temporary file {csv_file_path} has been deleted.")
except Exception as e:
print(f"Error deleting file {csv_file_path}: {e}")
Is this the best approach? Do you have any suggestions? Ps. I don't want to use sqlalchemy because is too slow for large dataset (my case).