こんにちは、Shimaco(@shimacotrip)です。
DataframeからRDBに書き込みたい時。タイトル通りの便利で素敵なPythonライブラリを見つけたのですが、日本語・英語ともに記事がないため、備忘録の意味を込めて記事にしました。
あくまで超初心者の備忘録ですので「そんなの知ってるわい」という方はどうぞ読み飛ばしてください。
結論だけ読みたい方はこちらへ。2023/6/24(土)追記:読者様からお寄せいただいた情報によると関数が変更になっているようです。先日私も試して動作したので、追記致します。
※今回はSparkがベースのDatabricksという分散処理プラットフォーム上で、Pythonで記述した場合が前提になっています。
PostgresのUPESRTとは
PostgresではUPSERTは用意されておらず、ON CONFLICT句を使います。詳しくはPostgresの公式ぽい?ドキュメントをご覧ください。
※しばらくONとCONFLICT逆に書いてました、、恥ずかしい。。
2023/6/24(土)追記:先日、MySQLへのUpsertも試したところできました。接続文字列をMySQLにすればOKです。
ただし、Spark上でSQLは記述できるものの、DataframeをどうやってSelectしてINSERTするのか?このへん自分の中で解決できませんでした。
※2021/12/30追記:未検証ですが、もしかしてDataframeをSparkの一時的なテーブルに登録して、そこからSELECT+INSERT ON CONFLICTを発行すればよかったり…?
Spark環境からPostgresへ
SparkのDataframeを直接Postgresに書き込むには、
- JDBCドライバー
- Psycopg
の2つの選択肢があります。
一つ目のJDBCドライバーはODBCのJava版です。
ODBCはWindows環境で実行するもので、SparkはJavaなのでJDBCを使います。
※ODBC使ってる事例見たことないのですが使えるのでしょうか。。
これらの操作はAppendでしており、これが結構時間かかります。このため、時短のために他に方法ないのかな?と調べました。
もう一つのpsycopgを使ったUPSERTで探したところ、sqlalchemyというライブラリのto_sqlを使う方法がありました。if_existsというオプションがあり、appendとかいろいろ振舞いを設定できるわけですが、if_exist=’upsert_overwrite’というオプションがあり、こちらも先日確認したら最新のsqlalchemyではupsert_overwriteのオプションが消えており、sqlalchemyではもはやupsertができなっているようでした。
2023/6/24(土)追記:先日、Pangresで、接続文字列をMySQLにすればMySQLへのUpsertも試したところできました。
試してみること、そんなオプションないよって怒られる。
よく見たら、MySQLのサンプルでしたというオチ。さて、他を探さなくては…!
sqlalchemy は何がベースなのかな?と思っていたら、PythonにはRDBに接続するAPI仕様であるPEP249が定められており、これに基づいて各RDBへの接続を実装するようです。
Pangres最強でした
気を取り直して、引き続き手掛かりを探すためMySQLのサンプルからヒントを得て、「Postgres if_exis」と検索してみます。
すると、Pangresというライブラリが出てまいりました。え、なんですかそれ。使っている人の記事は出てきませんでした(少なくとも2021/6当時は)。
ライブラリの公式ドキュメントにはサンプルが記載されており、私のような初心者にも優しいです。
ドキュメントを読む限り、内部的にON COFLICTが実行されているようですね。
ふむふむ。興味深いライブラリです。
※注意事項として、2.3.1という最新バージョンがありますが、使い方載っていなくてわかりません。私には理解できないだけかもしれません(笑)少なくとも、1.3.1と同じ記述では怒られます。使い方分かったら追記します。
備忘録を込めてちょっと解説していきます。
まず、schemaは、Postgresのデフォルトはpublicだと思いますが、ご自身の環境に合わせてください。
接続には、to_sqlを実行するとき同様にsqlalchemyを使います。create_engineの引数はご自身の環境に合わせてください。dfはSparkじゃなくてpandasらしいので、PangresはPandasのライブラリの親戚のようで、Spark.DataframeからPandas.Dataframeに変換する必要があります。
変換には、.toPandas()というおまじないを1行書きます。
upsertするには、pg_upsertupsertという関数を使います。
※関数が変更のため、新旧残しておきます。
※DataframeとPandas.Dataframeの違いの理解もあやふやですね、ここは理解できたら追記します。※2021/12/30追記: 環境がSpark上であったので、ごちゃまぜに理解していたのですが、集計はSparkの世界で行いますので、Spark.Dataframeで行います。
書き込みはPandasの世界で行いますので、Pangresに書き込んでもらうには、SparkからPandas.Dataframeに変換して、Pangresが扱える状態にしてあげます。と理解致しました。
import pandas as pd
from sqlalchemy import create_engine
from pangres import pg_upsert
from pangres import upsert
# configure schema, table_name and engine
schema = 'public'
table_name = 'pg_upsert_test'
engine = create_engine('postgresql://user:password@localhost:5432/mydatabase')
# create/get data
df = df.toPandas()
df.set_index('××', inplace = True)
# update table
pg_upsert(engine=engine,
upsert(engine=engine,
df=df,
schema=schema,
table_name=table_name,
if_exists='upsert_overwrite',
if_exists='update',
create_table=False
create_schema=True,
add_new_columns=True,
adapt_dtype_of_empty_db_columns=True,
clean_column_names=True,
chunksize=10000
)
これで完璧!と思ったら、インデックス(df.set_index)にはまってしまいました。
これはつまり、UPSERTには重複判定のための制約が必要なんですが、Postgresのテーブル側にはユニークな制約を設定してなかったわ。
重複判定に使うカラムをユニークな制約を設定しました。
それに合わせてPandas.Dataframe側にも書き込む前にインデックスを設定します。私は2つの列で重複判定するため、2つの列でユニークな制約を設定しました。
※df.set_index(省略)のところです。
すると、無事INSERTできました。
2回目、全く同じデータを書き込んだあとに件数数えると件数が変わらず、きちんとUPSERTが効いてることがわかりました!
途中ちょっとハマりましたが無事したいことが実現できました。
JDBCドライバーを使って、df.writeで書き込んでいたのですが、Pangresを使うとより爆速になりまして大満足です。
最後に
COPYコマンドを実行するという記事も見かけましたが、どこかに一時的にCSVを保存しないといけないわけで気が進みませんでした。(Databricksの場合はDBFSかな?)
削除も自動でやってもいいですけど、途中エラーで止まった場合は一生そこにファイルが残り続けそうな嫌な予感がしておりやめました。
以上、同じ悩みを抱えている方の参考になれば幸いです。
最後まで読んで下さりありがとうございます!