Cayos Cochinos

contact

Julien Benoist <julien@benoist.name>
285 Grand View avenue,
San Francisco, CA 94114
cell: +1 415-238-7908

projects

libko
spamanalysis
random software
lockfree computing

ese's

A B C D E

Kø (datastruktur): en kø er en datastruktur, hvor de enkelte dataelementer fjernes i samme orden, som de er indsat.

What is libko ?

Libko is a user-space, message-based, persistant queue library that can be used to exchange messages between unrelated processes in an event-driven manner.

Download:

Jan 31 2011: libko-1.0.0.tar.bz2, md5: a79068539e7baa1eb4e4ba3abd9d6e6a.

Features

Asymptotic properties

operation running time i/o
enqueueO(1)o(n)
dequeueO(1)o(1)*
*: assuming KO_DIRECT and no contention.

Design

Libko relies on fast userspace mutexes (futex) for inter-process communication. The kernel is only entered for contended operations.
For the record, here's an apple to oranges comparaison between libko and POSIX message queues:
POSIX mq libko
message max: 32'768 232
message size max: 1 MB 4 GB
queues max: 256 (non-root) unlimited
dequeue i/o: O(n) O(1)*
*: assuming KO_DIRECT.

Documentation

Name

ko — inter-process message queue in user-space.

SYNOPSIS

#include <ko.h>

int ko_create(const char *pathname, size_t capacity);
ko_t ko_open(const char *pathname, int flags);
koid_t ko_peek(ko_t queue, komsg_t *kmsg, const struct timespec *abstime);
koid_t ko_enqueue(ko_t queue, komsg_t *kmsg, ko_seqid_t id);
koid_t ko_dequeue(ko_t queue, komsg_t *kmsg, const struct timespec *abstime);
int ko_close(ko_t queue);
int ko_make_empty(ko_t queue);
int ko_destroy(const char *path);

DESCRIPTION

The ko_create() function shall create a new named queue pathname on the files-ystem, with a size limit of capacity bytes. The number of elements stored in the queue is not statically defined and individual messages can be of variable size with no limit. However, ko needs 8 additional bytes per message for housekeeping purposes.

The ko_open() function opens the named queue pathname with mode flags:

KO_DIRECT [zero-copy dequeue operations.]

KO_NONBLOCK [non blocking dequeue operations.]

KO_CIRCULAR [oldest messages gets drained when the queue is full.]

KO_INSTRUMENT [trace information is written in a circular buffer for each operation.]

KO_INTEGRITY [CRC32 is used for message integrity.]

ko_enqueue() enqueues message kmsg in queue queue. If sequence number id is specified, a check against messages with a matching id is performed, and if there is an existing dup, id is returned and nothing is actually enqueued. If KO_CIRCULAR is set while there is not enough free-space to store the submitted message, the oldest message(s) will be drained until space requirements are met.

The komsg_t structure is defined as follows:

typedef struct {
       void *msg_ptr;       /* pointer to message data */
    uint32_t msg_len;       /* message data length */
} komsg_t;

ko_dequeue() dequeues one message from queue queue. If KO_DIRECT is not set, the message is copied in application provided buffers, or, if kmsg structure members are NULL, into library allocated heap. If KO_DIRECT is set, kmsg will be filled with message size and its pointer address in shared memory, except if kmsg member dsize is fully filled of bits, in which case the dequeue operation will instead process the message with the function callback specified at msg_ptr address (see examples below). If KO_NONBLOCK is not set, the operation will block until data is available or the absolute time described by abstime passed.

ko_peek() peeks one message from queue queue without trimming it out. It is mostly used internally, but can be used externally as a zero-copy operation system if the queue is open in KO_DIRECT mode. On success, kmsg is filled with message size and its pointer address in shared memory, locking the queue until the process terminates or explicitly calls ko_dequeue(). If KO_NONBLOCK is not set, the operation will block until data is available or the absolute time described by abstime passed.

ko_make_empty() flushs out all the elements of queue queue.

ko_close() closes named queue queue.

ko_destroy() permanently destroys queue lying at path without checking if it is referenced anyway.

RETURN VALUES

if successful, ko_open() returns a pointer to the queue just created, or NULL with errno set on error.

If successful ko_create(), ko_close(), ko_make_empty() and ko_destroy() returns zero, or -1 with errno set on error.

if successful,ko_peek(), ko_dequeue() and ko_enqeue() will return a non-null sequence id, or 0 with errno set on error.

ERRORS

EIO [message integrity checksum failed.]

EINVAL [invalid argument.]

EEXISTS [queue file already exists.]

ENOBUF [application supplied buffer does not have enough space to hold message.]

EPROTO [supplied queue file magic number mismatchs the expected value.]

ENOMEM [message queue is full or message size bigger than queue capacity.]

EAGAIN [dequeue would block.]

NOTES

If KO_INTEGRITY is specified, ko_enqueue() and ko_dequeue() respectively lead to a CRC32 signature and check, and ko_open() results into an integrity check of all queued messages.

Queue files are versionned, data types are integral and are stored in network byte order. A queue file should be architecture & platform independant, however, due to difficulties with the pthread API, inter-operation between 32-bit & 64-bit processes at runtime is not possible.

The library is shipped with the ko-stat utility which can be used to query the meta-data or instrumentation traces of queue files. Instrumentation needs

EXAMPLES

dequeue into library generated buffers:

komsg_t kmsg = { NULL, 0};
ko_t q = ko_open("/tmp/.iqueue", 0);

for (;;) {
    ko_dequeue(q, &kmsg, NULL);

    /* ... */

    /* release memory */
    free(kmsg.msg_ptr);
        
    /* clean up kmsg for for next dequeue */
    bzero(&kmsg, sizeof(komsg_t));
}

dequeue into application generated buffers:

komsg_t kmsg;
ko_t q = ko_open("/tmp/.iqueue", 0);

kmsg.dsize = 128;
kmsg.msg_ptr = (char *)malloc(kmsg.dsize);

for (;;) {
    if (!ko_dequeue(q, &kmsg, NULL) && errno == ENOMEM) {
        /* supplied buffer too small */
        kmsg.msg_ptr = (char *)realloc(kmsg.msg_ptr, (kmsg.dsize *= 2));
        continue;
    }

    /* ... */
}

insecure zero-copy dequeues, should be avoided unless exclusive queue access is guaranteed by the application:

komsg_t kmsg;
ko_t q = ko_open("/tmp/.iqueue", KO_DIRECT);
        
ko_dequeue(q, &kmsg, NULL);
/* queue is not locked 
   .... */

secure zero-copy dequeues:

komsg_t kmsg;
ko_t q = ko_open("/tmp/.iqueue", KO_DIRECT);
        
ko_peek(q, &kmsg, NULL);
/* queue is locked
   ... */

/* trim message & release queue */
ko_dequeue(q, &kmsg, NULL);

zero-copy callback based dequeues:

/* handler */
static void __process(const char *data, int size)
{
    /* ... */
}

int main(int argc, char **argv)
{
    komsg_t kmsg = { &__process, 0xffffffff};
    ko_t q = ko_open("/tmp/.iqueue", KO_DIRECT);
        
    ko_dequeue(q, &kmsg, NULL);
}

using instrumentation traces on queues opened with KO_INSTRUMENT:

$ (cd ~/src/libko/ && make clean all test)

[..]

$ ko-inspect /tmp/test.queue
file: /tmp/test.queue (rev: 8)
arch: 32-bit (little-endian)
storage: 0/3600 bytes, 0 elements [head:0 tail 0]
pthread objects: ok
crc32: ok
traceinfo:
#1 ko_peek() by pid:4389[0x804943c <./ko_test>] on Thu Feb 10 22:06:39 2011
#2 ko_dequeue() by pid:4389[0x8049398 <./ko_test>] on Thu Feb 10 22:06:39 2011
#3 ko_enqueue() by pid:4390[0x804974f <./ko_test>] on Thu Feb 10 22:06:39 2011
#0 ko_dequeue() by pid:4391[0x8049d7a <./ko_test>] on Thu Feb 10 22:06:39 2011

$ addr2line -e ./ko_test 0x8049398
/home/jbenoist/src/libko/ko_test.c:116

SEE ALSO

pthreads(7), futex(7), mq_overview(7)