Kø (datastruktur): en kø er en datastruktur, hvor de enkelte dataelementer fjernes i samme orden, som de er indsat.
What is libko ?
Libko is a user-space, message-based, persistant queue library that can be used to exchange messages between unrelated processes in an event-driven manner.
Download:
Jan 31 2011: libko-1.0.0.tar.bz2, md5: a79068539e7baa1eb4e4ba3abd9d6e6a.
Features
- zero dependencies.
- persistant storage model.
- messages may be up to
4GBin size. - a queue can contain up to
232messages. - messages can be dequeued in a zero-copy manner.
- circular-buffer behaviour can be triggered (see:
KO_CIRCULAR). - sequence ids may be used to synchronize queued messages, thus, avoiding duplicates.
dequeuebehaviour can be blocking, non-blocking, or deadline-based (see:KO_NONBLOCK).
Asymptotic properties
| operation | running time | i/o |
| enqueue | O(1) | o(n) |
| dequeue | O(1) | o(1)* |
KO_DIRECT and no contention.
Design
Libko relies on fast userspace mutexes (futex) for inter-process communication.
The kernel is only entered for contended operations.
For the record, here's an apple to oranges comparaison between libko and POSIX message queues:
| POSIX mq | libko | |
| message max: | 32'768 | 232 |
| message size max: | 1 MB | 4 GB |
| queues max: | 256 (non-root) | unlimited |
| dequeue i/o: | O(n) | O(1)* |
KO_DIRECT.
Documentation
Name
ko — inter-process message queue in user-space.
SYNOPSIS
#include <ko.h>
int ko_create(const char *pathname, size_t capacity);
ko_t ko_open(const char *pathname, int flags);
koid_t ko_peek(ko_t queue, komsg_t *kmsg, const struct timespec *abstime);
koid_t ko_enqueue(ko_t queue, komsg_t *kmsg, ko_seqid_t id);
koid_t ko_dequeue(ko_t queue, komsg_t *kmsg, const struct timespec *abstime);
int ko_close(ko_t queue);
int ko_make_empty(ko_t queue);
int ko_destroy(const char *path);
DESCRIPTION
The ko_create() function shall create a new
named queue pathname on the files-ystem, with a size
limit of capacity bytes. The number of elements stored
in the queue is not statically defined and individual messages can be of
variable size with no limit. However, ko needs 8 additional bytes per
message for housekeeping purposes.
The ko_open() function opens the named queue
pathname with mode flags:
KO_DIRECT
[zero-copy dequeue operations.]
KO_NONBLOCK
[non blocking dequeue operations.]
KO_CIRCULAR
[oldest messages gets drained when the queue is full.]
KO_INSTRUMENT
[trace information is written in a circular buffer for each
operation.]
KO_INTEGRITY
[CRC32 is used for message integrity.]
ko_enqueue() enqueues message
kmsg in queue queue. If sequence
number id is specified, a check against messages with a
matching id is performed, and if there is an existing
dup, id is returned and nothing is actually enqueued.
If KO_CIRCULAR is set while there is not enough
free-space to store the submitted message, the oldest message(s) will be
drained until space requirements are met.
The komsg_t structure is defined as
follows:
typedef struct {
void *msg_ptr; /* pointer to message data */
uint32_t msg_len; /* message data length */
} komsg_t;
ko_dequeue() dequeues one message from queue
queue. If KO_DIRECT is not set, the
message is copied in application provided buffers, or, if
kmsg structure members are NULL, into library allocated
heap. If KO_DIRECT is set, kmsg will
be filled with message size and its pointer address in shared memory,
except if kmsg member dsize is fully
filled of bits, in which case the dequeue operation will instead process
the message with the function callback specified at
msg_ptr address (see examples below). If
KO_NONBLOCK is not set, the operation will block until
data is available or the absolute time described by abstime
passed.
ko_peek() peeks one message from queue
queue without trimming it out. It is mostly used
internally, but can be used externally as a zero-copy operation system if
the queue is open in KO_DIRECT mode. On success,
kmsg is filled with message size and its pointer
address in shared memory, locking the queue until the process terminates
or explicitly calls ko_dequeue(). If
KO_NONBLOCK is not set, the operation will block until
data is available or the absolute time described by abstime
passed.
ko_make_empty() flushs out all the elements of
queue queue.
ko_close() closes named queue
queue.
ko_destroy() permanently destroys queue lying
at path without checking if it is referenced
anyway.
RETURN VALUES
if successful, ko_open() returns a pointer to
the queue just created, or NULL with errno set on error.
If successful ko_create(),
ko_close(), ko_make_empty() and
ko_destroy() returns zero, or -1 with
errno set on error.
if successful,ko_peek(),
ko_dequeue() and ko_enqeue()
will return a non-null sequence id, or 0 with errno set
on error.
ERRORS
EIO
[message integrity checksum failed.]
EINVAL
[invalid argument.]
EEXISTS
[queue file already exists.]
ENOBUF
[application supplied buffer does not have enough space to hold
message.]
EPROTO
[supplied queue file magic number mismatchs the expected
value.]
ENOMEM
[message queue is full or message size bigger than queue
capacity.]
EAGAIN
[dequeue would block.]
NOTES
If KO_INTEGRITY is specified,
ko_enqueue() and ko_dequeue()
respectively lead to a CRC32 signature and check, and
ko_open() results into an integrity check of all
queued messages.
Queue files are versionned, data types are integral and are stored in network byte order. A queue file should be architecture & platform independant, however, due to difficulties with the pthread API, inter-operation between 32-bit & 64-bit processes at runtime is not possible.
The library is shipped with the ko-stat utility
which can be used to query the meta-data or instrumentation traces of
queue files. Instrumentation needs
EXAMPLES
dequeue into library generated buffers:
komsg_t kmsg = { NULL, 0};
ko_t q = ko_open("/tmp/.iqueue", 0);
for (;;) {
ko_dequeue(q, &kmsg, NULL);
/* ... */
/* release memory */
free(kmsg.msg_ptr);
/* clean up kmsg for for next dequeue */
bzero(&kmsg, sizeof(komsg_t));
}
dequeue into application generated buffers:
komsg_t kmsg;
ko_t q = ko_open("/tmp/.iqueue", 0);
kmsg.dsize = 128;
kmsg.msg_ptr = (char *)malloc(kmsg.dsize);
for (;;) {
if (!ko_dequeue(q, &kmsg, NULL) && errno == ENOMEM) {
/* supplied buffer too small */
kmsg.msg_ptr = (char *)realloc(kmsg.msg_ptr, (kmsg.dsize *= 2));
continue;
}
/* ... */
}
insecure zero-copy dequeues, should be avoided unless exclusive queue access is guaranteed by the application:
komsg_t kmsg;
ko_t q = ko_open("/tmp/.iqueue", KO_DIRECT);
ko_dequeue(q, &kmsg, NULL);
/* queue is not locked
.... */
secure zero-copy dequeues:
komsg_t kmsg;
ko_t q = ko_open("/tmp/.iqueue", KO_DIRECT);
ko_peek(q, &kmsg, NULL);
/* queue is locked
... */
/* trim message & release queue */
ko_dequeue(q, &kmsg, NULL);
zero-copy callback based dequeues:
/* handler */
static void __process(const char *data, int size)
{
/* ... */
}
int main(int argc, char **argv)
{
komsg_t kmsg = { &__process, 0xffffffff};
ko_t q = ko_open("/tmp/.iqueue", KO_DIRECT);
ko_dequeue(q, &kmsg, NULL);
}
using instrumentation traces on queues opened with KO_INSTRUMENT:
$ (cd ~/src/libko/ && make clean all test) [..] $ ko-inspect /tmp/test.queue file: /tmp/test.queue (rev: 8) arch: 32-bit (little-endian) storage: 0/3600 bytes, 0 elements [head:0 tail 0] pthread objects: ok crc32: ok traceinfo: #1 ko_peek() by pid:4389[0x804943c <./ko_test>] on Thu Feb 10 22:06:39 2011 #2 ko_dequeue() by pid:4389[0x8049398 <./ko_test>] on Thu Feb 10 22:06:39 2011 #3 ko_enqueue() by pid:4390[0x804974f <./ko_test>] on Thu Feb 10 22:06:39 2011 #0 ko_dequeue() by pid:4391[0x8049d7a <./ko_test>] on Thu Feb 10 22:06:39 2011 $ addr2line -e ./ko_test 0x8049398 /home/jbenoist/src/libko/ko_test.c:116