Source code for batcat.Redshift

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
author:     Ewen Wang
email:      wolfgangwong2012@gmail.com
license:    Apache License 2.0
"""
from io import StringIO, BytesIO
from datetime import datetime
from redshift_connector import connect

import json
import random

import numpy as np
import pandas as pd

import boto3
import botocore.session as Session
from botocore.exceptions import WaiterError
from botocore.waiter import WaiterModel
from botocore.waiter import create_waiter_with_client

[docs] def read_data_from_redshift(query, host, password, port=5439, database='dev', user='awsuser', date_start=None, date_end=None): """Read DataFrame from RedShift with host and password. Args: query (str): Querry to obtain data from Redshift, str. host (str): Redshift configuration. password (str): Redshift configuration. port (str): Redshift configuration. database (str): Redshift configuration. user (str): Redshift configuration. date_start (str): Date to start, strftime('%Y/%m/%d'). date_end (str): Date to end, strftime('%Y/%m/%d'). Returns: df (pandas.DataFrame): target dataframe """ cursor = connect(host=host, port=port, database=database, user=user, password=password).cursor() query = query.format(date_start, date_end) cursor.execute(query) df = cursor.fetch_dataframe() return df
[docs] def save_df_to_redshift(df, host=None, password=None, port=5439, database='dev', user='awsuser', table_name=None, schema='public', if_exists='append', index=True, index_label=None, chunksize=None, dtype=None, method=None): """Save pd.DataFrame to RedShift with host and password. Refer to `pandas.to_sql <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html>`_ for more information. Args: df (pandas.DataFrame): target dataframe host (str): in the form [name].[id].[region].redshift.amazonaws.com password (str): Redshift configuration port (str): usually 5439 database (str): Redshift configuration user (str): Redshift configuration table_name (str): target table name' schema (str): Specify the schema (if database flavor supports this). If None, use default schema. if_exists (str): How to behave if the table already exists, {‘fail’, ‘replace’, ‘append’}, default ‘append’. (1) fail: Raise a ValueError. (2) replace: Drop the table before inserting new values. (3) append: Insert new values to the existing table. index (bool): Write DataFrame index as a column, default True. Uses `index_label` as the column name in the table. index_label (str or sequence): Column label for index column(s), default None. If None is given (default) and `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex. chunksize (int, optional): Specify the number of rows in each batch to be written at a time. By default, all rows will be written at once. dtype (dict or scalar, optional): Specifying the datatype for columns. If a dictionary is used, the keys should be the column names and the values should be the SQLAlchemy types or strings for the sqlite3 legacy mode. If a scalar is provided, it will be applied to all columns. method (str): Controls the SQL insertion clause used: (1)None : Uses standard SQL ``INSERT`` clause (one per row). (2)'multi': Pass multiple values in a single ``INSERT`` clause. (3)callable with signature ``(pd_table, conn, keys, data_iter)``. Details and a sample callable implementation can be found in the section `insert method <https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method>`_ . Returns: None """ from sqlalchemy import create_engine con = create_engine('postgresql+psycopg2://{}:{}@{}:{}/{}'.format(user, password, host, port, database)) df.to_sql(name=table_name, con=con, schema=schema, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype, method=method) return None
[docs] def get_secret(secret_name, region): """Get configurations from AWS Secret Mananger. Args: secret_name (str): A secret name setted up in AWS Secrets Manager. region (str): The region name of AWS. Returns: secret (dict): The secret configurations. """ secret = dict() secret['name'] = secret_name secret['region'] = region session = boto3.session.Session() client = session.client( service_name='secretsmanager', region_name=secret['region'] ) try: get_secret_value_response = client.get_secret_value( SecretId=secret_name ) secret['arn']=get_secret_value_response['ARN'] except ClientError as e: print("Error retrieving secret. Error: " + e.response['Error']['Message']) else: # Depending on whether the secret is a string or binary, one of these fields will be populated. if 'SecretString' in get_secret_value_response: Secret = get_secret_value_response['SecretString'] else: Secret = base64.b64decode(get_secret_value_response['SecretBinary']) secret = {**secret, **json.loads(Secret)} return secret
def _get_waiter(waiter_name, delay=100, max_attempts=30): ### initiating waiter delay=delay max_attempts=30 #Configure the waiter settings waiter_config = { 'version': 2, 'waiters': { 'DataAPIExecution': { 'operation': 'DescribeStatement', 'delay': delay, 'maxAttempts': max_attempts, 'acceptors': [ { "matcher": "path", "expected": "FINISHED", "argument": "Status", "state": "success" }, { "matcher": "pathAny", "expected": ["PICKED","STARTED","SUBMITTED"], "argument": "Status", "state": "retry" }, { "matcher": "pathAny", "expected": ["FAILED","ABORTED"], "argument": "Status", "state": "failure" } ], }, }, } # set random seeds for reproducibility np.random.seed(42) random.seed(42) waiter_model = WaiterModel(waiter_config) return waiter_config, waiter_model def _make_datarow(output): res = [] a = [] for i in output['Records']: for j in i: try: a.append(j['stringValue']) except: try: a.append(j['doubleValue']) except: try: a.append(j['longValue']) except: a.append(np.nan) res.append(a) a = [] return res
[docs] def read_data_from_redshift_by_secret(secret_name=None, region=None, query=None, date_start=None, date_end=None, delay=100): """Read DataFrame from RedShift with AWS Secrets Manager. Args: secret_name (str): The name of AWS Secrets Manager. region (str): AWS region name. query (str): Querry to obtain data from Redshift. date_start (str): Date to start, strftime('%Y/%m/%d'). date_end (str): Date to end, strftime('%Y/%m/%d'). delay (int): Time to wait for the query. Returns: df (pandas.DataFrame): Target dataframe. """ query = query.format(date_start, date_end) secret = get_secret(secret_name, region) ## Data API client bc_session = Session.get_session() session = boto3.Session( botocore_session=bc_session, region_name=secret['region'], ) ## Setup the client client_redshift = session.client("redshift-data") ## Setup waiter waiter_name = 'DataAPIExecution' waiter_config, waiter_model = _get_waiter(waiter_name, delay=delay) custom_waiter = create_waiter_with_client(waiter_name, waiter_model, client_redshift) ## Read data res = client_redshift.execute_statement(Database = secret['db'], SecretArn = secret['arn'], Sql = query, ClusterIdentifier = secret['dbClusterIdentifier']) ## Reset the 'delay' attribute of the waiter back to [delay] seconds. waiter_config["waiters"]["DataAPIExecution"]["delay"] = delay waiter_model = WaiterModel(waiter_config) custom_waiter = create_waiter_with_client(waiter_name, waiter_model, client_redshift) ## Waiter in try block and wait for DATA API to return try: custom_waiter.wait(Id=res['Id']) print("Done waiting to finish Data API.") except WaiterError as e: print(e) output = client_redshift.get_statement_result(Id=res['Id']) ncols = len(output["ColumnMetadata"]) resrows = _make_datarow(output) col_labels=[] for i in range(ncols): col_labels.append(output["ColumnMetadata"][i]['label']) df = pd.DataFrame(np.array(resrows), columns=col_labels) return df
if __name__ == '__main__': main()