【 Python + PostgreSQL 】with ステートメントとコネクションプール ( connection pool ) を活用して、効率的で安全なデータベースコネクション、カーソル ( cursor ) の供給、利用ができるようにしよう!(その2) 投稿一覧へ戻る

Tags: Python , PostgreSQL , with , rollback , simpleconnectionpool

Published 2020年7月10日21:08 by T.Tsuyoshi

前回(その1)の記事は こちら からどうぞ。


今回は前回の最後に提起した課題から取り組んでいきましょう。


class ConnectionPool:
def __init__(self):
self.connection_pool = pool.SimpleConnectionPool(minconn=1,
maxconn=1,
database="Learning",
user="postgres",
password="1234",
host="localhost")

def __enter__(self):
""" with ステートメントに入る段階で呼び出されます"""
return self.connection_pool.getconn()

def __exit__(self, exc_type, exc_val, exc_tb):
""" with ステートメントから出る段階で呼び出されます"""
pass



まずは「 1: __exit__ 特殊関数において __enter__ 特殊関数で渡したコネクションをプールに戻すこと」です。
やりたいことは明確です。 __exit__ メソッド内において以下のコードを実行できるようにすることです。


self.connection_pool.putconn(...)



では、現在の実装でこれができない理由は何でしょう?


それは putconn() に渡すべきコネクションが不明、ということです。
__enter__ メソッドで渡したコネクションを指定しなければいけませんが、その情報をこのクラスでは保有していません。
そのコネクションは呼び出し元に投げっ放しになっています。


ではこのクラスを作り直してコネクションの情報を保有しておけるようにしましょう。


connection_pool = pool.SimpleConnectionPool(minconn=1,
maxconn=1,
database="Learning",
user="postgres",
password="1234",
host="localhost")


class ConnectionFromPool:
def __init__(self):
self.connection = None

def __enter__(self):
""" with ブロックに入る段階で呼び出されます"""
self.connection = connection_pool.getconn()
return self.connection

def __exit__(self, exc_type, exc_val, exc_tb):
""" with ブロックから出る段階で呼び出されます"""
connection_pool.putconn(self.connection)



コネクションプールの作成をクラスの外で行うように変更し、渡したコネクションの情報をクラス内で保持するようにしました。
実はこの実装によって、2つ目の課題「 2: with ステートメントが開始される時点でいちいちコネクションプールが作成されないようにする」もクリアしています。


期待通りに動作するか試してみましょう。


class Country:
def __init__(self, name, code, id=None):
self.name = name
self.code = code
self.id = id

def __repr__(self):
return f"Country(name={self.name!r}, code={self.code!r}, id={self.id})"

def save_to_db(self):
with ConnectionFromPool() as connection:
with connection.cursor() as cursor:
cursor.execute('INSERT INTO countries (name, code) VALUES (%s, %s)',
(self.name, self.code))
connection.commit()

@classmethod
def load_from_db_by_name(cls, name):
with ConnectionFromPool() as connection:
with connection.cursor() as cursor:
cursor.execute('SELECT * FROM countries WHERE name = %s', (name,))
country = cursor.fetchone()
return cls(name=country[1], code=country[2], id=country[0])


result = Country.load_from_db_by_name('Germany')
print(result)
# Country(name='Germany', code='DEU', id=4)


new_country = Country('Mexico', 'MEX')
new_country.save_to_db()


result = Country.load_from_db_by_name('Mexico')
print(result)
# Country(name='Mexico', code='MEX', id=5)



大丈夫そうです。
ただ、前回 __exit__ を実装していなかった段階でのテストで、コネクションプールとコネクションに焦点を当てるために、データ保存時にエラーが出ないように Country クラスの save_to_db() 末尾に connection.commit() を追加しました。


ここまでで ConnectionFromPool クラスの __exit__ が期待通りに動作することが確認できましたから、本来 with ブロックを抜ける際に実行されるべきデータ保存のコードも __exit__ に記述しましょう。


以上を全て満たすここまでの実装は以下のようになります。


connection_pool = pool.SimpleConnectionPool(minconn=1,
maxconn=1,
database="Learning",
user="postgres",
password="1234",
host="localhost")


class ConnectionFromPool:
def __init__(self):
self.connection = None

def __enter__(self):
""" with ブロックに入る段階で呼び出されます"""
self.connection = connection_pool.getconn()
return self.connection

def __exit__(self, exc_type, exc_val, exc_tb):
""" with ブロックから出る段階で呼び出されます"""
self.connection.commit()
connection_pool.putconn(self.connection)


class Country:
def __init__(self, name, code, id=None):
self.name = name
self.code = code
self.id = id

def __repr__(self):
return f"Country(name={self.name!r}, code={self.code!r}, id={self.id})"

def save_to_db(self):
with ConnectionFromPool() as connection:
with connection.cursor() as cursor:
cursor.execute('INSERT INTO countries (name, code) VALUES (%s, %s)',
(self.name, self.code))

@classmethod
def load_from_db_by_name(cls, name):
with ConnectionFromPool() as connection:
with connection.cursor() as cursor:
cursor.execute('SELECT * FROM countries WHERE name = %s', (name,))
country = cursor.fetchone()
return cls(name=country[1], code=country[2], id=country[0])


result = Country.load_from_db_by_name('Mexico')
print(result)
# Country(name='Mexico', code='MEX', id=5)


new_country = Country('Indonesia', 'IDN')
new_country.save_to_db()


result = Country.load_from_db_by_name('Indonesia')
print(result)
# Country(name='Indonesia', code='IDN', id=6)



さてさて、まだまだ改善の余地がありますね。


パッと見て気付くのは、今回もコードの重複です。
1つ目の with ステートメントでコネクションを取得し、そのコネクションを利用して2つ目の with ステートメントでカーソル ( cursor ) を取得しています。
これがカーソルを使用したい全ての場所で登場するわけです、これはいただけません。


ちょっと考えてみてください。コネクション自体がしている仕事は何でしょう?
カーソルを供給する、ということだけです。
実際にデータベースへ問い合わせをして結果を取得するのはカーソルの仕事です。
ということは、データベースとやり取りをする場所ではカーソルだけが取得できれば何の問題も無いわけです。


ということで、現在コネクションプールからコネクションを取り出して供給している ConnectionFromPool クラスをカーソルを供給するクラスへと変貌させましょう。


class CursorFromConnectionFromPool:
def __init__(self):
self.connection = None
self.cursor = None

def __enter__(self):
""" with ブロックに入る段階で呼び出されます"""
self.connection = connection_pool.getconn()
self.cursor = self.connection.cursor()
return self.cursor

def __exit__(self, exc_type, exc_val, exc_tb):
""" with ブロックから出る段階で呼び出されます"""
self.cursor.close()
self.connection.commit()
connection_pool.putconn(self.connection)



この変更によって利用する側の with ステートメントのネストが解消されます。


class Country:
def __init__(self, name, code, id=None):
self.name = name
self.code = code
self.id = id

def __repr__(self):
return f"Country(name={self.name!r}, code={self.code!r}, id={self.id})"

def save_to_db(self):
with CursorFromConnectionFromPool() as cursor:
cursor.execute('INSERT INTO countries (name, code) VALUES (%s, %s)',
(self.name, self.code))

@classmethod
def load_from_db_by_name(cls, name):
with CursorFromConnectionFromPool() as cursor:
cursor.execute('SELECT * FROM countries WHERE name = %s', (name,))
country = cursor.fetchone()
return cls(name=country[1], code=country[2], id=country[0])


result = Country.load_from_db_by_name('Indonesia')
print(result)
# Country(name='Indonesia', code='IDN', id=6)


new_country = Country('Canada', 'CAN')
new_country.save_to_db()


result = Country.load_from_db_by_name('Canada')
print(result)
# Country(name='Canada', code='CAN', id=7)



ただしこの実装方法には短所もあります


1つのデータベースコネクションに対し1つのカーソルしか供給できない、という制約を与えてしまうことです。


もし1つ1つが非常に大きいデータを数多く処理しなければいけないような、1つのコネクションに対して複数のカーソルをマルチスレッドで動かす必要があるような場面では最適な方法ではない、ということは認識しておかなければなりません。


あと、この実装方法の現時点での問題点は、ロールバックに対応していない、ということです。


ロールバック ( rollback ) とは、データベースへの処理途中で外部キー制約 ( Foreign Key constraint ) などの違反が見つかった場合に、その処理全体を取り消し、処理が行われる直前の正常な状態までデータベースを戻す(ロールバックする)ことです。


この処理を実装する際の主役は __exit__ 特殊関数に渡されてくる3つのパラメータです。
この3つのパラメータの頭文字 exc_... は exception_... の略です。


class CursorFromConnectionFromPool:
def __init__(self):
self.connection = None
self.cursor = None

def __enter__(self):
""" with ブロックに入る段階で呼び出されます"""
self.connection = connection_pool.getconn()
self.cursor = self.connection.cursor()
return self.cursor

def __exit__(self, exc_type, exc_val, exc_tb):
""" with ブロックから出る段階で呼び出されます"""
if exc_val is not None:
# トランザクション中にエラー発生。現在のコネクション中に生じた変更を全て取り消します。
self.connection.rollbak()
else:
self.cursor.close()
self.connection.commit()

# エラーが発生してもしなくてもコネクションはプールに戻さなければいけません。
connection_pool.putconn(self.connection)



さて、機能的にはかなり満足できる実装になってきました。
しかし立ち止まっていてはいけません、常に上を目指しましょう、しんどいですけど頑張るぞ!


ということで不満点を無理矢理にでも探し出します!


普通であればこれらの実装は複数のファイル ( Python パッケージ ) に分割されて管理されるはずです。


その場合、プログラムの開始コードを記述したファイルでは Country クラスをインポートし、Country クラスでは CursorFromConnectionFromPool クラスをインポートすることになるでしょう。


そのような構成でプログラムを開始した場合、Python では各パッケージの先頭に記述されている (であろう) import 文をたどっていき、ネストの一番奥にある import 文の実行部分を処理しながら戻ってきて、開始コードが記述されているファイルの処理に取り掛かります。


その手順をこのプログラムに当てはめてみると、一番最初に実行されるのは connection_pool = SimpleConnectionPool() になりそうです。


何よりも先にコネクションプールを作成し、必要なコネクションを作成してからプログラムが始まるんです。


もしこのプログラムがグラフィカルユーザーインターフェース ( GUI ) を持っていたとした、ユーザーのためにまず最初に実行すべきは GUI を表示することであってコネクションプールを作ることではないはずです。


さあ、お待ちかねの改良点が見つりました。


何とかして connection_pool = SimpleConnectionPool() がプログラムの一番最初に実行されることを阻止しましょう。


そのためには?
実行文でなくしてしまうことです。クラスを作ってその中に閉じ込めてしまうことです。
プログラム開始時にはクラスは評価されるだけで実行されませんからね。


なおかつ、コネクションプール作成をそのクラスの1つのメソッドとして提供できるようにすれば、いつコネクションプールを作成するのか、という決断をユーザーに委ねることができるようになります。


class Database:
connection_pool = None

@classmethod
def initialize(cls):
if not cls.connection_pool:
cls.connection_pool = pool.SimpleConnectionPool(minconn=1,
maxconn=1,
database="Learning",
user="postgres",
password="1234",
host="localhost")



connection_pool は Database クラスのプロパティであって、1つ1つのオブジェクトに属するのではない、というところに注目です。
これによって、全ての Database オブジェクトは1つの connection_pool を共有するようになります。


さて、この Database クラスを使ってコネクションを取得する場合、Database.connection_pool.getconn() という構文になりますね。
何か長ったらしくないですか? せめて Database.get_connection()、プールに戻すなら Database.return_connection() の方がスマートです。


そんなに面倒臭くはないですから実装しちゃいましょう。


class Database:
connection_pool = None

@classmethod
def initialize(cls):
if not cls.connection_pool:
cls.connection_pool = pool.SimpleConnectionPool(minconn=1,
maxconn=1,
database="Learning",
user="postgres",
password="1234",
host="localhost")

@classmethod
def get_connection(cls):
return cls.connection_pool.getconn()

@classmethod
def return_connection(cls, connection):
cls.connection_pool.putconn(connection)

@classmethod
def close_all_connections(cls):
cls.connection_pool.closeall()



これでコネクションプールに関わる機能を1箇所にまとめることができました。
動作確認です。


class CursorFromConnectionFromPool:
def __init__(self):
self.connection = None
self.cursor = None

def __enter__(self):
""" with ブロックに入る段階で呼び出されます"""
self.connection = Database.get_connection()
self.cursor = self.connection.cursor()
return self.cursor

def __exit__(self, exc_type, exc_val, exc_tb):
""" with ブロックから出る段階で呼び出されます"""
if exc_val is not None:
# トランザクション中にエラー発生。現在のコネクション中に生じた変更を全て取り消します。
self.connection.rollbak()
else:
self.cursor.close()
self.connection.commit()

# エラーが発生してもしなくてもコネクションはプールに戻さなければいけません。
Database.return_connection(self.connection)


class Country:
def __init__(self, name, code, id=None):
self.name = name
self.code = code
self.id = id

def __repr__(self):
return f"Country(name={self.name!r}, code={self.code!r}, id={self.id})"

def save_to_db(self):
with CursorFromConnectionFromPool() as cursor:
cursor.execute('INSERT INTO countries (name, code) VALUES (%s, %s)',
(self.name, self.code))

@classmethod
def load_from_db_by_name(cls, name):
with CursorFromConnectionFromPool() as cursor:
cursor.execute('SELECT * FROM countries WHERE name = %s', (name,))
country = cursor.fetchone()
return cls(name=country[1], code=country[2], id=country[0])


Database.initialize()


result = Country.load_from_db_by_name('Canada')
print(result)
# Country(name='Canada', code='CAN', id=7)


new_country = Country('Greece', 'GRC')
new_country.save_to_db()


result = Country.load_from_db_by_name('Greece')
print(result)
# Country(name='Greece', code='GRC', id=8)



Database.Initialize() を呼ぶことを忘れないでください。コネクションプールが空のままではエラーになります。


さて、いよいよ最終盤です。Database クラスを仕上げましょう。


Python におけるクラス構築の際に常に問題になるのが、private という概念がないことです。
クラスの属性は基本的にすべて public でクラスの外側からアクセスできます。


クロージャー ( closure ) を利用して秘匿することは可能ですが、1つの変数のために、となるとちょっと面倒くさいのも事実です。


ただし、「この属性にはダイレクトにアクセスせず、その属性を提供するためのメソッドを経由してくださいね」ということを明確にしておくことは重要です。


この Database クラスにおいても、Database.connection_pool.getconn() という構文で、Database.get_connection() を経由せずにコネクションプールからコネクションを取得することが可能です。


しかし、将来的に get_connection() に何らかの処理を追加した場合、アプリケーション全体で整合が取れなくなってしまう恐れがあります。


そのためにも最低限の「宣言」はしておくべきでしょう。
Python ではクラス属性の前に __ ( ダブルアンダースコア ) を追加することで「これはプライベートです」という意味合いを持たせることができます (実際に通常の方法ではアクセスできなくなります)。


class Database:
__connection_pool = None

@classmethod
def initialize(cls):
if not cls.__connection_pool:
cls.__connection_pool = pool.SimpleConnectionPool(minconn=1,
maxconn=1,
database="Learning",
user="postgres",
password="1234",
host="localhost")

@classmethod
def get_connection(cls):
return cls.__connection_pool.getconn()

@classmethod
def return_connection(cls, connection):
cls.__connection_pool.putconn(connection)

@classmethod
def close_all_connections(cls):
cls.__connection_pool.closeall()



それともう1つ手直ししましょう。


これは最初の方から気になっていた方がいるかもしれませんが、SimpleConnectionPool のパラメータを外部から渡せるようにすることでプログラムをよりジェネリックにすることです。


つまり Database.initialize() の呼び出しで対象とするデータベースなどをその都度指定できるようにします。


Database.initialize(database="Learning", user="postgres", password="1234", host="localhost")



さて、受け取る側ですが、このように名前付き引数として渡されてくれば「手間無しアンパック受け取り」です。


class Database:
__connection_pool = None

@classmethod
def initialize(cls, **kwargs):
if not cls.__connection_pool:
cls.__connection_pool = pool.SimpleConnectionPool(minconn=1, maxconn=10, **kwargs)

@classmethod
def get_connection(cls):
return cls.__connection_pool.getconn()

@classmethod
def return_connection(cls, connection):
cls.__connection_pool.putconn(connection)

@classmethod
def close_all_connections(cls):
cls.__connection_pool.closeall()



最終確認です。


class CursorFromConnectionFromPool:
def __init__(self):
self.connection = None
self.cursor = None

def __enter__(self):
""" with ブロックに入る段階で呼び出されます"""
self.connection = Database.get_connection()
self.cursor = self.connection.cursor()
return self.cursor

def __exit__(self, exc_type, exc_val, exc_tb):
""" with ブロックから出る段階で呼び出されます"""
if exc_val is not None:
# トランザクション中にエラー発生。現在のコネクション中に生じた変更を全て取り消します。
self.connection.rollbak()
else:
self.cursor.close()
self.connection.commit()

# エラーが発生してもしなくてもコネクションはプールに戻さなければいけません。
Database.return_connection(self.connection)


class Country:
def __init__(self, name, code, id=None):
self.name = name
self.code = code
self.id = id

def __repr__(self):
return f"Country(name={self.name!r}, code={self.code!r}, id={self.id})"

def save_to_db(self):
with CursorFromConnectionFromPool() as cursor:
cursor.execute('INSERT INTO countries (name, code) VALUES (%s, %s)',
(self.name, self.code))

@classmethod
def load_from_db_by_name(cls, name):
with CursorFromConnectionFromPool() as cursor:
cursor.execute('SELECT * FROM countries WHERE name = %s', (name,))
country = cursor.fetchone()
return cls(name=country[1], code=country[2], id=country[0])


Database.initialize(database="Learning", user="postgres", password="1234", host="localhost")


result = Country.load_from_db_by_name('Greece')
print(result)
# Country(name='Greece', code='GRC', id=8)


new_country = Country('Singapore', 'SGP')
new_country.save_to_db()


result = Country.load_from_db_by_name('Singapore')
print(result)
# Country(name='Singapore', code='SGP', id=9)



2回にわたる長い記事になりました。
最後まで読んでいただき有難うございます。
ちょっとでもお役に立つ部分があればよかったのですが...

この投稿をメールでシェアする

0 comments

コメントはまだありません。

コメントを追加する(不適切と思われるコメントは削除する場合があります)