What is leader election?

A common scenario where leader election is needed is running scheduled (cron) jobs in a backend service that has multiple instances.
If you have just one instance, life is simple — that instance runs all the jobs.
But when you scale horizontally and run multiple instances, you want to make sure that a scheduled job runs only once, on one leader instance — not duplicated across all instances.
This is where leader election comes in.

Using Advisory Locks in PostgreSQL

PostgreSQL offers advisory locks, which are a simple and effective way to implement leader election directly in the database.

Here’s how it works:

-- Attempt to become the leader by acquiring an exclusive advisory lock
SELECT pg_try_advisory_lock(2929) AS is_leader;

-- If is_leader = true → this session is the leader
-- If is_leader = false → another session is leader

This query tries to acquire a global, unique lock with the identifier 2929. It returns:

  • true: This session has successfully acquired the lock → you are the leader
  • false: Another session holds the lock → you are not the leader

The pg_try_advisory_lock function is non-blocking. It returns immediately whether or not the lock was acquired. The lock is automatically released when the database session closes. You can also manually release it using pg_advisory_unlock.

Common use cases for advisory locks

  • Job schedulers: Ensure only one worker processes a job at a time
  • Leader election: Select a single primary worker in a distributed system
  • Rate limiting: Prevent overlapping access to a shared resource

Code Example: Leader election in Rust using sqlx

async fn try_to_become_leader(
    db: PgPool,
    advisory_lock_id: i64,
    timeout_duration: std::time::Duration,
) -> Result<Option<PoolConnection<Postgres>>, Error> {
    let mut connection = db.acquire().await.map_err(Error::Connection)?;

    let is_leader = match timeout(
        timeout_duration,
        sqlx::query_scalar("SELECT pg_try_advisory_lock($1)")
            .bind(advisory_lock_id)
            .fetch_one(&mut *connection),
    )
    .await
    {
        Ok(Ok(is_leader)) => is_leader,
        Ok(Err(err)) => {
            eprintln!("scheduler: error querying advisory lock: {}", err);
            return Err(Error::AdvisoryLock(err));
        }
        Err(_) => {
            eprintln!("scheduler: timeout while querying advisory lock");
            return Err(Error::Timeout);
        }
    };

    if is_leader {
        Ok(Some(connection))
    } else {
        Ok(None)
    }
}

Making lock IDs more readable

PostgreSQL advisory locks only accept a 64-bit integer as the lock identifier. It helps to define named constants or enums to make your code easier to understand.

Option 1: Constants

// Define a newtype for PostgresLockID
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct PostgresLockID(pub i64);

// Constants representing specific lock IDs
const POSTGRES_LOCK_ID_SCHEDULER: PostgresLockID = PostgresLockID(1);
const POSTGRES_LOCK_ID_WORKER: PostgresLockID = PostgresLockID(2);

Option 2: Enum

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PostgresLockID {
    Scheduler = 1,
    Worker = 2,
    // ...
}

impl PostgresLockID {
    pub fn as_i64(self) -> i64 {
        self as i64
    }
}

Option 3: Hash-based IDs

You could also use a hashing function (like a checksum) to turn a string identifier into a unique i64 value.

A note on PgBouncer compatibility

Advisory locks do not work reliably with PgBouncer when it is in transaction pooling mode (pool_mode=transaction).

In transaction pooling mode, connections are frequently swapped between clients. This causes advisory locks to be released unexpectedly when transactions end.

If your application relies on advisory locks, either:

  • Configure PgBouncer to use pool_mode = session, or
  • Avoid PgBouncer for that particular workload

Using separate connections for leader election and normal queries

ENV variables

# === Database connection for general queries (PgBouncer transaction mode) ===
DATABASE_URL=postgres://user:password@127.0.0.1:6432/mydb

# === Database connection for leader election / advisory locks (PgBouncer session mode) ===
DATABASE_URL_SESSION=postgres://user:password@127.0.0.1:6433/mydb

Usage in Rust using SQLX

use sqlx::{PgPool, postgres::PgPoolOptions};
use std::env;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let normal_url = env::var("DATABASE_URL")?;
    let session_url = env::var("DATABASE_URL_SESSION")?;

    let normal_pool = PgPoolOptions::new().connect(&normal_url).await?;
    let session_pool = PgPoolOptions::new().connect(&session_url).await?;

    // normal_pool → use for normal queries
    // session_pool → use for leader election/advisory locks

    Ok(())
}