yodqueue/src/yodqueue.c
2022-12-01 00:38:00 +01:00

174 lines
3.6 KiB
C

#include "yodqueue.h"
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
yodqueue_t yodqueue_init(void)
{
yodqueue_t ret;
ret.fd_data = -1;
ret.fd_meta = -1;
return ret;
}
#define YODQUEUE_FILE_MODE 0664
int yodqueue_open(yodqueue_t *yq, const char *fname, int oflag)
{
if(!yq) return -1;
size_t fnamelen = strlen(fname);
// allocate memory for "{fname}.data" / "{fname}.meta"
char * fname2 = malloc(fnamelen + 6);
if(!fname2) return -1;
memcpy(fname2, fname, fnamelen);
memcpy(fname2 + fnamelen, ".data", 6);
if((oflag & O_RDWR) || (oflag & O_WRONLY))
oflag |= O_APPEND;
yq->fd_data = open(fname2, oflag, YODQUEUE_FILE_MODE);
if(yq->fd_data == -1) goto error;
memcpy(fname2 + fnamelen, ".meta", 5);
yq->fd_meta = open(fname2, oflag, YODQUEUE_FILE_MODE);
if(yq->fd_data == -1) goto error;
return 0;
error:
free(fname2);
return -1;
}
int yodqueue_openat(yodqueue_t *yq, int pfd, const char *fname, int oflag)
{
if(!yq) return -1;
size_t fnamelen = strlen(fname);
// allocate memory for "{fname}.data" / "{fname}.meta"
char * fname2 = malloc(fnamelen + 6);
if(!fname2) return -1;
memcpy(fname2, fname, fnamelen);
memcpy(fname2 + fnamelen, ".data", 6);
yq->fd_data = openat(pfd, fname2, oflag, YODQUEUE_FILE_MODE);
if(yq->fd_data == -1) goto error;
memcpy(fname2 + fnamelen, ".meta", 5);
yq->fd_meta = openat(pfd, fname2, oflag, YODQUEUE_FILE_MODE);
if(yq->fd_data == -1) goto error;
return 0;
error:
free(fname2);
return -1;
}
int yodqueue_fini(yodqueue_t *yq) {
if(!yq) return 0;
int ret = 0;
if(yq->fd_data != -1)
ret |= close(yq->fd_data);
if(yq->fd_meta != -1)
ret |= close(yq->fd_meta);
yq->fd_data = -1;
yq->fd_meta = -1;
free(yq);
return ret;
}
static int yodqueue_flock(int fd, int op)
{
int ret = -1;
do {
ret = flock(fd, op);
} while(ret == -1 && errno == EINTR);
return ret;
}
int yodqueue_append(yodqueue_t *yq, const void *data, size_t datalen)
{
if(-1 == yodqueue_flock(yq->fd_data, LOCK_EX))
return -1;
if(-1 == yodqueue_flock(yq->fd_meta, LOCK_EX))
goto errunl_data;
ssize_t wtmp = write(yq->fd_data, data, datalen);
off_t otmp;
uint32_t offser;
if(wtmp == -1) goto errunl_both;
otmp = lseek(yq->fd_data, 0, SEEK_END);
if(otmp > 0xffffffff) goto errunl_both;
offser = htonl(otmp);
wtmp = write(yq->fd_meta, (const void*)&offser, 4);
if(wtmp == -1) goto errunl_both;
yodqueue_flock(yq->fd_meta, LOCK_UN);
yodqueue_flock(yq->fd_data, LOCK_UN);
return 0;
errunl_both:
const int tmp_errno = errno;
yodqueue_flock(yq->fd_meta, LOCK_UN);
errno = tmp_errno;
errunl_data:
const int tmp_errno2 = errno;
yodqueue_flock(yq->fd_data, LOCK_UN);
errno = tmp_errno2;
return -1;
}
uint32_t yodqueue_translate_pos(yodqueue_t *yq, uint32_t id)
{
if(-1 == yodqueue_flock(yq->fd_meta, LOCK_SH))
return 0;
if(((off_t) -1) == lseek(yq->fd_meta, ((off_t) id) * 4, SEEK_SET))
goto errunl_meta;
uint32_t offser;
size_t xoff = 0;
while(xoff < 4)
{
ssize_t tmp = read(yq->fd_meta, ((void*)&offser) + xoff, 4 - xoff);
if(tmp <= 0) goto errunl_meta;
xoff += (size_t) tmp;
}
offser = ntohl(offser);
yodqueue_flock(yq->fd_meta, LOCK_UN);
return offser;
errunl_meta:
const int tmp_errno = errno;
yodqueue_flock(yq->fd_meta, LOCK_UN);
errno = tmp_errno;
return 0;
}
int yodqueue_rlock(yodqueue_t *yq) {
return yodqueue_flock(yq->fd_data, LOCK_SH);
}
int yodqueue_unlock(yodqueue_t *yq) {
return yodqueue_flock(yq->fd_data, LOCK_UN);
}