What is leader election?
A common scenario where leader election is needed is running scheduled (cron) jobs in a backend service that has multiple instances.
If you have just one instance, life is simple — that instance runs all the jobs.
But when you scale horizontally and run multiple instances, you want to make sure that a scheduled job runs only once, on one leader instance — not duplicated across all instances.
This is where leader election comes in.
Using Advisory Locks in PostgreSQL
PostgreSQL offers advisory locks, which are a simple and effective way to implement leader election directly in the database.
Here’s how it works:
-- Attempt to become the leader by acquiring an exclusive advisory lock
SELECT pg_try_advisory_lock(2929) AS is_leader;
-- If is_leader = true → this session is the leader
-- If is_leader = false → another session is leader
This query tries to acquire a global, unique lock with the identifier 2929. It returns:
- true: This session has successfully acquired the lock → you are the leader
- false: Another session holds the lock → you are not the leader
The pg_try_advisory_lock
function is non-blocking. It returns immediately whether or not the lock was acquired. The lock is automatically released when the database session closes. You can also manually release it using pg_advisory_unlock.
Common use cases for advisory locks
- Job schedulers: Ensure only one worker processes a job at a time
- Leader election: Select a single primary worker in a distributed system
- Rate limiting: Prevent overlapping access to a shared resource
Code Example: Leader election in Rust using sqlx
async fn try_to_become_leader(
db: PgPool,
advisory_lock_id: i64,
timeout_duration: std::time::Duration,
) -> Result<Option<PoolConnection<Postgres>>, Error> {
let mut connection = db.acquire().await.map_err(Error::Connection)?;
let is_leader = match timeout(
timeout_duration,
sqlx::query_scalar("SELECT pg_try_advisory_lock($1)")
.bind(advisory_lock_id)
.fetch_one(&mut *connection),
)
.await
{
Ok(Ok(is_leader)) => is_leader,
Ok(Err(err)) => {
eprintln!("scheduler: error querying advisory lock: {}", err);
return Err(Error::AdvisoryLock(err));
}
Err(_) => {
eprintln!("scheduler: timeout while querying advisory lock");
return Err(Error::Timeout);
}
};
if is_leader {
Ok(Some(connection))
} else {
Ok(None)
}
}
Making lock IDs more readable
PostgreSQL advisory locks only accept a 64-bit integer as the lock identifier. It helps to define named constants or enums to make your code easier to understand.
Option 1: Constants
// Define a newtype for PostgresLockID
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct PostgresLockID(pub i64);
// Constants representing specific lock IDs
const POSTGRES_LOCK_ID_SCHEDULER: PostgresLockID = PostgresLockID(1);
const POSTGRES_LOCK_ID_WORKER: PostgresLockID = PostgresLockID(2);
Option 2: Enum
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PostgresLockID {
Scheduler = 1,
Worker = 2,
// ...
}
impl PostgresLockID {
pub fn as_i64(self) -> i64 {
self as i64
}
}
Option 3: Hash-based IDs
You could also use a hashing function (like a checksum) to turn a string identifier into a unique i64 value.
A note on PgBouncer compatibility
Advisory locks do not work reliably with PgBouncer when it is in transaction pooling mode (pool_mode=transaction).
In transaction pooling mode, connections are frequently swapped between clients. This causes advisory locks to be released unexpectedly when transactions end.
If your application relies on advisory locks, either:
- Configure PgBouncer to use
pool_mode = session
, or - Avoid PgBouncer for that particular workload
Using separate connections for leader election and normal queries
ENV variables
# === Database connection for general queries (PgBouncer transaction mode) ===
DATABASE_URL=postgres://user:password@127.0.0.1:6432/mydb
# === Database connection for leader election / advisory locks (PgBouncer session mode) ===
DATABASE_URL_SESSION=postgres://user:password@127.0.0.1:6433/mydb
Usage in Rust using SQLX
use sqlx::{PgPool, postgres::PgPoolOptions};
use std::env;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let normal_url = env::var("DATABASE_URL")?;
let session_url = env::var("DATABASE_URL_SESSION")?;
let normal_pool = PgPoolOptions::new().connect(&normal_url).await?;
let session_pool = PgPoolOptions::new().connect(&session_url).await?;
// normal_pool → use for normal queries
// session_pool → use for leader election/advisory locks
Ok(())
}