diff --git a/CMakeLists.txt b/CMakeLists.txt index 34d906e..207c1f6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,4 +12,7 @@ add_library(YeltsinDB STATIC inc/YeltsinDB/constants.h inc/YeltsinDB/types.h inc/YeltsinDB/macro.h - ) \ No newline at end of file + inc/YeltsinDB/journal.h src/journal.c + inc/YeltsinDB/journal/transaction.h src/journal/transaction.c + inc/YeltsinDB/journal/transaction_op.h + inc/YDB_ext/vec.h inc/YDB_ext/vec.c) \ No newline at end of file diff --git a/inc/YDB_ext/LICENSE_vec b/inc/YDB_ext/LICENSE_vec new file mode 100644 index 0000000..03b6555 --- /dev/null +++ b/inc/YDB_ext/LICENSE_vec @@ -0,0 +1,20 @@ +Copyright (c) 2014 rxi + + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/inc/YDB_ext/vec.c b/inc/YDB_ext/vec.c new file mode 100644 index 0000000..38a1b48 --- /dev/null +++ b/inc/YDB_ext/vec.c @@ -0,0 +1,120 @@ +#ifdef __cplusplus +extern "C" { +#endif +/** + * Copyright (c) 2014 rxi + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the MIT license. See LICENSE for details. + */ + +#include "vec.h" + + +int vec_expand_(char **data, int *length, int *capacity, int memsz) { + if (*length + 1 > *capacity) { + void *ptr; + int n = (*capacity == 0) ? 1 : *capacity << 1; + ptr = realloc(*data, n * memsz); + if (ptr == NULL) return -1; + *data = ptr; + *capacity = n; + } + return 0; +} + + +int vec_reserve_(char **data, int *length, int *capacity, int memsz, int n) { + (void) length; + if (n > *capacity) { + void *ptr = realloc(*data, n * memsz); + if (ptr == NULL) return -1; + *data = ptr; + *capacity = n; + } + return 0; +} + + +int vec_reserve_po2_( + char **data, int *length, int *capacity, int memsz, int n +) { + int n2 = 1; + if (n == 0) return 0; + while (n2 < n) n2 <<= 1; + return vec_reserve_(data, length, capacity, memsz, n2); +} + + +int vec_compact_(char **data, int *length, int *capacity, int memsz) { + if (*length == 0) { + free(*data); + *data = NULL; + *capacity = 0; + return 0; + } else { + void *ptr; + int n = *length; + ptr = realloc(*data, n * memsz); + if (ptr == NULL) return -1; + *capacity = n; + *data = ptr; + } + return 0; +} + + +int vec_insert_(char **data, int *length, int *capacity, int memsz, + int idx +) { + int err = vec_expand_(data, length, capacity, memsz); + if (err) return err; + memmove(*data + (idx + 1) * memsz, + *data + idx * memsz, + (*length - idx) * memsz); + return 0; +} + + +void vec_splice_(char **data, int *length, int *capacity, int memsz, + int start, int count +) { + (void) capacity; + memmove(*data + start * memsz, + *data + (start + count) * memsz, + (*length - start - count) * memsz); +} + + +void vec_swapsplice_(char **data, int *length, int *capacity, int memsz, + int start, int count +) { + (void) capacity; + memmove(*data + start * memsz, + *data + (*length - count) * memsz, + count * memsz); +} + + +void vec_swap_(char **data, int *length, int *capacity, int memsz, + int idx1, int idx2 +) { + unsigned char *a, *b, tmp; + int count; + (void) length; + (void) capacity; + if (idx1 == idx2) return; + a = (unsigned char*) *data + idx1 * memsz; + b = (unsigned char*) *data + idx2 * memsz; + count = memsz; + while (count--) { + tmp = *a; + *a = *b; + *b = tmp; + a++, b++; + } +} + +#ifdef __cplusplus +} +#endif diff --git a/inc/YDB_ext/vec.h b/inc/YDB_ext/vec.h new file mode 100644 index 0000000..003804a --- /dev/null +++ b/inc/YDB_ext/vec.h @@ -0,0 +1,189 @@ +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Copyright (c) 2014 rxi + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the MIT license. See LICENSE for details. + */ + +#ifndef VEC_H +#define VEC_H + +#include +#include + +#define VEC_VERSION "0.2.1" + + +#define vec_unpack_(v)\ + (char**)&(v)->data, &(v)->length, &(v)->capacity, sizeof(*(v)->data) + + +#define vec_t(T)\ + struct { T *data; int length, capacity; } + + +#define vec_init(v)\ + memset((v), 0, sizeof(*(v))) + + +#define vec_deinit(v)\ + ( free((v)->data),\ + vec_init(v) ) + + +#define vec_push(v, val)\ + ( vec_expand_(vec_unpack_(v)) ? -1 :\ + ((v)->data[(v)->length++] = (val), 0), 0 ) + + +#define vec_pop(v)\ + (v)->data[--(v)->length] + + +#define vec_splice(v, start, count)\ + ( vec_splice_(vec_unpack_(v), start, count),\ + (v)->length -= (count) ) + + +#define vec_swapsplice(v, start, count)\ + ( vec_swapsplice_(vec_unpack_(v), start, count),\ + (v)->length -= (count) ) + + +#define vec_insert(v, idx, val)\ + ( vec_insert_(vec_unpack_(v), idx) ? -1 :\ + ((v)->data[idx] = (val), 0), (v)->length++, 0 ) + + +#define vec_sort(v, fn)\ + qsort((v)->data, (v)->length, sizeof(*(v)->data), fn) + + +#define vec_swap(v, idx1, idx2)\ + vec_swap_(vec_unpack_(v), idx1, idx2) + + +#define vec_truncate(v, len)\ + ((v)->length = (len) < (v)->length ? (len) : (v)->length) + + +#define vec_clear(v)\ + ((v)->length = 0) + + +#define vec_first(v)\ + (v)->data[0] + + +#define vec_last(v)\ + (v)->data[(v)->length - 1] + + +#define vec_reserve(v, n)\ + vec_reserve_(vec_unpack_(v), n) + + +#define vec_compact(v)\ + vec_compact_(vec_unpack_(v)) + + +#define vec_pusharr(v, arr, count)\ + do {\ + int i__, n__ = (count);\ + if (vec_reserve_po2_(vec_unpack_(v), (v)->length + n__) != 0) break;\ + for (i__ = 0; i__ < n__; i__++) {\ + (v)->data[(v)->length++] = (arr)[i__];\ + }\ + } while (0) + + +#define vec_extend(v, v2)\ + vec_pusharr((v), (v2)->data, (v2)->length) + + +#define vec_find(v, val, idx)\ + do {\ + for ((idx) = 0; (idx) < (v)->length; (idx)++) {\ + if ((v)->data[(idx)] == (val)) break;\ + }\ + if ((idx) == (v)->length) (idx) = -1;\ + } while (0) + + +#define vec_remove(v, val)\ + do {\ + int idx__;\ + vec_find(v, val, idx__);\ + if (idx__ != -1) vec_splice(v, idx__, 1);\ + } while (0) + + +#define vec_reverse(v)\ + do {\ + int i__ = (v)->length / 2;\ + while (i__--) {\ + vec_swap((v), i__, (v)->length - (i__ + 1));\ + }\ + } while (0) + + +#define vec_foreach(v, var, iter)\ + if ( (v)->length > 0 )\ + for ( (iter) = 0;\ + (iter) < (v)->length && (((var) = (v)->data[(iter)]), 1);\ + ++(iter)) + + +#define vec_foreach_rev(v, var, iter)\ + if ( (v)->length > 0 )\ + for ( (iter) = (v)->length - 1;\ + (iter) >= 0 && (((var) = (v)->data[(iter)]), 1);\ + --(iter)) + + +#define vec_foreach_ptr(v, var, iter)\ + if ( (v)->length > 0 )\ + for ( (iter) = 0;\ + (iter) < (v)->length && (((var) = &(v)->data[(iter)]), 1);\ + ++(iter)) + + +#define vec_foreach_ptr_rev(v, var, iter)\ + if ( (v)->length > 0 )\ + for ( (iter) = (v)->length - 1;\ + (iter) >= 0 && (((var) = &(v)->data[(iter)]), 1);\ + --(iter)) + + + +int vec_expand_(char **data, int *length, int *capacity, int memsz); +int vec_reserve_(char **data, int *length, int *capacity, int memsz, int n); +int vec_reserve_po2_(char **data, int *length, int *capacity, int memsz, + int n); +int vec_compact_(char **data, int *length, int *capacity, int memsz); +int vec_insert_(char **data, int *length, int *capacity, int memsz, + int idx); +void vec_splice_(char **data, int *length, int *capacity, int memsz, + int start, int count); +void vec_swapsplice_(char **data, int *length, int *capacity, int memsz, + int start, int count); +void vec_swap_(char **data, int *length, int *capacity, int memsz, + int idx1, int idx2); + + +typedef vec_t(void*) vec_void_t; +typedef vec_t(char*) vec_str_t; +typedef vec_t(int) vec_int_t; +typedef vec_t(char) vec_char_t; +typedef vec_t(float) vec_float_t; +typedef vec_t(double) vec_double_t; + +#endif + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/inc/YeltsinDB/constants.h b/inc/YeltsinDB/constants.h index 939e568..6961770 100644 --- a/inc/YeltsinDB/constants.h +++ b/inc/YeltsinDB/constants.h @@ -44,4 +44,46 @@ enum YDB_v1_page_offsets { YDB_v1_page_prev_offset = YDB_v1_page_next_offset + YDB_v1_page_next_size, YDB_v1_page_row_count_offset = YDB_v1_page_prev_offset + YDB_v1_page_prev_size, YDB_v1_page_data_offset = YDB_v1_page_row_count_offset + YDB_v1_page_row_count_size, +}; + +#define YDB_JRNL_SIGN "JRNL" +#define YDB_JRNL_SIGN_SIZE (sizeof(YDB_JRNL_SIGN)-1) + +enum YDB_jrnl_sizes { + YDB_jrnl_first_transaction_size = 8, + YDB_jrnl_last_transaction_size = 8, + YDB_jrnl_transaction_prev_size = 8, + YDB_jrnl_transaction_next_size = 8, + YDB_jrnl_transaction_timestamp_size = 8, + YDB_jrnl_transaction_flags_size = 1, + YDB_jrnl_transaction_op_type_size = 1, + YDB_jrnl_transaction_op_datasz_size = 4, +}; + +enum YDB_jrnl_offsets { + YDB_jrnl_first_transaction_offset = YDB_JRNL_SIGN_SIZE, + YDB_jrnl_last_transaction_offset = YDB_jrnl_first_transaction_offset + YDB_jrnl_first_transaction_size, + YDB_jrnl_data_offset = YDB_jrnl_last_transaction_offset + YDB_jrnl_last_transaction_size, +}; + +enum YDB_jrnl_transaction_offsets { + YDB_jrnl_transaction_prev_offset = 0, + YDB_jrnl_transaction_next_offset = YDB_jrnl_transaction_prev_offset + YDB_jrnl_transaction_prev_size, + YDB_jrnl_transaction_timestamp_offset = YDB_jrnl_transaction_next_offset + YDB_jrnl_transaction_next_size, + YDB_jrnl_transaction_flags_offset = YDB_jrnl_transaction_timestamp_offset + YDB_jrnl_transaction_timestamp_size, + YDB_jrnl_transaction_data_offset = YDB_jrnl_transaction_flags_offset + YDB_jrnl_transaction_flags_size, +}; + +enum YDB_jrnl_transaction_op_offsets { + YDB_jrnl_transaction_op_type_offset = 0, + YDB_jrnl_transaction_op_datasz_offset = YDB_jrnl_transaction_op_type_offset + YDB_jrnl_transaction_op_type_size, + YDB_jrnl_transaction_op_data_offset = YDB_jrnl_transaction_op_datasz_offset + YDB_jrnl_transaction_op_datasz_size, +}; + +enum YDB_jrnl_transaction_ops { + YDB_jrnl_op_page_alloc = 0x01, + YDB_jrnl_op_page_modify = 0x02, + YDB_jrnl_op_page_remove = 0x03, + YDB_jrnl_op_rollback = 0xFE, + YDB_jrnl_op_complete = 0xFF, }; \ No newline at end of file diff --git a/inc/YeltsinDB/error_code.h b/inc/YeltsinDB/error_code.h index 4c22e39..6711093 100644 --- a/inc/YeltsinDB/error_code.h +++ b/inc/YeltsinDB/error_code.h @@ -70,6 +70,54 @@ * @brief The addresses of pages are the same. */ #define YDB_ERR_SAME_PAGE_ADDRESS (-14) +/** + * @brief The transaction is not initialized. + */ +#define YDB_ERR_TRANSACTION_NOT_INITIALIZED (-15) +/** + * @brief Transaction operation push failed. + */ +#define YDB_ERR_TRANSACTION_OP_PUSH_FAILED (-16) +/** + * @brief Index of transaction operation is out of range. + */ +#define YDB_ERR_TRANSACTION_OP_OUT_OF_RANGE (-17) +/** + * @brief The journal is not initialized. + */ +#define YDB_ERR_JOURNAL_NOT_INITIALIZED (-18) +/** + * @brief The journal is in use. + */ +#define YDB_ERR_JOURNAL_IN_USE (-19) +/** + * @brief The journal is not in use. + */ +#define YDB_ERR_JOURNAL_NOT_IN_USE (-20) +/** + * @brief The journal file does not exist. + */ +#define YDB_ERR_JOURNAL_NOT_EXIST (-21) +/** + * @brief The journal file does already exist. + */ +#define YDB_ERR_JOURNAL_EXIST (-22) +/** + * @brief The journal file is corrupted. + */ +#define YDB_ERR_JOURNAL_FILE_CORRUPTED (-23) +/** + * @brief The journal is not consistent. + */ +#define YDB_ERR_JOURNAL_NOT_CONSISTENT (-24) +/** + * @brief No more transactions left in the journal. + */ +#define YDB_ERR_NO_MORE_TRANSACTIONS (-25) +/** + * @brief The journal is empty. + */ +#define YDB_ERR_JOURNAL_EMPTY (-26) /** * @brief An unknown error has occurred. */ diff --git a/inc/YeltsinDB/journal.h b/inc/YeltsinDB/journal.h new file mode 100644 index 0000000..a7e5ac7 --- /dev/null +++ b/inc/YeltsinDB/journal.h @@ -0,0 +1,44 @@ +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +/** + * @file journal.h + * @todo Doxygen. + */ + +struct __YDB_Journal; +typedef struct __YDB_Journal YDB_Journal; + +YDB_Journal* ydb_journal_init(); + +void ydb_journal_destroy(YDB_Journal* jrnl); + +YDB_Error ydb_journal_file_create(YDB_Journal* jrnl, char* path); + +YDB_Error ydb_journal_file_open(YDB_Journal* jrnl, char* path); + +YDB_Error ydb_journal_file_close(YDB_Journal* jrnl); + +YDB_Error ydb_journal_file_check_consistency(YDB_Journal* jrnl); + +YDB_Error ydb_journal_seek_to_begin(YDB_Journal* jrnl); + +YDB_Error ydb_journal_seek_to_end(YDB_Journal* jrnl); + +YDB_Error ydb_journal_prev_transaction(YDB_Journal* jrnl); + +YDB_Error ydb_journal_next_transaction(YDB_Journal* jrnl); + +YDB_Error ydb_journal_append_transaction(YDB_Journal* jrnl, YDB_Transaction* transaction); + +YDB_Transaction* ydb_journal_get_current_transaction(YDB_Journal* jrnl); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/inc/YeltsinDB/journal/transaction.h b/inc/YeltsinDB/journal/transaction.h new file mode 100644 index 0000000..845a07d --- /dev/null +++ b/inc/YeltsinDB/journal/transaction.h @@ -0,0 +1,49 @@ +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @file journal/transaction.h + * @todo Doxygen. + */ + +#include + +struct __YDB_Transaction; +typedef struct __YDB_Transaction YDB_Transaction; + +/** + * @brief Create a new transaction. + * @return A transaction instance. + */ +YDB_Transaction* ydb_transaction_create(); + +/** + * @brief Destroy a transaction object. + * @param t Transaction object. + */ +void ydb_transaction_destroy(YDB_Transaction* t); + +YDB_Timestamp ydb_transaction_timestamp_get(YDB_Transaction* t); + +YDB_Error ydb_transaction_timestamp_set(YDB_Transaction* t, YDB_Timestamp timestamp); + +YDB_Flags ydb_transaction_flags_get(YDB_Transaction* t); + +YDB_Error ydb_transaction_flags_set(YDB_Transaction* t, YDB_Flags flags); + +uint32_t ydb_transaction_ops_size_get(YDB_Transaction* t); + +YDB_Error ydb_transaction_push_op(YDB_Transaction* t, YDB_TransactionOp* op); + +YDB_Error ydb_transaction_pop_op(YDB_Transaction* t); + +YDB_TransactionOp* ydb_transaction_op_at(YDB_Transaction* t, uint32_t index); + +YDB_Transaction* ydb_transaction_clone(YDB_Transaction* t); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/inc/YeltsinDB/journal/transaction_op.h b/inc/YeltsinDB/journal/transaction_op.h new file mode 100644 index 0000000..ff074d0 --- /dev/null +++ b/inc/YeltsinDB/journal/transaction_op.h @@ -0,0 +1,38 @@ +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +/** + * @file journal/transaction_op.h + * @todo Doxygen. + */ + +struct __YDB_TransactionOp { + YDB_OpCode opcode; + YDB_OpDataSize size; + void* data; +}; +typedef struct __YDB_TransactionOp YDB_TransactionOp; + +void ydb_transaction_op_destroy(YDB_TransactionOp* op) { + free(op->data); + free(op); +} + +YDB_TransactionOp* ydb_transaction_op_clone(YDB_TransactionOp* op) { + THROW_IF_NULL(op, NULL); + YDB_TransactionOp* result = malloc(sizeof(YDB_TransactionOp)); + result->opcode = op->opcode; + result->size = op->size; + result->data = malloc(op->size); + memcpy(result->data, op->data, op->size); + return result; +} + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/inc/YeltsinDB/types.h b/inc/YeltsinDB/types.h index 69c0f90..d887600 100644 --- a/inc/YeltsinDB/types.h +++ b/inc/YeltsinDB/types.h @@ -11,6 +11,13 @@ typedef int16_t YDB_Error; /** @brief YeltsinDB data file offset type. */ typedef uint64_t YDB_Offset; -/** */ +/** @brief YeltsinDB page size type. */ typedef uint16_t YDB_PageSize; +/** @brief YeltsinDB flags container type. */ typedef uint8_t YDB_Flags; +/** @brief YeltsinDB journal operation code type. */ +typedef uint8_t YDB_OpCode; +/** @brief YeltsinDB Unix timestamp type. */ +typedef int64_t YDB_Timestamp; +/** @brief YeltsinDB operation data size type. */ +typedef uint32_t YDB_OpDataSize; \ No newline at end of file diff --git a/inc/YeltsinDB/ydb.h b/inc/YeltsinDB/ydb.h index 58af0bb..5995bb4 100644 --- a/inc/YeltsinDB/ydb.h +++ b/inc/YeltsinDB/ydb.h @@ -135,6 +135,8 @@ YDB_Error ydb_seek_to_begin(YDB_Engine* instance); */ YDB_Error ydb_seek_to_end(YDB_Engine* instance); +YDB_Error ydb_commit_changes(YDB_Engine* instance); + // TODO: rebuild page offsets, etc. /** diff --git a/journal_file.md b/journal_file.md index 10dee54..57268b4 100644 --- a/journal_file.md +++ b/journal_file.md @@ -3,11 +3,11 @@ ## Specification 1. `JRNL` file signature (4 bytes) -2. The offset to the first transaction (8 bytes) -3. The offset to the last transaction (8 bytes) +2. The offset to the first transaction (8 bytes) **(could be 0 if no transactions)** +3. The offset to the last transaction (8 bytes) **(could be 0 if no transactions)** 4. Transactions - 1. Previous transaction offset (8 bytes) - 2. Next transaction offset (8 bytes) + 1. Previous transaction offset (8 bytes) **(could be 0 if the first one)** + 2. Next transaction offset (8 bytes) **(could be 0 if the last one)** 3. Creation date in Unix timestamp format (8 bytes) 4. Transaction flags (1 byte) 5. Operations list @@ -20,12 +20,16 @@ ### Flags **TODO**. Now there is the only option -- `JRNL_COMPLETE` on bit 0. +It means that the transaction was completely written to the storage. ### Consistency If the last transaction was not written completely, journal rollback is started. That means that it will be cleaned since it has not even been written to storage. +If the incomplete transaction is the only one in journal, the journal file is truncated +to 20 bytes and first and last transaction offsets are being nulled. + If the last transaction is complete in journal but not in storage (flag `JRNL_COMPLETE` is not set), redo is being done. That means the transaction starts again since the journal is consistent. diff --git a/src/journal.c b/src/journal.c new file mode 100644 index 0000000..ad6da8e --- /dev/null +++ b/src/journal.c @@ -0,0 +1,409 @@ +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +struct __YDB_Journal { + FILE *fd; + YDB_Offset first_transaction_offset; + YDB_Offset last_transaction_offset; + + YDB_Offset prev_transaction_offset; + YDB_Offset curr_transaction_offset; + YDB_Offset next_transaction_offset; + + YDB_Transaction* curr_transaction; + + uint8_t is_consistent; + uint8_t in_use; +}; + +inline static YDB_Error __ydb_jrnl_truncate_incomplete(YDB_Journal* j, YDB_Offset last_complete_offset) { + THROW_IF_NULL(j, YDB_ERR_JOURNAL_NOT_INITIALIZED); + + // The only transaction is incomplete + if (last_complete_offset == 0) { + j->prev_transaction_offset = 0; + j->next_transaction_offset = 0; + j->curr_transaction_offset = 0; + j->first_transaction_offset = 0; + j->last_transaction_offset = 0; + + fseeko(j->fd, YDB_jrnl_first_transaction_offset, SEEK_SET); + fwrite(&j->first_transaction_offset, sizeof(YDB_Offset), 1, j->fd); + fwrite(&j->last_transaction_offset, sizeof(YDB_Offset), 1, j->fd); + + ftruncate(fileno(j->fd), YDB_jrnl_data_offset); + fsync(fileno(j->fd)); + return YDB_ERR_SUCCESS; + } + + fseeko(j->fd, last_complete_offset + YDB_jrnl_transaction_prev_offset, SEEK_SET); + + YDB_Offset prev_offset; + fread(&prev_offset, sizeof(YDB_Offset), 1, j->fd); + REASSIGN_FROM_LE(prev_offset); + + YDB_Offset next_offset; + fread(&next_offset, sizeof(YDB_Offset), 1, j->fd); + REASSIGN_FROM_LE(next_offset); + + fseeko(j->fd, last_complete_offset + YDB_jrnl_transaction_next_offset, SEEK_SET); + YDB_Offset new_next_offset = 0; + fwrite(&new_next_offset, sizeof(YDB_Offset), 1, j->fd); + + j->last_transaction_offset = last_complete_offset; + fseeko(j->fd, YDB_jrnl_last_transaction_offset, SEEK_SET); + YDB_Offset last_complete_offset_le = TO_LE(last_complete_offset); + fwrite(&last_complete_offset_le, sizeof(YDB_Offset), 1, j->fd); + + ftruncate(fileno(j->fd), next_offset); + fsync(fileno(j->fd)); + + return YDB_ERR_SUCCESS; +} + +inline static YDB_TransactionOp* __ydb_read_op(YDB_Journal* j) { + THROW_IF_NULL(j, NULL); + YDB_TransactionOp* r = malloc(sizeof(YDB_TransactionOp)); + fread(&r->opcode, sizeof(YDB_OpCode), 1, j->fd); + fread(&r->size, sizeof(YDB_OpDataSize), 1, j->fd); + r->data = malloc(r->size); + fread(r->data, r->size, 1, j->fd); + return r; +} + +inline static YDB_Error __ydb_jrnl_read(YDB_Journal* j, YDB_Offset off) { + THROW_IF_NULL(j, YDB_ERR_JOURNAL_NOT_INITIALIZED); + + j->curr_transaction_offset = off; + fseeko(j->fd, off, SEEK_SET); + + YDB_Offset prev_offset, next_offset; + YDB_Timestamp timestamp; + YDB_Flags flags; + + fread(&prev_offset, sizeof(YDB_Offset), 1, j->fd); + fread(&next_offset, sizeof(YDB_Offset), 1, j->fd); + fread(×tamp, sizeof(YDB_Timestamp), 1, j->fd); + fread(&flags, sizeof(YDB_Flags), 1, j->fd); + + j->prev_transaction_offset = prev_offset; + j->next_transaction_offset = next_offset; + + YDB_Transaction* t = ydb_transaction_create(); + ydb_transaction_timestamp_set(t, timestamp); + ydb_transaction_flags_set(t, flags); + + YDB_TransactionOp* op; + do { + op = __ydb_read_op(j); + if (op->opcode != YDB_jrnl_op_complete) { + ydb_transaction_push_op(t, op); + } + } while (op->opcode != YDB_jrnl_op_complete); + + return YDB_ERR_SUCCESS; +} + +YDB_Journal *ydb_journal_init() { + YDB_Journal* result = calloc(1, sizeof(YDB_Journal)); + return result; +} + +void ydb_journal_destroy(YDB_Journal *jrnl) { + ydb_journal_file_close(jrnl); + free(jrnl); +} + +YDB_Error ydb_journal_file_create(YDB_Journal *jrnl, char *path) { + THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED); + THROW_IF_NULL(!jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE); + + if (access(path, F_OK) != -1) { + // TODO: Windows does not check W_OK correctly, use other methods. + // TODO: if can't read/write, throw other error + return YDB_ERR_JOURNAL_EXIST; + } + + FILE *f = fopen(path, "wb"); + + char tpl[] = YDB_JRNL_SIGN // File signature + "\x00\x00\x00\x00\x00\x00\x00\x00" // First transaction offset + "\x00\x00\x00\x00\x00\x00\x00\x00"; // Last transaction offset + fwrite(tpl, sizeof(tpl)-1, 1, f); + + fclose(f); + + return ydb_journal_file_open(jrnl, path); +} + +YDB_Error ydb_journal_file_open(YDB_Journal *jrnl, char *path) { + THROW_IF_NULL(jrnl, YDB_ERR_INSTANCE_NOT_INITIALIZED); + THROW_IF_NULL(!jrnl->in_use, YDB_ERR_INSTANCE_IN_USE); + + if (access(path, F_OK) == -1) { + // TODO: Windows does not check W_OK correctly, use other methods. + // TODO: if can't read/write, throw other error + return YDB_ERR_JOURNAL_NOT_EXIST; + } + + jrnl->fd = fopen(path, "rb+"); + THROW_IF_NULL(jrnl->fd, YDB_ERR_UNKNOWN); // TODO file open error + + char signature[YDB_JRNL_SIGN_SIZE]; + fread(signature, 1, YDB_JRNL_SIGN_SIZE, jrnl->fd); + int sig_match = memcmp(signature, YDB_JRNL_SIGN, YDB_JRNL_SIGN_SIZE) == 0; + if (!sig_match) { + return YDB_ERR_JOURNAL_FILE_CORRUPTED; + } + + // TODO check for read error + fread(&jrnl->first_transaction_offset, sizeof(YDB_Offset), 1, jrnl->fd); + REASSIGN_FROM_LE(jrnl->first_transaction_offset); + fread(&jrnl->last_transaction_offset, sizeof(YDB_Offset), 1, jrnl->fd); + REASSIGN_FROM_LE(jrnl->last_transaction_offset); + + jrnl->in_use = -1; + jrnl->curr_transaction_offset = jrnl->first_transaction_offset; + + return ydb_journal_file_check_consistency(jrnl); +} + +YDB_Error ydb_journal_file_close(YDB_Journal *jrnl) { + THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED); + THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE); + + jrnl->first_transaction_offset = 0; + jrnl->last_transaction_offset = 0; + jrnl->prev_transaction_offset = 0; + jrnl->curr_transaction_offset = 0; + jrnl->next_transaction_offset = 0; + ydb_transaction_destroy(jrnl->curr_transaction); + jrnl->curr_transaction = NULL; + fclose(jrnl->fd); + jrnl->in_use = 0; + + return YDB_ERR_SUCCESS; +} + +YDB_Error ydb_journal_file_check_consistency(YDB_Journal *jrnl) { + THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED); + THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE); + + // Empty journal is always consistent lol + if (jrnl->last_transaction_offset == 0) { + jrnl->is_consistent = -1; + return YDB_ERR_SUCCESS; + } + + YDB_Offset last_complete_offset = 0; + YDB_Offset prev_offset = 0; + YDB_Offset next_offset = 0; + + fseeko(jrnl->fd, jrnl->last_transaction_offset, SEEK_SET); + if (fread(&prev_offset, sizeof(YDB_Offset), 1, jrnl->fd) != sizeof(YDB_Offset)) { + if (feof(jrnl->fd)) { + clearerr(jrnl->fd); + perror("Mozhno nenado...\n"); + abort(); + // TODO try to find last transaction from begin + } else { + return YDB_ERR_UNKNOWN; // FIXME + } + } + REASSIGN_FROM_LE(prev_offset); + + last_complete_offset = prev_offset; + + if (fread(&next_offset, sizeof(YDB_Offset), 1, jrnl->fd) != sizeof(YDB_Offset)) { + if (feof(jrnl->fd)) { + clearerr(jrnl->fd); + __ydb_jrnl_truncate_incomplete(jrnl, last_complete_offset); + jrnl->is_consistent = -1; + return YDB_ERR_SUCCESS; + } else { + return YDB_ERR_UNKNOWN; // FIXME + } + } + REASSIGN_FROM_LE(next_offset); + + if (next_offset != 0) { + perror("Kavo? Ne ponyal shyas...\n"); + abort(); // Аборт -- это грех, кста + } + + fseeko(jrnl->fd, jrnl->last_transaction_offset + YDB_jrnl_transaction_flags_offset, SEEK_SET); + if (feof(jrnl->fd)) { + __ydb_jrnl_truncate_incomplete(jrnl, last_complete_offset); + jrnl->is_consistent = -1; + return YDB_ERR_SUCCESS; + } else if (ferror(jrnl->fd)) { + return YDB_ERR_UNKNOWN; //FIXME + } + + YDB_Flags flags = 0; + fread(&flags, sizeof(YDB_Flags), 1, jrnl->fd); + + if (flags & 1) { //FIXME magic + // A transaction has been written to the storage. + return YDB_ERR_SUCCESS; + } else { + YDB_OpCode op = 0; + + while (op != YDB_jrnl_op_complete) { + YDB_OpDataSize size; + fread(&op, sizeof(YDB_TransactionOp), 1, jrnl->fd); + fread(&size, sizeof(YDB_OpDataSize), 1, jrnl->fd); + REASSIGN_FROM_LE(size); + + if (feof(jrnl->fd) || ferror(jrnl->fd)) break; + fseeko(jrnl->fd, size, SEEK_CUR); + } + + if (op == YDB_jrnl_op_complete) { + //fseeko(jrnl->fd, jrnl->last_transaction_offset + YDB_jrnl_transaction_flags_offset, SEEK_SET); + //fwrite(flags | 1, sizeof(YDB_Flags), 1, jrnl->fd); //FIXME magic // Also flag should be written when the data in storage is consistent + //fsync(fileno(jrnl->fd)); + jrnl->is_consistent = -1; + return YDB_ERR_SUCCESS; + } + __ydb_jrnl_truncate_incomplete(jrnl, last_complete_offset); + } + + jrnl->is_consistent = -1; + YDB_Error e = ydb_journal_seek_to_begin(jrnl); + if (e == YDB_ERR_SUCCESS || e == YDB_ERR_JOURNAL_EMPTY) { + return YDB_ERR_SUCCESS; + } +} + +YDB_Error ydb_journal_seek_to_begin(YDB_Journal *jrnl) { + THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED); + THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE); + THROW_IF_NULL(jrnl->is_consistent, YDB_ERR_JOURNAL_NOT_CONSISTENT); + THROW_IF_NULL(jrnl->first_transaction_offset, YDB_ERR_JOURNAL_EMPTY); + + return __ydb_jrnl_read(jrnl, jrnl->first_transaction_offset); +} + +YDB_Error ydb_journal_seek_to_end(YDB_Journal *jrnl) { + THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED); + THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE); + THROW_IF_NULL(jrnl->is_consistent, YDB_ERR_JOURNAL_NOT_CONSISTENT); + THROW_IF_NULL(jrnl->last_transaction_offset, YDB_ERR_JOURNAL_EMPTY); + + return __ydb_jrnl_read(jrnl, jrnl->last_transaction_offset); +} + +YDB_Error ydb_journal_prev_transaction(YDB_Journal *jrnl) { + THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED); + THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE); + THROW_IF_NULL(jrnl->is_consistent, YDB_ERR_JOURNAL_NOT_CONSISTENT); + + if (jrnl->prev_transaction_offset != 0) { + __ydb_jrnl_read(jrnl, jrnl->prev_transaction_offset); + return YDB_ERR_SUCCESS; + } + return YDB_ERR_NO_MORE_TRANSACTIONS; +} + +YDB_Error ydb_journal_next_transaction(YDB_Journal *jrnl) { + THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED); + THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE); + THROW_IF_NULL(jrnl->is_consistent, YDB_ERR_JOURNAL_NOT_CONSISTENT); + + if (jrnl->next_transaction_offset != 0) { + __ydb_jrnl_read(jrnl, jrnl->prev_transaction_offset); + return YDB_ERR_SUCCESS; + } + return YDB_ERR_NO_MORE_TRANSACTIONS; +} + +YDB_Error ydb_journal_append_transaction(YDB_Journal *jrnl, YDB_Transaction *transaction) { + THROW_IF_NULL(jrnl, YDB_ERR_JOURNAL_NOT_INITIALIZED); + THROW_IF_NULL(jrnl->in_use, YDB_ERR_JOURNAL_NOT_IN_USE); + THROW_IF_NULL(jrnl->is_consistent, YDB_ERR_JOURNAL_NOT_CONSISTENT); + + YDB_Transaction* t = ydb_transaction_clone(transaction); + + fseeko(jrnl->fd, 0, SEEK_END); + YDB_Offset file_end = ftello(jrnl->fd); + YDB_Offset file_end_le = TO_LE(file_end); + + YDB_Offset prev = jrnl->last_transaction_offset; + YDB_Offset prev_le = TO_LE(prev); + YDB_Offset next_le = 0; + YDB_Timestamp ts_le = TO_LE(ydb_transaction_timestamp_get(t)); + YDB_Flags flags = ydb_transaction_flags_get(t); + + fseeko(jrnl->fd, file_end, SEEK_SET); + fwrite(&prev_le, sizeof(YDB_Offset), 1, jrnl->fd); + fwrite(&next_le, sizeof(YDB_Offset), 1, jrnl->fd); + fwrite(&ts_le, sizeof(YDB_Timestamp), 1, jrnl->fd); + fwrite(&flags, sizeof(YDB_Flags), 1, jrnl->fd); + + jrnl->last_transaction_offset = file_end; + fseeko(jrnl->fd, YDB_jrnl_last_transaction_offset, SEEK_SET); + fwrite(&file_end_le, sizeof(YDB_Offset), 1, jrnl->fd); + + fsync(fileno(jrnl->fd)); // Checkpoint + + if (prev != 0) { + fseeko(jrnl->fd, prev + YDB_jrnl_transaction_next_offset, SEEK_SET); + } else { + fseeko(jrnl->fd, YDB_jrnl_first_transaction_offset, SEEK_SET); + } + fwrite(&file_end_le, sizeof(YDB_Offset), 1, jrnl->fd); + + fsync(fileno(jrnl->fd)); // Checkpoint + + fseeko(jrnl->fd, file_end, SEEK_SET); + uint32_t op_cnt = ydb_transaction_ops_size_get(t); + for (uint32_t i = 0; i < op_cnt; ++i) { + YDB_TransactionOp* op = ydb_transaction_op_at(t, i); + fwrite(&op->opcode, sizeof(YDB_OpCode), 1, jrnl->fd); + fwrite(&op->size, sizeof(YDB_OpDataSize), 1, jrnl->fd); + fwrite(&op->data, op->size, 1, jrnl->fd); + fsync(fileno(jrnl->fd)); + } + // Write completion op + fputc(YDB_jrnl_op_complete, jrnl->fd); + YDB_OpDataSize sz = 0; +#ifndef NO_EASTER_EGGS + char easter[] = "Andrew is a pi-door-ass"; + sz = sizeof(easter) - 1; + fwrite(&sz, sizeof(YDB_OpDataSize), 1, jrnl->fd); + fwrite(easter, sz, 1, jrnl->fd); +#else + fwrite(&sz, sizeof(YDB_OpDataSize), 1, jrnl->fd); +#endif + fsync(fileno(jrnl->fd)); + + return __ydb_jrnl_read(jrnl, file_end); +} + +YDB_Transaction *ydb_journal_get_current_transaction(YDB_Journal *jrnl) { + THROW_IF_NULL(jrnl, NULL); + THROW_IF_NULL(jrnl->in_use, NULL); + THROW_IF_NULL(jrnl->is_consistent, NULL); + THROW_IF_NULL(jrnl->first_transaction_offset, NULL); + + return jrnl->curr_transaction; +} + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/src/journal/transaction.c b/src/journal/transaction.c new file mode 100644 index 0000000..dc6f02f --- /dev/null +++ b/src/journal/transaction.c @@ -0,0 +1,101 @@ +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include +#include + +typedef vec_t(YDB_TransactionOp*) vec_op_t; + +struct __YDB_Transaction { + YDB_Timestamp timestamp; + YDB_Flags flags; + vec_op_t ops; +}; + +YDB_Transaction *ydb_transaction_create() { + YDB_Transaction* result = calloc(1, sizeof(YDB_Transaction)); + vec_init(&result->ops); + return result; +} + +void ydb_transaction_destroy(YDB_Transaction *t) { + THROW_IF_NULL(t, (void)0); + int i; YDB_TransactionOp* op; + vec_foreach(&t->ops, op, i) { + ydb_transaction_op_destroy(t->ops.data[i]); + } + vec_deinit(&t->ops); + free(t); +} +YDB_Timestamp ydb_transaction_timestamp_get(YDB_Transaction *t) { + THROW_IF_NULL(t, 1 << sizeof(YDB_Timestamp)); // NOLINT minimal signed int value + return t->timestamp; +} +YDB_Error ydb_transaction_timestamp_set(YDB_Transaction *t, YDB_Timestamp timestamp) { + THROW_IF_NULL(t, YDB_ERR_TRANSACTION_NOT_INITIALIZED); + t->timestamp = timestamp; + return YDB_ERR_SUCCESS; +} + +YDB_Flags ydb_transaction_flags_get(YDB_Transaction *t) { + THROW_IF_NULL(t, 0); // NOLINT minimal signed int value + return t->flags; +} + +YDB_Error ydb_transaction_flags_set(YDB_Transaction *t, YDB_Flags flags) { + THROW_IF_NULL(t, YDB_ERR_TRANSACTION_NOT_INITIALIZED); + t->flags = flags; + return YDB_ERR_SUCCESS; +} + +uint32_t ydb_transaction_ops_size_get(YDB_Transaction *t) { + THROW_IF_NULL(t, 0); + return t->ops.length; +} + +YDB_Error ydb_transaction_push_op(YDB_Transaction* t, YDB_TransactionOp *op) { + THROW_IF_NULL(t, YDB_ERR_TRANSACTION_NOT_INITIALIZED); + YDB_TransactionOp* op_copy = ydb_transaction_op_clone(op); + if(vec_push(&t->ops, op_copy)) { + return YDB_ERR_TRANSACTION_OP_PUSH_FAILED; + } + return YDB_ERR_SUCCESS; +} + +YDB_Error ydb_transaction_pop_op(YDB_Transaction *t) { + THROW_IF_NULL(t, YDB_ERR_TRANSACTION_NOT_INITIALIZED); + uint32_t vec_back = t->ops.length - 1; + ydb_transaction_op_destroy(t->ops.data[vec_back]); + vec_pop(&t->ops); + return YDB_ERR_SUCCESS; +} + +YDB_TransactionOp *ydb_transaction_op_at(YDB_Transaction *t, uint32_t index) { + THROW_IF_NULL(t, NULL); + if (index > ydb_transaction_ops_size_get(t)) { + return NULL; + } + return t->ops.data[index]; +} + +YDB_Transaction *ydb_transaction_clone(YDB_Transaction *t) { + THROW_IF_NULL(t, NULL); + YDB_Transaction* r = ydb_transaction_create(); + r->timestamp = t->timestamp; + r->flags = t->flags; + int i; YDB_TransactionOp* op; + vec_foreach(&t->ops, op, i) { + vec_push(&r->ops, ydb_transaction_op_clone(op)); + } + + return r; +} + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/src/ydb.c b/src/ydb.c index e6043b9..e178b1f 100644 --- a/src/ydb.c +++ b/src/ydb.c @@ -11,6 +11,11 @@ extern "C" { #include #include #include +#ifndef NO_JOURNAL +#include +#include +#include +#endif #include struct __YDB_Engine { @@ -29,16 +34,27 @@ struct __YDB_Engine { uint8_t in_use; /**< "In use" flag. */ char *filename; /**< Current table data file name. */ FILE *fd; /**< Current table data file descriptor. */ + +#ifndef NO_JOURNAL + YDB_Journal* jrnl; +#endif }; YDB_Engine *ydb_init_instance() { YDB_Engine *new_instance = calloc(1, sizeof(YDB_Engine)); +#ifndef NO_JOURNAL + new_instance->jrnl = ydb_journal_init(); +#endif return new_instance; } void ydb_terminate_instance(YDB_Engine *instance) { ydb_unload_table(instance); +#ifndef NO_JOURNAL + ydb_journal_destroy(instance->jrnl); +#endif + // And after all that, the instance could be freed free(instance); } @@ -91,6 +107,14 @@ static YDB_Error __ydb_read_page(YDB_Engine *inst) { // Moves file position to allocated block. // Also changes last_free_page_offset. static YDB_Offset __ydb_allocate_page_and_seek(YDB_Engine *inst) { + // TODO journal +#ifndef NO_JOURNAL + YDB_Transaction* t = ydb_transaction_create(); + YDB_TransactionOp* op_alloc = malloc(sizeof(YDB_TransactionOp)); + op_alloc->opcode = YDB_jrnl_op_page_alloc; + op_alloc->size = 24; + op_alloc->data = malloc(op_alloc->size); +#endif // TODO change signature of the function. It should return an error code, I think. YDB_Offset result = 0; // If no free pages in the table, then... @@ -118,7 +142,7 @@ static YDB_Offset __ydb_allocate_page_and_seek(YDB_Engine *inst) { inst->last_page_offset = result; fseek(inst->fd, YDB_v1_last_page_offset, SEEK_SET); fwrite(&allocated_page_offset_le, sizeof(YDB_Offset), 1, inst->fd); - fflush(inst->fd); + fsync(fileno(inst->fd)); } else { // Return last free page offset result = inst->last_free_page_offset; @@ -143,7 +167,7 @@ static YDB_Offset __ydb_allocate_page_and_seek(YDB_Engine *inst) { YDB_Offset lfp_offset_le = TO_LE(inst->last_free_page_offset); fwrite(&lfp_offset_le, sizeof(YDB_Offset), 1, inst->fd); } - fflush(inst->fd); + fsync(fileno(inst->fd)); fseek(inst->fd, result, SEEK_SET); return result; } @@ -303,7 +327,7 @@ YDB_Error ydb_append_page(YDB_Engine* instance, YDB_TablePage* page) { fwrite(&rc_le, sizeof(rc), 1, instance->fd); fwrite(d, sizeof(d), 1, instance->fd); - fflush(instance->fd); + fsync(fileno(instance->fd)); __ydb_read_page(instance); @@ -343,7 +367,7 @@ YDB_Error ydb_replace_current_page(YDB_Engine *instance, YDB_TablePage *page) { fwrite(page_data, sizeof(page_data), 1, instance->fd); // Flush buffer - fflush(instance->fd); + fsync(fileno(instance->fd)); ydb_page_free(instance->curr_page); instance->curr_page = page; @@ -398,7 +422,7 @@ YDB_Error ydb_delete_current_page(YDB_Engine *instance) { fwrite(&cp_le, sizeof(YDB_Offset), 1, instance->fd); // Flush buffer - fflush(instance->fd); + fsync(fileno(instance->fd)); // Seek to the next page if it's not the last, else seek to the previous one if (instance->next_page_offset != 0) { @@ -431,6 +455,10 @@ YDB_Error ydb_seek_to_end(YDB_Engine *instance) { return __ydb_read_page(instance); } +YDB_Error ydb_commit_changes(YDB_Engine *instance) { + return 0; +} + #ifdef __cplusplus } #endif \ No newline at end of file