Introduction
For years, Moore's law for microprocessors - that is, the trend in terms of transistors density for the money - consistently meant higher clock rates. A period of three years and a half pretty much meant, twice the clock rate and twice the cache for the same money.
While CPU manufacturers still deliver microprocessors with an increasing number of transistors, clock rates clearly stabilized a few year ago, somewhere around 3 Ghz, due to growing power dissipation issues. The new trend is towards higher parallelism through the manufacturing of processors with multiple cores, and multiple threading units. Besides, it might go beyond the scope of this very document, but I wish this industry would also focus at addressing existing power dissipation issues and aim towards better overall power efficiency. In fact, if you have not purchased a computer since the 90s, you might be shocked to see the kind of average PSU wattage and cooling infrastructure you will need if you intend to assemble a computer today.
Application level parallelism
While the hardware industry has embraced parallelism, the software industry is just beginning: operating systems have been featuring multitasking for decades and do provide multithreading APIs, but parallelism is still a long way from being a reality at the applications level. There are a few reasons:
- Applications have been mainly written
single-threadedfor decades. -
Multi-threadedsoftware impliesconcurrencyandsynchronizationissues that make it much harder to write. - Many widespread programming languages have been thought and designed in a
single-threadedera and are therefor fundamentally unsuited to parallelism. - The hardware is still missing a critical piece for easy parallelism: transactional memory.
That said, there are a lot of multithreaded applications today, and most of them deal with concurrency issues
using the same techniques. This page introduces other alternatives using examples in the C language for simplicity.
The atomicity issue
The biggest problem in multithreaded programming is the non-atomic nature of a lot of operations we've been used
to do with the main programming languages out there. Most of those languages were designed at a time where applications
were mainly single-threaded and feature no, or poor support for atomic operations.
The easiest way to explain the problem is to write a simple program spawning multiple threads which
concurrently increment a global counter.
the thread unsafe approach
# file: normal.c 6 volatile unsigned long counter = 0; 7 8 void *worker(void *arg) 9 { 10 int i = 0, iterations = *((int *)arg); 11 for (i = 0; i < iterations); ++i) 12 ++counter; 13 return NULL; 14 } 15 16 int main(int argc, char **argv) 17 { 18 int i, count; 19 pthread_t *t; 20 if (argc != 3) { 75 fprintf(stderr, "%s: <nthreads> <iterations>\n", argv[0]); 22 return 1; 23 } 24 count = atoi(argv[2]); 25 /* create # threads */ 26 t = (pthread_t *)calloc(atoi(argv[1]), sizeof(pthread_t)); 27 for (i = 0; i < atoi(argv[1]); ++i) 28 assert(!pthread_create(&(t[i]), NULL, worker, (void *)&count)); 29 /* wait for the completion of all the threads */ 30 for (i = 0; i < atoi(argv[1]); ++i) 31 assert(!pthread_join(t[i], NULL)); 32 /* print counter value */ 33 printf("all thread done -> counter=%lu\n", counter); 34 return 0; 35 }
We can run it on a single-core machine for a million increments with 1 and then 64 worker threads:
$ ./normal 1 1000000 all thread done -> counter=1000000 $ ./normal 64 1000000 all thread done -> counter=7546383
This program will not get the counter right with more than one worker thread. Contrary
to popular belief, even on single-core architectures, whatever the type of the counter. A widespread
confusion with regards to atomicity is the belief that C types such as sig_atomic_t
can guarantee atomicity: they don't, the only guarantee about those atomic types is that
they will never be written to or read partially in the presence of asynchronous signals.
# file: normal.c 6 volatile unsigned long counter = 0; 7 8 void *worker(void *arg) 9 { 10 int i = 0, iterations = *((int *)arg); 11 for (i = 0; i < iterations; ++i) 12 ++counter; 13 return NULL; 14 }
The operation at line 12 is non-atomic. A look at the disassembled function:
000000000040075c <worker>: 40075c: 48 89 7c 24 e8 mov %rdi,-0x18(%rsp) 400761: c7 44 24 fc 00 00 00 movl $0x0,-0x4(%rsp) 400768: 00 400769: c7 44 24 fc 00 00 00 movl $0x0,-0x4(%rsp) 400770: 00 400771: eb 17 jmp 40078a <worker+0x2e> 400773: 48 8b 05 76 06 20 00 mov 0x200676(%rip),%rax # 600df0 <counter> 40077a: 48 83 c0 01 add $0x1,%rax 40077e: 48 89 05 6b 06 20 00 mov %rax,0x20066b(%rip) # 600df0 <counter> 400785: 83 44 24 fc 01 addl $0x1,-0x4(%rsp) 40078a: 48 8b 44 24 e8 mov -0x18(%rsp),%rax 40078f: 8b 00 mov (%rax),%eax 400791: 3b 44 24 fc cmp -0x4(%rsp),%eax 400795: 7f dc jg 400773 >worker+0x17< 400797: b8 00 00 00 00 mov $0x0,%eax 40079c: c3 retq
Clearly shows that this one-liner C operation actually leads to 3 CPU instructions
- a
MOVfromcounterto register%RAX - an
ADDof0x01on register%RAX - and a
MOVfrom register%RAXtocounter
Leading to potential inconsistency scenarios such as:
# thread-0 # thread-1
- -
MOV counter,%rax # %rax has a value of 0 in T0
.. MOV counter,%rax # %rax has a value of 0 in T1
ADD 0x01,%rax ..
.. ADD 0x01,%rax # %rax has value 1
MOV %rax,counter ..
MOV %rax,counter # counter has value 1
Most multithreaded applications will solve this problem by making this operation a critical sections:
the critical section approach
# file: locked.c 6 volatile unsigned long counter = 0; 7 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 8 9 void *worker(void *arg) 10 { 11 int i = 0, iterations = *((int *)arg); 12 for (i = 0; i < iterations; ++i) { 13 pthread_mutex_lock(&mutex); /* BEGIN critical section */ 14 ++counter; 15 pthread_mutex_unlock(&mutex); /* END critical section */ 16 } 17 return NULL; 18 } 19
Synchronizing the counter update operation with a mutual exclusion primitive guarantees only
one thread may execute the code at once.
Critical sections have several attractive aspects:
- Operating systems have several readily available APIs.
- There's no shortage of books and literature on critical section techniques.
- Combined with
condition variables, critical sections can allow powerfulproducer/consumerengines relatively painlessly.
But implie the following flipside:
Mutexesare extremely costly to acquire.- Applications with an important level of contention make for some poor use of available resources.
- The programmer has to deal with synchronization issues that can quickly get very complex and lead to
racesanddead-locks. - Finally, the average programmer is not trained to deal with those issues.
CPUs have provided the basis for software lockfree behavior for a while; although, while operating systems
code heavily relies on those techniques, those features are not commonly found at the application level. The biggest reason being
that major programming languages were designed for use cases scenarios where atomicity doesn't matter and therefore didn't expose
atomic primitives, the only way to manipulate those was to deal with low-level assembly.
Things are changing with the growing need for better parallel software, programming environments are starting to ease the programming of lockfree applications through the exposition of atomic primitives:
- JDK 5.0 has introduced
java.util.concurrent.atomic - the GNU C Compiler 4.1 has introduced
Atomic Builtins
Intel has provided an XADD instruction that can solve our above problem in a lockfree manner since the release of the 80486. The XADD instruction can be used to implement an atomic add-and-fetch function that obeys the following pseudo-code:
/* atomic */
unsigned long add_and_fetch(unsigned long *addr, unsigned long add)
{
*addr += add;
return *addr;
}
Recent gcc versions provide easy access to this instruction through the Atomic Builtins
type __sync_add_and_fetch (type *ptr, type value, ...)
Older versions can use the following inline assembly glue:
# file: atomic.h static inline unsigned int AAF(volatile unsigned long *mem, unsigned long add) { unsigned long __tmp = add; __asm__ __volatile__("lock xaddl %0,%1" :"+r" (add), "+m" (*mem) : : "memory"); return add + __tmp; }
Thus, we can write a new version of our example updating the shared counter atomically using the XADD instruction:
the atomic approach
# file: aaf.c 11 void *worker(void *arg) 12 { 13 int i = 0, iterations = *((int *)arg); 14 for (i = 0; i < iterations; ++i) 15 AAF(&counter, 1); 16 return NULL; 17 }
The resulting program becomes essentially lockfree and makes for a much better use of computing resources. To analyze the performance of both systems, we can create the following script:
1 #!/bin/ksh93 2 for pg in aaf locked; do 3 for ((i=1; $i <= 16; ++i)); do 4 let nops=100000000/$i 5 echo -n "$i $nops " 6 /usr/bin/time -f "%e" ./$pg $i $nops 1> /dev/null 7 done 8 done
This script measures the wall clock time required for the execution of both implementations of the shared-counter program,
with one to sixteen threads, competing for exactly - or close to - 100-million updates of the counter:


We observe the following:
- On each machine, the
atomicimplementation perform much better than themutexbased one - On the 4-way system, with one core, the
mutexbased program has a2.6xpenalty over theatomicone, this penalty goes as high as16.3xwhen 4 threads are used - As seen in fig-3, the
atomicversion completes earlier as physical cores are added, on the other hand, the situation is quiet the opposite for themutexversion.. Hyperthreadingseems to improve the performance of themutexbased program when more threads than the number of physical cores are executing.
To get back to this counter example, many similar updates can be done in a using the XADD family instructions. Here's a sample of what GCC Atomic Builtins
provide:
/* primitives returning 'ptr' value *before* the update */ type __sync_fetch_and_add (type *ptr, type value, ...) type __sync_fetch_and_sub (type *ptr, type value, ...) type __sync_fetch_and_or (type *ptr, type value, ...) type __sync_fetch_and_and (type *ptr, type value, ...) type __sync_fetch_and_xor (type *ptr, type value, ...) type __sync_fetch_and_nand (type *ptr, type value, ...) /* primitives returning 'ptr' value *after* the update */ type __sync_add_and_fetch (type *ptr, type value, ...) type __sync_sub_and_fetch (type *ptr, type value, ...) type __sync_or_and_fetch (type *ptr, type value, ...) type __sync_and_and_fetch (type *ptr, type value, ...) type __sync_xor_and_fetch (type *ptr, type value, ...) type __sync_nand_and_fetch (type *ptr, type value, ...)
A quick note before getting any further on atomic operations, a good way to achieve good parallel performance is to go local when it's possible: while the counter example above has the advantage of being simple enough to explain various ways to update shared resources concurrently, it also has the disadvantage of being exact the kind of problem that does not need concurrent updates:
The local approach
# file: local.c 9 volatile unsigned long counter = 0; 10 11 void *worker(void *arg) 12 { 13 int i = 0, iterations = *((int *)arg); 14 unsigned long *local; 15 local = calloc(1, sizeof(unsigned long)); 16 for (i = 0; i < iterations; ++i) 17 ++*local; 18 return (void *)local; 19 } 20 21 int main(int argc, char **argv) 22 { 23 int i, count; 24 unsigned int *ret; 25 pthread_t *t; 26 if (argc != 3) { 27 fprintf(stderr, "%s <#-threads> <#-iterations>\n", argv[0]); 28 return 1; 29 } 30 count = atoi(argv[2]); 31 t = (pthread_t *)calloc(atoi(argv[1]), sizeof(pthread_t)); 32 /* create # threads */ 33 for (i = 0; i < atoi(argv[1]); ++i) 34 assert(!pthread_create(&(t[i]), NULL, worker, (void *)&count)); 35 /* wait for the completion of all the threads */ 36 for (i = 0; i < atoi(argv[1]); ++i) { 37 assert(!pthread_join(t[i], (void **)&ret)); 38 counter += *ret; 39 free(ret); 40 } 41 /* print counter value */ 42 printf("all thread done -> counter=%ld\n", counter); 43 return 0; 44 }
In this case, as the problem allows, each thread computes a counter local in its own address space at line
17, and results are simply aggregated in counter by the main thread (line 38).
That said, many problems still need shared updates and require more advanced techniques.
Operations as transactions
The most powerful CPU feature available to the programmer for the achievement of lockfree behavior is the
CMPXCHG instruction also know as compare-and-swap, commonly named CAS. In pseudo-code,
it does:
/* atomic */
bool compare_and_swap (unsigned long *mem, unsigned long old, unsigned long new)
{
if (*mem == old) {
*mem = new
return true;
}
return false;
}
Recent gcc versions provide easy access to this instruction through the Atomic Builtins
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
Older versions can use the following inline assembly glue:
# file: atomic.h static inline char CAS(volatile unsigned long *mem, unsigned long old, unsigned long new) { unsigned long r; __asm__ __volatile_-("lock cmpxchgl %k2,%1" : "=a" (r), "+m" (*mem) : "r" (new), "0" (old) : "memory"); return r == old ? 1 : 0; }
In other words, the CAS operation will succeed and update mem to new, if, and only if, the value of mem matches old. Thus, we can write a new implementation of our shared counter
example:
# file: cas.c 11 void *worker(void *arg) 12 { 13 int i = 0, iterations = *((int *)arg); 14 typeof(counter) expectedval; 15 for (i = 0; i < iterations; ++i) { 16 do { 17 expectedval = counter; 18 } while (!CAS(&counter, expectedval, expectedval + 1)); 19 } 20 return NULL; 21 }
At line 17, the modified function stores the value of counter in a local variable
expectedval, then at line 18, it immediately attempts a compare-and-swap
operation on counter. If counter matches the value of expectedval,
the CAS operation will succeed and counter will be set to expectedval + 1;
if it fails - because a concurrent thread updated counter shortly after its value was stored in
expectedval at line 17 - the loop will continue, expectedval will be reset
to the new value of counter and the CAS operation will be retried.
Thus, in this version of the shared counter program, the update of counter is done in
a transactional manner
The ABA problem
The ABA problem can be demonstrated by doing a slightly different implementation of the
shared counter program with CAS operations done with references, rather than with values:
triggering ABA
# file: aba.c 9 volatile unsigned long *curval = NULL; 10 volatile unsigned long threads = 0; 11 12 struct argument { 13 int count; 14 unsigned long *buffer; 15 }; 16 17 void *worker(struct argument *arg) 18 { 19 int i = 0; 20 volatile unsigned long *old; 21 AAF(&threads, 1); 22 for (i = 0; i < arg->count; ++i) { 23 do { 24 old = curval; 25 if (old == arg->buffer && threads > 1) 26 continue; 27 *(arg->buffer) = !old ? 1 : (*old + 1); 28 } while (!CAS((unsigned long *)&curval, (uintptr_t)old, (uintptr_t)arg->buffer)); 29 } 30 AN(&threads, -1); 31 return NULL; 32 } 33 34 int main(int argc, char **argv) 35 { 36 int i, count; 37 pthread_t *t; 38 struct argument *arg; 39 if (argc != 3) { 75 fprintf(stderr, "%s: <nthreads> <iterations>\n", argv[0]); 41 return 1; 42 } 43 count = atoi(argv[2]); 44 t = (pthread_t *)calloc(atoi(argv[1]), sizeof(pthread_t)); 45 /* create # threads */ 46 for (i = 0; i < atoi(argv[1]); ++i) { 47 arg = (struct argument *)calloc(1, 48 sizeof(struct argument) + sizeof(unsigned long)); 49 arg->count = atoi(argv[2]); 50 arg->buffer = (unsigned long *)(arg + 1); 51 assert(!pthread_create(&(t[i]), NULL, (void *(*)(void *))worker, arg)); 52 } 53 /* wait for the completion of all the threads */ 54 for (i = 0; i < atoi(argv[1]); ++i) 55 assert(!pthread_join(t[i], NULL)); 56 /* print counter value */ 57 printf("all thread done -> values=%lu\n", *curval); 58 return 0; 59 } 60
The program is slightly different, instead of having each threads updating a shared counter, we have
a shared pointer curval (line 9), that each thread update with the address
of its own private version of the counter buffer (line 14). Thus, at any
given time, curval holds the address of the latest version of the counter.
To iterate the counter, threads dereference curval to get the latest value of the counter,
and set their own local version to this value plus one (line 27), then at line
28 attempt a compare-and-swap on curval
and try to update it to the address of its own version of the counter buffer. If the
CAS operation fails because another thread updated curval in the meantime,
the code will jump back to line 24 and the whole transaction is retried.
There is also one new thing in this program, an active thread counter threads
(line 10) is updated at line 21 and line 30. This variable allows us
to work around a potential race condition than may appear if one thread manages to complete two iterations of the
outer loop of line 22-29 while none of the other threads managed to complete a single
update of curval: line 25-26 make sure the transaction will continue only if
the current address of curval does not match the address of local version buffer,
unless all the concurrent threads are done with their job.
We can now run the program on a multi-core machine to see if the ABA problem shows up
./aba 1 1000000 all thread done -> values=1000000 ./aba 64 1000000 all thread done -> values=18017949
The counter is inconsistent with more than one thread.
The ABA problem happens when one thread finding that the particular value of a shared variable
is set to A, gets preempted by another thread which sets this variable to B,
then back to A. When the first thread resumes and re-reads the value of the shared variable,
it may wrongly assume that "nothing changed" because the variable is still set to A.
In many circumstances, especially if the the shared variable holds references, the ABA
effect can lead to rather serious inconsistencies. The following scenario applies to our program:
# thread-1 # thread-2 # thread-3
- - -
*(arg->buffer) = 1 . . # T1-buff is set to 1
CAS(&curval, old, arg->buffer); . . # curval:T1-buff, *curval:1
. old = curval; . # old points to T1-buff
. *(arg->buffer) = *old + 1; . # T2-buff is set to 2
. . .
. . .
. . old = curval; # old points to T1-buff
. . *(arg->buffer) = *old + 1 # T3-buff is set to 2
. . CAS(&curval, old, arg->buffer); # curval:T3-buff, *curval:2
old = curval; . . # old points to T3-buff
*(arg->buffer) = *old + 1; . . # T1-buff is set to 3
CAS(&curval, old, arg->buffer); . . # curval:T1-buff, *curval:3
CAS(&curval, old, arg->buffer); . # curval:T1-buff, *curval:2
The classic workaround against ABA based race conditions is based on the double-word-compare-and-swap instruction
(DWCAS), not to be confused with the proposed but never implemented double-compare-and-swap (DCAS).
The term double-word here has to be understood as in twice the architecture word (e.g: not DWORD as in
16-bit), that is 64-bit (8-bytes) on 32-bit architectures, and 128-bit (16-bytes) on
64-bit architectures.
This instruction is known as CMPXCHG8B on 32-bit CPUs
and CMPXCHG16B on 64-bit ones. Recent gcc
versions provide easy access to this instruction through the
Atomic Builtins
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
Note that on 64-bit machines, gcc requires the -mcx16 flag to provide CMPXCHG16B. Older versions can still use the following inline assembly glue:
# file: atomic.h static inline char DWCAS(volatile DWORD *mem, DWORD old, DWORD new) { unsigned long old_h = old >> SHIFT, old_l = old; unsigned long new_h = new >> SHIFT, new_l = new; char r = 0; __asm__ __volatile__("lock; cmpxchg16b (%6);" "setz %7; " : "=a" (old_l), "=d" (old_h) : "0" (old_l), "1" (old_h), "b" (new_l), "c" (new_h), "r" (mem), "m" (r) : "cc", "memory"); return r; }
The DWCAS instruction can help avoiding ABA situations as it allows the atomic manipulation of more than
just a memory address, giving the opportunity for some additional synchronization
We can use DWCAS to fix our program:
avoiding ABA
# file: aba-avoidance.c 9 static volatile DWORD curval = 0; 10 volatile unsigned long threads = 0; 11 12 struct argument { 13 int count; 14 unsigned long *buffer; 15 }; 16 17 struct object { 18 unsigned long *address; 19 unsigned long refcount; 20 }; 21 22 #define OBJECT(obj) ((volatile struct object *)&(obj)) 23 24 void *worker(struct argument *arg) 25 { 26 int i = 1; 27 volatile DWORD expectedval , newval; 28 AAF(&threads, 1); 29 for (i = 0; i < arg->count; ++i) { 30 for (;;) { 31 expectedval = curval; 32 if (OBJECT(expectedval)->address == arg->buffer && threads > 1) 33 continue; 34 *(arg->buffer) = !OBJECT(expectedval)->address ? 1 : 35 *OBJECT(expectedval)->address + 1; 36 OBJECT(newval)->address = arg->buffer; 37 OBJECT(newval)->refcount = OBJECT(curval)->refcount + 1; 38 if (DWCAS(&curval, expectedval, newval)) 39 break; 40 } 41 } 42 AN(&threads, -1); 43 return NULL; 44 }
The logic is exactly the same, except that instead of just storing addresses of arg->buffer into the shared curval
variable, those addresses are mangled into the double-word aligned object data-structure defined at line 17-19.
Each pointer address is now accompanied with an unsigned long serial number refcount used to count the number of updates
that were done to curval (see, line 35), preventing an ABA race on the pointer addresses themselves.
Note that the ABA issue is not eliminated since refcount can wrap, but it makes it unlikely for most workloads.
Running the modified program on a multi-core machine attests the correction of the fix:
./aba-avoidance 64 1000000 all thread done -> curval:64000000
Real world example: fixed size lockfree queue
Like Map-Reduce which can do much more than just counting words, the techniques described above can help doing much more than just managing counters.
Queues are abstract data-types widely found in producer/consumer algorithms. Many good implementations are lockfull
and may be subject to high contention with thousands of concurrent threads adding and consuming data on them. The techniques described
above can be used to implement an fully lockfree array-based queue:
lockfree queue primitives
typedef struct _queue_t *queue_t; queue_t queue_create(size_t); void *queue_dequeue(queue_t); int queue_enqueue(queue_t, void *);
The desired depth of the queue is defined at creation time to queue_create, queue_dequeue returns element
address or NULL when the queue is empty, the address of the element to queue is passed to queue_enqueue which
returns 0 if the operation succeeded, and 1 if the queue is full.
internal data-structures, initialization
# file: lfq.h 31 #define E(val) ((element_t *)&val) 32 33 typedef struct { 34 unsigned long ptr; 35 unsigned long ref; 36 } element_t; 37 38 struct _queue_t { 39 size_t depth; 40 element_t *e; 41 unsigned long rear; 42 unsigned long front; 43 }; 44 45 queue_t queue_create(size_t depth) 46 { 47 queue_t q = (queue_t)malloc(sizeof(struct _queue_t)); 48 if (q) { 49 q->depth = depth; 50 q->rear = q->front = 0; 51 q->e = (element_t *)calloc(depth, sizeof(element_t)); 52 } 53 return q; 54 }
element_t is the double-word aligned structure that will hold the address of elements enqueued and a count
of updates to the slots.
queue_t contains the queue metadata with the capacity in depth, boundary information in rear
and front, and e is the contiguous storage for up to depth elements.
the enqueue algorithm
# file: lfq.c e56 int queue_enqueue(volatile queue_t q, void *data) e57 { e58 DWORD old, new; e59 unsigned long rear, front; e60 do { e61 rear = q->rear; e62 old = *((DWORD *)&(q->e[rear % q->depth])); e63 front = q->front; e64 if (rear != q->rear) e65 continue; e66 if (rear == (q->front + q->depth)) { e67 if (q->e[front % q->depth].ptr && front == q->front) e68 return 0; e69 CAS(&(q->front), front, front + 1); e70 continue; e71 } e72 if (!E(old)->ptr) { e73 E(new)->ptr = (uintptr_t)data; e74 E(new)->ref = ((element_t *)&old)->ref + 1; e75 if (DWCAS((DWORD *)&(q->e[rear % q->depth]), old, new)) { e76 CAS(&(q->rear), rear, rear + 1); e77 return 1; e78 } e79 } else if (q->e[rear % q->depth].ptr) e80 CAS(&(q->rear), rear, rear + 1); e81 } while(1); e82 }
The transaction starts at line e61. A snapshot of q->rear is taken, followed by a full
copy of the element at rear (line e62). At line 64, a complimentary
consistency check of the snapshotted version of rear is done: this is a non-mandatory optimization.
At line e66, the verification of whether the queue is full is started - it "starts" because the current value
of q->front is not enough to conclude that the queue is full: in fact, a concurrent thread might just be
about to complete a successful dequeue at line d104, in which case q->front
would be lagging behind and the queue has in fact one free slot. Thus, the two checks of line e67 are
used to make sure the queue is full, in which case 0 is returned at line e68
The code at line e69 is another non-mandatory optimization in case q->front is known as lagging
behind: the thread will simply help other threads upping q->front before jumping back to e60
and retry the whole transaction.
At line e72, a consistency check somehow similar to the one of e67 verifies whether the slot's
ptr is non-null to makes sure that a concurrent thread which completed e75 when the slot
that was snapshotted at e62 does not result in q->rear lagging behind,
in which case the thread does an extra consistency check at line e79 before helping increment
q->rear at line e80 and jump back to e60 to retry the transaction.
If the test of line e72 succeeds, data is copied in the 128-bit word in stack memory
(line e73-e74) and the DWCAS operation is attempted at line e75. If it succeeds, the thread
attempts to up q->rear and returns 1; if it fails, the whole set of operations is
restarted at line e60.
You will notice that the status of all CAS operations of lines e69, e76
and e80 are never verified. This is the expected behavior as this algorithm makes sure that any
"lagging" of q->rear or q->front witnessed, due to concurrent threads not done on
e76 or d104 will attempt try to "help" by doing a CAS in a lazy way.
the dequeue algorithm
# file: lfq.c d84 void *queue_dequeue(volatile queue_t q) d85 { d86 DWORD old, new; d87 unsigned long front, rear; d88 do { d89 front = q->front; d90 old = *((DWORD *)&(q->e[front % q->depth])); d91 rear = q->rear; d92 if (front != q->front) d93 continue; d94 if (front == q->rear) { d95 if (!q->e[rear % q->depth].ptr && rear == q->rear) d96 return NULL; d97 CAS(&(q->rear), rear, rear + 1); d98 continue; d99 } d100 if (E(old)->ptr) { d101 E(new)->ptr = 0; d102 E(new)->ref = E(old)->ref + 1; d103 if (DWCAS((DWORD *)&(q->e[front % q->depth]), old, new)) { d104 CAS(&(q->front), front, front + 1); d105 return (void *)((element_t *)&old)->ptr; d106 } d107 } else if (!q->e[front % q->depth].ptr) d108 CAS(&(q->front), front, front + 1); d109 } while(1); d110 }
The dequeue operation is exactly symetric to the enqueue.
To verify the correctness of the queue implementation, the following program spawns an even number of
worker threads, with the first half producing content and pushing it with queue_enqueue,
and the other half consuming it with queue_dequeue.
verification program
# file: test.c 34 static size_t iterations; 35 static volatile unsigned long input = 0; 36 static volatile unsigned long output = 0; 37 38 void *producer(void *arg) 39 { 40 int i = 0; 41 unsigned long *ptr; 42 queue_t q = (queue_t)arg; 43 for (i = 0; i < iterations; ++i) { 44 ptr = (unsigned long *)malloc(sizeof(unsigned long)); 45 assert(ptr); 46 *ptr = AAF(&input, 1); 47 while (!queue_enqueue(q, (void *)ptr)) 48 ; 49 } 50 return NULL; 51 } 52 53 void *consumer(void *arg) 54 { 55 int i = 0; 56 unsigned long *ptr; 57 queue_t q = (queue_t)arg; 58 for (i = 0; i < iterations; ++i) { 59 while (!(ptr = (unsigned long *)queue_dequeue(q))) 60 ; 61 AAF(&output, *ptr); 62 *ptr = 0; 63 free((void *)ptr); 64 } 65 return NULL; 66 } 67 68 int main(int argc, char **argv) 69 { 70 int i; 71 pthread_t *t; 72 unsigned long verif = 0; 73 queue_t q = queue_create(2); 74 if (argc != 3) { 75 fprintf(stderr, "%s: <nthreads> <iterations>\n", argv[0]); 76 return 1; 77 } 78 if (atoi(argv[1]) % 2) { 79 fprintf(stderr, "%s: need an even number of threads\n", argv[0]); 80 return 1; 81 } 82 t = (pthread_t *)calloc(atoi(argv[1]), sizeof(pthread_t)); 83 iterations = atoi(argv[2]); 84 for (i = 0; i < atoi(argv[1]); ++i) 85 assert(!pthread_create(&(t[i]), NULL, 86 (i % 2) ? consumer : producer, q)); 87 for (i = 0; i < atoi(argv[1]); ++i) 88 assert(!pthread_join(t[i], NULL)); 89 for (i = 0; i <= input; ++i) 90 verif += i; 91 printf("input SUM[0..%lu]=%lu output=%lu\n", input, verif, output); 92 return 0; 93 }
The content produced are numbers between between 1 and iterations * threads / 2
(line 44-46).
Threads maintain two shared counters:
inputline34, is upped by1each time an enqueue operation was completed successfully (line46).outputline35, is upped at line61by the value ofptrthat was just dequeued at line59.
When all threads complete, the main thread computes the sum of all the numbers between 1 and
input to assert the correct value of output.
Running the test on multi-core machine attests the implementation is correct:
$ ./lfq-test 64 1000000 input SUM[0..32000000]=512000016000000 output=512000016000000
References
Most of the examples cited above can be found here here or downloaded as a bundle here
there. Those examples have been verified in 64-bit and 32-bit
mode on Linux and Darwin. If you run the latter with an old version of gcc without support
for Atomic Builtins
, you'll most likely need to disable PIC with the -mdynamic-no-pic
flag to compile the inline assembly.
- [1] Linux kernel's 'atomic' header for x86. kernel.org
- [2] Intel(c) 64 and IA-32 Architecture Software Developer's Manual, Volume 3A: System Programming Guide Part 1. Intel
- [3] Intel(c) 64 and IA-32 Architecture Software Developer's Manual Volume 2A: Instruction Set Reference, A-M. Intel
- [4] A practical nonblocking queue algorithm using compare-and-swap. Chien-Hua Shann; Ting-Lu Huang; Cheng Chen
- [5] Formal Verification of an array-based nonblocking queue. Colvin, R; Groves, L
- [6] Dmitriy V'jukov posts Intel Software Network
Feel free to email me for any comment or suggestion
