Cayos Cochinos

contact

Julien Benoist <julien@benoist.name>
285 Grand View avenue,
San Francisco, CA 94114
cell: +1 415-238-7908

projects

libko
spamanalysis
random software
lockfree computing

ese's

A B C D E

Introduction

For years, Moore's law for microprocessors - that is, the trend in terms of transistors density for the money - consistently meant higher clock rates. A period of three years and a half pretty much meant, twice the clock rate and twice the cache for the same money.

While CPU manufacturers still deliver microprocessors with an increasing number of transistors, clock rates clearly stabilized a few year ago, somewhere around 3 Ghz, due to growing power dissipation issues. The new trend is towards higher parallelism through the manufacturing of processors with multiple cores, and multiple threading units. Besides, it might go beyond the scope of this very document, but I wish this industry would also focus at addressing existing power dissipation issues and aim towards better overall power efficiency. In fact, if you have not purchased a computer since the 90s, you might be shocked to see the kind of average PSU wattage and cooling infrastructure you will need if you intend to assemble a computer today.

Application level parallelism

While the hardware industry has embraced parallelism, the software industry is just beginning: operating systems have been featuring multitasking for decades and do provide multithreading APIs, but parallelism is still a long way from being a reality at the applications level. There are a few reasons:

That said, there are a lot of multithreaded applications today, and most of them deal with concurrency issues using the same techniques. This page introduces other alternatives using examples in the C language for simplicity.

The atomicity issue

The biggest problem in multithreaded programming is the non-atomic nature of a lot of operations we've been used to do with the main programming languages out there. Most of those languages were designed at a time where applications were mainly single-threaded and feature no, or poor support for atomic operations.
The easiest way to explain the problem is to write a simple program spawning multiple threads which concurrently increment a global counter.

the thread unsafe approach
# file: normal.c

 6 volatile unsigned long counter = 0;
 7	
 8 void *worker(void *arg)
 9 {
10	int i = 0, iterations = *((int *)arg);
11	for (i = 0; i < iterations); ++i)  
12		++counter;
13	return NULL;
14 }
15	
16 int main(int argc, char **argv)
17 {
18	int i, count;
19	pthread_t *t;
20	if (argc != 3) {
75 		fprintf(stderr, "%s: <nthreads> <iterations>\n", argv[0]);
22		return 1;
23	}
24	count = atoi(argv[2]);
25	/* create # threads */
26	t = (pthread_t *)calloc(atoi(argv[1]), sizeof(pthread_t));
27	for (i = 0; i < atoi(argv[1]); ++i) 
28		assert(!pthread_create(&(t[i]), NULL, worker, (void *)&count));
29	/* wait for the completion of all the threads */
30	for (i = 0; i < atoi(argv[1]); ++i)
31		assert(!pthread_join(t[i], NULL));
32	/* print counter value */
33	printf("all thread done -> counter=%lu\n", counter);
34	return 0;
35 }

We can run it on a single-core machine for a million increments with 1 and then 64 worker threads:

$ ./normal 1 1000000
all thread done -> counter=1000000
$ ./normal 64 1000000
all thread done -> counter=7546383

This program will not get the counter right with more than one worker thread. Contrary to popular belief, even on single-core architectures, whatever the type of the counter. A widespread confusion with regards to atomicity is the belief that C types such as sig_atomic_t can guarantee atomicity: they don't, the only guarantee about those atomic types is that they will never be written to or read partially in the presence of asynchronous signals.

# file: normal.c

 6 volatile unsigned long counter = 0;
 7	
 8 void *worker(void *arg)
 9 {
10	int i = 0, iterations = *((int *)arg);
11	for (i = 0; i < iterations; ++i)  
12		++counter;
13	return NULL;
14 }

The operation at line 12 is non-atomic. A look at the disassembled function:

000000000040075c <worker>:
40075c:       48 89 7c 24 e8          mov    %rdi,-0x18(%rsp)
400761:       c7 44 24 fc 00 00 00    movl   $0x0,-0x4(%rsp)
400768:       00 
400769:       c7 44 24 fc 00 00 00    movl   $0x0,-0x4(%rsp)
400770:       00 
400771:       eb 17                   jmp    40078a <worker+0x2e>
400773:       48 8b 05 76 06 20 00    mov    0x200676(%rip),%rax        # 600df0 <counter>
40077a:       48 83 c0 01             add    $0x1,%rax
40077e:       48 89 05 6b 06 20 00    mov    %rax,0x20066b(%rip)        # 600df0 <counter>
400785:       83 44 24 fc 01          addl   $0x1,-0x4(%rsp)
40078a:       48 8b 44 24 e8          mov    -0x18(%rsp),%rax
40078f:       8b 00                   mov    (%rax),%eax
400791:       3b 44 24 fc             cmp    -0x4(%rsp),%eax
400795:       7f dc                   jg     400773 >worker+0x17<
400797:       b8 00 00 00 00          mov    $0x0,%eax
40079c:       c3                      retq   

Clearly shows that this one-liner C operation actually leads to 3 CPU instructions

Leading to potential inconsistency scenarios such as:

# thread-0                # thread-1
-                         -
MOV counter,%rax                              # %rax has a value of 0 in T0
..                        MOV counter,%rax    # %rax has a value of 0 in T1
ADD 0x01,%rax             .. 
..			  ADD 0x01,%rax       # %rax has value 1
MOV %rax,counter          ..
                          MOV %rax,counter    # counter has value 1

Most multithreaded applications will solve this problem by making this operation a critical sections:

the critical section approach
# file: locked.c

 6 volatile unsigned long counter = 0;
 7 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 8	
 9 void *worker(void *arg)
10 {
11	int i = 0, iterations = *((int *)arg);
12	for (i = 0; i < iterations; ++i)  {
13		pthread_mutex_lock(&mutex);	/* BEGIN critical section */
14		++counter;
15		pthread_mutex_unlock(&mutex);	/* END critical section */
16	}
17	return NULL;
18 }
19	

Synchronizing the counter update operation with a mutual exclusion primitive guarantees only one thread may execute the code at once.

Critical sections have several attractive aspects:

But implie the following flipside:

CPUs have provided the basis for software lockfree behavior for a while; although, while operating systems code heavily relies on those techniques, those features are not commonly found at the application level. The biggest reason being that major programming languages were designed for use cases scenarios where atomicity doesn't matter and therefore didn't expose atomic primitives, the only way to manipulate those was to deal with low-level assembly.
Things are changing with the growing need for better parallel software, programming environments are starting to ease the programming of lockfree applications through the exposition of atomic primitives:

Intel has provided an XADD instruction that can solve our above problem in a lockfree manner since the release of the 80486. The XADD instruction can be used to implement an atomic add-and-fetch function that obeys the following pseudo-code:

/* atomic */
unsigned long add_and_fetch(unsigned long *addr, unsigned long add)
{ 
	*addr += add; 
	return *addr; 
}

Recent gcc versions provide easy access to this instruction through the Atomic Builtins

type __sync_add_and_fetch (type *ptr, type value, ...)

Older versions can use the following inline assembly glue:

# file: atomic.h

static inline unsigned int AAF(volatile unsigned long *mem, unsigned long add)
{
	unsigned long __tmp = add;
	__asm__ __volatile__("lock xaddl %0,%1"
			:"+r" (add),
			"+m" (*mem)
			: : "memory");
	return add + __tmp;
}

Thus, we can write a new version of our example updating the shared counter atomically using the XADD instruction:

the atomic approach
# file: aaf.c

11 void *worker(void *arg)
12 {
13	int i = 0, iterations = *((int *)arg);
14	for (i = 0; i < iterations; ++i)  
15		AAF(&counter, 1);
16	return NULL;
17 }

The resulting program becomes essentially lockfree and makes for a much better use of computing resources. To analyze the performance of both systems, we can create the following script:

1 #!/bin/ksh93
2 for pg in aaf locked; do
3	for ((i=1; $i <= 16; ++i)); do
4		let nops=100000000/$i
5 		echo -n "$i $nops "
6		/usr/bin/time -f "%e" ./$pg $i $nops 1> /dev/null
7 	done
8 done

This script measures the wall clock time required for the execution of both implementations of the shared-counter program, with one to sixteen threads, competing for exactly - or close to - 100-million updates of the counter:

Figure-1
Figure-2 Figure-3

We observe the following:

To get back to this counter example, many similar updates can be done in a using the XADD family instructions. Here's a sample of what GCC Atomic Builtins provide:

/* primitives returning 'ptr' value *before* the update */
type __sync_fetch_and_add (type *ptr, type value, ...)
type __sync_fetch_and_sub (type *ptr, type value, ...)
type __sync_fetch_and_or (type *ptr, type value, ...)
type __sync_fetch_and_and (type *ptr, type value, ...)
type __sync_fetch_and_xor (type *ptr, type value, ...)
type __sync_fetch_and_nand (type *ptr, type value, ...)

/* primitives returning 'ptr' value *after* the update */
type __sync_add_and_fetch (type *ptr, type value, ...)
type __sync_sub_and_fetch (type *ptr, type value, ...)
type __sync_or_and_fetch (type *ptr, type value, ...)
type __sync_and_and_fetch (type *ptr, type value, ...)
type __sync_xor_and_fetch (type *ptr, type value, ...)
type __sync_nand_and_fetch (type *ptr, type value, ...)

A quick note before getting any further on atomic operations, a good way to achieve good parallel performance is to go local when it's possible: while the counter example above has the advantage of being simple enough to explain various ways to update shared resources concurrently, it also has the disadvantage of being exact the kind of problem that does not need concurrent updates:

The local approach
# file: local.c

  9 volatile unsigned long counter = 0;
 10 
 11 void *worker(void *arg)
 12 {
 13	int i = 0, iterations = *((int *)arg);
 14 	unsigned long *local;
 15 	local = calloc(1, sizeof(unsigned long));
 16 	for (i = 0; i < iterations; ++i)  
 17 		++*local;
 18 	return (void *)local;
 19 }
 20 
 21 int main(int argc, char **argv)
 22 {
 23 	int i, count;
 24 	unsigned int *ret;
 25 	pthread_t *t;
 26 	if (argc != 3) {
 27 		fprintf(stderr, "%s <#-threads> <#-iterations>\n", argv[0]);
 28 		return 1;
 29 	}
 30 	count = atoi(argv[2]);
 31 	t = (pthread_t *)calloc(atoi(argv[1]), sizeof(pthread_t));
 32 	/* create # threads */
 33 	for (i = 0; i < atoi(argv[1]); ++i) 
 34 		assert(!pthread_create(&(t[i]), NULL, worker, (void *)&count));
 35 	/* wait for the completion of all the threads */
 36 	for (i = 0; i < atoi(argv[1]); ++i) {
 37 		assert(!pthread_join(t[i], (void **)&ret));
 38 		counter += *ret;
 39 		free(ret);
 40 	}
 41 	/* print counter value */
 42 	printf("all thread done -> counter=%ld\n", counter);
 43 	return 0;
 44 }

In this case, as the problem allows, each thread computes a counter local in its own address space at line 17, and results are simply aggregated in counter by the main thread (line 38).

That said, many problems still need shared updates and require more advanced techniques.

Operations as transactions

The most powerful CPU feature available to the programmer for the achievement of lockfree behavior is the CMPXCHG instruction also know as compare-and-swap, commonly named CAS. In pseudo-code, it does:

/* atomic */
bool compare_and_swap (unsigned long *mem, unsigned long old, unsigned long new)
{
	if (*mem == old) {
		*mem = new
		return true;
	}
	return false;
}

Recent gcc versions provide easy access to this instruction through the Atomic Builtins

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)

Older versions can use the following inline assembly glue:

# file: atomic.h

static inline char CAS(volatile unsigned long *mem, unsigned long old, unsigned long new)
{
	unsigned long r;
	__asm__ __volatile_-("lock cmpxchgl %k2,%1"
			: "=a" (r), "+m" (*mem)
			: "r" (new), "0" (old)
			: "memory");
	return r == old ? 1 : 0;
}

In other words, the CAS operation will succeed and update mem to new, if, and only if, the value of mem matches old. Thus, we can write a new implementation of our shared counter example:

# file: cas.c

11 void *worker(void *arg)
12 {
13	int i = 0, iterations = *((int *)arg);
14	typeof(counter) expectedval;
15	for (i = 0; i < iterations; ++i)  {
16		do {
17 			expectedval = counter;
18 		} while (!CAS(&counter, expectedval, expectedval + 1));
19 	}
20	return NULL;
21 }

At line 17, the modified function stores the value of counter in a local variable expectedval, then at line 18, it immediately attempts a compare-and-swap operation on counter. If counter matches the value of expectedval, the CAS operation will succeed and counter will be set to expectedval + 1; if it fails - because a concurrent thread updated counter shortly after its value was stored in expectedval at line 17 - the loop will continue, expectedval will be reset to the new value of counter and the CAS operation will be retried.
Thus, in this version of the shared counter program, the update of counter is done in a transactional manner

The ABA problem

The ABA problem can be demonstrated by doing a slightly different implementation of the shared counter program with CAS operations done with references, rather than with values:

triggering ABA
# file: aba.c

 9 volatile unsigned long *curval = NULL;
10 volatile unsigned long threads = 0;
11 
12 struct argument {
13 	int count;
14 	unsigned long *buffer;
15 };
16 
17 void *worker(struct argument *arg)
18 {
19 	int i = 0;
20 	volatile unsigned long *old;
21 	AAF(&threads, 1);
22 	for (i = 0; i < arg->count; ++i)  {
23 		do { 
24 			old = curval;
25 			if (old == arg->buffer && threads > 1) 
26 				continue;
27 			*(arg->buffer) = !old ? 1 : (*old + 1);
28 		} while (!CAS((unsigned long *)&curval, (uintptr_t)old, (uintptr_t)arg->buffer));
29 	}
30 	AN(&threads, -1);
31 	return NULL;
32 }
33 
34 int main(int argc, char **argv)
35 {
36 	int i, count;
37 	pthread_t *t;
38 	struct argument *arg;
39 	if (argc != 3) {
75 		fprintf(stderr, "%s: <nthreads> <iterations>\n", argv[0]);
41 		return 1;
42 	}
43 	count = atoi(argv[2]);
44 	t = (pthread_t *)calloc(atoi(argv[1]), sizeof(pthread_t));
45 	/* create # threads */
46 	for (i = 0; i < atoi(argv[1]); ++i) {
47 		arg = (struct argument *)calloc(1, 
48 			sizeof(struct argument) + sizeof(unsigned long));
49 		arg->count = atoi(argv[2]);
50 		arg->buffer = (unsigned long *)(arg + 1);
51 		assert(!pthread_create(&(t[i]), NULL, (void *(*)(void *))worker, arg));
52 	}
53 	/* wait for the completion of all the threads */
54 	for (i = 0; i < atoi(argv[1]); ++i)
55 		assert(!pthread_join(t[i], NULL));
56 	/* print counter value */
57 	printf("all thread done -> values=%lu\n", *curval);
58 	return 0;
59 }
60 

The program is slightly different, instead of having each threads updating a shared counter, we have a shared pointer curval (line 9), that each thread update with the address of its own private version of the counter buffer (line 14). Thus, at any given time, curval holds the address of the latest version of the counter.
To iterate the counter, threads dereference curval to get the latest value of the counter, and set their own local version to this value plus one (line 27), then at line 28 attempt a compare-and-swap on curval and try to update it to the address of its own version of the counter buffer. If the CAS operation fails because another thread updated curval in the meantime, the code will jump back to line 24 and the whole transaction is retried.
There is also one new thing in this program, an active thread counter threads (line 10) is updated at line 21 and line 30. This variable allows us to work around a potential race condition than may appear if one thread manages to complete two iterations of the outer loop of line 22-29 while none of the other threads managed to complete a single update of curval: line 25-26 make sure the transaction will continue only if the current address of curval does not match the address of local version buffer, unless all the concurrent threads are done with their job.

We can now run the program on a multi-core machine to see if the ABA problem shows up

./aba 1 1000000
all thread done -> values=1000000
./aba 64 1000000
all thread done -> values=18017949

The counter is inconsistent with more than one thread.
The ABA problem happens when one thread finding that the particular value of a shared variable is set to A, gets preempted by another thread which sets this variable to B, then back to A. When the first thread resumes and re-reads the value of the shared variable, it may wrongly assume that "nothing changed" because the variable is still set to A.
In many circumstances, especially if the the shared variable holds references, the ABA effect can lead to rather serious inconsistencies. The following scenario applies to our program:

# thread-1                        # thread-2                        # thread-3
-                                 -                                 -
*(arg->buffer) = 1                .                                 .                                 # T1-buff is set to 1
CAS(&curval, old, arg->buffer);   .                                 .                                 # curval:T1-buff, *curval:1
.                                 old = curval;                     .                                 # old points to T1-buff
.                                 *(arg->buffer) = *old + 1;        .                                 # T2-buff is set to 2
.                                 .                                 .
.                                 .                                 .
.                                 .                                 old = curval;                     # old points to T1-buff
.                                 .                                 *(arg->buffer) = *old + 1         # T3-buff is set to 2
.                                 .                                 CAS(&curval, old, arg->buffer);   # curval:T3-buff, *curval:2
old = curval;                     .                                 .                                 # old points to T3-buff
*(arg->buffer) = *old + 1;        .                                 .                                 # T1-buff is set to 3
CAS(&curval, old, arg->buffer);   .                                 .                                 # curval:T1-buff, *curval:3
                                  CAS(&curval, old, arg->buffer);   .                                 # curval:T1-buff, *curval:2

The classic workaround against ABA based race conditions is based on the double-word-compare-and-swap instruction (DWCAS), not to be confused with the proposed but never implemented double-compare-and-swap (DCAS). The term double-word here has to be understood as in twice the architecture word (e.g: not DWORD as in 16-bit), that is 64-bit (8-bytes) on 32-bit architectures, and 128-bit (16-bytes) on 64-bit architectures.

This instruction is known as CMPXCHG8B on 32-bit CPUs and CMPXCHG16B on 64-bit ones. Recent gcc versions provide easy access to this instruction through the Atomic Builtins

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)

Note that on 64-bit machines, gcc requires the -mcx16 flag to provide CMPXCHG16B. Older versions can still use the following inline assembly glue:

# file: atomic.h

static inline char DWCAS(volatile DWORD *mem, DWORD old, DWORD new)
{
	unsigned long old_h = old >> SHIFT, old_l = old;
	unsigned long new_h = new >> SHIFT, new_l = new;

	char r = 0;
	__asm__ __volatile__("lock; cmpxchg16b (%6);"
		     "setz %7; "
		     : "=a" (old_l),
		       "=d" (old_h)
		     : "0" (old_l),
		       "1" (old_h),
		       "b" (new_l),
		       "c" (new_h),
		       "r" (mem),
		       "m" (r)
		     : "cc", "memory");
	return r;
}

The DWCAS instruction can help avoiding ABA situations as it allows the atomic manipulation of more than just a memory address, giving the opportunity for some additional synchronization
We can use DWCAS to fix our program:

avoiding ABA
# file: aba-avoidance.c

 9 static volatile DWORD curval = 0;	
10 volatile unsigned long threads = 0;
11 
12 struct argument {
13 	int count;
14 	unsigned long *buffer;
15 };
16 
17 struct object {
18 	unsigned long *address;
19 	unsigned long refcount;
20 };
21 
22 #define OBJECT(obj) ((volatile struct object *)&(obj))
23 
24 void *worker(struct argument *arg)
25 {
26 	int i = 1;
27 	volatile DWORD expectedval , newval;
28 	AAF(&threads, 1);
29 	for (i = 0; i < arg->count; ++i)  {
30 		for (;;) {
31 			expectedval = curval;
32 			if (OBJECT(expectedval)->address == arg->buffer && threads > 1)
33 				continue;
34 			*(arg->buffer) =  !OBJECT(expectedval)->address ? 1 :
35 				*OBJECT(expectedval)->address + 1;
36 			OBJECT(newval)->address = arg->buffer;
37 			OBJECT(newval)->refcount = OBJECT(curval)->refcount + 1;
38 			if (DWCAS(&curval, expectedval, newval))
39 				break;
40 		} 
41 	}
42 	AN(&threads, -1);
43 	return NULL;
44 }

The logic is exactly the same, except that instead of just storing addresses of arg->buffer into the shared curval variable, those addresses are mangled into the double-word aligned object data-structure defined at line 17-19. Each pointer address is now accompanied with an unsigned long serial number refcount used to count the number of updates that were done to curval (see, line 35), preventing an ABA race on the pointer addresses themselves.
Note that the ABA issue is not eliminated since refcount can wrap, but it makes it unlikely for most workloads.

Running the modified program on a multi-core machine attests the correction of the fix:

 ./aba-avoidance 64 1000000   
 all thread done -> curval:64000000 

Real world example: fixed size lockfree queue

Like Map-Reduce which can do much more than just counting words, the techniques described above can help doing much more than just managing counters.

Queues are abstract data-types widely found in producer/consumer algorithms. Many good implementations are lockfull and may be subject to high contention with thousands of concurrent threads adding and consuming data on them. The techniques described above can be used to implement an fully lockfree array-based queue:

lockfree queue primitives
typedef struct _queue_t *queue_t;
queue_t queue_create(size_t);
void *queue_dequeue(queue_t);
int queue_enqueue(queue_t, void *);

The desired depth of the queue is defined at creation time to queue_create, queue_dequeue returns element address or NULL when the queue is empty, the address of the element to queue is passed to queue_enqueue which returns 0 if the operation succeeded, and 1 if the queue is full.

internal data-structures, initialization
# file: lfq.h

31 #define E(val) ((element_t *)&val)
32 
33 typedef struct {
34 	unsigned long ptr;
35 	unsigned long ref;
36 } element_t; 
37 
38 struct _queue_t {
39 	size_t depth;
40 	element_t *e;
41 	unsigned long rear;
42 	unsigned long front;
43 };
44 
45 queue_t queue_create(size_t depth)
46 {
47 	queue_t q = (queue_t)malloc(sizeof(struct _queue_t));
48 	if (q) {
49 		q->depth = depth;
50 		q->rear = q->front = 0;
51 		q->e = (element_t *)calloc(depth, sizeof(element_t));
52 	}
53 	return q;
54 }

element_t is the double-word aligned structure that will hold the address of elements enqueued and a count of updates to the slots.
queue_t contains the queue metadata with the capacity in depth, boundary information in rear and front, and e is the contiguous storage for up to depth elements.

the enqueue algorithm
# file: lfq.c

e56 int queue_enqueue(volatile queue_t q, void *data)
e57 {
e58 	DWORD old, new;
e59 	unsigned long rear, front;
e60 	do {
e61 		rear = q->rear;	
e62 		old = *((DWORD *)&(q->e[rear % q->depth]));
e63 		front = q->front;
e64 		if (rear != q->rear) 
e65 			continue;
e66 		if (rear == (q->front + q->depth))  {
e67 			if (q->e[front % q->depth].ptr && front == q->front) 
e68 				return 0;
e69 			CAS(&(q->front), front, front + 1);
e70 			continue;
e71 		}
e72 		if (!E(old)->ptr) {
e73 			E(new)->ptr = (uintptr_t)data;
e74 			E(new)->ref  = ((element_t *)&old)->ref + 1;
e75 			if (DWCAS((DWORD *)&(q->e[rear % q->depth]), old, new)) {
e76 				CAS(&(q->rear), rear, rear + 1);
e77 				return 1;
e78 			}
e79 		} else if (q->e[rear % q->depth].ptr)
e80 			CAS(&(q->rear), rear, rear + 1);
e81 	} while(1);
e82 }

The transaction starts at line e61. A snapshot of q->rear is taken, followed by a full copy of the element at rear (line e62). At line 64, a complimentary consistency check of the snapshotted version of rear is done: this is a non-mandatory optimization.
At line e66, the verification of whether the queue is full is started - it "starts" because the current value of q->front is not enough to conclude that the queue is full: in fact, a concurrent thread might just be about to complete a successful dequeue at line d104, in which case q->front would be lagging behind and the queue has in fact one free slot. Thus, the two checks of line e67 are used to make sure the queue is full, in which case 0 is returned at line e68
The code at line e69 is another non-mandatory optimization in case q->front is known as lagging behind: the thread will simply help other threads upping q->front before jumping back to e60 and retry the whole transaction.
At line e72, a consistency check somehow similar to the one of e67 verifies whether the slot's ptr is non-null to makes sure that a concurrent thread which completed e75 when the slot that was snapshotted at e62 does not result in q->rear lagging behind, in which case the thread does an extra consistency check at line e79 before helping increment q->rear at line e80 and jump back to e60 to retry the transaction.
If the test of line e72 succeeds, data is copied in the 128-bit word in stack memory (line e73-e74) and the DWCAS operation is attempted at line e75. If it succeeds, the thread attempts to up q->rear and returns 1; if it fails, the whole set of operations is restarted at line e60.
You will notice that the status of all CAS operations of lines e69, e76 and e80 are never verified. This is the expected behavior as this algorithm makes sure that any "lagging" of q->rear or q->front witnessed, due to concurrent threads not done on e76 or d104 will attempt try to "help" by doing a CAS in a lazy way.

the dequeue algorithm
# file: lfq.c

 d84 void *queue_dequeue(volatile queue_t q)
 d85 {
 d86 	DWORD old, new;
 d87 	unsigned long front, rear;
 d88 	do {
 d89 		front = q->front;
 d90 		old = *((DWORD *)&(q->e[front % q->depth]));
 d91 		rear = q->rear;
 d92 		if (front != q->front) 
 d93 			continue;
 d94 		if (front == q->rear) {
 d95 			if (!q->e[rear % q->depth].ptr && rear == q->rear)
 d96 				return NULL;
 d97 			CAS(&(q->rear), rear, rear + 1);
 d98 			continue;
 d99 		}
d100 		if (E(old)->ptr) {
d101 			E(new)->ptr = 0;
d102 			E(new)->ref = E(old)->ref + 1;
d103 			if (DWCAS((DWORD *)&(q->e[front % q->depth]), old, new)) {
d104 				CAS(&(q->front), front, front + 1);
d105 				return (void *)((element_t *)&old)->ptr;
d106 			}
d107 		} else if (!q->e[front % q->depth].ptr)
d108 			CAS(&(q->front), front, front + 1);
d109 	} while(1);
d110 }

The dequeue operation is exactly symetric to the enqueue.

To verify the correctness of the queue implementation, the following program spawns an even number of worker threads, with the first half producing content and pushing it with queue_enqueue, and the other half consuming it with queue_dequeue.

verification program
# file: test.c

34 static size_t iterations;
35 static volatile unsigned long input = 0;
36 static volatile unsigned long output = 0;
37 
38 void *producer(void *arg)
39 {
40 	int i = 0;
41 	unsigned long *ptr;
42 	queue_t q = (queue_t)arg;
43 	for (i = 0; i < iterations; ++i) {
44 		ptr = (unsigned long *)malloc(sizeof(unsigned long));
45 		assert(ptr);
46 		*ptr = AAF(&input, 1);
47 		while (!queue_enqueue(q, (void *)ptr)) 
48 			;
49 	}
50 	return NULL;
51 }
52 
53 void *consumer(void *arg)
54 {
55 	int i = 0;
56 	unsigned long *ptr;
57 	queue_t q = (queue_t)arg;
58 	for (i = 0; i < iterations; ++i) {
59 		while (!(ptr = (unsigned long *)queue_dequeue(q)))
60 			;
61 		AAF(&output, *ptr);
62 		*ptr = 0;
63 		free((void *)ptr);
64 	}
65 	return NULL;
66 }
67 
68 int main(int argc, char **argv)
69 {
70 	int i;
71 	pthread_t *t;
72 	unsigned long verif = 0;
73 	queue_t q = queue_create(2);
74 	if (argc != 3) {
75 		fprintf(stderr, "%s: <nthreads> <iterations>\n", argv[0]);
76 		return 1;
77 	} 
78 	if (atoi(argv[1]) % 2) {
79 		fprintf(stderr, "%s: need an even number of threads\n", argv[0]);
80 		return 1;
81 	}
82 	t = (pthread_t *)calloc(atoi(argv[1]), sizeof(pthread_t));
83 	iterations = atoi(argv[2]);
84 	for (i = 0; i < atoi(argv[1]); ++i) 
85 		assert(!pthread_create(&(t[i]), NULL, 
86 			(i % 2) ? consumer : producer, q));
87 	for (i = 0; i < atoi(argv[1]); ++i) 
88 		assert(!pthread_join(t[i], NULL));
89 	for (i = 0; i <= input; ++i)
90 		verif += i;
91 	printf("input SUM[0..%lu]=%lu output=%lu\n", input, verif, output);
92 	return 0;
93 }

The content produced are numbers between between 1 and iterations * threads / 2 (line 44-46).
Threads maintain two shared counters:

When all threads complete, the main thread computes the sum of all the numbers between 1 and input to assert the correct value of output.

Running the test on multi-core machine attests the implementation is correct:

$ ./lfq-test 64 1000000  
input SUM[0..32000000]=512000016000000 output=512000016000000

References

Most of the examples cited above can be found here here or downloaded as a bundle here there. Those examples have been verified in 64-bit and 32-bit mode on Linux and Darwin. If you run the latter with an old version of gcc without support for Atomic Builtins , you'll most likely need to disable PIC with the -mdynamic-no-pic flag to compile the inline assembly.

Feel free to email me for any comment or suggestion