Source code for config_from_pg.db

"""Database helper module.
Modified from https://gist.github.com/rustprooflabs/3b8564a8e7b7fe611436b30a95b7cd17,
adapted to psycopg 3 from psycopg2.
"""
import getpass
import psycopg


[docs] def prepare(): """Ensures latest `pgconfig.settings` view exists in DB to generate config. """ print('Preparing database objects...') ensure_schema_exists() ensure_view_exists() print('Database objects ready.')
[docs] def ensure_schema_exists(): """Ensures the `pgconfig` schema exists.""" sql_raw = 'CREATE SCHEMA IF NOT EXISTS pgconfig;' _execute_query(sql_raw, params=None, qry_type='ddl')
[docs] def ensure_view_exists(): """Ensures the view `pgconfig.settings` exists.""" sql_file = 'create_pgconfig_settings_view.sql' with open(sql_file) as f: sql_raw = f.read() _execute_query(sql_raw, params=None, qry_type='ddl')
[docs] def select_one(sql_raw: str, params: dict) -> dict: """ Runs SELECT query that will return zero or 1 rows. Parameters ----------------- sql_raw : str params : dict Params is required, can be `None` if query returns a single row such as `SELECT version();` Returns ----------------- data : dict """ return _execute_query(sql_raw, params, 'sel_single')
[docs] def select_multi(sql_raw, params=None) -> list: """ Runs SELECT query that will return multiple. `params` is optional. Parameters ----------------- sql_raw : str params : dict Params is optional, defaults to `None`. Returns ------------------ data : list List of dictionaries. """ return _execute_query(sql_raw, params, 'sel_multi')
[docs] def get_db_string() -> str: """Prompts user for details to create connection string Returns ------------------------ database_string : str """ db_host = input('Database host [127.0.0.1]: ') or '127.0.0.1' db_port = input('Database port [5432]: ') or '5432' db_name = input('Database name: ') db_user = input('Enter PgSQL username: ') db_pw = getpass.getpass('Enter password (empty for pgpass): ') or None if db_pw is None: database_string = 'postgresql://{user}@{host}:{port}/{dbname}' else: database_string = 'postgresql://{user}:{pw}@{host}:{port}/{dbname}' return database_string.format(user=db_user, pw=db_pw, host=db_host, port=db_port, dbname=db_name)
[docs] DB_STRING = get_db_string()
[docs] def get_db_conn(): """Uses DB_STRING to establish psycopg connection.""" db_string = DB_STRING try: conn = psycopg.connect(db_string) except psycopg.OperationalError as err: err_msg = f'DB Connection Error - Error: {err}' print(err_msg) return False return conn
[docs] def _execute_query(sql_raw, params, qry_type): """ Handles executing all types of queries based on the `qry_type` passed in. Returns False if there are errors during connection or execution. if results == False: print('Database error') else: print(results) You cannot use `if not results:` b/c 0 results is a false negative. """ try: conn = get_db_conn() except psycopg.ProgrammingError as err: print(f'Connection not configured properly. Err: {err}') return False if not conn: return False cur = conn.cursor(row_factory=psycopg.rows.dict_row) try: cur.execute(sql_raw, params) if qry_type == 'sel_single': results = cur.fetchone() elif qry_type == 'sel_multi': results = cur.fetchall() elif qry_type == 'ddl': conn.commit() results = True else: raise Exception('Invalid query type defined.') except psycopg.BINARYProgrammingError as err: print('Database error via psycopg. %s', err) results = False except psycopg.IntegrityError as err: print('PostgreSQL integrity error via psycopg. %s', err) results = False finally: conn.close() return results