Featured image of post Deadlockにはまった時に調べた事

Deadlockにはまった時に調べた事

目次

1.背景

  • 負荷試験時にdeadlockがデータベースで発生して困ったのでメモ
  • 背景としては、画像解析処理の為にサーバーで解析結果を保存するときに発生した
  • そのdeadlockの時に調べたり改めて確認した時のメモ

2.前提

2.1.ソフトウェアアーキ

  • 言語: Python
  • サーバー
    • APサーバー: FastAPI
    • ASGIサーバー: Uvicorn
    • Webサーバー: Nginx
  • ORM:
    • SQLAlchemy v2
  • RDB:
    • PostgreSQL
    • Read Committed

2.2.サーバー構成

主に次の三つのサーバー(レイヤー)構造で処理をしている。

  • FEサーバー
    • APIリクエスト受付用のFEサーバー
  • BEサーバー
    • 解析リクエストをする、ブローカー用のBEサーバー
  • 解析サーバー
    • 解析処理を行うマイクロサービスのサーバー

2.3.処理フロー

  1. FEサーバーでAPIリクエストを受け取りSQSに貯める
  2. BEサーバーでSQSをポーリングする
  3. BEサーバーで解析の実行計画を決定する
  4. BEサーバーが解析用のマイクロサーバー群に解析を依頼する
  5. その結果を集めて、BEサーバーがデータベースに保存する

3.RDB全般

3.1.デッドロックとは

複数のプロセスやトランザクションが、互いに相手が持っているリソース(ロックなど)を待ち続けることで、永久に進行できなくなる状態のこと。

3.2.ロックモードの種類

PostgreSQL には 8種類のロックモードがある。

ロックモード取得される操作影響
ROW SHARESELECT FOR UPDATESELECT FOR SHAREDELETE をブロック
ROW EXCLUSIVEUPDATE, DELETE, INSERTROW SHARE より強いロック
SHARELOCK TABLE IN SHARE MODEUPDATE / DELETE は可能だが SHARE ROW EXCLUSIVE はブロック
SHARE ROW EXCLUSIVE特定の DDLSHARE より強いが EXCLUSIVE より弱い
EXCLUSIVELOCK TABLE IN EXCLUSIVE MODEすべての行の更新をブロック
ACCESS EXCLUSIVELOCK TABLE IN ACCESS EXCLUSIVE MODEテーブル全体をロック(SELECT すらできない)

3.3.クエリが作成するロックモード

各クエリにが作るロックは以下がある。

SQLロックの種類影響
UPDATE行ロック (ROW EXCLUSIVE)対象の行が他のトランザクションで UPDATE / DELETE されるのを防ぐ
SELECT FOR UPDATE行ロック (ROW SHARE)対象の行を UPDATE / DELETE から保護する
DELETE行ロック (ROW EXCLUSIVE)削除対象の行をロックし、他の UPDATE / DELETE をブロックする
INSERT行ロック (ROW EXCLUSIVE)基本的には競合しないが、UNIQUE 制約があると競合する可能性あり
LOCK TABLE IN EXCLUSIVE MODEテーブルロック他のトランザクションが INSERT, UPDATE, DELETE できなくなる
SELECT(普通の読み取り)ロックなしどのトランザクションも自由に読み取り可能

3.4.SELECT FOR UPDATEについて

  • 注意なのは、SELECT FOR UPDATESELECTに見えるが、実体はUPDATEに近い
  • 例えば、あるTXのSELECT FOR UPDATEは他のTXのSELECT FOR UPDATEをブロックする
  • つまり、SELECT FOR UPDATEは、実際は「UPDATEをしないUPDATE
  • SELECT FOR UPDATE のロック開放はそのTXのCOMMIT時に解放される
  • ちなみに、SELECT FOR UPDATEの最中にSELECTはすることができる
  • ただし、READ COMMITEDではノンリピータブルリードやファントムリードが発生する可能性があるので注意

3.5.リード一貫性の問題(Read Consistency Issues)

3.5.1.リード一貫性の問題とは

Readの問題は次の3つがある。

問題発生条件影響
ダーティリード(Dirty Read)未コミットのデータを他のトランザクションが読める読んだデータが後で ROLLBACK されると、データの整合性が崩れる
ノンリピータブルリード(Non-Repeatable Read)同じトランザクション内で、同じ SELECT でも結果が変わる(他のトランザクションの UPDATE / DELETE の影響)2回の SELECT で異なる値が見える
ファントムリード(Phantom Read)同じ SELECT でも、新しい行 (INSERT) が増えたり、削除されたりするWHERE 条件に一致する行数が変化する

3.5.2.ダーティーリードは

  • 他Commit前のTXのCRUDの変更を読み込める事による問題
  • 特に、Rollbackされると、データ不整合が発生する
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
-- トランザクションA
BEGIN;
UPDATE users SET name = 'Bob' WHERE id = 1;
-- まだ COMMIT していない!

-- トランザクションB
SELECT * FROM users WHERE id = 1;
-- READ UNCOMMITTED なら 'Bob' が見えてしまう(ダーティーリード発生)

-- トランザクションA
ROLLBACK;

トランザクションBは、存在しないはずのデータ ‘Bob’ を見てしまったことになった。

3.5.3.ノンリピータブルリード

  • 他コミット済みのTXのUD(UpdateやDelete)による問題
  • UDによって、既存の行のデータが変わる(値が変わる or 消える)問題
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
-- トランザクションA
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT * FROM users WHERE id = 1;  -- 'Alice'

-- トランザクションB
BEGIN;
UPDATE users SET name = 'Bob' WHERE id = 1;
COMMIT;

-- トランザクションA
SELECT * FROM users WHERE id = 1;  -- 'Bob'(前回と結果が違う!)

同じトランザクション内で SELECT した結果が変わる(ノンリピータブルリード発生)。

3.5.4.ファントムリード

  • 他コミット済みのTXのCUD(InsertやUpdateやDelete)による問題
  • CUDによって、結果セットの行数が変わる(新しい行が増える or 減る)問題
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
-- トランザクションA
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT * FROM users WHERE age >= 18;
-- 3件取得: Alice, Bob, Charlie

-- トランザクションB
BEGIN;
INSERT INTO users (id, name, age) VALUES (4, 'Dave', 22);
COMMIT;

-- トランザクションA
SELECT * FROM users WHERE age >= 18;
-- 4件取得: Alice, Bob, Charlie, Dave(行が増えた!)
  • 検索結果の行数が変わってしまう(ファントムリード発生)
  • REPEATABLE READ では防げず、SERIALIZABLE にする必要がある

3.6.トランザクション分離レベルとReadの問題

トランザクション分離レベルのReadの関係

分離レベルダーティリードノンリピータブルリードファントムリード
READ UNCOMMITTED発生する発生する発生する
READ COMMITTED発生しない発生する発生する
REPEATABLE READ発生しない発生しない発生する
SERIALIZABLE発生しない発生しない発生しない
  • トランザクション内の一貫性を保ちたい(変更途中のデータを見たくない)
    • => REPEATABLE READ / SERIALIZABLE
    • スナップショットを維持し、一貫性を確保
  • リアルタイムの最新データがほしい
    • => READ COMMITTED
    • SELECT のたびに最新の COMMIT 済みデータを取得
  • 未コミットのデータも見たい(※PostgreSQL では不可)
    • => READ UNCOMMITTED

3.7.ACID特性

ACID特性 は、データベースのトランザクションが信頼性を持って処理されるために必要な4つの特性。

特性説明
Atomicity(原子性)トランザクションは「すべて成功」または「すべて失敗」送金処理の途中で失敗したら、残高は元のまま
Consistency(一貫性)トランザクションの前後でデータのルールが守られる存在しない product_idorders に挿入できない
Isolation(分離性)同時実行のトランザクションが影響し合わない在庫チェック中に他のトランザクションの変更が見えない
Durability(永続性)COMMIT したデータは消えないCOMMIT 直後にDBがクラッシュしても、データは保証される

逆に、ACIDを守らないと何が起こるか?

  • 原子性がない
    • 送金処理が途中で止まったら、お金だけ引かれて相手に届かない
  • 一貫性がない
    • 存在しない商品IDの注文が通る
  • 分離性がない
    • 他のトランザクションの影響で、データが意図しない値に変更される
  • 永続性がない
    • COMMIT したのに、障害でデータが消える

3.8.デッドロックの例

データ用意

1
2
3
4
5
6
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name TEXT NOT NULL
);

INSERT INTO users (name) VALUES ('Alice'), ('Bob');

次の時系列だと、2つのTX間でデッドロックが発生する。T1とT2のどちらもCommitはしていないから。

TXSQL コマンド意味
T1BEGIN TRANSACTION;トランザクション開始
T1UPDATE users SET name = 'Alice Updated' WHERE id = 1;id=1 の行をロック
T2BEGIN TRANSACTION;トランザクション開始
T2UPDATE users SET name = 'Bob Updated' WHERE id = 2;id=2 の行をロック
T1UPDATE users SET name = 'Alice Final' WHERE id = 2;id=2 のロックを待機(T2 がロック中)
T2UPDATE users SET name = 'Bob Final' WHERE id = 1;id=1 のロックを待機(T1 がロック中)
PGDEADLOCK DETECTEDデッドロック発生!どちらかのトランザクションが終了

Read Committedの場合、各 UPDATE 文ごとに行のロックを取得する仕様となっている。

3.9.共有ロックと排他ロック

  • 共有ロック(Shared Lock、Sロック)
    • 他のトランザクションが「読み取り」可能なロック
    • 書き込み(更新・削除)を行うことはできない
  • 排他ロック(Exclusive Lock、Xロック)
    • 他のトランザクションが「読み書き」できなくなるロック
    • 1つのトランザクションだけがデータを操作できる(単独アクセス)

3.10.ロック専用テーブルを利用する例

  • READ COMMITTEDで排他制御をしたい場合はロック専用テーブルを用意する方法もある
  • なお、テーブルは次を用意する
1
2
3
4
5
CREATE TABLE BATCH_LOCK (
    BATCH_ID VARCHAR(50) PRIMARY KEY,  -- バッチ処理の識別子
    LOCK_STATUS CHAR(1) NOT NULL,      -- ロック状態('1'=ロック中、'0'=未ロック)
    LOCK_TIME TIMESTAMP,               -- ロックを取得した時間
);

3.10.1.Updateを使った排他制御

  • READ COMMITTEDなので、UPDATEの前にSELECTLOCK STATUSをチェックすると、Non-repeatable readsの可能性が生まれる
  • 故に、UPDATE文は排他ロックを取得するため、その条件でLOCK_STATUSを指定する事で排他制御が可能となる
  • さらに、UPDATEの後にSELECTをすると、このTXによって更新されたのか、または既に別プロセスによってロックされていたのかを区別ができない
    • 更新が実際に行われたかどうかを知る唯一の方法が、GET DIAGNOSTICSRETURNINGになる
  • 何らかの理由でLOCK_STATUS1になりっぱなしになる対策として、LOCK_TIMEを見て、一定時間ロックされたらフラグを無視する
  • もし、処理に時間がかかる場合は、TTLであるLOCK_TIMEを更新して、他プロセスに処理をブロックされないようにする必要がある

ロック開始:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
BEGIN;

UPDATE BATCH_LOCK 
SET LOCK_STATUS = '1', 
    LOCK_TIME = CURRENT_TIMESTAMP 
WHERE BATCH_ID = 'DAILY_BATCH' 
  AND LOCK_STATUS = '0';

-- 更新件数を確認
GET DIAGNOSTICS affected_rows = ROW_COUNT;
-- affected_rows が 0 ならロック取得失敗

COMMIT;

解除:

1
2
3
4
5
6
7
8
BEGIN;

UPDATE BATCH_LOCK 
SET LOCK_STATUS = '0', 
    LOCK_TIME = CURRENT_TIMESTAMP 
WHERE BATCH_ID = 'DAILY_BATCH';

COMMIT;

ただし、lockされている時にのみ解除する必要があるので注意。

3.10.2.SELECT FOR UPDATEを使った排他制御

  • SELECT FOR UPDATEを使うことも有効
  • GET DIAGNOSTICSを使わなくても、LOCK_STATUS = '0'を指定しているので、SELECTの結果で分かる
  • デッドロックを避けたい場合やシーケンシャルな処理を担保したい場合、nowaitskip lockedが良い
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
BEGIN;

-- 排他ロックを取得
SELECT LOCK_STATUS, LOCK_TIME 
FROM BATCH_LOCK 
WHERE BATCH_ID = 'DAILY_BATCH' AND LOCK_STATUS = '0' 
FOR UPDATE;

-- 結果の行数をチェック(アプリケーションコード側で)
-- 結果が0行なら、すでにロックされている
-- 結果が1行なら、ロック取得成功

-- ロックされていなければ更新
UPDATE BATCH_LOCK 
SET LOCK_STATUS = '1', 
    LOCK_TIME = CURRENT_TIMESTAMP 
WHERE BATCH_ID = 'DAILY_BATCH' 
  AND LOCK_STATUS = '0';

COMMIT;

3.11.レコードがないと排他ロックは取れない

  • そもそも大きな勘違いをしていた
  • つまり、select for updateで取得したデータが空の時は排他ロックはかからない
    • これが結論だった
  • Aというレコードがないなら挿入する、ただし、2つ同じレコードを挿入したくはないというのが要件
  • select for updateでAというレコードを探しても、同時に二つのプロセスからselect for updateがかかったら両方空になる
  • 故に、両方のプロセスで空だからinsertしようとして重複するデータが挿入されて破綻する
  • つまり、レコードがない状態がある場合は、排他ロックが取れないので、重複排除処理をSelect for updateでやるのは難しい
  • それならロック専用テーブルを用意するしかない、もしくは、その重複したくないものでUniqueキーをつけるしかない
  • アプリケーションレベルでのユニーク制約はロック専用テーブルを用意するしかない

3.12.部分インデックス

  • mysqlでは後からユニーク制約をつけたい場合は、仮想カラムを追加して処理する方法もある
  • しかし、Postgresの場合は、条件付きのインデックス(部分インデックス)が作れるのでそれが一番
  • データマイグレーションもする必要もなく、条件付きでデータ整合性を保つことができる
  • 例えば、次のような感じ
    • created_at >= 2022-12-12 => xxxとyyyでは複合ユニークインデックス

4.AP周り

4.1.APレベルでのTX制御は難しい

フロー:

  1. Aを含むコードをselect for update
  2. Aがなかったら、Aをインサート
  3. Commit
  • つまり、Aというレコードを1つだけインサートしたい
  • select for updateで対象のAのレコードがない時は排他ロックがかからずAをインサートする
  • もし、そのコードを2つのプロセスから同時に実行したら、Aというレコードは2つインサートされる可能性がある
  • なぜなら、排他ロックが両方で取れない為、両方Aをインサートする

これの解決策は、次の2つ:

  • unique制約をつける
  • ロック専用テーブルを用意して使う

4.2.PythonのAPログの強化

アクセスログ以外にも、デバッグログもJSON化した方が良かった。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import logging
import json
import sys
from uvicorn.logging import DefaultFormatter

def is_output_a_terminal(stream):
    """
    Check if the provided stream is a terminal.
    """
    return stream.isatty()

class AppJSONFormatter(logging.Formatter):
    def format(self, record):
        full_path = record.pathname
        path_parts = full_path.split("/")

        try:
            my_namespace_index = path_parts.index("my_namespace")
            display_path = "/".join(path_parts[my_namespace_index:])
        except ValueError:
            if len(path_parts) > 3:
                display_path = "/".join(path_parts[-4:])
            else:
                display_path = full_path

        # デフォルトのログフィールド
        log_record = {
            "level": record.levelname,
            "timestamp": self.formatTime(record, "%Y-%m-%d %H:%M:%S"),
            "file": display_path,
            "line": record.lineno,
            "message": record.getMessage(),
        }

        # extra で渡されたカスタムフィールドを追加
        extra_fields = {k: v for k, v in record.__dict__.items() if k not in logging.LogRecord.__dict__}
        log_record.update(extra_fields)

        return json.dumps(log_record, ensure_ascii=False)

use_colors = is_output_a_terminal(sys.stdout)

handler = logging.StreamHandler(sys.stdout)
formatter = AppJSONFormatter()
handler.setFormatter(formatter)

logger = logging.getLogger("app")
logger.setLevel(logging.INFO)
logger.addHandler(handler)

使う時

1
2
3
4
import logging
logger = logging.getLogger("app")

logger.info("ユーザーがログインしました", extra={"UserID": 12345, "SessionID": "abcde12345"})

結果

1
2
3
4
5
6
7
8
9
{
    "level": "INFO",
    "timestamp": "2025-02-14 12:34:56",
    "file": "my_namespace/module/auth.py",
    "line": 42,
    "message": "ユーザーがログインしました",
    "UserID": 12345,
    "SessionID": "abcde12345
}

4.3.アプリケーションバグ

以下のようなアプリケーションのバグや問題の可能性を確認した。

  • txの貼り忘れ
  • rollbackやcommit忘れ
    • commitしたら排他ロックがリリースされるので、変更後に適切にcommit/rollbackされていない事を疑った
    • 次の形で排他処理をまとめる所を囲って確実に実行されるように変更した
      1
      2
      3
      4
      5
      6
      7
      8
      
      try:
         pass
      except:
        await session.rollback()
      else:
        await session.commit()
      finally:
        await session.close()
      
  • commitが早すぎた
    • 途中で早すぎたcommitをしている事を疑った
  • repositoryの読み間違い
    • DDDで実装されているが、repository層でsessionを保持している
    • それが、別sessionではないか疑った
  • 不要なNestesd Transatction
    • 一部がnested transactionだったので、それをシンプルに変えた
    • SAVEPOINTを作ると、ROLLBACK TO SAVEPOINTで戻せるが、シンプルに不要だった
  • repositoryから取得したentityが古い疑惑
    • repositoryから取得したentityが古い可能性がある
    • そのため、既に同じentityがあっても、select for updateで取得したentityを利用した
  • 状態チェックは初めに行うべき
    • たとえSAVEPOINTを使っても、COMMITするとSAVEPOINTからではなくBEGINからのTX全体がCOMMITされるので注意
    • 故に、最初に排他処理を入れるのが正解だった
  • SQLAlchemyのオプション
    • auto_commitauto_flushなど
  • 排他ロックの回数の順序
    • 2回排他ロックのSQLを実行しており、それらのSQLのTXによって実行順番が逆だとデッドロックになる
    • もしくは、SQLは1回だったとしても、orderがバラバラだった場合はデッドロックになる
  • TXの長さ
    • 書き込む直前にTXを張り書き込むのがベター
    • TXが長いと競合する可能性が高まるため

5.SQLAlchemy

5.1.SQLAlchemyのflushとcommit

  • flush:
    • SQLAlchemyのSessionが追跡している変更内容をデータベースに送信する操作
    • 実際のSQLクエリが発行されるが、トランザクションはまだコミットされていない
    • 主にセッション内での整合性チェックや、後続の処理で変更内容を参照したい場合に使用する
  • commit:
    • トランザクションを確定させ、変更内容を永続化する操作
    • 自動的にflushが実行された後、トランザクションがコミットされる
    • 変更内容は他のセッションからも参照可能になる
    • with session.begin():を抜けると自動的にcommitになる

5.2.SQLAlchemyのAuto Begin

SQLAlchemyhにはAuto Beginという機能がある。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
session = Session()

# SELECT文の実行(この時点ではトランザクションは開始されない)
user = session.query(User).first()

# Insertなので、自動的に最初のトランザクションが開始
user1 = User(name="田中一郎")
session.add(user1)
session.commit()  # トランザクション1が終了し、自動的に新しいトランザクションが開始

# Auto Beginで自動的に開始された次のトランザクション
user2 = User(name="鈴木二郎")
session.add(user2)
session.commit()  # トランザクション2が終了し、また新しいトランザクションが開始

# withによる明示的なトランザクション
with session.begin():
    user = User(name="山田太郎")
    session.add(user)
    # withブロックを抜けると自動的にcommitされる

# または
session.begin()
try:
    user = User(name="山田太郎")
    session.add(user)
    session.commit()
except:
    session.rollback()
    raise

FastAPIのcontext managerの例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def get_db():
    db = SessionLocal()
    try:
        yield db  # NOTE: この時点ではトランザクションは開始されていない
    finally:
        db.close()

@app.post("/users/")
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    db_user = User(name=user.name)  
    db.add(db_user)  # この時点で自動的にトランザクションが開始される
    db.commit()  # トランザクションをコミット

@app.get("/users/{user_id}")
def read_user(user_id: int, db: Session = Depends(get_db)):
    # SELECT操作のみなのでトランザクションは開始されない
    user = db.query(User).filter(User.id == user_id).first()
    return user

ポイント

  • SELECT単体ではトランザクションは開始されない
  • INSERT/UPDATE/DELETEで自動的にトランザクションが開始される
  • commitの後は自動的に新しいトランザクションが開始される
  • 明示的なトランザクション管理(withbegin)も可能

5.3.SQLAlchemyのSelect for update

  • select for updateを使うSQLAlchemyの例は以下
  • user = result.scalars().first()のタイミングでDBに問い合わせがかかり排他ロックがかかる
  • result = await session.execute(stmt)ではないので注意
  • また、commitしない場合は、他のtxはブロックされるので注意
  • commitかrollbackのタイミングで他のTXが読み込めるようになる
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import declarative_base
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import text

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)

# Async Engine & Session Setup
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

async def select_for_update_example(user_id: int):
    async with AsyncSessionLocal() as session:
        async with session.begin():  # トランザクション開始
            try:
                # `SELECT ... FOR UPDATE` で行をロック
                stmt = select(User).where(User.id == user_id).with_for_update()
                result = await session.execute(stmt)
                user = result.scalars().first()

                if user:
                    print(f"Locked User: {user.name}")
                    # ここで更新処理などを行う
                    user.name = "Updated Name"
                    await session.commit()
                else:
                    print("User not found")

            except SQLAlchemyError as e:
                await session.rollback()
                print(f"Error: {e}")

実行する場合

1
2
import asyncio
asyncio.run(select_for_update_example(1))

6.PostgreSQL

6.1.Postgresのデバッグ方法

  • deadlock_timeoutの時間を上げる
  • pg_stat_activitypg_locksでデータを確認する
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
SELECT client_addr, client_port, query
FROM pg_stat_activity
WHERE state = 'active'
  AND wait_event_type = 'Lock'
ORDER BY state_change;

  client_addr   client_port                        query
═══════════════╪═════════════╪════════════════════════════════════════════════════
 192.168.2.120        49578  SELECT pdata FROM parent WHERE pid = 1 FOR UPDATE;
 192.168.2.122        47432  SELECT pdata FROM parent WHERE pid = 1 FOR UPDATE;
(2 rows)

その後、CLIをPSをチェックする。

1
2
$ lsof -P | grep 49578
myapp  3974  mypc  [...]  TCP localhost:49578->dbserver:5432 (ESTABLISHED)

もしくは、pg_locksを見ても良い。

1
2
3
SELECT pid, relation::regclass, mode, granted
FROM pg_locks
WHERE relation = 'users'::regclass;

6.2.Select for updateのオプション

  • no key
    • 弱いロック
    • FKやIDXのKeyの更新しないかつ最新のselectが不要な場合は有効
  • no wait
    • ロックされてたらエラーで中断
    • リトライ処理があるならこれでも良い
  • skip locked
    • ロックされてる行をスキップ

6.3.PostgresのCompatibility Matrix

Postgresの行レベルのロックの強さの比較表:

Requested Lock ModeFOR KEY SHAREFOR SHAREFOR NO KEY UPDATEFOR UPDATE
FOR KEY SHAREX
FOR SHAREXX
FOR NO KEY UPDATEXXX
FOR UPDATEXXXX

see PostgreSQL: Documentation: 17: 13.3. Explicit Locking

NOTE:

  • 「X」で競合する(後がブロックされる)事を意味する
  • 対称行列になっている

6.4.PostgreSQLのログオプションの変更

設定確認

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
mydb=> SHOW deadlock_timeout;
 deadlock_timeout
------------------
 1s
(1 row)

mydb=> SHOW log_lock_waits;
 log_lock_waits
----------------
 off
(1 row)

すべてのステートメントをログに記録する。

1
2
log_statement = 'all'
log_line_prefix = '%m [pid=%p] %q[txid=%x] %u@%d '

CLIから更新する。もしくは、RDSの場合は、パラメーターグループから更新する。

1
2
ALTER SYSTEM SET log_lock_waits = ON;
SELECT pg_reload_conf();

結果は以下のようになる。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
2022-06-20 17:17:50.888 CEST [pid=19718] [txid=0] mypc@test LOG:  statement: BEGIN;
2022-06-20 17:17:52.988 CEST [pid=19718] [txid=0] mypc@test LOG:  statement: INSERT INTO child VALUES (100, 1, 'new child');
2022-06-20 17:17:58.215 CEST [pid=19674] [txid=0] mypc@test LOG:  statement: BEGIN;
2022-06-20 17:18:03.626 CEST [pid=19674] [txid=0] mypc@test LOG:  statement: INSERT INTO child VALUES (101, 1, 'another child');
2022-06-20 17:18:18.979 CEST [pid=19674] [txid=1108] mypc@test LOG:  statement: SELECT pdata FROM parent WHERE pid = 1 FOR UPDATE;
2022-06-20 17:18:31.550 CEST [pid=19718] [txid=1109] mypc@test LOG:  statement: SELECT pdata FROM parent WHERE pid = 1 FOR UPDATE;
2022-06-20 17:18:32.551 CEST [pid=19718] [txid=1109] mypc@test ERROR:  deadlock detected
2022-06-20 17:18:32.551 CEST [pid=19718] [txid=1109] mypc@test DETAIL:  Process 19718 waits for ShareLock on transaction 1108; blocked by process 19674.
        Process 19674 waits for ShareLock on transaction 1109; blocked by process 19718.
        Process 19718: SELECT pdata FROM parent WHERE pid = 1 FOR UPDATE;
        Process 19674: SELECT pdata FROM parent WHERE pid = 1 FOR UPDATE;
2022-06-20 17:18:32.551 CEST [pid=19718] [txid=1109] mypc@test HINT:  See server log for query details.
2022-06-20 17:18:32.551 CEST [pid=19718] [txid=1109] mypc@test CONTEXT:  while locking tuple (0,1) in relation 'parent'
2022-06-20 17:18:32.551 CEST [pid=19718] [txid=1109] mypc@test STATEMENT:  SELECT pdata FROM parent WHERE pid = 1 FOR UPDATE;

6.5.分析ツールの利用

設定ファイル(postgresql.conf)を編集:

1
shared_preload_libraries = 'pg_stat_statements'

サーバーを再起動:

1
$ sudo systemctl restart postgresql

拡張機能を作成:

1
CREATE EXTENSION pg_stat_statements;

デッドロックの 発生したクエリの履歴をpg_stat_statementsから取得:

1
2
3
4
5
SELECT query, calls, total_exec_time, rows
FROM pg_stat_statements
WHERE query LIKE '%analysis%'
ORDER BY total_exec_time DESC
LIMIT 10;

7.結論

次が大切だった:

  • レコードはUpdateではなくInsertをメインに
    • 並列分散処理のシステム設計としてはUpdateを前提とした処理フローは向いていなかった
    • なぜなら、一貫性を持ったデータの更新には排他ロックをする必要があるため
    • しかし、強い一貫性を持ったデータの構造にする必要はそもそもなかった
    • さらに、順序処理を前提としてWaiting処理も入れていたため、それも複雑なシステムの一因だった
    • 並列分散システムなので、フローはシンプルに
  • そもそも排他処理はコストがかかるのでするべきではなかった
    • 排他処理は沼なので、排他をやめてAPのバグやリファクタを先にすすめるべき
    • 後々もっと長い時間でデータの結果整合性を合わせられるようにするべきだった
    • Postgresのログからdeadlockの原因を突き止めるのは難しい
  • 設定の確認
    • SQLAlchemyの設定を深く理解するべき
    • PostgreSQLの設定をデバッグ可能にする
  • ローカルで再現するのを優先するべきき
  • リファクタ・ファースト
    • コードが複雑だと追っかけられないので、先に軽いリファクタをするべきだった。
  • ログ・ファースト
    • マイクロサービスのIn/Outのログも出すべき
    • アクセスログだけではなく、一般のログもJSONで出すべき
    • Don’t guess, just measure

8.参考文献

Built with Hugo
テーマ StackJimmy によって設計されています。