CockroachDB and Postgres SELECT FOR UPDATE and SKIP LOCKED

Download full source code.

CockroachDB and Postgres have a couple of features I use that make it easier to handle concurrent access to the same data.

Say you have several client applications that want to process rows in the table, but you don’t want them to process the same row.

With both databases, you can use the SELECT FOR UPDATE statement to lock the row, and the SKIP LOCKED statement to skip rows that are already locked.

Client A will select the top row in the table that needs to be processed, lock it, and then process it. Client B will select the next top row that needs to be processed and is not locked, lock it, and process it. Client A and B will never select the same row, because they will never select a locked row.

To demonstrate this I’m going to use a simple statement SELECT * FROM products LIMIT 1 FOR UPDATE SKIP LOCKED with Entity Framework.

The SELECT statement and subsequent UPDATE must be done inside a transaction for this work.

Because I want to run two transactions at the same time, I need two instances of the SalesContext. Aside from that, the code is very simple.  The rest of the source code is in the attached zip file.

 1using PostgresSkipLockedRows.Data;
 2using Microsoft.EntityFrameworkCore;
 3
 4SalesContext salesContext1 = new SalesContext(new DbContextOptionsBuilder<SalesContext>()
 5 .UseNpgsql("Host=localhost;Port=5432;Database=salesdb;Username=postgres;Password=postgres").Options);
 6
 7SalesContext salesContext2 = new SalesContext(new DbContextOptionsBuilder<SalesContext>()
 8 .UseNpgsql("Host=localhost;Port=5432;Database=salesdb;Username=postgres;Password=postgres").Options);
 9
10await salesContext1.Database.EnsureCreatedAsync(); // create the database
11await salesContext1.SeedAsync().ConfigureAwait(false); // seed the database
12
13var task1 = ProcessTopRow(1000, salesContext1); // delay 1 second, making the transaction hold the lock longer
14var task2 = ProcessTopRow(0, salesContext2); // no delay, it will process the second row
15
16await Task.WhenAll(task1, task2).ConfigureAwait(false);
17
18async Task ProcessTopRow(int delay, SalesContext salesContext)
19{
20    using var transaction = salesContext.Database.BeginTransaction();
21    try
22    {
23        var result = salesContext.Products.FromSql($"SELECT * FROM products LIMIT 1 FOR UPDATE SKIP LOCKED").Single(); // the select statement
24        await Task.Delay(delay); // simulate some processing time
25        Console.WriteLine($"{delay} {result}");
26        //update the data
27        //transaction.Commit(); // not going to commit in this example
28    }
29    catch (Exception ex)
30    {
31        Console.WriteLine(ex.Message);
32        transaction.Rollback();
33    }
34}

That’s all there is to it, but this is a very powerful feature.

Download full source code.

comments powered by Disqus

Related