目次
Pythonで、Accessにアクセスする
PythonからAccessにアクセスして、データベースにアクセスするクラスライブラリを作りました。
クラス化のメリット
・メインプログラムを簡素化できる
メインプログラムの記述が単純化されます。
データベース処理とエラー処理をクラス側で行う。
メインプログラムは、データ処理の知識がいらない。
・他のプログラムでクラスを流用できる
エラーの無いプログラムを利用できる
メインプログラムの標準化・可読性が向上する
・複数のデータを同じ手法で取り扱いできる
別のインスタンスとして取り扱える
・メモリ(インスタンス)上で取り扱いできる
データベースへのアクセスが減る
必要な準備
Python で Microsoft Access データベースに接続するには、主に ODBC(Open Database Connectivity)を使用します。
Python のインストール
Python がインストールされていない場合は、公式サイトからインストールしてください。
必要なライブラリのインストール
Access データベースと Python を接続するためには、pyodbc ライブラリを使用します。以下のコマンドでインストールします:
pip install pyodbc
Access データベースファイルの準備
接続したい Access ファイル(例:database.accdb または database.mdb)を用意します。
ODBC ドライバの確認
Access データベースに接続するには ODBC ドライバが必要です。Windows 環境では通常、以下のどちらかがインストールされています:
Microsoft Access Database Engine
Microsoft Office
必要に応じて Access Database Engine をインストールしてください。
ODBCドライバーの確認
Windows管理ツール
→ODBCデータソース →ドライバー
→Microsoft Access Driver (*.mdb, *.accdb)
接続文字列の作成
# 接続文字列 connection_string = ( "Driver={Microsoft Access Driver (*.mdb, *.accdb)};" f"DBQ={db_path};" )
Accessデータベースの配置場所
ドライブ指定
db_path = r”Z:\\Database_Master\\eng001_Ver7.accdb”
ネットワークドライブ指定
db_path = r”\\\\landisk-0010\\Youtube\\Database_Master\\eng001_Ver7.accdb”
# # pip install pyodbc # import pyodbc as mydb # データベースファイルのパスを指定 database_path = "Z:\\Database_Master\\eng001_Ver7.accdb" # 接続文字列 connection_string = ( r"Driver={Microsoft Access Driver (*.mdb, *.accdb)};" f"DBQ={database_path};" ) try: # データベースに接続 connection = mydb.connect(connection_string) cursor = connection.cursor() # クエリの実行例: テーブルのデータを取得 cursor.execute("SELECT * FROM user_master") rows = cursor.fetchall() # 結果セットがある場合のみ表示 if cursor.description: # フィールド名が存在する場合のみ実行 field_names = [desc[0] for desc in cursor.description] # フィールド名を取得 for item in field_names: print(item, "\t", end="") print("") # 改行 data_array = cursor.fetchall() # 結果をリストとして取得 for row in data_array: for item in row: print(item, "\t", end="") # 各行を表示 print("") # 改行 else: print("クエリが成功しましたが、結果セットがありません。") except mydb.Error as err: print(f"MySQLエラー: {err}") except Exception as e: print(f"その他のエラー: {e}") finally: # 切断処理 if connection is not None : connection.close() print("接続を閉じました")
ソースコード
import pyodbc as mydb # import os #db_path = os.getenv("DB_PATH", r"Z:\\Database_Master\\eng001_Ver7.accdb") class DbAccessLib: """メンバー変数の登録""" # db_path = r"Z:\\Database_Master\\eng001_Ver7.accdb" # Path to your database db_path = r"\\\\landisk-0010\\Youtube\\Database_Master\\eng001_Ver7.accdb" # Path to your database db_name = "Access" tb_name = "顧客マスタ" data_field = [] data_array = [] def __init__(self, db_path=None,db_name=None,tb_name=None): """ DbAccessLib クラスのコンストラクタ。 """ # デフォルトのデータベースパスを設定 if db_path is None: db_path = r"Z:\\Database_Master\\eng001_Ver7.accdb" if db_name is None: db_name = "Access" if tb_name is None: tb_name = "顧客マスタ" # データベース接続文字列の構築 self.connection_string = ( "Driver={Microsoft Access Driver (*.mdb, *.accdb)};" f"DBQ={db_path};" ) self.db_path = db_path self.db_name = db_name self.tb_name = tb_name # Function to get data array from the database def get_table_data(self, tb_name): query = "SELECT * FROM " + tb_name + " ;" # SQL query to fetch data try: # Connecting to the Access database with mydb.connect(self.connection_string) as connection: with connection.cursor() as cursor: cursor.execute(query) if cursor.description: # If field names exist self.data_field = [desc[0] for desc in cursor.description] # Get field names self.data_array = cursor.fetchall() # Fetch results return self.data_field, self.data_array, False, "クエリが成功しました" # Return success, no error message else: return None, None, False, "結果セットがありません。" except mydb.Error as err: return None, None, True, f"データベースエラー: {err}" except Exception as e: return None, None, True , f"その他のエラー: {e}" def dorop_tabale(self,tb_name): sqlstr = f"DROP TABLE {tb_name} ; " success, error = self.execute_sql( sqlstr ) return success, error # Function to add a new record to the database def add_record(self, Record_fields, Record_data, table_name=None ): try: placeholders = ", ".join(["?" for _ in Record_fields]) query = f"INSERT INTO {table_name} ({', '.join(Record_fields)}) VALUES ({placeholders})" print(query) # データベース接続 with mydb.connect(self.connection_string) as connection: with connection.cursor() as cursor: cursor.execute(query, Record_data) # 実際の値を渡す connection.commit() return True, None except mydb.Error as err: return False, f"データベースエラー: {err}" except Exception as e: return False, f"その他のエラー: {e}" def delete_record(self, Record_name, Record_id ,tb_name): # Confirm deletion try: query = f"DELETE FROM {tb_name} WHERE {Record_name} = ?" # Execute the delete query safely using parameters with mydb.connect(self.connection_string) as connection: with connection.cursor() as cursor: cursor.execute(query, (Record_id,)) # Use parameterized query connection.commit() return True, None except mydb.Error as err: return False, f"データベースエラー: {err}" except Exception as e: return False, f"レコードの削除中にエラーが発生しました: {e}" # Function to update the record in the database def update_record(self, tb_name, data_fields, data_values): try: field_name = data_fields[0] Record_id = int(data_values[0]) # Create an UPDATE SQL query to modify the record set_clause = ", ".join([f"{field} = ?" for field in data_fields]) query = f"UPDATE { tb_name } SET {set_clause} WHERE {field_name} = ?" print(query) # Connecting to the database with mydb.connect(self.connection_string) as connection: with connection.cursor() as cursor: cursor.execute(query, (*data_values, Record_id)) # Execute the query with updated values connection.commit() return True, None except mydb.Error as err: return False, f"データベースエラー: {err}" except Exception as e: return False, f"その他のエラー: {e}" def execute_sql(self, sqlstr, params=None): """ クエリを実行する """ try: # エラー処理 with mydb.connect(self.connection_string) as connection: with connection.cursor() as cursor: cursor.execute(sqlstr, params or []) return True, "SQL成功" except mydb.ProgrammingError as prog_err: return False, f"プログラミングエラー: {prog_err}" except mydb.DatabaseError as db_err: return False, f"データベースエラー: {db_err}" except mydb.Error as odbc_err: return False, f"ODBCエラー: {odbc_err}" except Exception as e: return False, f"ODBCエラー: {e}" def execute_query(self, query, params=None): """ クエリを実行する """ try: # エラー処理 with mydb.connect(self.connection_string) as connection: with connection.cursor() as cursor: cursor.execute(query, params or []) if cursor.description: # If field names exist self.data_field = [desc[0] for desc in cursor.description] # Get field names self.data_array = cursor.fetchall() # Fetch results return self.data_field, self.data_array, True, "クエリが成功しました" # Return success, no error message else: return None, None, True, "結果セットがありません。" except mydb.ProgrammingError as prog_err: return None, None, False ,f"プログラミングエラー: {prog_err}" except mydb.DatabaseError as db_err: return None, None, False ,f"データベースエラー: {db_err}" except mydb.Error as odbc_err: return None, None, False ,f"ODBCエラー: {odbc_err}" except Exception as e: return None, None, False ,f"エラー: {e}" def print_Data_Array(self, data_field=None, data_array=None ): if data_field == None : data_field = self.data_field if data_array == None : data_array = self.data_array for item in data_field: print(item, ":", end="") print("") # 改行 for row in data_array : for item in row: print(item, ":", end="") # 各行を表示 print("") # 改行 """ メインプログラム """ def Data_customer(): data = [ ['顧客番号', '顧客名', '郵便番号', '住所1', '住所2', '住所3', '電話番号'], [2001, '三田沙也加', '125-0031', '東京都', '練馬区富士見台X-X-X', '', '03-36XX-XXXX'], [2002, '竹原由美', '177-0034', '神奈川県', '横浜市港北区下田町X-X-X', '下田ビル2F', '03-38XX-XXXX'], [2003, '林香奈子', '223-0064', '東京都', '世田谷区代田X-X-X', '城田ビル', '045-56X-XXXX'], [2004, '町井秀人', '155-0033', '東京都', '練馬区氷川台X-X-X', '', '03-54XX-XXXX'], [2005, '三井雅人', '179-0084', '神奈川県', '相模原市南区大野台X-X-X', '', '03-66XX-XXXX'], [2006, '牧野卓', '252-0331', '東京都', '品川区大井町', '大井ビル', '042-32X-XXXX'], [2007, '須田秀樹', '244-0817', '東京都', '大田区石川町X-X-X', '', '045-82X-XXXX'], [2008, '駒井よし子', '145-0061', '神奈川県', '横浜市神奈川区新子安X-X-X', '', '03-98XX-XXXX'], [2009, '三田沙也加', '125-0031', '東京都', '練馬区富士見台X-X-X', '', '03-36XX-XXXX'], [2010, '竹原由美', '177-0034', '神奈川県', '横浜市港北区下田町X-X-X', '下田ビル2F', '03-38XX-XXXX'], [2011, '林香奈子', '223-0064', '東京都', '世田谷区代田X-X-X', '城田ビル', '045-56X-XXXX'], [2012, '町井秀人', '155-0033', '東京都', '練馬区氷川台X-X-X', '', '03-54XX-XXXX'], [2013, '三井雅人', '179-0084', '神奈川県', '相模原市南区大野台X-X-X', '', '03-66XX-XXXX'], [2014, '牧野卓', '252-0331', '東京都', '品川区大井町', '大井ビル', '042-32X-XXXX'], [2015, '須田秀樹', '244-0817', '東京都', '大田区石川町X-X-X', '', '045-82X-XXXX'], [2016, '駒井よし子', '145-0061', '神奈川県', '横浜市神奈川区新子安X-X-X', '', '03-98XX-XXXX'], [2017, '三田沙也加', '125-0031', '東京都', '練馬区富士見台X-X-X', '', '03-36XX-XXXX'], [2018, '竹原由美', '177-0034', '神奈川県', '横浜市港北区下田町X-X-X', '下田ビル2F', '03-38XX-XXXX'], [2019, '林香奈子', '223-0064', '東京都', '世田谷区代田X-X-X', '城田ビル', '045-56X-XXXX'], [2020, '牧野卓', '252-0331', '東京都', '品川区大井町', '大井ビル', '042-32X-XXXX'] ] data_field = data[0] data_array = data[1:] return data_field, data_array def drop_tabale(db_Lib, tb_name ): success, error = db_Lib.dorop_tabale(tb_name) if success: print(f"{tb_name} を削除しました。") else: print(f"エラー: {error}") def create_tabale_custmer( db_Lib, tb_name ): # テーブル作成のSQL文 sqlstr = """ CREATE TABLE 顧客マスタ ( id AUTOINCREMENT PRIMARY KEY, 顧客番号 LONG NOT NULL, 顧客名 TEXT, 郵便番号 TEXT, 住所1 TEXT, 住所2 TEXT, 住所3 TEXT, 電話番号 TEXT ); """ success, error = db_Lib.execute_sql( sqlstr ) if success: print("顧客マスタ 作成しました。") else: print(f"エラー: {error}") def insert_data( db_Lib, tb_name ): data_field, data_values = Data_customer() for data_value in data_values: success, error =db_Lib.add_record( data_field, data_value , tb_name ) if success: print("更新に成功しました。") else: print(f"更新にエラー: {error}") db_Lib.print_Data_Array() def update_data( db_Lib, tb_name ): data_fields= ['顧客番号', '顧客名', '郵便番号', '住所1', '住所2', '住所3', '電話番号'] data_values = [2028, '三井正人', '179-0084', '神奈川県', '相模原市南区大野台X-X-X', '', '03-66XX-XXXX'] success, error = db_Lib.update_record( tb_name , data_fields, data_values ) if success: print("更新に成功しました。") else: print(f"更新にエラー: {error}") def delete_data( db_Lib, tb_name ): Record_id = '2029' Record_field = '顧客番号' success, error = db_Lib.delete_record( Record_field,Record_id, tb_name ) if success: print("削除に成功しました。") else: print(f"削除エラー: {error}") def add_data( db_Lib, tb_name ): # 新しいレコードを追加(IDは自動生成されるため指定しない) data_array = ['2029', '三井かずと', '179-0084', '神奈川県', '相模原市南区大野台X-X-X', '', '03-66XX-XXXX'] data_field= ['顧客番号', '顧客名', '郵便番号', '住所1', '住所2', '住所3', '電話番号'] success, error = db_Lib.add_record( data_field, data_array,tb_name, ) if success: print("追加に成功しました。") else: print(f"追加エラー: {error}") def select_data( db_Lib, tb_name): sqlstr = f"SELECT * FROM {tb_name} WHERE 顧客番号 = ? ; " param = 2016 data_field, data_array, success, error = db_Lib.execute_query(sqlstr, param ) if success: print("取得に成功しました。") else: print(f"取得エラー: {error}") def get_table_data(db_Lib, tb_name): data_field, data_array, success, error = db_Lib.get_table_data(tb_name) if success: print("取得に成功しました。") else: print(f"取得エラー: {error}") def Main(): db_path = r"Z:\\Database_Master\\eng001_Ver7.accdb" # Path to your database db_name = "Access" tb_name = "顧客マスタ" db_Lib = DbAccessLib(db_path,db_name,tb_name) # テーブル削除 drop_tabale( db_Lib, tb_name ) # テーブル作成 create_tabale_custmer( db_Lib, tb_name ) # データの追加(初期) insert_data( db_Lib, tb_name ) # データの追加 add_data( db_Lib, tb_name ) # データの削除 delete_data( db_Lib, tb_name ) # データの更新 update_data( db_Lib, tb_name ) # データの検索 select_data( db_Lib, tb_name) db_Lib.print_Data_Array() if __name__ == "__main__": Main()