My friend Jason and I were chatting about work we were doing, and he brought up a database deadlock he was having trouble reproducing. The query in question does a batch insert to a certain table. We process rows into this table in such a way that a particular row might be processed by multiple batches at once: this is fine, we have uniqueness constraints that ensure everything works out.

We already knew the deadlock had something to do with the uniqueness constraint. If one transaction tries to insert A then B and another transaction tries to insert B then A, then an unlucky ordering of operations will result in a deadlock. What we were unsure about was whether batch inserts, which all occur in the same database statement, should work the same way. Together, we found a Stack Exchange post that pretty well explained what was going on. We resolved to write a small test that demonstrates the problem.

Let's examine the dynamics here with a single table. We only need a single unique column.

CREATE TABLE foo (
    bar INTEGER UNIQUE
);

We can force a deadlock by starting two transactions in two separate terminals and inserting two rows in different orders.

postgres=# /* transaction 1 */
postgres-# begin;
BEGIN
postgres=*# insert into foo values (0);
INSERT 0 1
postgres=*#
postgres=# /* transaction 2 */
postgres-# begin;
BEGIN
postgres=*# insert into foo values (1);
INSERT 0 1
postgres=*#
postgres=*# /* transaction 1 */
postgres-*# insert into foo values (1);
/* pauses because transaction 2 has already inserted 1 */
postgres=*# /* transaction 2 */
postgres-*# insert into foo values (0);
ERROR:  deadlock detected
DETAIL:  Process 130 waits for ShareLock on transaction 751; blocked by process 127.
Process 127 waits for ShareLock on transaction 752; blocked by process 130.
HINT:  See server log for query details.
CONTEXT:  while inserting index tuple (0,14) in relation "foo_bar_key"
postgres=!#

This is all very straightforward, but how do we reproducing this using batch inserts to confirm that the fix suggested in the Stack Exchange post, sorting the batch before inserting it, will work for us? Let's write some code to insert batches in a random order until we reproduce the issue.

Let's start a Rust playground with tokio and tokio-postgres. We also add rand to generate random values for our one column.

[package]
name = "playground"
version = "0.1.0"
edition = "2021"

[dependencies]
rand = "0.8.5"
tokio = { version = "1.37.0", features = ["full"] }
tokio-postgres = "0.7.10"

We'll need a function that does a batch insert of rows into the database. Let's call it do_batch_insert().

async fn do_batch_insert(client: &mut Client) -> Result<(), Error> {
    // TODO
}

do_batch_insert() takes a mutable reference to a tokio_postgres::Client so that it can perform database operations. It is async and returns a Result.

We need to build the batch insert we're going to do as a SQL statement. The bar column on the foo table is an integer, so let's select random integers without replacement so that we don't run into duplicate errors within the same batch.

// async fn do_batch_insert(client: &mut Client) -> Result<(), Error> {
    let numbers: Vec<i32> = (0..32).collect();
    let params: Vec<String> = (1..=32).map(|param| format!("(${param})")).collect();
    let statement = "INSERT INTO foo(bar) VALUES ".to_owned() + &params.join(", ");
    let mut rng = rand::thread_rng();
    let args: Vec<&(dyn ToSql + Sync)> = numbers
        .choose_multiple(&mut rng, 32)
        .map(|num| num as &(dyn ToSql + Sync))
        .collect();

This builds a query of the form as given below.

INSERT INTO foo(bar) VALUES $1, $2, $3, ...

In the variable args, we store a randomly chosen set of 32 integers using rand::seq::SliceRandom::choose_multiple().

Finally, we run this statement in a transaction, then explicitly roll it back. We do this because we're trying to reproduce a deadlock between uncommitted transactions that are inserting the same row. We don't want the transaction to error due to inserting rows that have already been committed by other transactions: we only want it to error because of a deadlock due to in-flight work. Rolling back the transaction before exiting the function makes sure our database is always in a clean state.

//    let args: Vec<&(dyn ToSql + Sync)> = numbers
//        .choose_multiple(&mut rng, 32)
//        .map(|num| num as &(dyn ToSql + Sync))
//        .collect();
    let transaction = client.transaction().await?;
    transaction.query(&statement, &args[..]).await?;
    drop(transaction);
    Ok(())
// }

Next, we'll write a run() function that calls our do_batch_insert() function in a loop, printing any errors that it encounters. run() will also set up a connection to the database.

/// Create a connection and make batch inserts in a loop.
async fn run() {
    let (mut client, connection) =
        tokio_postgres::connect("host=localhost user=postgres password=password", NoTls)
            .await
            .unwrap();
    tokio::spawn(async move {
        if let Err(_err) = connection.await {
            panic!();
        }
    });
    loop {
        if let Err(err) = do_batch_insert(&mut client).await {
            eprintln!("{err}");
        }
    }
}

Now, we'll call run() from two tokio tasks to simulate concurrent work.

#[tokio::main]
async fn main() {
    let t1 = tokio::spawn(run());
    let t2 = tokio::spawn(run());
    let _ = t1.await;
    let _ = t2.await;
}

If you try to build this now, you'll get the following error.

error: future cannot be sent between threads safely
   --> src/main.rs:43:18
    |
43  |     tokio::spawn(run());
    |                  ^^^^^ future returned by `run` is not `Send`
    |
    = help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `Rc<UnsafeCell<ReseedingRng<rand_chacha::chacha::ChaCha12Core, OsRng>>>`, which is required by `impl Future<Output = ()>: Send`
note: future is not `Send` as this value is used across an await

If you look back at our do_batch_insert() function, it doesn't use rng across .await points. However, Rust considers the value dropped at the end of the function, which counts as a use. Manually calling drop() on rng won't fix the error because of a compiler issue. We can fix the issue by introducing a new scope which rng does not escape.

    // let statement = "INSERT INTO foo(bar) VALUES ".to_owned() + &params.join(", ");
    let args: Vec<&(dyn ToSql + Sync)> = {
        let mut rng = rand::thread_rng();
        numbers
            .choose_multiple(&mut rng, 32)
            .map(|num| num as &(dyn ToSql + Sync))
            .collect()
    };
    // let transaction = client.transaction().await?;

If we build and run this code, we see the following output.

$ cargo run
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
     Running `target/debug/playground`
db error: ERROR: deadlock detected
DETAIL: Process 95 waits for ShareLock on transaction 4376; blocked by process 96.
Process 96 waits for ShareLock on transaction 4377; blocked by process 95.
HINT: See server log for query details.
db error: ERROR: deadlock detected
DETAIL: Process 96 waits for ShareLock on transaction 6747; blocked by process 95.
Process 95 waits for ShareLock on transaction 6746; blocked by process 96.
HINT: See server log for query details.

Success! We can also validate the suggested fix: sorting the rows before inserting them so they're inserted in a consistent order. We'll add the sort to do_batch_insert().

    //  let mut rng = rand::thread_rng();
        let mut args: Vec<&i32> = numbers.choose_multiple(&mut rng, 32).collect();
        args.sort_unstable();
        args.into_iter()
            .map(|num| num as &(dyn ToSql + Sync))
            .collect()
    // };

Which produces no errors when we run it, validating the fix.