DataframeをそのままPostgresにUPSERTできちゃうPangres

IT
この記事は約7分で読めます。

こんにちは、Shimaco(@shimacotrip)です。

DataframeからRDBに書き込みたい時。タイトル通りの便利で素敵なPythonライブラリを見つけたのですが、日本語・英語ともに記事がないため、備忘録の意味を込めて記事にしました。
あくまで超初心者の備忘録ですので「そんなの知ってるわい」という方はどうぞ読み飛ばしてください。
結論だけ読みたい方はこちらへ。

※今回はSparkがベースのDatabricksという分散処理プラットフォーム上で、Pythonで記述した場合が前提になっています。

PostgresのUPESRTとは

PostgresではUPSERTは用意されておらず、ON CONFLICT句を使います。詳しくはPostgresの公式ぽい?ドキュメントをご覧ください。
※しばらくONとCONFLICT逆に書いてました、、恥ずかしい。。

ただし、Spark上でSQLは記述できるものの、DataframeをどうやってSelectしてINSERTするのか?このへん自分の中で解決できませんでした。
※2021/12/30追記:未検証ですが、もしかしてDataframeをSparkの一時的なテーブルに登録して、そこからSELECT+INSERT ON CONFLICTを発行すればよかったり…?

Spark環境からPostgresへ

SparkのDataframeを直接Postgresに書き込むには、

  • JDBCドライバー
  • Psycopg

の2つの選択肢があります。

一つ目のJDBCドライバーはODBCのJava版です。
ODBCはWindows環境で実行するもので、SparkはJavaなのでJDBCを使います。
※ODBC使ってる事例見たことないのですが使えるのでしょうか。。

これらの操作はAppendでしており、これが結構時間かかります。このため、時短のために他に方法ないのかな?と調べました。

もう一つのpsycopgを使ったUPSERTで探したところ、sqlalchemyというライブラリのto_sqlを使う方法がありました。if_existsというオプションがあり、appendとかいろいろ振舞いを設定できるわけですが、if_exist=’upsert_overwrite’というオプションがあり、つまり、既に存在する場合は上書きするのでUPESRTができるということです。

試してみること、そんなオプションないよって怒られる
よく見たら、MySQLのサンプルでしたというオチ。さて、他を探さなくては…!

sqlalchemy は何がベースなのかな?と思っていたら、PythonにはRDBに接続するAPI仕様であるPEP249が定められており、これに基づいて各RDBへの接続を実装するようです。

Pangres最強でした

気を取り直して、引き続き手掛かりを探すためMySQLのサンプルからヒントを得て「Postgres if_exis」と検索してみます。

すると、Pangresというライブラリが出てまいりました。え、なんですかそれ。使っている人の記事は出てきませんでした(少なくとも2021/6当時は)。

ライブラリの公式ドキュメントにはサンプルが記載されており、私のような初心者にも優しいです。
ドキュメントを読む限り、内部的にON COFLICTが実行されているようですね

ふむふむ、興味深いライブラリです。

※注意事項として、2.3.1という最新バージョンがありますが、使い方載っていなくてわかりません。私には理解できないだけかもしれません(笑)少なくとも、1.3.1と同じ記述では怒られます。使い方分かったら追記します。

備忘録を込めてちょっと解説していきます。

まず、schemaは、Postgresのデフォルトはpublicだと思いますが、ご自身の環境に合わせてください。
接続には、to_sqlを実行するとき同様にsqlalchemyを使います。create_engineの引数はご自身の環境に合わせてください。
dfはSparkじゃなくてpandasらしいので、PangresはPandasのライブラリの親戚のようで、Spark.DataframeからPandas.Dataframeに変換する必要があります。
変換には、.toPandas()というおまじないを1行書きます
postgresには、pg_upsertなるものを使います。


※DataframeとPandas.Dataframeの違いの理解もあやふやですね、ここは理解できたら追記します。※2021/12/30追記: 環境がSpark上であったので、ごちゃまぜに理解していたのですが、集計はSparkの世界で行いますので、Spark.Dataframeで行います。
書き込みはPandasの世界で行いますので、Pangresに書き込んでもらうには、SparkからPandas.Dataframeに変換して、Pangresが扱える状態にしてあげます。と理解致しました。

import pandas as pd
from sqlalchemy import create_engine
from pangres import pg_upsert

# configure schema, table_name and engine
schema = 'public'
table_name = 'pg_upsert_test'
engine = create_engine('postgresql://user:password@localhost:5432/mydatabase')

# create/get data
df = df.toPandas()
df.set_index('××', inplace = True)

# create or update table
# if_exists = 'upsert_overwrite' makes a ON CONFLICT DO UPDATE
pg_upsert(engine=engine,
          df=df,
          schema=schema,
          table_name=table_name,
          if_exists='upsert_overwrite',
          create_schema=True, 
          add_new_columns=True, 
          adapt_dtype_of_empty_db_columns=True, 
          clean_column_names=True,
          chunksize=10000) 

これで完璧!と思ったら、インデックス(df.set_index)にはまってしまいました。

これはつまり、UPSERTには重複判定のための、プライマリーキー制約が必要なんですが、Postgresのテーブル側にはプライマリキー設定してなかったわ。

なにをプライマリーキーに設定しようかねえ?というところから始まり、セオリー通り重複判定に使いたいカラムをプライマリーキー制約を設定しました。(RDBも初心者、、)
それに合わせてPandas.Dataframe側にも書き込む前にインデックスを設定します。※df.set_index(省略)のところです。

すると、無事INSERTできました。
2回目、全く同じデータを書き込んだあとに、Select countで件数数えると件数が変わらず、きちんとUPSERTが効いてることがわかりました!

途中ちょっとハマりましたが無事したいことが実現できました。
JDBCドライバーを使って、df.writeで書き込んでいたのですが、Pangresを使うとより爆速になりまして大満足です

最後に

COPYコマンドを実行するという記事も見かけましたが、どこかに一時的にCSVを保存しないといけないわけで気が進みませんでした。(Databricksの場合はDBFSかな?)

削除も自動でやってもいいですけど、途中エラーで止まった場合は一生そこにファイルが残り続けそうな嫌な予感がしておりやめました。

以上、同じ悩みを抱えている方の参考になれば幸いです。
最後まで読んで下さりありがとうございます!

IT
この記事を書いた人
shimaco

旅を愛し旅に生きる31歳女子。
35か国/国内47都道府県を旅しました。
「自分の足で歩く」のが好きで、ほぼ全て個人手配です。
「個人で旅をしてみたい。」そんな人の背中を押すような情報をお届けします。

shimacoをフォロー
ShimacoTrip
タイトルとURLをコピーしました