添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Psycopg是Python用於操作PostgreSQL的庫。Hologres相容PostgreSQL 11,因此您可以通過psycopg訪問Hologres。本文將指導您使用psycopg2訪問Hologres,樣本使用的作業環境為基於CentOS 7系統的Python 3.8版本。

安裝Python3.8

您可以基於Miniconda、Anaconda安裝Python 3.8環境。如下內容以CentOS 7系統為例,安裝Python 3.8版本。

  1. 安裝Python 3.8。

    您可以下載對應版本的Python,執行如下命令進行安裝。

    # yum install centos-release-scl
    # yum install rh-python38
    # scl enable rh-python38 bash
    # python --version
    Python 3.8.6
  2. 安裝 psycopg2 模組。

    執行如下命令安裝psycopg2模組。

     # pip install psycopg2-binary

串連Hologres

Python3.8環境和psycopg2安裝完成之後,您可以執行如下操作並串連Hologres。

  1. 載入psycopg2。

    如果需要使用psycopg2,您可以執行命令 import psycopg2 載入安裝的psycopg2。

  2. 建立資料庫連接。

    您可以使用 psycopg2.connect() 函數串連Hologres,具體文法和參數說明如下所示。

    conn = psycopg2.connect(host="<Endpoint>",
                            port=<Port>,
                            dbname="<databases>",
                            user="<Access ID>",
                            password="<Access Key>",
                            keepalives=<keepalives>,
                            keepalives_idle=<keepalives_idle>,
                            keepalives_interval=<keepalives_interval>,
                            keepalives_count=<keepalives_count>,
                            application_name="<Application Name>"
    

    參數

    描述

    host

    Hologres執行個體的網路地址。

    進入 Hologres管理主控台 的執行個體詳情頁,從 網路資訊 擷取網路地址。

    port

    Hologres的執行個體連接埠。

    您可以進入 Hologres管理主控台 的執行個體詳情頁,從 網路資訊 擷取連接埠。

    dbname

    Hologres建立的資料庫名稱。

    user

    當前阿里雲帳號的AccessKey ID。

    您可以單擊 AccessKey 管理 ,擷取AccessKey ID。

    password

    當前阿里雲帳號的AccessKey Secret。

    您可以單擊 AccessKey 管理 ,擷取AccessKey Secret。

    application_name

    可選,應用程式名稱,用於記錄查詢日誌時識別SQL代表的應用含義。

    說明

    配置該參數,有助於您在慢Query清單中根據Application Name快速定位您的發起請求的應用。

    keepalives

    可選(推薦使用),串連方式:

    • 1表示使用長串連。

    • 0表示非長串連。

    keepalives_idle

    空閑時,保持串連連通的時間間隔,單位秒(s)。

    keepalives_interval

    沒得到回應時,等待重新嘗試保持連通的時間間隔,單位秒(s)。

    keepalives_count

    嘗試重新保持連通最大次數。

    程式碼範例如下。

    conn = psycopg2.connect(host="<Endpoint>",
                            port=<Port>,
                            dbname="<databases>",
                            user="<Access ID>",
                            password="<Access Key>",
                            keepalives=1,  # 保持串連
                            keepalives_idle=130,  # 空閑時,每130秒保持串連連通
                            keepalives_interval=10,   # 沒得到回應時,等待10秒重新嘗試保持連通
                            keepalives_count=15,   # 嘗試最多15次重新保持連通
                            application_name="<Application Name>"
    

使用Hologres

當您成功串連Hologres資料庫之後,即可通過psycopg2進行資料開發操作。如下內容將指導您建立表、插入資料、查詢和釋放資源等操作。如果需要使用Fixed Plan能力實現更高效能的讀寫操作,需要配置相關GUC參數,請參見 Fixed Plan加速SQL執行

  1. 建立遊標。

    在進行資料開發之前,您需要執行命令 cur = conn.cursor() 來建立串連的遊標。

  2. 資料開發。

    1. 建立表

      您可以執行如下命令,建立一個表 holo_test 並定義表的資料類型為integer。您也可以根據業務需求定義表名稱和資料類型。

      cur.execute("CREATE TABLE holo_test (num integer);")
    2. 插入資料

      您可以執行如下命令,為建立的表 holo_test 插入資料1~1000。

      cur.execute("INSERT INTO holo_test SELECT generate_series(%s, %s)", (1, 1000))
    3. 查詢資料

      cur.execute("SELECT sum(num) FROM holo_test;")
      cur.fetchone()
  3. 提交事務。

    在查詢資料的命令之後,您需要執行命令 conn.commit() 提交事務,此操作可以確保操作已經提交。也可以把 autocommit 參數設定為true,實現SQL命令的自動認可。

  4. 釋放資源。

    為避免影響後續的操作,當操作執行完成後,您需要執行如下命令關閉遊標並斷開資料庫連接。

    cur.close()
    conn.close()

Pandas DataFrame快速寫入Hologres最佳實務

使用Python時,經常會使用Pandas將資料轉換為DataFrame,並對DataFrame進行處理,最終將DataFrame匯入Hologres,此時希望將DataFrame快速匯入Hologres。匯入時候常用 to_sql 函數,詳情請參見 Pandas

需要Pandas為V1.4.2及以上版本,您可以執行如下命令強制安裝V1.5.1版本的Pandas庫。

# pip install Pandas==1.5.1

推薦使用 to_sql 函數的callable方式,使用copy方式匯入資料,範例的Python代碼如下。

# 載入依賴
import pandas as pd
import psycopg2
# 產生連接字串
host="hgpostcn-cn-xxxxxx-cn-hangzhou.hologres.aliyuncs.com"
port=80
dbname="demo"
user="LTAI5xxxxx"
password="fa8Kdgxxxxx"
application_name="Python Test"
conn = "postgresql+psycopg2://{}:{}@{}:{}/{}?application_name={}".format(user, password, host, port, dbname,application_name)
print(conn)
# 產生dataframe
data = [('1','1','1'),('2','2','2')]
cols = ('col1','col2','col3')
pd_data = pd.DataFrame(data, columns=cols)
# 定義callable函數
import csv
from io import StringIO
def psql_insert_copy(table, conn, keys, data_iter):
    Execute SQL statement inserting data
    Parameters
    ----------
    table : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)
        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name
        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)
# 匯入資料
pd_data.to_sql(
    name="pd_data",
    con=conn,
    if_exists="append",
    index=False,
    method=psql_insert_copy
)

查看歷史查詢,驗證已經使用COPY方式寫入資料至Hologres。 歷史慢Query