Pythonで、Accessにアクセスする

目次

Pythonで、Accessにアクセスする

PythonからAccessにアクセスして、データベースにアクセスするクラスライブラリを作りました。

クラス化のメリット

・メインプログラムを簡素化できる
メインプログラムの記述が単純化されます。
データベース処理とエラー処理をクラス側で行う。
メインプログラムは、データ処理の知識がいらない。

・他のプログラムでクラスを流用できる
エラーの無いプログラムを利用できる
メインプログラムの標準化・可読性が向上する

・複数のデータを同じ手法で取り扱いできる
別のインスタンスとして取り扱える

・メモリ(インスタンス)上で取り扱いできる
データベースへのアクセスが減る

必要な準備 

Python で Microsoft Access データベースに接続するには、主に ODBC(Open Database Connectivity)を使用します。

Python インストール

Python がインストールされていない場合は、公式サイトからインストールしてください。

Python.org
Welcome to Python.org The official home of the Python Programming Language

必要なライブラリインストール

Access データベースと Python を接続するためには、pyodbc ライブラリを使用します。以下のコマンドでインストールします:

  pip install pyodbc

Access データベースファイル準備

接続したい Access ファイル(例:database.accdb または database.mdb)を用意します。

ODBC ドライバ確認

Access データベースに接続するには ODBC ドライバが必要です。Windows 環境では通常、以下のどちらかがインストールされています:

Microsoft Access Database Engine

Microsoft Office

必要に応じて Access Database Engine をインストールしてください。

ODBCドライバーの確認

Windows管理ツール
→ODBCデータソース →ドライバー
→Microsoft Access Driver (*.mdb, *.accdb)

接続文字列の作成

# 接続文字列 
connection_string = (
      "Driver={Microsoft Access Driver (*.mdb, *.accdb)};"
      f"DBQ={db_path};"
 )

Accessデータベースの配置場所

ドライブ指定

db_path = r”Z:\\Database_Master\\eng001_Ver7.accdb”

ネットワークドライブ指定

db_path = r”\\\\landisk-0010\\Youtube\\Database_Master\\eng001_Ver7.accdb”

#
# pip install pyodbc
#

import pyodbc as mydb

# データベースファイルのパスを指定
database_path = "Z:\\Database_Master\\eng001_Ver7.accdb"
# 接続文字列
connection_string = (
    r"Driver={Microsoft Access Driver (*.mdb, *.accdb)};"
    f"DBQ={database_path};"
)

try:
    # データベースに接続
    connection = mydb.connect(connection_string)
    cursor = connection.cursor()

    # クエリの実行例: テーブルのデータを取得
    cursor.execute("SELECT * FROM user_master")
    rows = cursor.fetchall()

    # 結果セットがある場合のみ表示
    if cursor.description:  # フィールド名が存在する場合のみ実行
        field_names = [desc[0] for desc in cursor.description]  # フィールド名を取得
        for item in field_names:
            print(item, "\t", end="")
        print("")  # 改行

        data_array = cursor.fetchall()  # 結果をリストとして取得
        for row in data_array:
            for item in row:
                print(item, "\t", end="")  # 各行を表示
            print("")  # 改行
    else:
        print("クエリが成功しましたが、結果セットがありません。")

except mydb.Error as err:
    print(f"MySQLエラー: {err}")
except Exception as e:
    print(f"その他のエラー: {e}")
finally:                        # 切断処理
    if connection is not None :
        connection.close()
        print("接続を閉じました")
        

ソースコード

import pyodbc as mydb
# import os
#db_path = os.getenv("DB_PATH", r"Z:\\Database_Master\\eng001_Ver7.accdb")

class DbAccessLib:
    """メンバー変数の登録"""
    # db_path = r"Z:\\Database_Master\\eng001_Ver7.accdb"  # Path to your database
    db_path = r"\\\\landisk-0010\\Youtube\\Database_Master\\eng001_Ver7.accdb"  # Path to your database
    db_name = "Access"
    tb_name = "顧客マスタ"
    data_field = []
    data_array = []

    def __init__(self, db_path=None,db_name=None,tb_name=None):
        """ DbAccessLib クラスのコンストラクタ。 """
        # デフォルトのデータベースパスを設定
        if db_path is None:
            db_path = r"Z:\\Database_Master\\eng001_Ver7.accdb"
        if db_name is None:
            db_name = "Access"
        if tb_name is None:
            tb_name = "顧客マスタ"
        # データベース接続文字列の構築
        self.connection_string = (
            "Driver={Microsoft Access Driver (*.mdb, *.accdb)};"
            f"DBQ={db_path};"
        )
        self.db_path = db_path
        self.db_name = db_name
        self.tb_name = tb_name

    # Function to get data array from the database
    def get_table_data(self, tb_name):
        query = "SELECT * FROM " + tb_name + " ;"  # SQL query to fetch data
        try:
            # Connecting to the Access database
            with mydb.connect(self.connection_string) as connection:
                with connection.cursor() as cursor:
                    cursor.execute(query)                  
                    if cursor.description:  # If field names exist
                        self.data_field = [desc[0] for desc in cursor.description]  # Get field names
                        self.data_array = cursor.fetchall()  # Fetch results
                        return self.data_field, self.data_array, False, "クエリが成功しました" # Return success, no error message
                    else:
                        return None, None, False, "結果セットがありません。"
        except mydb.Error as err:
           return None, None, True,  f"データベースエラー: {err}"
        except Exception as e:
           return None, None, True , f"その他のエラー: {e}"

    def dorop_tabale(self,tb_name):
        sqlstr = f"DROP TABLE {tb_name} ; "
        success, error = self.execute_sql( sqlstr )
        return success, error 
    
    # Function to add a new record to the database
    def add_record(self, Record_fields, Record_data, table_name=None ):
        try:
            placeholders = ", ".join(["?" for _ in Record_fields])
            query = f"INSERT INTO {table_name} ({', '.join(Record_fields)}) VALUES ({placeholders})"
            print(query)
            # データベース接続
            with mydb.connect(self.connection_string) as connection:
                with connection.cursor() as cursor:
                    cursor.execute(query, Record_data)  # 実際の値を渡す
                    connection.commit()
                    return True, None
        except mydb.Error as err:
            return False, f"データベースエラー: {err}"
        except Exception as e:
            return False, f"その他のエラー: {e}"

    def delete_record(self, Record_name, Record_id ,tb_name):
        # Confirm deletion
        try:
            query = f"DELETE FROM  {tb_name} WHERE {Record_name} = ?"
            # Execute the delete query safely using parameters
            with mydb.connect(self.connection_string) as connection:
                with connection.cursor() as cursor:
                    cursor.execute(query, (Record_id,))  # Use parameterized query
                    connection.commit()
                    return True, None
        except mydb.Error as err:
            return False, f"データベースエラー: {err}"
        except Exception as e:
            return False, f"レコードの削除中にエラーが発生しました: {e}"

    # Function to update the record in the database
    def update_record(self, tb_name, data_fields, data_values):
        try:
            field_name = data_fields[0]
            Record_id = int(data_values[0])
            # Create an UPDATE SQL query to modify the record
            set_clause = ", ".join([f"{field} = ?" for field in data_fields])
            query = f"UPDATE { tb_name } SET {set_clause} WHERE {field_name} = ?"
            print(query)
            # Connecting to the database
            with mydb.connect(self.connection_string) as connection:
                with connection.cursor() as cursor:
                    cursor.execute(query, (*data_values, Record_id))  # Execute the query with updated values
                    connection.commit()
                    return True, None
        except mydb.Error as err:
            return False, f"データベースエラー: {err}"
        except Exception as e:
            return False, f"その他のエラー: {e}"

    def execute_sql(self, sqlstr, params=None):
        """ クエリを実行する """
        try: # エラー処理
            with mydb.connect(self.connection_string) as connection:
                with connection.cursor() as cursor:
                    cursor.execute(sqlstr, params or [])
                    return True, "SQL成功"
        except mydb.ProgrammingError as prog_err:
            return False, f"プログラミングエラー: {prog_err}"
        except mydb.DatabaseError as db_err:
            return False, f"データベースエラー: {db_err}"
        except mydb.Error as odbc_err:
            return False, f"ODBCエラー: {odbc_err}"
        except Exception as e:
            return False, f"ODBCエラー: {e}"
        
    def execute_query(self, query, params=None):
        """ クエリを実行する """
        try: # エラー処理
            with mydb.connect(self.connection_string) as connection:
                with connection.cursor() as cursor:
                    cursor.execute(query, params or [])
                    if cursor.description:  # If field names exist
                        self.data_field = [desc[0] for desc in cursor.description]  # Get field names
                        self.data_array = cursor.fetchall()  # Fetch results
                        return self.data_field, self.data_array, True, "クエリが成功しました" # Return success, no error message
                    else:
                        return None, None, True, "結果セットがありません。"           
        except mydb.ProgrammingError as prog_err:
            return None, None, False ,f"プログラミングエラー: {prog_err}"
        except mydb.DatabaseError as db_err:
            return None, None, False ,f"データベースエラー: {db_err}"
        except mydb.Error as odbc_err:
            return None, None, False ,f"ODBCエラー: {odbc_err}"
        except Exception as e:
            return None, None, False ,f"エラー: {e}"
    
    def print_Data_Array(self, data_field=None, data_array=None ): 
            if data_field == None :
                data_field = self.data_field
            if data_array == None :
                data_array = self.data_array 
            for item in data_field:
                print(item, ":", end="")
            print("")  # 改行
            for row in data_array :
                for item in row:
                    print(item, ":", end="")  # 各行を表示
                print("")  # 改行

""" メインプログラム """
def Data_customer():
    data = [
        ['顧客番号', '顧客名', '郵便番号', '住所1', '住所2', '住所3', '電話番号'],
        [2001, '三田沙也加', '125-0031', '東京都', '練馬区富士見台X-X-X', '', '03-36XX-XXXX'],
        [2002, '竹原由美', '177-0034', '神奈川県', '横浜市港北区下田町X-X-X', '下田ビル2F', '03-38XX-XXXX'],  
        [2003, '林香奈子', '223-0064', '東京都', '世田谷区代田X-X-X', '城田ビル', '045-56X-XXXX'],
        [2004, '町井秀人', '155-0033', '東京都', '練馬区氷川台X-X-X', '', '03-54XX-XXXX'],
        [2005, '三井雅人', '179-0084', '神奈川県', '相模原市南区大野台X-X-X', '', '03-66XX-XXXX'],
        [2006, '牧野卓', '252-0331', '東京都', '品川区大井町', '大井ビル', '042-32X-XXXX'],
        [2007, '須田秀樹', '244-0817', '東京都', '大田区石川町X-X-X', '', '045-82X-XXXX'],
        [2008, '駒井よし子', '145-0061', '神奈川県', '横浜市神奈川区新子安X-X-X', '', '03-98XX-XXXX'],      
        [2009, '三田沙也加', '125-0031', '東京都', '練馬区富士見台X-X-X', '', '03-36XX-XXXX'],
        [2010, '竹原由美', '177-0034', '神奈川県', '横浜市港北区下田町X-X-X', '下田ビル2F', '03-38XX-XXXX'],
        [2011, '林香奈子', '223-0064', '東京都', '世田谷区代田X-X-X', '城田ビル', '045-56X-XXXX'],
        [2012, '町井秀人', '155-0033', '東京都', '練馬区氷川台X-X-X', '', '03-54XX-XXXX'],
        [2013, '三井雅人', '179-0084', '神奈川県', '相模原市南区大野台X-X-X', '', '03-66XX-XXXX'],
        [2014, '牧野卓', '252-0331', '東京都', '品川区大井町', '大井ビル', '042-32X-XXXX'],
        [2015, '須田秀樹', '244-0817', '東京都', '大田区石川町X-X-X', '', '045-82X-XXXX'],
        [2016, '駒井よし子', '145-0061', '神奈川県', '横浜市神奈川区新子安X-X-X', '', '03-98XX-XXXX'],
        [2017, '三田沙也加', '125-0031', '東京都', '練馬区富士見台X-X-X', '', '03-36XX-XXXX'],
        [2018, '竹原由美', '177-0034', '神奈川県', '横浜市港北区下田町X-X-X', '下田ビル2F', '03-38XX-XXXX'], 
        [2019, '林香奈子', '223-0064', '東京都', '世田谷区代田X-X-X', '城田ビル', '045-56X-XXXX'],
        [2020, '牧野卓', '252-0331', '東京都', '品川区大井町', '大井ビル', '042-32X-XXXX']
    ]
    data_field = data[0]
    data_array = data[1:]  

    return data_field, data_array

def drop_tabale(db_Lib, tb_name ):
    success, error = db_Lib.dorop_tabale(tb_name)
    if success:
        print(f"{tb_name} を削除しました。")
    else:
        print(f"エラー: {error}")

def create_tabale_custmer( db_Lib, tb_name ):
    # テーブル作成のSQL文
    sqlstr = """
        CREATE TABLE 顧客マスタ (
        id AUTOINCREMENT PRIMARY KEY,
        顧客番号 LONG NOT NULL,
        顧客名 TEXT,
        郵便番号 TEXT,
        住所1 TEXT,
        住所2 TEXT,
        住所3 TEXT,
        電話番号 TEXT
        ); 
    """
    success, error = db_Lib.execute_sql( sqlstr )
    if success:
        print("顧客マスタ 作成しました。")
    else:
        print(f"エラー: {error}")

def insert_data( db_Lib, tb_name ):
        data_field, data_values = Data_customer()
        for data_value in data_values:
            success, error =db_Lib.add_record( data_field, data_value , tb_name  )
            if success:
                print("更新に成功しました。")
            else:
                print(f"更新にエラー: {error}")

        db_Lib.print_Data_Array()

def update_data( db_Lib, tb_name ):
    data_fields= ['顧客番号', '顧客名', '郵便番号', '住所1', '住所2', '住所3', '電話番号']
    data_values = [2028, '三井正人', '179-0084', '神奈川県', '相模原市南区大野台X-X-X', '', '03-66XX-XXXX']
    success, error = db_Lib.update_record( tb_name , data_fields, data_values )
    if success:
        print("更新に成功しました。")
    else:
        print(f"更新にエラー: {error}")

def delete_data( db_Lib, tb_name ):
    Record_id = '2029'
    Record_field = '顧客番号'
    success, error = db_Lib.delete_record( Record_field,Record_id, tb_name )
    if success:
        print("削除に成功しました。")
    else:
        print(f"削除エラー: {error}")

def add_data( db_Lib, tb_name ):
    # 新しいレコードを追加(IDは自動生成されるため指定しない)
    data_array = ['2029', '三井かずと', '179-0084', '神奈川県', '相模原市南区大野台X-X-X', '', '03-66XX-XXXX']
    data_field= ['顧客番号', '顧客名', '郵便番号', '住所1', '住所2', '住所3', '電話番号']
    success, error = db_Lib.add_record( data_field, data_array,tb_name, )
    if success:
        print("追加に成功しました。")
    else:
        print(f"追加エラー: {error}")

def select_data( db_Lib, tb_name):
    sqlstr =  f"SELECT * FROM {tb_name} WHERE 顧客番号 = ? ; "
    param = 2016
    data_field, data_array, success, error = db_Lib.execute_query(sqlstr, param )
    if success:
        print("取得に成功しました。")
    else:
        print(f"取得エラー: {error}")


def get_table_data(db_Lib, tb_name):
    data_field, data_array, success, error = db_Lib.get_table_data(tb_name)
    if success:
        print("取得に成功しました。")
    else:
        print(f"取得エラー: {error}")

def Main():
    db_path = r"Z:\\Database_Master\\eng001_Ver7.accdb"  # Path to your database
    db_name = "Access"
    tb_name = "顧客マスタ"
    db_Lib = DbAccessLib(db_path,db_name,tb_name)
    # テーブル削除
    drop_tabale( db_Lib, tb_name )
    # テーブル作成
    create_tabale_custmer( db_Lib, tb_name )
    # データの追加(初期)
    insert_data( db_Lib, tb_name )
    # データの追加
    add_data( db_Lib, tb_name )
    # データの削除
    delete_data( db_Lib, tb_name )
    # データの更新
    update_data( db_Lib, tb_name )
    # データの検索
    select_data( db_Lib, tb_name)
    db_Lib.print_Data_Array()

if __name__ == "__main__":
    Main()

よかったらシェアしてね!
目次