WIP: journal + transactions

I'm not even sure this code works, but I thought it would be lame to lose even *that* amount of code.
add-journal
Yury Kurlykov 2020-04-22 01:16:06 +10:00
parent c3401c2317
commit 64be6a4466
Signed by: t1meshift
GPG Key ID: B133F3167ABF94D8
15 changed files with 1115 additions and 11 deletions

View File

@ -12,4 +12,7 @@ add_library(YeltsinDB STATIC
inc/YeltsinDB/constants.h
inc/YeltsinDB/types.h
inc/YeltsinDB/macro.h
)
inc/YeltsinDB/journal.h src/journal.c
inc/YeltsinDB/journal/transaction.h src/journal/transaction.c
inc/YeltsinDB/journal/transaction_op.h
inc/YDB_ext/vec.h inc/YDB_ext/vec.c)

20
inc/YDB_ext/LICENSE_vec Normal file
View File

@ -0,0 +1,20 @@
Copyright (c) 2014 rxi
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

120
inc/YDB_ext/vec.c Normal file
View File

@ -0,0 +1,120 @@
#ifdef __cplusplus
extern "C" {
#endif
/**
* Copyright (c) 2014 rxi
*
* This library is free software; you can redistribute it and/or modify it
* under the terms of the MIT license. See LICENSE for details.
*/
#include "vec.h"
int vec_expand_(char **data, int *length, int *capacity, int memsz) {
if (*length + 1 > *capacity) {
void *ptr;
int n = (*capacity == 0) ? 1 : *capacity << 1;
ptr = realloc(*data, n * memsz);
if (ptr == NULL) return -1;
*data = ptr;
*capacity = n;
}
return 0;
}
int vec_reserve_(char **data, int *length, int *capacity, int memsz, int n) {
(void) length;
if (n > *capacity) {
void *ptr = realloc(*data, n * memsz);
if (ptr == NULL) return -1;
*data = ptr;
*capacity = n;
}
return 0;
}
int vec_reserve_po2_(
char **data, int *length, int *capacity, int memsz, int n
) {
int n2 = 1;
if (n == 0) return 0;
while (n2 < n) n2 <<= 1;
return vec_reserve_(data, length, capacity, memsz, n2);
}
int vec_compact_(char **data, int *length, int *capacity, int memsz) {
if (*length == 0) {
free(*data);
*data = NULL;
*capacity = 0;
return 0;
} else {
void *ptr;
int n = *length;
ptr = realloc(*data, n * memsz);
if (ptr == NULL) return -1;
*capacity = n;
*data = ptr;
}
return 0;
}
int vec_insert_(char **data, int *length, int *capacity, int memsz,
int idx
) {
int err = vec_expand_(data, length, capacity, memsz);
if (err) return err;
memmove(*data + (idx + 1) * memsz,
*data + idx * memsz,
(*length - idx) * memsz);
return 0;
}
void vec_splice_(char **data, int *length, int *capacity, int memsz,
int start, int count
) {
(void) capacity;
memmove(*data + start * memsz,
*data + (start + count) * memsz,
(*length - start - count) * memsz);
}
void vec_swapsplice_(char **data, int *length, int *capacity, int memsz,
int start, int count
) {
(void) capacity;
memmove(*data + start * memsz,
*data + (*length - count) * memsz,
count * memsz);
}
void vec_swap_(char **data, int *length, int *capacity, int memsz,
int idx1, int idx2
) {
unsigned char *a, *b, tmp;
int count;
(void) length;
(void) capacity;
if (idx1 == idx2) return;
a = (unsigned char*) *data + idx1 * memsz;
b = (unsigned char*) *data + idx2 * memsz;
count = memsz;
while (count--) {
tmp = *a;
*a = *b;
*b = tmp;
a++, b++;
}
}
#ifdef __cplusplus
}
#endif

189
inc/YDB_ext/vec.h Normal file
View File

@ -0,0 +1,189 @@
#ifdef __cplusplus
extern "C" {
#endif
/**
* Copyright (c) 2014 rxi
*
* This library is free software; you can redistribute it and/or modify it
* under the terms of the MIT license. See LICENSE for details.
*/
#ifndef VEC_H
#define VEC_H
#include <stdlib.h>
#include <string.h>
#define VEC_VERSION "0.2.1"
#define vec_unpack_(v)\
(char**)&(v)->data, &(v)->length, &(v)->capacity, sizeof(*(v)->data)
#define vec_t(T)\
struct { T *data; int length, capacity; }
#define vec_init(v)\
memset((v), 0, sizeof(*(v)))
#define vec_deinit(v)\
( free((v)->data),\
vec_init(v) )
#define vec_push(v, val)\
( vec_expand_(vec_unpack_(v)) ? -1 :\
((v)->data[(v)->length++] = (val), 0), 0 )
#define vec_pop(v)\
(v)->data[--(v)->length]
#define vec_splice(v, start, count)\
( vec_splice_(vec_unpack_(v), start, count),\
(v)->length -= (count) )
#define vec_swapsplice(v, start, count)\
( vec_swapsplice_(vec_unpack_(v), start, count),\
(v)->length -= (count) )
#define vec_insert(v, idx, val)\
( vec_insert_(vec_unpack_(v), idx) ? -1 :\
((v)->data[idx] = (val), 0), (v)->length++, 0 )
#define vec_sort(v, fn)\
qsort((v)->data, (v)->length, sizeof(*(v)->data), fn)
#define vec_swap(v, idx1, idx2)\
vec_swap_(vec_unpack_(v), idx1, idx2)
#define vec_truncate(v, len)\
((v)->length = (len) < (v)->length ? (len) : (v)->length)
#define vec_clear(v)\
((v)->length = 0)
#define vec_first(v)\
(v)->data[0]
#define vec_last(v)\
(v)->data[(v)->length - 1]
#define vec_reserve(v, n)\
vec_reserve_(vec_unpack_(v), n)
#define vec_compact(v)\
vec_compact_(vec_unpack_(v))
#define vec_pusharr(v, arr, count)\
do {\
int i__, n__ = (count);\
if (vec_reserve_po2_(vec_unpack_(v), (v)->length + n__) != 0) break;\
for (i__ = 0; i__ < n__; i__++) {\
(v)->data[(v)->length++] = (arr)[i__];\
}\
} while (0)
#define vec_extend(v, v2)\
vec_pusharr((v), (v2)->data, (v2)->length)
#define vec_find(v, val, idx)\
do {\
for ((idx) = 0; (idx) < (v)->length; (idx)++) {\
if ((v)->data[(idx)] == (val)) break;\
}\
if ((idx) == (v)->length) (idx) = -1;\
} while (0)
#define vec_remove(v, val)\
do {\
int idx__;\
vec_find(v, val, idx__);\
if (idx__ != -1) vec_splice(v, idx__, 1);\
} while (0)
#define vec_reverse(v)\
do {\
int i__ = (v)->length / 2;\
while (i__--) {\
vec_swap((v), i__, (v)->length - (i__ + 1));\
}\
} while (0)
#define vec_foreach(v, var, iter)\
if ( (v)->length > 0 )\
for ( (iter) = 0;\
(iter) < (v)->length && (((var) = (v)->data[(iter)]), 1);\
++(iter))
#define vec_foreach_rev(v, var, iter)\
if ( (v)->length > 0 )\
for ( (iter) = (v)->length - 1;\
(iter) >= 0 && (((var) = (v)->data[(iter)]), 1);\
--(iter))
#define vec_foreach_ptr(v, var, iter)\
if ( (v)->length > 0 )\
for ( (iter) = 0;\
(iter) < (v)->length && (((var) = &(v)->data[(iter)]), 1);\
++(iter))
#define vec_foreach_ptr_rev(v, var, iter)\
if ( (v)->length > 0 )\
for ( (iter) = (v)->length - 1;\
(iter) >= 0 && (((var) = &(v)->data[(iter)]), 1);\
--(iter))
int vec_expand_(char **data, int *length, int *capacity, int memsz);
int vec_reserve_(char **data, int *length, int *capacity, int memsz, int n);
int vec_reserve_po2_(char **data, int *length, int *capacity, int memsz,
int n);
int vec_compact_(char **data, int *length, int *capacity, int memsz);
int vec_insert_(char **data, int *length, int *capacity, int memsz,
int idx);
void vec_splice_(char **data, int *length, int *capacity, int memsz,
int start, int count);
void vec_swapsplice_(char **data, int *length, int *capacity, int memsz,
int start, int count);
void vec_swap_(char **data, int *length, int *capacity, int memsz,
int idx1, int idx2);
typedef vec_t(void*) vec_void_t;
typedef vec_t(char*) vec_str_t;
typedef vec_t(int) vec_int_t;
typedef vec_t(char) vec_char_t;
typedef vec_t(float) vec_float_t;
typedef vec_t(double) vec_double_t;
#endif
#ifdef __cplusplus
}
#endif

View File

@ -44,4 +44,46 @@ enum YDB_v1_page_offsets {
YDB_v1_page_prev_offset = YDB_v1_page_next_offset + YDB_v1_page_next_size,
YDB_v1_page_row_count_offset = YDB_v1_page_prev_offset + YDB_v1_page_prev_size,
YDB_v1_page_data_offset = YDB_v1_page_row_count_offset + YDB_v1_page_row_count_size,
};
#define YDB_JRNL_SIGN "JRNL"
#define YDB_JRNL_SIGN_SIZE (sizeof(YDB_JRNL_SIGN)-1)
enum YDB_jrnl_sizes {
YDB_jrnl_first_transaction_size = 8,
YDB_jrnl_last_transaction_size = 8,
YDB_jrnl_transaction_prev_size = 8,
YDB_jrnl_transaction_next_size = 8,
YDB_jrnl_transaction_timestamp_size = 8,
YDB_jrnl_transaction_flags_size = 1,
YDB_jrnl_transaction_op_type_size = 1,
YDB_jrnl_transaction_op_datasz_size = 4,
};
enum YDB_jrnl_offsets {
YDB_jrnl_first_transaction_offset = YDB_JRNL_SIGN_SIZE,
YDB_jrnl_last_transaction_offset = YDB_jrnl_first_transaction_offset + YDB_jrnl_first_transaction_size,
YDB_jrnl_data_offset = YDB_jrnl_last_transaction_offset + YDB_jrnl_last_transaction_size,
};
enum YDB_jrnl_transaction_offsets {
YDB_jrnl_transaction_prev_offset = 0,
YDB_jrnl_transaction_next_offset = YDB_jrnl_transaction_prev_offset + YDB_jrnl_transaction_prev_size,
YDB_jrnl_transaction_timestamp_offset = YDB_jrnl_transaction_next_offset + YDB_jrnl_transaction_next_size,
YDB_jrnl_transaction_flags_offset = YDB_jrnl_transaction_timestamp_offset + YDB_jrnl_transaction_timestamp_size,
YDB_jrnl_transaction_data_offset = YDB_jrnl_transaction_flags_offset + YDB_jrnl_transaction_flags_size,
};
enum YDB_jrnl_transaction_op_offsets {
YDB_jrnl_transaction_op_type_offset = 0,
YDB_jrnl_transaction_op_datasz_offset = YDB_jrnl_transaction_op_type_offset + YDB_jrnl_transaction_op_type_size,
YDB_jrnl_transaction_op_data_offset = YDB_jrnl_transaction_op_datasz_offset + YDB_jrnl_transaction_op_datasz_size,
};
enum YDB_jrnl_transaction_ops {
YDB_jrnl_op_page_alloc = 0x01,
YDB_jrnl_op_page_modify = 0x02,
YDB_jrnl_op_page_remove = 0x03,
YDB_jrnl_op_rollback = 0xFE,
YDB_jrnl_op_complete = 0xFF,
};

View File

@ -70,6 +70,54 @@
* @brief The addresses of pages are the same.
*/
#define YDB_ERR_SAME_PAGE_ADDRESS (-14)
/**
* @brief The transaction is not initialized.
*/
#define YDB_ERR_TRANSACTION_NOT_INITIALIZED (-15)
/**
* @brief Transaction operation push failed.
*/
#define YDB_ERR_TRANSACTION_OP_PUSH_FAILED (-16)
/**
* @brief Index of transaction operation is out of range.
*/
#define YDB_ERR_TRANSACTION_OP_OUT_OF_RANGE (-17)
/**
* @brief The journal is not initialized.
*/
#define YDB_ERR_JOURNAL_NOT_INITIALIZED (-18)
/**
* @brief The journal is in use.
*/
#define YDB_ERR_JOURNAL_IN_USE (-19)
/**
* @brief The journal is not in use.
*/
#define YDB_ERR_JOURNAL_NOT_IN_USE (-20)
/**
* @brief The journal file does not exist.
*/
#define YDB_ERR_JOURNAL_NOT_EXIST (-21)
/**
* @brief The journal file does already exist.
*/
#define YDB_ERR_JOURNAL_EXIST (-22)
/**
* @brief The journal file is corrupted.
*/
#define YDB_ERR_JOURNAL_FILE_CORRUPTED (-23)
/**
* @brief The journal is not consistent.
*/
#define YDB_ERR_JOURNAL_NOT_CONSISTENT (-24)
/**
* @brief No more transactions left in the journal.
*/
#define YDB_ERR_NO_MORE_TRANSACTIONS (-25)
/**
* @brief The journal is empty.
*/
#define YDB_ERR_JOURNAL_EMPTY (-26)
/**
* @brief An unknown error has occurred.
*/

44
inc/YeltsinDB/journal.h Normal file
View File

@ -0,0 +1,44 @@
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
#include <YeltsinDB/types.h>
#include <YeltsinDB/journal/transaction.h>
/**
* @file journal.h
* @todo Doxygen.
*/
struct __YDB_Journal;
typedef struct __YDB_Journal YDB_Journal;
YDB_Journal* ydb_journal_init();
void ydb_journal_destroy(YDB_Journal* jrnl);
YDB_Error ydb_journal_file_create(YDB_Journal* jrnl, char* path);
YDB_Error ydb_journal_file_open(YDB_Journal* jrnl, char* path);
YDB_Error ydb_journal_file_close(YDB_Journal* jrnl);
YDB_Error ydb_journal_file_check_consistency(YDB_Journal* jrnl);
YDB_Error ydb_journal_seek_to_begin(YDB_Journal* jrnl);
YDB_Error ydb_journal_seek_to_end(YDB_Journal* jrnl);
YDB_Error ydb_journal_prev_transaction(YDB_Journal* jrnl);
YDB_Error ydb_journal_next_transaction(YDB_Journal* jrnl);
YDB_Error ydb_journal_append_transaction(YDB_Journal* jrnl, YDB_Transaction* transaction);
YDB_Transaction* ydb_journal_get_current_transaction(YDB_Journal* jrnl);
#ifdef __cplusplus
}
#endif

View File

@ -0,0 +1,49 @@
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
/**
* @file journal/transaction.h
* @todo Doxygen.
*/
#include <YeltsinDB/journal/transaction_op.h>
struct __YDB_Transaction;
typedef struct __YDB_Transaction YDB_Transaction;
/**
* @brief Create a new transaction.
* @return A transaction instance.
*/
YDB_Transaction* ydb_transaction_create();
/**
* @brief Destroy a transaction object.
* @param t Transaction object.
*/
void ydb_transaction_destroy(YDB_Transaction* t);
YDB_Timestamp ydb_transaction_timestamp_get(YDB_Transaction* t);
YDB_Error ydb_transaction_timestamp_set(YDB_Transaction* t, YDB_Timestamp timestamp);
YDB_Flags ydb_transaction_flags_get(YDB_Transaction* t);
YDB_Error ydb_transaction_flags_set(YDB_Transaction* t, YDB_Flags flags);
uint32_t ydb_transaction_ops_size_get(YDB_Transaction* t);
YDB_Error ydb_transaction_push_op(YDB_Transaction* t, YDB_TransactionOp* op);
YDB_Error ydb_transaction_pop_op(YDB_Transaction* t);
YDB_TransactionOp* ydb_transaction_op_at(YDB_Transaction* t, uint32_t index);
YDB_Transaction* ydb_transaction_clone(YDB_Transaction* t);
#ifdef __cplusplus
}
#endif

View File

@ -0,0 +1,38 @@
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
#include <YeltsinDB/macro.h>
/**
* @file journal/transaction_op.h
* @todo Doxygen.
*/
struct __YDB_TransactionOp {
YDB_OpCode opcode;
YDB_OpDataSize size;
void* data;
};
typedef struct __YDB_TransactionOp YDB_TransactionOp;
void ydb_transaction_op_destroy(YDB_TransactionOp* op) {
free(op->data);
free(op);
}
YDB_TransactionOp* ydb_transaction_op_clone(YDB_TransactionOp* op) {
THROW_IF_NULL(op, NULL);
YDB_TransactionOp* result = malloc(sizeof(YDB_TransactionOp));
result->opcode = op->opcode;
result->size = op->size;
result->data = malloc(op->size);
memcpy(result->data, op->data, op->size);
return result;
}
#ifdef __cplusplus
}
#endif

View File

@ -11,6 +11,13 @@
typedef int16_t YDB_Error;
/** @brief YeltsinDB data file offset type. */
typedef uint64_t YDB_Offset;
/** */
/** @brief YeltsinDB page size type. */
typedef uint16_t YDB_PageSize;
/** @brief YeltsinDB flags container type. */
typedef uint8_t YDB_Flags;
/** @brief YeltsinDB journal operation code type. */
typedef uint8_t YDB_OpCode;
/** @brief YeltsinDB Unix timestamp type. */
typedef int64_t YDB_Timestamp;
/** @brief YeltsinDB operation data size type. */
typedef uint32_t YDB_OpDataSize;

View File

@ -135,6 +135,8 @@ YDB_Error ydb_seek_to_begin(YDB_Engine* instance);
*/
YDB_Error ydb_seek_to_end(YDB_Engine* instance);
YDB_Error ydb_commit_changes(YDB_Engine* instance);
// TODO: rebuild page offsets, etc.
/**

View File

@ -3,11 +3,11 @@
## Specification
1. `JRNL` file signature (4 bytes)
2. The offset to the first transaction (8 bytes)
3. The offset to the last transaction (8 bytes)
2. The offset to the first transaction (8 bytes) **(could be 0 if no transactions)**
3. The offset to the last transaction (8 bytes) **(could be 0 if no transactions)**
4. Transactions
1. Previous transaction offset (8 bytes)
2. Next transaction offset (8 bytes)
1. Previous transaction offset (8 bytes) **(could be 0 if the first one)**
2. Next transaction offset (8 bytes) **(could be 0 if the last one)**
3. Creation date in Unix timestamp format (8 bytes)
4. Transaction flags (1 byte)
5. Operations list
@ -20,12 +20,16 @@
### Flags
**TODO**. Now there is the only option -- `JRNL_COMPLETE` on bit 0.
It means that the transaction was completely written to the storage.
### Consistency
If the last transaction was not written completely, journal rollback is started.
That means that it will be cleaned since it has not even been written to storage.
If the incomplete transaction is the only one in journal, the journal file is truncated
to 20 bytes and first and last transaction offsets are being nulled.
If the last transaction is complete in journal but not in storage (flag `JRNL_COMPLETE` is
not set), redo is being done. That means the transaction starts again since the journal is
consistent.

409
src/journal.c Normal file
View File

@ -0,0 +1,409 @@
#ifdef __cplusplus
extern "C" {
#endif
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <YeltsinDB/types.h>
#include <YeltsinDB/error_code.h>
#include <YeltsinDB/constants.h>
#include <YeltsinDB/macro.h>
#include <YeltsinDB/journal/transaction.h>
#include <YeltsinDB/journal/transaction_op.h>
#include <YeltsinDB/journal.h>
struct __YDB_Journal {
FILE *fd;
YDB_Offset first_transaction_offset;
YDB_Offset last_transaction_offset;
YDB_Offset prev_transaction_offset;
YDB_Offset curr_transaction_offset;
YDB_Offset next_transaction_offset;
YDB_Transaction* curr_transaction;
uint8_t is_consistent;
uint8_t in_use;
};
inline static YDB_Error __ydb_jrnl_truncate_incomplete(YDB_Journal* j, YDB_Offset last_complete_offset) {
THROW_IF_NULL(j, YDB_ERR_JOURNAL_NOT_INITIALIZED);
// The only transaction is incomplete
if (last_complete_offset == 0) {
j->prev_transaction_offset = 0;
j->next_transaction_offset = 0;
j->curr_transaction_offset = 0;
j->first_transaction_offset = 0;
j->last_transaction_offset = 0;
fseeko(j->fd, YDB_jrnl_first_transaction_offset, SEEK_SET);
fwrite(&j->first_transaction_offset, sizeof(YDB_Offset), 1, j->fd);
fwrite(&j->last_transaction_offset, sizeof(YDB_Offset), 1, j->fd);
ftruncate(fileno(j->fd), YDB_jrnl_data_offset);
fsync(fileno(j->fd));
return YDB_ERR_SUCCESS;
}
fseeko(j->fd, last_complete_offset + YDB_jrnl_transaction_prev_offset, SEEK_SET);
YDB_Offset prev_offset;
fread(&prev_offset, sizeof(YDB_Offset), 1, j->fd);
REASSIGN_FROM_LE(prev_offset);
YDB_Offset next_offset;
fread(&next_offset, sizeof(YDB_Offset), 1, j->fd);
REASSIGN_FROM_LE(next_offset);
fseeko(j->fd, last_complete_offset + YDB_jrnl_transaction_next_offset, SEEK_SET);
YDB_Offset new_next_offset = 0;
fwrite(&new_next_offset, sizeof(YDB_Offset), 1, j->fd);
j->last_transaction_offset = last_complete_offset;
fseeko(j->fd, YDB_jrnl_last_transaction_offset, SEEK_SET);
YDB_Offset last_complete_offset_le = TO_LE(last_complete_offset);
fwrite(&last_complete_offset_le, sizeof(YDB_Offset), 1, j->fd);
ftruncate(fileno(j->fd), next_offset);
fsync(fileno(j->fd));
return YDB_ERR_SUCCESS;
}
inline static YDB_TransactionOp* __ydb_read_op(YDB_Journal* j) {
THROW_IF_NULL(j, NULL);
YDB_TransactionOp* r = malloc(sizeof(YDB_TransactionOp));
fread(&r->opcode, sizeof(YDB_OpCode), 1, j->fd);
fread(&r->size, sizeof(YDB_OpDataSize), 1, j->fd);
r->data = malloc(r->size);
fread(r->data, r->size, 1, j->fd);
return r;
}
inline static YDB_Error __ydb_jrnl_read(YDB_Journal* j, YDB_Offset off) {
THROW_IF_NULL(j, YDB_ERR_JOURNAL_NOT_INITIALIZED);
j->curr_transaction_offset = off;
fseeko(j->fd, off, SEEK_SET);
YDB_Offset prev_offset, next_offset;
YDB_Timestamp timestamp;
YDB_Flags flags;
fread(&prev_offset, sizeof(YDB_Offset), 1, j->fd);
fread(&next_offset, sizeof(YDB_Offset), 1, j->fd);
fread(&timestamp, sizeof(YDB_Timestamp), 1, j->fd);
fread(&flags, sizeof(YDB_Flags), 1, j->fd);
j->prev_transaction_offset = prev_offset;
j->next_transaction_offset = next_offset;
YDB_Transaction* t = ydb_transaction_create();
ydb_transaction_timestamp_set(t, timestamp);
ydb_transaction_flags_set(t, flags);
YDB_TransactionOp* op;
do {
op = __ydb_read_op(j);
if (op->opcode != YDB_jrnl_op_complete) {
ydb_transaction_push_op(t, op);
}
} while (op->opcode != YDB_jrnl_op_complete);
return YDB_ERR_SUCCESS;
}
YDB_Journal *ydb_journal_init() {
YDB_Journal* result = calloc(1, sizeof(YDB_Journal));
return result;
}
void ydb_journal_destroy(YDB_Journal *jrnl) {
ydb_journal_file_close(jrnl);
free(jrnl);
}
YDB_Error ydb_journal_file_create(YDB_Journal *jrnl, char *path) {
THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED);
THROW_IF_NULL(!jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE);
if (access(path, F_OK) != -1) {
// TODO: Windows does not check W_OK correctly, use other methods.
// TODO: if can't read/write, throw other error
return YDB_ERR_JOURNAL_EXIST;
}
FILE *f = fopen(path, "wb");
char tpl[] = YDB_JRNL_SIGN // File signature
"\x00\x00\x00\x00\x00\x00\x00\x00" // First transaction offset
"\x00\x00\x00\x00\x00\x00\x00\x00"; // Last transaction offset
fwrite(tpl, sizeof(tpl)-1, 1, f);
fclose(f);
return ydb_journal_file_open(jrnl, path);
}
YDB_Error ydb_journal_file_open(YDB_Journal *jrnl, char *path) {
THROW_IF_NULL(jrnl, YDB_ERR_INSTANCE_NOT_INITIALIZED);
THROW_IF_NULL(!jrnl->in_use, YDB_ERR_INSTANCE_IN_USE);
if (access(path, F_OK) == -1) {
// TODO: Windows does not check W_OK correctly, use other methods.
// TODO: if can't read/write, throw other error
return YDB_ERR_JOURNAL_NOT_EXIST;
}
jrnl->fd = fopen(path, "rb+");
THROW_IF_NULL(jrnl->fd, YDB_ERR_UNKNOWN); // TODO file open error
char signature[YDB_JRNL_SIGN_SIZE];
fread(signature, 1, YDB_JRNL_SIGN_SIZE, jrnl->fd);
int sig_match = memcmp(signature, YDB_JRNL_SIGN, YDB_JRNL_SIGN_SIZE) == 0;
if (!sig_match) {
return YDB_ERR_JOURNAL_FILE_CORRUPTED;
}
// TODO check for read error
fread(&jrnl->first_transaction_offset, sizeof(YDB_Offset), 1, jrnl->fd);
REASSIGN_FROM_LE(jrnl->first_transaction_offset);
fread(&jrnl->last_transaction_offset, sizeof(YDB_Offset), 1, jrnl->fd);
REASSIGN_FROM_LE(jrnl->last_transaction_offset);
jrnl->in_use = -1;
jrnl->curr_transaction_offset = jrnl->first_transaction_offset;
return ydb_journal_file_check_consistency(jrnl);
}
YDB_Error ydb_journal_file_close(YDB_Journal *jrnl) {
THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED);
THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE);
jrnl->first_transaction_offset = 0;
jrnl->last_transaction_offset = 0;
jrnl->prev_transaction_offset = 0;
jrnl->curr_transaction_offset = 0;
jrnl->next_transaction_offset = 0;
ydb_transaction_destroy(jrnl->curr_transaction);
jrnl->curr_transaction = NULL;
fclose(jrnl->fd);
jrnl->in_use = 0;
return YDB_ERR_SUCCESS;
}
YDB_Error ydb_journal_file_check_consistency(YDB_Journal *jrnl) {
THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED);
THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE);
// Empty journal is always consistent lol
if (jrnl->last_transaction_offset == 0) {
jrnl->is_consistent = -1;
return YDB_ERR_SUCCESS;
}
YDB_Offset last_complete_offset = 0;
YDB_Offset prev_offset = 0;
YDB_Offset next_offset = 0;
fseeko(jrnl->fd, jrnl->last_transaction_offset, SEEK_SET);
if (fread(&prev_offset, sizeof(YDB_Offset), 1, jrnl->fd) != sizeof(YDB_Offset)) {
if (feof(jrnl->fd)) {
clearerr(jrnl->fd);
perror("Mozhno nenado...\n");
abort();
// TODO try to find last transaction from begin
} else {
return YDB_ERR_UNKNOWN; // FIXME
}
}
REASSIGN_FROM_LE(prev_offset);
last_complete_offset = prev_offset;
if (fread(&next_offset, sizeof(YDB_Offset), 1, jrnl->fd) != sizeof(YDB_Offset)) {
if (feof(jrnl->fd)) {
clearerr(jrnl->fd);
__ydb_jrnl_truncate_incomplete(jrnl, last_complete_offset);
jrnl->is_consistent = -1;
return YDB_ERR_SUCCESS;
} else {
return YDB_ERR_UNKNOWN; // FIXME
}
}
REASSIGN_FROM_LE(next_offset);
if (next_offset != 0) {
perror("Kavo? Ne ponyal shyas...\n");
abort(); // Аборт -- это грех, кста
}
fseeko(jrnl->fd, jrnl->last_transaction_offset + YDB_jrnl_transaction_flags_offset, SEEK_SET);
if (feof(jrnl->fd)) {
__ydb_jrnl_truncate_incomplete(jrnl, last_complete_offset);
jrnl->is_consistent = -1;
return YDB_ERR_SUCCESS;
} else if (ferror(jrnl->fd)) {
return YDB_ERR_UNKNOWN; //FIXME
}
YDB_Flags flags = 0;
fread(&flags, sizeof(YDB_Flags), 1, jrnl->fd);
if (flags & 1) { //FIXME magic
// A transaction has been written to the storage.
return YDB_ERR_SUCCESS;
} else {
YDB_OpCode op = 0;
while (op != YDB_jrnl_op_complete) {
YDB_OpDataSize size;
fread(&op, sizeof(YDB_TransactionOp), 1, jrnl->fd);
fread(&size, sizeof(YDB_OpDataSize), 1, jrnl->fd);
REASSIGN_FROM_LE(size);
if (feof(jrnl->fd) || ferror(jrnl->fd)) break;
fseeko(jrnl->fd, size, SEEK_CUR);
}
if (op == YDB_jrnl_op_complete) {
//fseeko(jrnl->fd, jrnl->last_transaction_offset + YDB_jrnl_transaction_flags_offset, SEEK_SET);
//fwrite(flags | 1, sizeof(YDB_Flags), 1, jrnl->fd); //FIXME magic // Also flag should be written when the data in storage is consistent
//fsync(fileno(jrnl->fd));
jrnl->is_consistent = -1;
return YDB_ERR_SUCCESS;
}
__ydb_jrnl_truncate_incomplete(jrnl, last_complete_offset);
}
jrnl->is_consistent = -1;
YDB_Error e = ydb_journal_seek_to_begin(jrnl);
if (e == YDB_ERR_SUCCESS || e == YDB_ERR_JOURNAL_EMPTY) {
return YDB_ERR_SUCCESS;
}
}
YDB_Error ydb_journal_seek_to_begin(YDB_Journal *jrnl) {
THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED);
THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE);
THROW_IF_NULL(jrnl->is_consistent, YDB_ERR_JOURNAL_NOT_CONSISTENT);
THROW_IF_NULL(jrnl->first_transaction_offset, YDB_ERR_JOURNAL_EMPTY);
return __ydb_jrnl_read(jrnl, jrnl->first_transaction_offset);
}
YDB_Error ydb_journal_seek_to_end(YDB_Journal *jrnl) {
THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED);
THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE);
THROW_IF_NULL(jrnl->is_consistent, YDB_ERR_JOURNAL_NOT_CONSISTENT);
THROW_IF_NULL(jrnl->last_transaction_offset, YDB_ERR_JOURNAL_EMPTY);
return __ydb_jrnl_read(jrnl, jrnl->last_transaction_offset);
}
YDB_Error ydb_journal_prev_transaction(YDB_Journal *jrnl) {
THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED);
THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE);
THROW_IF_NULL(jrnl->is_consistent, YDB_ERR_JOURNAL_NOT_CONSISTENT);
if (jrnl->prev_transaction_offset != 0) {
__ydb_jrnl_read(jrnl, jrnl->prev_transaction_offset);
return YDB_ERR_SUCCESS;
}
return YDB_ERR_NO_MORE_TRANSACTIONS;
}
YDB_Error ydb_journal_next_transaction(YDB_Journal *jrnl) {
THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED);
THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE);
THROW_IF_NULL(jrnl->is_consistent, YDB_ERR_JOURNAL_NOT_CONSISTENT);
if (jrnl->next_transaction_offset != 0) {
__ydb_jrnl_read(jrnl, jrnl->prev_transaction_offset);
return YDB_ERR_SUCCESS;
}
return YDB_ERR_NO_MORE_TRANSACTIONS;
}
YDB_Error ydb_journal_append_transaction(YDB_Journal *jrnl, YDB_Transaction *transaction) {
THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED);
THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE);
THROW_IF_NULL(jrnl->is_consistent, YDB_ERR_JOURNAL_NOT_CONSISTENT);
YDB_Transaction* t = ydb_transaction_clone(transaction);
fseeko(jrnl->fd, 0, SEEK_END);
YDB_Offset file_end = ftello(jrnl->fd);
YDB_Offset file_end_le = TO_LE(file_end);
YDB_Offset prev = jrnl->last_transaction_offset;
YDB_Offset prev_le = TO_LE(prev);
YDB_Offset next_le = 0;
YDB_Timestamp ts_le = TO_LE(ydb_transaction_timestamp_get(t));
YDB_Flags flags = ydb_transaction_flags_get(t);
fseeko(jrnl->fd, file_end, SEEK_SET);
fwrite(&prev_le, sizeof(YDB_Offset), 1, jrnl->fd);
fwrite(&next_le, sizeof(YDB_Offset), 1, jrnl->fd);
fwrite(&ts_le, sizeof(YDB_Timestamp), 1, jrnl->fd);
fwrite(&flags, sizeof(YDB_Flags), 1, jrnl->fd);
jrnl->last_transaction_offset = file_end;
fseeko(jrnl->fd, YDB_jrnl_last_transaction_offset, SEEK_SET);
fwrite(&file_end_le, sizeof(YDB_Offset), 1, jrnl->fd);
fsync(fileno(jrnl->fd)); // Checkpoint
if (prev != 0) {
fseeko(jrnl->fd, prev + YDB_jrnl_transaction_next_offset, SEEK_SET);
} else {
fseeko(jrnl->fd, YDB_jrnl_first_transaction_offset, SEEK_SET);
}
fwrite(&file_end_le, sizeof(YDB_Offset), 1, jrnl->fd);
fsync(fileno(jrnl->fd)); // Checkpoint
fseeko(jrnl->fd, file_end, SEEK_SET);
uint32_t op_cnt = ydb_transaction_ops_size_get(t);
for (uint32_t i = 0; i < op_cnt; ++i) {
YDB_TransactionOp* op = ydb_transaction_op_at(t, i);
fwrite(&op->opcode, sizeof(YDB_OpCode), 1, jrnl->fd);
fwrite(&op->size, sizeof(YDB_OpDataSize), 1, jrnl->fd);
fwrite(&op->data, op->size, 1, jrnl->fd);
fsync(fileno(jrnl->fd));
}
// Write completion op
fputc(YDB_jrnl_op_complete, jrnl->fd);
YDB_OpDataSize sz = 0;
#ifndef NO_EASTER_EGGS
char easter[] = "Andrew is a pi-door-ass";
sz = sizeof(easter) - 1;
fwrite(&sz, sizeof(YDB_OpDataSize), 1, jrnl->fd);
fwrite(easter, sz, 1, jrnl->fd);
#else
fwrite(&sz, sizeof(YDB_OpDataSize), 1, jrnl->fd);
#endif
fsync(fileno(jrnl->fd));
return __ydb_jrnl_read(jrnl, file_end);
}
YDB_Transaction *ydb_journal_get_current_transaction(YDB_Journal *jrnl) {
THROW_IF_NULL(jrnl, NULL);
THROW_IF_NULL(jrnl->in_use, NULL);
THROW_IF_NULL(jrnl->is_consistent, NULL);
THROW_IF_NULL(jrnl->first_transaction_offset, NULL);
return jrnl->curr_transaction;
}
#ifdef __cplusplus
}
#endif

101
src/journal/transaction.c Normal file
View File

@ -0,0 +1,101 @@
#ifdef __cplusplus
extern "C" {
#endif
#include <YeltsinDB/types.h>
#include <YDB_ext/vec.h>
#include <YeltsinDB/journal/transaction_op.h>
#include <YeltsinDB/macro.h>
#include <YeltsinDB/error_code.h>
#include <YeltsinDB/journal/transaction.h>
typedef vec_t(YDB_TransactionOp*) vec_op_t;
struct __YDB_Transaction {
YDB_Timestamp timestamp;
YDB_Flags flags;
vec_op_t ops;
};
YDB_Transaction *ydb_transaction_create() {
YDB_Transaction* result = calloc(1, sizeof(YDB_Transaction));
vec_init(&result->ops);
return result;
}
void ydb_transaction_destroy(YDB_Transaction *t) {
THROW_IF_NULL(t, (void)0);
int i; YDB_TransactionOp* op;
vec_foreach(&t->ops, op, i) {
ydb_transaction_op_destroy(t->ops.data[i]);
}
vec_deinit(&t->ops);
free(t);
}
YDB_Timestamp ydb_transaction_timestamp_get(YDB_Transaction *t) {
THROW_IF_NULL(t, 1 << sizeof(YDB_Timestamp)); // NOLINT minimal signed int value
return t->timestamp;
}
YDB_Error ydb_transaction_timestamp_set(YDB_Transaction *t, YDB_Timestamp timestamp) {
THROW_IF_NULL(t, YDB_ERR_TRANSACTION_NOT_INITIALIZED);
t->timestamp = timestamp;
return YDB_ERR_SUCCESS;
}
YDB_Flags ydb_transaction_flags_get(YDB_Transaction *t) {
THROW_IF_NULL(t, 0); // NOLINT minimal signed int value
return t->flags;
}
YDB_Error ydb_transaction_flags_set(YDB_Transaction *t, YDB_Flags flags) {
THROW_IF_NULL(t, YDB_ERR_TRANSACTION_NOT_INITIALIZED);
t->flags = flags;
return YDB_ERR_SUCCESS;
}
uint32_t ydb_transaction_ops_size_get(YDB_Transaction *t) {
THROW_IF_NULL(t, 0);
return t->ops.length;
}
YDB_Error ydb_transaction_push_op(YDB_Transaction* t, YDB_TransactionOp *op) {
THROW_IF_NULL(t, YDB_ERR_TRANSACTION_NOT_INITIALIZED);
YDB_TransactionOp* op_copy = ydb_transaction_op_clone(op);
if(vec_push(&t->ops, op_copy)) {
return YDB_ERR_TRANSACTION_OP_PUSH_FAILED;
}
return YDB_ERR_SUCCESS;
}
YDB_Error ydb_transaction_pop_op(YDB_Transaction *t) {
THROW_IF_NULL(t, YDB_ERR_TRANSACTION_NOT_INITIALIZED);
uint32_t vec_back = t->ops.length - 1;
ydb_transaction_op_destroy(t->ops.data[vec_back]);
vec_pop(&t->ops);
return YDB_ERR_SUCCESS;
}
YDB_TransactionOp *ydb_transaction_op_at(YDB_Transaction *t, uint32_t index) {
THROW_IF_NULL(t, NULL);
if (index > ydb_transaction_ops_size_get(t)) {
return NULL;
}
return t->ops.data[index];
}
YDB_Transaction *ydb_transaction_clone(YDB_Transaction *t) {
THROW_IF_NULL(t, NULL);
YDB_Transaction* r = ydb_transaction_create();
r->timestamp = t->timestamp;
r->flags = t->flags;
int i; YDB_TransactionOp* op;
vec_foreach(&t->ops, op, i) {
vec_push(&r->ops, ydb_transaction_op_clone(op));
}
return r;
}
#ifdef __cplusplus
}
#endif

View File

@ -11,6 +11,11 @@ extern "C" {
#include <YeltsinDB/constants.h>
#include <YeltsinDB/macro.h>
#include <YeltsinDB/table_page.h>
#ifndef NO_JOURNAL
#include <YeltsinDB/journal.h>
#include <YeltsinDB/journal/transaction.h>
#include <YeltsinDB/journal/transaction_op.h>
#endif
#include <YeltsinDB/ydb.h>
struct __YDB_Engine {
@ -29,16 +34,27 @@ struct __YDB_Engine {
uint8_t in_use; /**< "In use" flag. */
char *filename; /**< Current table data file name. */
FILE *fd; /**< Current table data file descriptor. */
#ifndef NO_JOURNAL
YDB_Journal* jrnl;
#endif
};
YDB_Engine *ydb_init_instance() {
YDB_Engine *new_instance = calloc(1, sizeof(YDB_Engine));
#ifndef NO_JOURNAL
new_instance->jrnl = ydb_journal_init();
#endif
return new_instance;
}
void ydb_terminate_instance(YDB_Engine *instance) {
ydb_unload_table(instance);
#ifndef NO_JOURNAL
ydb_journal_destroy(instance->jrnl);
#endif
// And after all that, the instance could be freed
free(instance);
}
@ -91,6 +107,14 @@ static YDB_Error __ydb_read_page(YDB_Engine *inst) {
// Moves file position to allocated block.
// Also changes last_free_page_offset.
static YDB_Offset __ydb_allocate_page_and_seek(YDB_Engine *inst) {
// TODO journal
#ifndef NO_JOURNAL
YDB_Transaction* t = ydb_transaction_create();
YDB_TransactionOp* op_alloc = malloc(sizeof(YDB_TransactionOp));
op_alloc->opcode = YDB_jrnl_op_page_alloc;
op_alloc->size = 24;
op_alloc->data = malloc(op_alloc->size);
#endif
// TODO change signature of the function. It should return an error code, I think.
YDB_Offset result = 0;
// If no free pages in the table, then...
@ -118,7 +142,7 @@ static YDB_Offset __ydb_allocate_page_and_seek(YDB_Engine *inst) {
inst->last_page_offset = result;
fseek(inst->fd, YDB_v1_last_page_offset, SEEK_SET);
fwrite(&allocated_page_offset_le, sizeof(YDB_Offset), 1, inst->fd);
fflush(inst->fd);
fsync(fileno(inst->fd));
} else {
// Return last free page offset
result = inst->last_free_page_offset;
@ -143,7 +167,7 @@ static YDB_Offset __ydb_allocate_page_and_seek(YDB_Engine *inst) {
YDB_Offset lfp_offset_le = TO_LE(inst->last_free_page_offset);
fwrite(&lfp_offset_le, sizeof(YDB_Offset), 1, inst->fd);
}
fflush(inst->fd);
fsync(fileno(inst->fd));
fseek(inst->fd, result, SEEK_SET);
return result;
}
@ -303,7 +327,7 @@ YDB_Error ydb_append_page(YDB_Engine* instance, YDB_TablePage* page) {
fwrite(&rc_le, sizeof(rc), 1, instance->fd);
fwrite(d, sizeof(d), 1, instance->fd);
fflush(instance->fd);
fsync(fileno(instance->fd));
__ydb_read_page(instance);
@ -343,7 +367,7 @@ YDB_Error ydb_replace_current_page(YDB_Engine *instance, YDB_TablePage *page) {
fwrite(page_data, sizeof(page_data), 1, instance->fd);
// Flush buffer
fflush(instance->fd);
fsync(fileno(instance->fd));
ydb_page_free(instance->curr_page);
instance->curr_page = page;
@ -398,7 +422,7 @@ YDB_Error ydb_delete_current_page(YDB_Engine *instance) {
fwrite(&cp_le, sizeof(YDB_Offset), 1, instance->fd);
// Flush buffer
fflush(instance->fd);
fsync(fileno(instance->fd));
// Seek to the next page if it's not the last, else seek to the previous one
if (instance->next_page_offset != 0) {
@ -431,6 +455,10 @@ YDB_Error ydb_seek_to_end(YDB_Engine *instance) {
return __ydb_read_page(instance);
}
YDB_Error ydb_commit_changes(YDB_Engine *instance) {
return 0;
}
#ifdef __cplusplus
}
#endif