Pythonでデータベース操作を行う際には、SQLAlchemyというライブラリが便利です。今回は、SQLAlchemyを使ってMySQLに接続し、データベースを操作する方法について解説します。
SQLAlchemyのインストール
まずは、SQLAlchemyをインストールします。以下のコマンドを実行してください。
pip install sqlalchemy
MySQLに接続する
SQLAlchemyを使ってMySQLに接続するには、以下のようにします。
from sqlalchemy import create_engine engine = create_engine('mysql+pymysql://ユーザー名:パスワード@ホスト名:ポート番号/データベース名?charset=utf8')
create_engine関数に接続情報を指定します。mysql+pymysqlはMySQL用のドライバで、ユーザー名、パスワード、ホスト名、ポート番号、データベース名は接続するMySQLサーバーの情報です。charset=utf8は文字コードを指定しています。
データベースに接続する
create_engine関数でエンジンを作成した後、connectメソッドを呼び出してデータベースに接続します。
conn = engine.connect()
テーブルを作成する
次に、テーブルを作成してみましょう。以下のように、Tableクラスを使ってテーブルを定義します。
from sqlalchemy import Table, Column, Integer, String, MetaData metadata = MetaData() users = Table('users', metadata, Column('id', Integer, primary_key=True), Column('name', String(255)), Column('age', Integer), ) metadata.create_all(engine)
上記の例では、usersというテーブルを定義しています。Columnクラスを使ってカラムを定義し、Tableクラスに渡しています。metadata.create_all(engine)を呼び出すことで、実際にデータベース上にテーブルを作成することができます。
データを挿入する
テーブルが作成できたら、データを挿入してみましょう。以下のように、insertメソッドを使ってデータを挿入できます。
conn.execute(users.insert(), [
{'name': 'Alice', 'age': 25},
{'name': 'Bob', 'age': 30},
])
データを取得する
データを挿入した後は、selectメソッドを使ってデータを取得することができます。
result = conn.execute(users.select()) for row in result: print(row)
条件を指定して特定の行を取得する
データベース内のデータを取得する際に、特定の条件を指定するには、SQLAlchemyのselect()関数を使用します。以下のように、条件を指定してデータを取得することができます。
from sqlalchemy import select # name列が'Alice'の行を取得する stmt = select([users]).where(users.c.name == 'Alice') result = conn.execute(stmt)
上記の例では、select()関数にusersテーブルを指定し、where()メソッドを使ってname列が'Alice'である行を条件として指定しています。execute()メソッドを使ってSQL文を実行し、結果をresultに格納しています。
複数の条件を指定して特定の行を取得する
また、select()関数には、複数の条件を指定することもできます。以下の例では、and_()関数を使って複数の条件を指定しています。
from sqlalchemy import and_ # name列が'Alice'で、age列が25の行を取得する stmt = select([users]).where(and_(users.c.name == 'Alice', users.c.age == 25)) result = conn.execute(stmt)
上記の例では、and_()関数を使ってname列が'Alice'でかつage列が25である行を条件として指定しています。
列を指定して取得する
さらに、select()関数には、取得する列を指定することもできます。以下の例では、select()関数にname列とage列を指定しています。
# name列とage列を取得する
stmt = select([users.c.name, users.c.age])
result = conn.execute(stmt)
上記の例では、select()関数にusers.c.nameとusers.c.ageを指定しています。この場合、execute()メソッドの結果は、各行のname列とage列の値を含むタプルのリストとなります。
データを更新する
データベース内のデータを更新するには、SQLAlchemyのupdate()関数を使用します。具体的には、以下のような方法でデータを更新することができます。
from sqlalchemy import update # データベース内のデータを更新する stmt = update(users).where(users.c.name == 'Bob').values(age=30) conn.execute(stmt)
上記の例では、usersテーブル内でname列が'Bob'である行のage列を30に更新しています。update()関数には、where()メソッドを使って条件を指定し、values()メソッドを使って更新する値を指定します。
エラーと対処法
OperationalError
接続に失敗した場合や操作に失敗した場合、SQLAlchemyから例外が発生します。例外をキャッチして、適切に処理することが重要です。
たとえば、接続に失敗した場合はOperationalError例外が発生します。以下のように、try-exceptブロックを使って例外をキャッチしてエラーメッセージを表示することができます。
from sqlalchemy.exc import OperationalError try: conn = engine.connect() except OperationalError as e: print(f'接続エラー: {e}')
IntegrityError
また、データベースに挿入しようとしたデータがテーブルの定義に違反している場合には、IntegrityError例外が発生します。以下のように、try-exceptブロックを使って例外をキャッチしてエラーメッセージを表示することができます。
from sqlalchemy.exc import IntegrityError try: conn.execute(users.insert(), [ {'name': 'Alice', 'age': 'abc'}, ]) except IntegrityError as e: print(f'挿入エラー: {e}')
InvalidRequestError
更新対象のデータが存在しない場合には、InvalidRequestError例外が発生します。例外を適切に処理することで、アプリケーションの安定性を確保することができます。
from sqlalchemy.exc import InvalidRequestError try: stmt = update(users).where(users.c.name == 'Bob').values(age=30) result = conn.execute(stmt) except InvalidRequestError as e: print(f'更新エラー: {e}')
まとめ
以上が、SQLAlchemyを使ってMySQLに接続し、データベースを操作する方法の基本的な解説でした。SQLAlchemyを使うことで、Pythonから簡単にデータベース操作を行うことができます。また、例外を適切に処理することで、安定したアプリケーションを作ることができます。Pythonを利用したシステム開発の学習には下記のような講座の利用が有効です。