1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

Merge branch 'develop' into MCOL-3536

This commit is contained in:
David Hall
2020-06-01 15:09:44 -05:00
38 changed files with 683 additions and 182 deletions

211
.drone.jsonnet Normal file
View File

@@ -0,0 +1,211 @@
local platforms = {
"develop": ["centos:7", "centos:8", "debian:9", "debian:10", "ubuntu:18.04", "ubuntu:20.04"],
'develop-1.4': ["centos:7", "centos:8", "debian:9", "debian:10", "ubuntu:16.04", "ubuntu:18.04", "ubuntu:20.04"]
};
local codebase_map = {
// "develop": "git clone --recurse-submodules --branch mariadb-10.5.3 --depth 1 https://github.com/MariaDB/server .",
develop: 'git clone --recurse-submodules --branch bb-10.5-cs --depth 1 https://github.com/MariaDB/server .',
"develop-1.4": "git clone --recurse-submodules --branch 10.4-enterprise --depth 1 https://github.com/mariadb-corporation/MariaDBEnterprise .",
};
local builddir = 'verylongdirnameforverystrangecpackbehavior';
local cmakeflags = '-DCMAKE_BUILD_TYPE=Release -DPLUGIN_COLUMNSTORE=YES -DPLUGIN_MROONGA=NO -DPLUGIN_ROCKSDB=NO -DPLUGIN_TOKUDB=NO -DPLUGIN_CONNECT=NO -DPLUGIN_SPIDER=NO -DPLUGIN_OQGRAPH=NO -DPLUGIN_PERFSCHEMA=NO -DPLUGIN_SPHINX=NO';
local rpm_build_deps = 'yum install -y git cmake make gcc gcc-c++ libaio-devel openssl-devel boost-devel bison snappy-devel flex libcurl-devel libxml2-devel ncurses-devel automake libtool policycoreutils-devel rpm-build lsof iproute pam-devel perl-DBI cracklib-devel expect readline-devel';
local deb_build_deps = 'apt update && apt install --yes --no-install-recommends git ca-certificates devscripts equivs build-essential libboost-all-dev libdistro-info-perl flex pkg-config automake libtool lsb-release bison chrpath cmake dh-apparmor dh-systemd gdb libaio-dev libcrack2-dev libjemalloc-dev libjudy-dev libkrb5-dev libncurses5-dev libpam0g-dev libpcre3-dev libreadline-gplv2-dev libsnappy-dev libssl-dev libsystemd-dev libxml2-dev unixodbc-dev uuid-dev zlib1g-dev libcurl4-openssl-dev dh-exec libpcre2-dev libzstd-dev psmisc socat expect net-tools rsync lsof libdbi-perl iproute2 gawk && mk-build-deps debian/control && dpkg -i mariadb-10*.deb || true && apt install -fy --no-install-recommends';
local platformMap(branch, platform) =
local branch_cmakeflags_map = {
develop: ' -DBUILD_CONFIG=mysql_release -DWITH_WSREP=OFF',
'develop-1.4': ' -DBUILD_CONFIG=enterprise',
};
local platform_map = {
'opensuse/leap:15': 'zypper install -y ' + rpm_build_deps + ' && cmake ' + cmakeflags + branch_cmakeflags_map[branch] + ' -DRPM=sles15 && make -j$(nproc) package',
'centos:7': rpm_build_deps + ' && cmake ' + cmakeflags + branch_cmakeflags_map[branch] + ' -DRPM=centos7 && make -j$(nproc) package',
'centos:8': "sed -i 's/enabled=0/enabled=1/' /etc/yum.repos.d/CentOS-PowerTools.repo && " + rpm_build_deps + ' && cmake ' + cmakeflags + branch_cmakeflags_map[branch] + ' -DRPM=centos8 && make -j$(nproc) package',
'debian:9': deb_build_deps + " && CMAKEFLAGS='" + cmakeflags + branch_cmakeflags_map[branch] + " -DDEB=stretch' debian/autobake-deb.sh",
'debian:10': deb_build_deps + " && CMAKEFLAGS='" + cmakeflags + branch_cmakeflags_map[branch] + " -DDEB=buster' debian/autobake-deb.sh",
'ubuntu:16.04': deb_build_deps + " && CMAKEFLAGS='" + cmakeflags + branch_cmakeflags_map[branch] + " -DDEB=xenial' debian/autobake-deb.sh",
'ubuntu:18.04': deb_build_deps + " && CMAKEFLAGS='" + cmakeflags + branch_cmakeflags_map[branch] + " -DDEB=bionic' debian/autobake-deb.sh",
'ubuntu:20.04': deb_build_deps + " && CMAKEFLAGS='" + cmakeflags + branch_cmakeflags_map[branch] + " -DDEB=focal' debian/autobake-deb.sh",
};
platform_map[platform];
local Pipeline(branch, platform, event) = {
local pipeline = self,
_volumes:: {
mdb: {
name: 'mdb',
path: '/mdb',
},
},
tests:: {
name: 'tests',
image: platform,
commands: [
(if platform == 'centos:7' then 'yum install -y sysvinit-tools' else '' ),
(if platform == 'centos:8' then 'yum install -y diffutils' else '' ),
'yum install -y lz4 wget git rsyslog',
"sed -i '/OmitLocalLogging/d' /etc/rsyslog.conf",
"sed -i 's/off/on/' /etc/rsyslog.conf",
"rm -f /etc/rsyslog.d/listen.conf",
'rsyslogd',
'yum install -y result/*.rpm',
'kill $(pidof rsyslogd) && while pidof rsyslogd; do sleep 2; done',
'rsyslogd',
'bash -o pipefail ./build/columnstore_startup.sh',
'git clone --recurse-submodules --branch ' + branch + ' --depth 1 https://github.com/mariadb-corporation/mariadb-columnstore-regression-test',
'wget -qO- https://cspkg.s3.amazonaws.com/testData.tar.lz4 | lz4 -dc - | tar xf - -C mariadb-columnstore-regression-test/',
'cd mariadb-columnstore-regression-test/mysql/queries/nightly/alltest',
"./go.sh --sm_unit_test_dir=/drone/src/storage-manager" + (if event == 'pull_request' then ' --tests=test000.sh' else '' ),
'cat go.log',
'test -f testErrorLogs.tgz && mv testErrorLogs.tgz /drone/src/result/ || echo no-errors-archive',
],
},
kind: 'pipeline',
type: 'docker',
name: std.join(" ", [branch, platform, event]),
clone: {
depth: 10,
},
steps: [
{
name: 'submodules',
image: 'alpine/git',
commands: [
'git submodule update --recursive --remote',
'git config cmake.update-submodules no',
'ls -la /drone/src/storage-manager',
],
},
{
name: 'clone-mdb',
image: 'alpine/git',
volumes: [pipeline._volumes.mdb],
commands: [
'mkdir -p /mdb/' + builddir + ' && cd /mdb/' + builddir,
codebase_map[branch],
'git config cmake.update-submodules no',
'rm -rf storage/columnstore',
'cp -r /drone/src /mdb/' + builddir + '/storage/columnstore',
],
},
{
name: 'build',
image: platform,
volumes: [pipeline._volumes.mdb],
environment: {
DEBIAN_FRONTEND: 'noninteractive',
TRAVIS: 'true',
},
commands: [
'cd /mdb/' + builddir,
"sed -i -e '/-DBUILD_CONFIG=mysql_release/d' debian/rules",
"sed -i -e '/Package: libmariadbd19/,/^$/d' debian/control",
"sed -i -e '/Package: libmariadbd-dev/,/^$/d' debian/control",
"sed -i -e '/Package: mariadb-backup/,/^$/d' debian/control",
"sed -i -e '/Package: mariadb-plugin-connect/,/^$/d' debian/control",
"sed -i -e '/Package: mariadb-plugin-cracklib-password-check/,/^$/d' debian/control",
"sed -i -e '/Package: mariadb-plugin-gssapi-*/,/^$/d' debian/control",
"sed -i -e '/wsrep/d' debian/mariadb-server-*.install",
"sed -i -e 's/Depends: galera.*/Depends:/' debian/control",
"sed -i -e 's/\"galera-enterprise-4\"//' cmake/cpack_rpm.cmake",
platformMap(branch, platform),
],
},
{
name: 'list pkgs',
image: 'centos:7',
volumes: [pipeline._volumes.mdb],
commands: [
'cd /mdb/' + builddir,
'mkdir /drone/src/result',
'cp *.rpm /drone/src/result 2>/dev/null || true',
'cp ../*.deb /drone/src/result 2>/dev/null || true',
'! test -n "$(find /drone/src/result -prune -empty)" && ls /drone/src/result',
],
},
] +
(if branch=='develop-1.4' && std.split(platform, ":")[0]=="centos" then [pipeline.tests] else []) +
[
{
name: 'publish',
image: 'plugins/s3',
when: {
status: ['success', 'failure'],
// event: ['cron'],
},
settings: {
bucket: 'cspkg',
access_key: {
from_secret: 'aws_access_key_id',
},
secret_key: {
from_secret: 'aws_secret_access_key',
},
source: 'result/*',
target: branch + '/${DRONE_BUILD_NUMBER}/' + std.strReplace(platform, ':', ''),
strip_prefix: 'result/',
},
},
],
volumes: [pipeline._volumes.mdb + {"temp": {}}],
trigger: {
event: [event],
branch: [branch],
} + (if event == 'cron' then {
cron: ['nightly-'+ std.strReplace(branch, '.', '-')]
} else {})
};
local FinalPipeline(branch, event) = {
kind: "pipeline",
name: std.join(" ", ["after", branch, event]),
steps: [
{
name: "notify",
image: "plugins/slack",
settings: {
room: "#drone_test",
webhook: {
from_secret: "slack_webhook"
},
template: (if event == 'cron' then "*Nightly" else "*Pull Request <https://github.com/{{repo.owner}}/{{repo.name}}/pull/{{build.pull}}|#{{build.pull}}>" ) +
" build <{{build.link}}|{{build.number}}> {{#success build.status}}succeeded{{else}}failed{{/success}}*.
*Branch*: <https://github.com/{{repo.owner}}/{{repo.name}}/tree/{{build.branch}}|{{build.branch}}>
*Commit*: <https://github.com/{{repo.owner}}/{{repo.name}}/commit/{{build.commit}}|{{truncate build.commit 8}}> {{truncate build.message.title 100 }}
*Author*: {{ build.author }}
*Duration*: {{since build.started}}
*Artifacts*: https://cspkg.s3.amazonaws.com/index.html?prefix={{build.branch}}/{{build.number}}"
},
},
],
trigger: {
event: [event],
branch: [branch],
status: [
"success",
"failure"
],
} + (if event == 'cron' then {
cron: ['nightly-'+ std.strReplace(branch, '.', '-')]
} else {}),
depends_on: std.map(function(p) std.join(" ", [branch, p, event]), platforms[branch])
};
[
Pipeline(b, p, e)
for b in ['develop', 'develop-1.4']
for p in platforms[b]
for e in ['pull_request', 'cron']
] + [
FinalPipeline(b, e)
for b in ['develop', 'develop-1.4']
for e in ['pull_request', 'cron']
]

12
.gitignore vendored
View File

@@ -21,6 +21,7 @@ CMakeCache.txt
CMakeFiles
CMakeScripts
Makefile
VERSION.dep
cmake_install.cmake
install_manifest.txt
CTestTestfile.cmake
@@ -133,8 +134,18 @@ oam/install_scripts/columnstore.service
oam/install_scripts/columnstoreSyslogSetup.sh
oam/install_scripts/columnstore_module_installer.sh
oam/install_scripts/disable-rep-columnstore.sh
oam/install_scripts/mariadb-columnstore.service
oam/install_scripts/mariadb-command-line.sh
oam/install_scripts/master-rep-columnstore.sh
oam/install_scripts/mcs-controllernode.service
oam/install_scripts/mcs-ddlproc.service
oam/install_scripts/mcs-dmlproc.service
oam/install_scripts/mcs-exemgr.service
oam/install_scripts/mcs-loadbrm.service
oam/install_scripts/mcs-primproc.service
oam/install_scripts/mcs-stop-controllernode.sh
oam/install_scripts/mcs-workernode.service
oam/install_scripts/mcs-writeengineserver.service
oam/install_scripts/mcs_module_installer.sh
oam/install_scripts/slave-rep-columnstore.sh
oam/install_scripts/startupTests.sh
@@ -147,4 +158,5 @@ bin
external
gitversionEngine
mcsconfig.h
storage-manager/testS3Connection
storage-manager/unit_tests

View File

@@ -1,41 +1,6 @@
CMAKE_MINIMUM_REQUIRED(VERSION 2.8.12)
# Avoid warnings in higher versions
if("${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION}" GREATER 2.6)
CMAKE_POLICY(VERSION 2.8)
endif()
# explicitly set the policy to OLD
# (cannot use NEW, not everyone is on cmake-2.8.12 yet)
IF(POLICY CMP0022)
CMAKE_POLICY(SET CMP0022 OLD)
ENDIF()
# We use the LOCATION target property (CMP0026)
# and get_target_property() for non-existent targets (CMP0045)
# and INSTALL_NAME_DIR (CMP0042)
IF(CMAKE_VERSION VERSION_EQUAL "3.0.0" OR
CMAKE_VERSION VERSION_GREATER "3.0.0")
CMAKE_POLICY(SET CMP0026 OLD)
CMAKE_POLICY(SET CMP0045 OLD)
CMAKE_POLICY(SET CMP0042 OLD)
ENDIF()
MESSAGE(STATUS "Running cmake version ${CMAKE_VERSION}")
OPTION(USE_CCACHE "reduce compile time with ccache." FALSE)
if(NOT USE_CCACHE)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "")
set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK "")
else()
find_program(CCACHE_FOUND ccache)
if(CCACHE_FOUND)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ccache)
set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK ccache)
endif(CCACHE_FOUND)
endif()
IF(NOT INSTALL_LAYOUT)
IF(NOT CMAKE_BUILD_TYPE)
SET(CMAKE_BUILD_TYPE RELWITHDEBINFO CACHE STRING
@@ -123,16 +88,37 @@ ENDIF ()
SET_PROPERTY(DIRECTORY PROPERTY EP_BASE ${CMAKE_CURRENT_BINARY_DIR}/external)
LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/cmake)
FIND_PACKAGE(Boost 1.53.0 REQUIRED COMPONENTS system filesystem thread regex date_time chrono atomic)
FIND_PACKAGE(BISON REQUIRED)
check_cxx_source_compiles("#include <filesystem>\n void main(){}" HAS_STD_FILESYSTEM)
check_cxx_source_compiles("#include <experimental/filesystem>\n void main(){}" HAS_STD_EXPERIMENTAL_FILESYSTEM)
SET (ENGINE_SRC_DIR ${CMAKE_CURRENT_SOURCE_DIR})
INCLUDE(columnstore_version)
OPTION(USE_CCACHE "reduce compile time with ccache." FALSE)
if(NOT USE_CCACHE)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "")
set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK "")
else()
find_program(CCACHE_FOUND ccache)
if(CCACHE_FOUND)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ccache)
set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK ccache)
endif(CCACHE_FOUND)
endif()
FIND_PACKAGE(Boost 1.53.0 COMPONENTS system filesystem thread regex date_time chrono atomic)
IF (NOT Boost_FOUND)
MESSAGE_ONCE(CS_NO_BOOST "Required Boost libraries not found!")
return()
ENDIF()
FIND_PACKAGE(BISON)
IF (NOT BISON_FOUND)
MESSAGE_ONCE(CS_NO_BISON "bison not found!")
return()
ENDIF()
check_cxx_source_compiles("#include <filesystem>\n void main(){}" HAS_STD_FILESYSTEM)
check_cxx_source_compiles("#include <experimental/filesystem>\n void main(){}" HAS_STD_EXPERIMENTAL_FILESYSTEM)
SET (PACKAGE columnstore)
SET (PACKAGE_NAME columnstore)
SET (PACKAGE_TARNAME columnstore)
@@ -143,30 +129,32 @@ SET (PACKAGE_STRING columnstore-${PACKAGE_VERSION})
INCLUDE (configureEngine)
FIND_PROGRAM(LEX_EXECUTABLE flex DOC "path to the flex executable")
if(NOT LEX_EXECUTABLE)
FIND_PROGRAM(LEX_EXECUTABLE lex DOC "path to the lex executable")
if(NOT LEX_EXECUTABLE)
message(FATAL_ERROR "flex/lex not found!")
MESSAGE_ONCE(CS_NO_LEX "flex/lex not found!")
return()
endif()
endif()
FIND_PACKAGE(LibXml2)
if (NOT LIBXML2_FOUND)
MESSAGE(FATAL_ERROR "Could not find a usable libxml2 development environment!")
MESSAGE_ONCE(CS_NO_LIBXML "Could not find a usable libxml2 development environment!")
return()
endif()
INCLUDE (FindSnappy)
find_package(Snappy)
if (NOT SNAPPY_FOUND)
MESSAGE(FATAL_ERROR "Snappy not found please install snappy-devel for CentOS/RedHat or libsnappy-dev for Ubuntu/Debian")
MESSAGE_ONCE(CS_NO_SNAPPY "Snappy not found please install snappy-devel for CentOS/RedHat or libsnappy-dev for Ubuntu/Debian")
return()
endif()
FIND_PROGRAM(AWK_EXECUTABLE awk DOC "path to the awk executable")
if(NOT AWK_EXECUTABLE)
message(FATAL_ERROR "awk not found!")
MESSAGE_ONCE(CS_NO_AWK "awk not found!")
return()
endif()
IF (NOT INSTALL_LAYOUT)
@@ -240,8 +228,8 @@ IF (NOT SERVER_BUILD_DIR)
SET (SERVER_BUILD_DIR ${SERVER_SOURCE_ROOT_DIR})
ENDIF()
MESSAGE("SERVER_BUILD_INCLUDE_DIR = ${SERVER_BUILD_INCLUDE_DIR}")
MESSAGE("SERVER_SOURCE_ROOT_DIR = ${SERVER_SOURCE_ROOT_DIR}")
MESSAGE_ONCE(SERVER_BUILD_INCLUDE_DIR "SERVER_BUILD_INCLUDE_DIR = ${SERVER_BUILD_INCLUDE_DIR}")
MESSAGE_ONCE(SERVER_SOURCE_ROOT_DIR "SERVER_SOURCE_ROOT_DIR = ${SERVER_SOURCE_ROOT_DIR}")
IF (INSTALL_LAYOUT)
SET (MARIADB_CLIENT_LIBS libmariadb)
@@ -398,9 +386,9 @@ IF (INSTALL_LAYOUT)
set(SUSE_VERSION_NUMBER "${CMAKE_MATCH_1}")
ENDIF ()
if (${SUSE_VERSION_NUMBER} EQUAL 12)
SETA(CPACK_RPM_columnstore-engine_PACKAGE_REQUIRES "expect" "boost-devel >= 1.54.0" "snappy" "jemalloc" "net-tools" PARENT_SCOPE)
SETA(CPACK_RPM_columnstore-engine_PACKAGE_REQUIRES "expect" "boost-devel >= 1.54.0" "snappy" "jemalloc" "net-tools MariaDB-server" PARENT_SCOPE)
else ()
SETA(CPACK_RPM_columnstore-engine_PACKAGE_REQUIRES "expect" "boost >= 1.53.0" "snappy" "jemalloc" "net-tools" PARENT_SCOPE)
SETA(CPACK_RPM_columnstore-engine_PACKAGE_REQUIRES "expect" "boost >= 1.53.0" "snappy" "jemalloc" "net-tools MariaDB-server" PARENT_SCOPE)
endif()
SET(CPACK_RPM_columnstore-engine_PRE_INSTALL_SCRIPT_FILE ${CMAKE_CURRENT_SOURCE_DIR}/build/preInstall_storage_engine.sh PARENT_SCOPE)

View File

@@ -17,6 +17,10 @@
# SNAPPY_LIBRARIES The snappy library/libraries
# SNAPPY_INCLUDE_DIR The location of snappy headers
if(DEFINED SNAPPY_ROOT_DIR)
set(Snappy_FIND_QUIET)
endif()
find_path(SNAPPY_ROOT_DIR
NAMES include/snappy.h
)

View File

@@ -34,7 +34,7 @@ IF(NOT "${CS_MAJOR_VERSION}" MATCHES "[0-9]+" OR
ENDIF()
SET(VERSION "${CS_MAJOR_VERSION}.${CS_MINOR_VERSION}.${CS_PATCH_VERSION}${CS_EXTRA_VERSION}")
MESSAGE(STATUS "MariaDB-Columnstore ${VERSION}")
MESSAGE("== MariaDB-Columnstore ${VERSION}")
IF (NOT INSTALL_LAYOUT)
SET(CPACK_PACKAGE_VERSION_MAJOR ${CS_MAJOR_VERSION})
SET(CPACK_PACKAGE_VERSION_MINOR ${CS_MINOR_VERSION})

View File

@@ -77,13 +77,13 @@ IF (EXISTS "/etc/SuSE-release")
set(SUSE_VERSION_NUMBER "${CMAKE_MATCH_1}")
ENDIF ()
if (${REDHAT_VERSION_NUMBER} EQUAL 6)
SETA(CPACK_RPM_columnstore-engine_PACKAGE_REQUIRES "MariaDB-columnstore-shared" "snappy" "net-tools")
SETA(CPACK_RPM_columnstore-engine_PACKAGE_REQUIRES "MariaDB-columnstore-shared" "snappy" "net-tools" "MariaDB-server")
# Disable auto require as this will also try to pull Boost via RPM
SET(CPACK_RPM_PACKAGE_AUTOREQPROV " no")
elseif (${SUSE_VERSION_NUMBER} EQUAL 12)
SETA(CPACK_RPM_columnstore-engine_PACKAGE_REQUIRES "boost-devel >= 1.54.0" "libsnappy1" "jemalloc" "net-tools")
SETA(CPACK_RPM_columnstore-engine_PACKAGE_REQUIRES "boost-devel >= 1.54.0" "libsnappy1" "jemalloc" "net-tools" "MariaDB-server")
else ()
SETA(CPACK_RPM_columnstore-engine_PACKAGE_REQUIRES "boost >= 1.53.0" "snappy" "jemalloc" "net-tools")
SETA(CPACK_RPM_columnstore-engine_PACKAGE_REQUIRES "boost >= 1.53.0" "snappy" "jemalloc" "net-tools" "MariaDB-server")
endif()
SET(CPACK_RPM_columnstore-engine_PRE_INSTALL_SCRIPT_FILE ${CMAKE_SOURCE_DIR}/build/preInstall_storage_engine.sh)

View File

@@ -337,7 +337,7 @@ void storeNumericField(Field** f, int64_t value, CalpontSystemCatalog::ColType&
if (ct.colDataType == CalpontSystemCatalog::DECIMAL)
dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, tmp, 25, ct.colDataType);
else
snprintf(tmp, 25, "%ld", value);
snprintf(tmp, 25, "%lld", (long long)value);
f2->store(tmp, strlen(tmp), f2->charset());
break;

View File

@@ -443,7 +443,7 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
{
case Item_sum::UDF_SUM_FUNC:
{
uint64_t bRespectNulls = (ac->getUDAFContext().getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS)) ? 0 : 1;
unsigned long bRespectNulls = (ac->getUDAFContext().getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS)) ? 0 : 1;
char sRespectNulls[18];
sprintf(sRespectNulls, "%lu", bRespectNulls);
srcp.reset(new ConstantColumn(sRespectNulls, (uint64_t)bRespectNulls, ConstantColumn::NUM)); // IGNORE/RESPECT NULLS. 1 => RESPECT

View File

@@ -20,6 +20,7 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-writeengineserver.service.in" "$
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-dmlproc.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-dmlproc.service" @ONLY)
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-ddlproc.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-ddlproc.service" @ONLY)
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-loadbrm.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-loadbrm.service" @ONLY)
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-storagemanager.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-storagemanager.service" @ONLY)
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-stop-controllernode.sh.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-stop-controllernode.sh" @ONLY)
install(PROGRAMS columnstore-post-install
@@ -44,6 +45,8 @@ install(PROGRAMS columnstore-post-install
mariadb-command-line.sh
mcs_module_installer.sh
mcs-stop-controllernode.sh
mcs-loadbrm.py
mcs-start-storagemanager.py
DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine)
install(FILES mariadb-columnstore.service
@@ -62,6 +65,7 @@ install(FILES mariadb-columnstore.service
mcs-dmlproc.service
mcs-ddlproc.service
mcs-loadbrm.service
mcs-storagemanager.service
DESTINATION ${ENGINE_SUPPORTDIR} COMPONENT columnstore-engine)
install(FILES module DESTINATION ${ENGINE_DATADIR}/local COMPONENT columnstore-engine)

View File

@@ -105,6 +105,9 @@ if [ $user = "root" ]; then
cp @ENGINE_SUPPORTDIR@/mcs-writeengineserver.service /lib/systemd/system/. >/dev/null 2>&1
cp @ENGINE_SUPPORTDIR@/mcs-loadbrm.service /usr/lib/systemd/system/. >/dev/null 2>&1
cp @ENGINE_SUPPORTDIR@/mcs-loadbrm.service /lib/systemd/system/. >/dev/null 2>&1
cp @ENGINE_SUPPORTDIR@/mcs-storagemanager.service /usr/lib/systemd/system/. >/dev/null 2>&1
cp @ENGINE_SUPPORTDIR@/mcs-storagemanager.service /lib/systemd/system/. >/dev/null 2>&1
systemctl enable mariadb-columnstore >/dev/null 2>&1
systemctl enable mcs-controllernode > /dev/null 2>&1
@@ -115,6 +118,7 @@ if [ $user = "root" ]; then
systemctl enable mcs-workernode > /dev/null 2>&1
systemctl enable mcs-writeengineserver > /dev/null 2>&1
systemctl enable mcs-loadbrm > /dev/null 2>&1
systemctl enable mcs-storagemanager > /dev/null 2>&1
else
chkconfig=`which chkconfig 2>/dev/null`
if [ -n "$chkconfig" ]; then
@@ -177,6 +181,9 @@ if [ -z "aws" ]; then
fi
postConfigure
systemctl cat mariadb-columnstore.service > /dev/null 2>&1
if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then
systemctl start mariadb-columnstore
# Wait for all columnstore to be ready, DDLProc is final process in startup order
@@ -184,6 +191,7 @@ while [ -z "$(pgrep -x DDLProc)" ];
do
sleep 1
done
fi
dbbuilder 7 > $tmpDir/dbbuilder.log

View File

@@ -17,7 +17,7 @@ systemctl cat mariadb-columnstore.service > /dev/null 2>&1
if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then
systemctl stop mariadb-columnstore >/dev/null 2>&1
else
PROGS='load_brm workernode controllernode PrimProc ExeMgr DMLProc DDLProc WriteEngineServer'
PROGS='StorageManager workernode controllernode PrimProc ExeMgr DMLProc DDLProc WriteEngineServer'
kill $(pidof $PROGS) > /dev/null
sleep 3
kill -9 $(pidof $PROGS) > /dev/null
@@ -70,6 +70,7 @@ if [ -n "$systemctl" ] && [ $(running_systemd) -eq 0 ]; then
systemctl disable mcs-workernode > /dev/null 2>&1
systemctl disable mcs-writeengineserver > /dev/null 2>&1
systemctl disable mcs-loadbrm > /dev/null 2>&1
systemctl disable mcs-storagemanager > /dev/null 2>&1
rm -f /usr/lib/systemd/system/mariadb-columnstore.service
rm -f /lib/systemd/system/mariadb-columnstore.service
@@ -89,6 +90,9 @@ if [ -n "$systemctl" ] && [ $(running_systemd) -eq 0 ]; then
rm -f /lib/systemd/system/mcs-writeengineserver.service
rm -f /usr/lib/systemd/system/mcs-loadbrm.service
rm -f /lib/systemd/system/mcs-loadbrm.service
rm -f /usr/lib/systemd/system/mcs-storagemanager.service
rm -f /lib/systemd/system/mcs-storagemanager.service
systemctl daemon-reload
else
chkconfig=`which chkconfig 2>/dev/null`

View File

@@ -5,7 +5,7 @@ After=mcs-workernode.service
[Service]
Type=forking
Environment="SKIP_OAM_INIT=1"
ExecStartPre=/usr/bin/env bash -c "systemctl start mcs-workernode"
ExecStart=@ENGINE_BINDIR@/controllernode
Restart=on-failure
ExecStop=@ENGINE_BINDIR@/mcs-stop-controllernode.sh $MAINPID

View File

@@ -1,11 +1,10 @@
[Unit]
Description=mcs-ddlproc
PartOf=mcs-exemgr.service
PartOf=mcs-writeengineserver.service
After=mcs-dmlproc.service
[Service]
Type=simple
Environment="SKIP_OAM_INIT=1"
ExecStart=@ENGINE_BINDIR@/DDLProc
Restart=on-failure
TimeoutStopSec=2

View File

@@ -1,11 +1,10 @@
[Unit]
Description=mcs-dmlproc
PartOf=mcs-exemgr.service
After=mcs-exemgr.service
PartOf=mcs-writeengineserver.service
After=mcs-writeengineserver.service
[Service]
Type=simple
Environment="SKIP_OAM_INIT=1"
ExecStart=@ENGINE_BINDIR@/DMLProc
Restart=on-failure
TimeoutStopSec=2

View File

@@ -1,12 +1,11 @@
[Unit]
Description=mcs-exemgr
PartOf=mcs-writeengineserver.service
After=mcs-writeengineserver.service
PartOf=mcs-primproc.service
After=mcs-primproc.service
[Service]
Type=simple
Environment="SKIP_OAM_INIT=1"
ExecStart=@ENGINE_BINDIR@/ExeMgr
ExecStart=/usr/bin/env bash -c "LD_PRELOAD=$(ldconfig -p | grep -m1 libjemalloc | awk '{print $1}') exec @ENGINE_BINDIR@/ExeMgr"
Restart=on-failure
TimeoutStopSec=2

View File

@@ -0,0 +1,68 @@
#!/usr/bin/env python3
import configparser
import subprocess
import xml.etree.ElementTree as ET
sm_config = configparser.ConfigParser()
sm_config.read('/etc/columnstore/storagemanager.cnf')
cs_config = ET.parse('/etc/columnstore/Columnstore.xml')
config_root = cs_config.getroot()
storage = sm_config['ObjectStorage']['service']
region = sm_config['S3']['region']
bucket = sm_config['S3']['bucket']
loadbrm = '/usr/bin/load_brm /var/lib/columnstore/data1/systemFiles/dbrm/{0}'
brm_saves_current = ''
if storage.lower() == 's3' and not region.lower() == 'some_region' and not bucket.lower() == 'some_bucket':
# load s3
brm = 'data1/systemFiles/dbrm/BRM_saves_current'
config_root.find('./Installation/DBRootStorageType').text = "StorageManager"
config_root.find('./StorageManager/Enabled').text = "Y"
if config_root.find('./SystemConfig/DataFilePlugin') is None:
config_root.find('./SystemConfig').append(ET.Element("DataFilePlugin"))
config_root.find('./SystemConfig/DataFilePlugin').text = "libcloudio.so"
cs_config.write('/etc/columnstore/Columnstore.xml')
try:
brm_saves_current = subprocess.check_output(['smcat', brm])
except subprocess.CalledProcessError as e:
# will happen when brm file does not exist
pass
else:
pmCount = int(config_root.find('./SystemModuleConfig/ModuleCount3').text)
brm = '/var/lib/columnstore/data1/systemFiles/dbrm/BRM_saves_current'
if pmCount > 1:
# load multinode dbrm
try:
brm_saves_current = subprocess.check_output(['cat', brm])
if not brm_saves_current:
# local dbrm empty, need to pull from main node
pass
except subprocess.CalledProcessError as e:
# will happen when brm file does not exist
pass
else:
# load local dbrm
try:
brm_saves_current = subprocess.check_output(['cat', brm])
except subprocess.CalledProcessError as e:
# will happen when brm file does not exist
pass
if brm_saves_current:
cmd = loadbrm.format(brm_saves_current.decode('utf-8'))
try:
retcode = subprocess.call(cmd, shell=True)
if retcode < 0:
#print("Child was terminated by signal", -retcode, file=sys.stderr)
pass
except OSError as e:
#print("Execution failed:", e, file=sys.stderr)
pass

View File

@@ -1,13 +1,11 @@
[Unit]
Description=loadbrm
PartOf=mcs-workernode.service
Before=mcs-workernode.service
ConditionPathExists=/var/lib/columnstore/data1/systemFiles/dbrm/BRM_saves_current
After=mcs-storagemanager.service
[Service]
Type=simple
Environment="SKIP_OAM_INIT=1"
ExecStart=/usr/bin/env bash -c "/usr/bin/load_brm /var/lib/columnstore/data1/systemFiles/dbrm/$(cat /var/lib/columnstore/data1/systemFiles/dbrm/BRM_saves_current)"
ExecStart=@ENGINE_BINDIR@/mcs-loadbrm.py
[Install]
WantedBy=mariadb-columnstore.service

View File

@@ -6,8 +6,8 @@ After=mcs-controllernode.service
[Service]
Type=simple
Environment="SKIP_OAM_INIT=1"
ExecStart=@ENGINE_BINDIR@/PrimProc
ExecStart=/usr/bin/env bash -c "LD_PRELOAD=$(ldconfig -p | grep -m1 libjemalloc | awk '{print $1}') exec @ENGINE_BINDIR@/PrimProc"
ExecStartPost=/bin/sleep 2
Restart=on-failure
TimeoutStopSec=2

View File

@@ -0,0 +1,15 @@
#!/usr/bin/env python3
import configparser
import sys
config = configparser.ConfigParser()
config.read('/etc/columnstore/storagemanager.cnf')
storage = config['ObjectStorage']['service']
region = config['S3']['region']
bucket = config['S3']['bucket']
if storage.lower() == 's3' and not region.lower() == 'some_region' and not bucket.lower() == 'some_bucket':
sys.exit(0)
sys.exit(1)

View File

@@ -0,0 +1,16 @@
[Unit]
Description=storagemanager
PartOf=mcs-workernode.service
Before=mcs-workernode.service
ConditionPathExists=/etc/columnstore/storagemanager.cnf
# FailureAction="exit"
# FailureActionExitStatus=0
[Service]
Type=simple
ExecStartPre=@ENGINE_BINDIR@/mcs-start-storagemanager.py
ExecStart=/usr/bin/env bash -c "LD_PRELOAD=$(ldconfig -p | grep -m1 libjemalloc | awk '{print $1}') exec @ENGINE_BINDIR@/StorageManager"
[Install]
WantedBy=mariadb-columnstore.service
WantedBy=mcs-workernode.service

View File

@@ -5,7 +5,6 @@ After=mcs-loadbrm.service
[Service]
Type=forking
Environment="SKIP_OAM_INIT=1"
ExecStart=@ENGINE_BINDIR@/workernode DBRM_Worker1
Restart=on-failure
ExecStop=-@ENGINE_BINDIR@/save_brm

View File

@@ -1,12 +1,11 @@
[Unit]
Description=WriteEngineServer
PartOf=mcs-primproc.service
After=mcs-primproc.service
PartOf=mcs-exemgr.service
After=mcs-exemgr.service
[Service]
Type=simple
Environment="SKIP_OAM_INIT=1"
ExecStart=@ENGINE_BINDIR@/WriteEngineServer
ExecStart=/usr/bin/env bash -c "LD_PRELOAD=$(ldconfig -p | grep -m1 libjemalloc | awk '{print $1}') exec @ENGINE_BINDIR@/WriteEngineServer"
Restart=on-failure
TimeoutStopSec=2

View File

@@ -22,10 +22,7 @@ checkForError() {
# See if engine columnstore exist
#---------------------------------------------------------------------------
echo "checking for engine columnstore..."
mysql \
--user=root \
--execute='show engines;' \
| grep -i columnstore
su -s /bin/sh -c 'mysql --execute="show engines"' mysql 2> ${tmpdir}/post-mysql-install.log | grep -i columnstore
#
# Add compressiontype column to SYSCOLUMN if applicable

View File

@@ -74,7 +74,7 @@ bool AppendTask::run()
while (readCount < cmd->count)
{
uint toRead = min(cmd->count - readCount, bufsize);
uint toRead = min(static_cast<uint>(cmd->count - readCount), bufsize);
success = read(&databuf[0], toRead);
check_error("AppendTask read data", false);
if (success==0)

View File

@@ -174,7 +174,7 @@ string use_envvar(const boost::smatch &envvar)
string expand_numbers(const boost::smatch &match)
{
long num = stol(match[1].str());
long long num = stol(match[1].str());
char suffix = (char) ::tolower(match[2].str()[0]);
if (suffix == 't')

View File

@@ -488,6 +488,8 @@ ssize_t IOCoordinator::_write(const boost::filesystem::path &filename, const uin
//log error and abort
l_errno = errno;
logger->log(LOG_ERR,"IOCoordinator::write(): Failed newObject.");
metadata.removeEntry(newObject.offset);
replicator->remove(cachePath/firstDir/newObject.key);
errno = l_errno;
if (count == 0) // if no data has been written yet, it's safe to return -1 here.
return -1;
@@ -497,7 +499,7 @@ ssize_t IOCoordinator::_write(const boost::filesystem::path &filename, const uin
{
// remove the object created above; can't have 0-length objects
metadata.removeEntry(newObject.offset);
replicator->remove(firstDir/newObject.key);
replicator->remove(cachePath/firstDir/newObject.key);
goto out;
}
else if ((uint)err < writeLength)
@@ -505,7 +507,27 @@ ssize_t IOCoordinator::_write(const boost::filesystem::path &filename, const uin
dataRemaining -= err;
count += err;
iocBytesWritten += err;
metadata.updateEntryLength(newObject.offset, (err + objectOffset));
// get a new name for the object
string oldKey = newObject.key;
newObject.key = metadata.getNewKeyFromOldKey(newObject.key, err + objectOffset);
ostringstream os;
os << "IOCoordinator::write(): renaming " << oldKey << " to " << newObject.key;
logger->log(LOG_DEBUG, os.str().c_str());
int renameErr = ::rename((cachePath/firstDir/oldKey).string().c_str(), (cachePath/firstDir/newObject.key).string().c_str());
int renameErrno = errno;
if (renameErr < 0)
{
ostringstream oss;
char buf[80];
oss << "IOCoordinator::write(): Failed to rename " << (cachePath/firstDir/oldKey).string() << " to " <<
(cachePath/firstDir/newObject.key).string() << "! Got " << strerror_r(renameErrno, buf, 80);
logger->log(LOG_ERR, oss.str().c_str());
newObject.key = oldKey;
}
// rename and resize the object in metadata
metadata.updateEntry(newObject.offset, newObject.key, (err + objectOffset));
cache->newObject(firstDir, newObject.key,err + objectOffset);
newObjectKeys.push_back(newObject.key);
goto out;
@@ -618,6 +640,8 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t
l_errno = errno;
//log error and abort
logger->log(LOG_ERR,"IOCoordinator::append(): Failed newObject.");
metadata.removeEntry(newObject.offset);
replicator->remove(cachePath/firstDir/newObject.key);
errno = l_errno;
// if no data was written successfully yet, it's safe to return -1 here.
if (count == 0)
@@ -627,21 +651,41 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t
else if (err == 0)
{
metadata.removeEntry(newObject.offset);
replicator->remove(firstDir/newObject.key);
replicator->remove(cachePath/firstDir/newObject.key);
goto out;
}
count += err;
dataRemaining -= err;
iocBytesWritten += err;
if (err < (int64_t) writeLength)
{
string oldKey = newObject.key;
newObject.key = metadata.getNewKeyFromOldKey(newObject.key, err + newObject.offset);
ostringstream os;
os << "IOCoordinator::append(): renaming " << oldKey << " to " << newObject.key;
logger->log(LOG_DEBUG, os.str().c_str());
int renameErr = ::rename((cachePath/firstDir/oldKey).string().c_str(), (cachePath/firstDir/newObject.key).string().c_str());
int renameErrno = errno;
if (renameErr < 0)
{
ostringstream oss;
char buf[80];
oss << "IOCoordinator::append(): Failed to rename " << (cachePath/firstDir/oldKey).string() << " to " <<
(cachePath/firstDir/newObject.key).string() << "! Got " << strerror_r(renameErrno, buf, 80);
logger->log(LOG_ERR, oss.str().c_str());
newObject.key = oldKey;
}
metadata.updateEntry(newObject.offset, newObject.key, err);
}
cache->newObject(firstDir, newObject.key,err);
newObjectKeys.push_back(newObject.key);
if (err < (int64_t) writeLength)
{
//logger->log(LOG_ERR,"IOCoordinator::append(): newObject failed to complete write, %u of %u bytes written.",count,length);
// make the object reflect length actually written
metadata.updateEntryLength(newObject.offset, err);
goto out;
}
}
@@ -964,9 +1008,12 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
errno = ENOENT;
return -1;
}
if (bf::exists(metaFile2))
{
deleteMetaFile(metaFile2);
++filesDeleted;
}
// since we don't implement mkdir(), assume the caller did that and
// create any necessary parent dirs for filename2
try
@@ -1001,8 +1048,13 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
for (const auto &object : objects)
{
bf::path journalFile = journalPath/firstDir1/(object.key + ".journal");
metadataObject newObj = meta2.addMetadataObject(filename2, object.length);
assert(newObj.offset == object.offset);
// originalLength = the length of the object before journal entries.
// the length in the metadata is the length after journal entries
size_t originalLength = MetadataFile::getLengthFromKey(object.key);
metadataObject newObj = meta2.addMetadataObject(filename2, originalLength);
if (originalLength != object.length)
meta2.updateEntryLength(newObj.offset, object.length);
err = cs->copyObject(object.key, newObj.key);
if (err)
{
@@ -1135,7 +1187,10 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
objFD = ::open(object, O_RDONLY);
if (objFD < 0)
{
*_bytesReadOut = 0;
return ret;
}
ScopedCloser s1(objFD);
ret.reset(new uint8_t[len]);
@@ -1148,11 +1203,12 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
int err = ::read(objFD, &ret[count], len - count);
if (err < 0)
{
char buf[80];
logger->log(LOG_CRIT, "IOC::mergeJournal(): failed to read %s, got '%s'", object, strerror_r(errno, buf, 80));
int l_errno = errno;
char buf[80];
logger->log(LOG_CRIT, "IOC::mergeJournal(): failed to read %s, got '%s'", object, strerror_r(l_errno, buf, 80));
ret.reset();
errno = l_errno;
*_bytesReadOut = count;
return ret;
}
else if (err == 0)
@@ -1171,17 +1227,18 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
size_t mjimBytesRead = 0;
int mjimerr = mergeJournalInMem(ret, len, journal, &mjimBytesRead);
if (mjimerr)
{
ret.reset();
return ret;
}
l_bytesRead += mjimBytesRead;
*_bytesReadOut = l_bytesRead;
return ret;
}
journalFD = ::open(journal, O_RDONLY);
if (journalFD < 0)
{
*_bytesReadOut = l_bytesRead;
return ret;
}
ScopedCloser s2(journalFD);
boost::shared_array<char> headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead);
@@ -1219,17 +1276,21 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
err = ::read(journalFD, &ret[startReadingAt - offset + count], lengthOfRead - count);
if (err < 0)
{
int l_errno = errno;
char buf[80];
logger->log(LOG_ERR, "mergeJournal: got %s", strerror_r(errno, buf, 80));
logger->log(LOG_ERR, "mergeJournal: got %s", strerror_r(l_errno, buf, 80));
ret.reset();
return ret;
errno = l_errno;
l_bytesRead += count;
goto out;
}
else if (err == 0)
{
logger->log(LOG_ERR, "mergeJournal: got early EOF. offset=%ld, len=%ld, jOffset=%ld, jLen=%ld,"
" startReadingAt=%ld, lengthOfRead=%ld", offset, len, offlen[0], offlen[1], startReadingAt, lengthOfRead);
ret.reset();
return ret;
l_bytesRead += count;
goto out;
}
count += err;
}
@@ -1243,6 +1304,7 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
// skip over this journal entry
::lseek(journalFD, offlen[1], SEEK_CUR);
}
out:
*_bytesReadOut = l_bytesRead;
return ret;
}

View File

@@ -101,6 +101,7 @@ int LocalStorage::copy(const bf::path &source, const bf::path &dest)
if (err)
{
errno = err.value();
::unlink(dest.string().c_str());
return -1;
}
return 0;
@@ -216,6 +217,7 @@ int LocalStorage::putObject(boost::shared_array<uint8_t> data, size_t len, const
l_errno = errno;
//logger->log(LOG_CRIT, "LocalStorage::putObject(): Failed to write to %s, got '%s'", c_dest, strerror_r(errno, buf, 80));
close(fd);
::unlink(c_dest);
errno = l_errno;
bytesWritten += count;
return err;

View File

@@ -461,8 +461,8 @@ void MetadataFile::printObjects() const
{
BOOST_FOREACH(const boost::property_tree::ptree::value_type &v, jsontree->get_child("objects"))
{
printf("Name: %s Length: %lu Offset: %lu\n", v.second.get<string>("key").c_str(),
v.second.get<size_t>("length"), v.second.get<off_t>("offset"));
printf("Name: %s Length: %zu Offset: %lld\n", v.second.get<string>("key").c_str(),
v.second.get<size_t>("length"), (long long)v.second.get<off_t>("offset"));
}
}

View File

@@ -77,7 +77,6 @@ class MetadataFile
// removes p from the json cache. p should be a fully qualified metadata file
static void deletedMeta(const boost::filesystem::path &p);
// TBD: this may have to go; there may be no use case where only the uuid needs to change.
static std::string getNewKeyFromOldKey(const std::string &oldKey, size_t length=0);
static std::string getNewKey(std::string sourceName, size_t offset, size_t length);
static off_t getOffsetFromKey(const std::string &key);

View File

@@ -380,14 +380,22 @@ void PrefixCache::newJournalEntry(size_t size)
void PrefixCache::deletedJournal(size_t size)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
assert(currentCacheSize >= size);
if (currentCacheSize >= size)
currentCacheSize -= size;
else
{
ostringstream oss;
oss << "PrefixCache::deletedJournal(): Detected an accounting error.";
logger->log(LOG_WARNING, oss.str().c_str());
currentCacheSize = 0;
}
}
void PrefixCache::deletedObject(const string &key, size_t size)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
assert(currentCacheSize >= size);
M_LRU_t::iterator mit = m_lru.find(key);
assert(mit != m_lru.end());
@@ -397,7 +405,15 @@ void PrefixCache::deletedObject(const string &key, size_t size)
doNotEvict.erase(mit->lit);
lru.erase(mit->lit);
m_lru.erase(mit);
if (currentCacheSize >= size)
currentCacheSize -= size;
else
{
ostringstream oss;
oss << "PrefixCache::deletedObject(): Detected an accounting error.";
logger->log(LOG_WARNING, oss.str().c_str());
currentCacheSize = 0;
}
}
}

View File

@@ -128,7 +128,7 @@ int Replicator::newObject(const boost::filesystem::path &filename, const uint8_t
OPEN(objectFilename.c_str(), O_WRONLY | O_CREAT);
size_t count = 0;
while (count < length) {
err = ::pwrite(fd, &data[count], length - count, offset);
err = ::pwrite(fd, &data[count], length - count, offset + count);
if (err <= 0)
{
if (count > 0) // return what was successfully written
@@ -154,6 +154,67 @@ int Replicator::newNullObject(const boost::filesystem::path &filename,size_t len
return err;
}
ssize_t Replicator::_pwrite(int fd, const void *data, size_t length, off_t offset)
{
ssize_t err;
size_t count = 0;
uint8_t *bData = (uint8_t *) data;
do
{
err = ::pwrite(fd, &bData[count], length - count, offset + count);
if (err < 0 || (err == 0 && errno != EINTR))
{
if (count > 0)
return count;
else
return err;
}
count += err;
} while (count < length);
return count;
}
ssize_t Replicator::_write(int fd, const void *data, size_t length)
{
ssize_t err;
size_t count = 0;
uint8_t *bData = (uint8_t *) data;
do
{
err = ::write(fd, &bData[count], length - count);
if (err < 0 || (err == 0 && errno != EINTR))
{
if (count > 0)
return count;
else
return err;
}
count += err;
} while (count < length);
return count;
}
/* XXXPAT: I think we'll have to rewrite this function some; we'll have to at least clearly define
what happens in the various error scenarios.
To be more resilent in the face of hard errors, we may also want to redefine what a journal file is.
If/when we cannot fix the journal file in the face of an error, there are scenarios that the read code
will not be able to cope with. Ex, a journal entry that says it's 200 bytes long, but there are only
really 100 bytes. The read code has no way to tell the difference if there is an entry that follows
the bad entry, and that will cause an unrecoverable error.
Initial thought on a sol'n. Make each journal entry its own file in a tmp dir, ordered by a sequence
number in the filename. Then, one entry cannot affect the others, and the end of the file is unambiguously
the end of the data. On successful write, move the file to where it should be. This would also prevent
the readers from ever seeing bad data, and possibly reduce the size of some critical sections.
Benefits would be data integrity, and possibly add'l parallelism. The downside is of course, a higher
number of IO ops for the same operation.
*/
int Replicator::addJournalEntry(const boost::filesystem::path &filename, const uint8_t *data, off_t offset, size_t length)
{
int fd, err;
@@ -177,7 +238,7 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
bHeaderChanged = true;
// create new journal file with header
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str();
err = ::write(fd, header.c_str(), header.length() + 1);
err = _write(fd, header.c_str(), header.length() + 1);
l_errno = errno;
repHeaderDataWritten += (header.length() + 1);
if ((uint)err != (header.length() + 1))
@@ -238,26 +299,32 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
{
bHeaderChanged = true;
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str();
err = ::pwrite(fd, header.c_str(), header.length() + 1,0);
err = _pwrite(fd, header.c_str(), header.length() + 1,0);
l_errno = errno;
repHeaderDataWritten += (header.length() + 1);
if ((uint)err != (header.length() + 1))
{
// only the header was possibly changed rollback attempt
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Updating journal header failed. "
"Attempting to rollback and continue.");
int rollbackErr = ::pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1,0);
int rollbackErr = _pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1,0);
if ((uint)rollbackErr == (headerRollback.length() + 1))
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header success.");
else
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed!");
errno = l_errno;
if (err < 0)
return err;
else
return 0;
}
}
}
off_t entryHeaderOffset = ::lseek(fd, 0, SEEK_END);
err = ::write(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE);
err = _write(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE);
l_errno = errno;
repHeaderDataWritten += JOURNAL_ENTRY_HEADER_SIZE;
if (err != JOURNAL_ENTRY_HEADER_SIZE)
{
@@ -266,12 +333,16 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
{
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: write journal entry header failed. Attempting to rollback and continue.");
//attempt to rollback top level header
int rollbackErr = ::pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1,0);
int rollbackErr = _pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1,0);
if ((uint)rollbackErr != (headerRollback.length() + 1))
{
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed! (%s)",
strerror_r(errno, errbuf, 80));
errno = l_errno;
if (err < 0)
return err;
else
return 0;
}
}
int rollbackErr = ::ftruncate(fd,entryHeaderOffset);
@@ -279,13 +350,17 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
{
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Truncate to previous EOF failed! (%s)",
strerror_r(errno, errbuf, 80));
errno = l_errno;
if (err < 0)
return err;
else
return 0;
}
l_errno = errno;
return err;
}
while (count < length) {
err = ::write(fd, &data[count], length - count);
err = _write(fd, &data[count], length - count);
if (err < 0 )
{
l_errno = errno;
@@ -301,7 +376,7 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
if (thisEntryMaxOffset > currentMaxOffset)
{
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str();
int rollbackErr = ::pwrite(fd, header.c_str(), header.length() + 1,0);
int rollbackErr = _pwrite(fd, header.c_str(), header.length() + 1,0);
if ((uint)rollbackErr != (header.length() + 1))
{
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Update of journal header failed! (%s)",
@@ -312,7 +387,7 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
}
// Update the journal entry header
offlen[1] = count;
int rollbackErr = ::pwrite(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE,entryHeaderOffset);
int rollbackErr = _pwrite(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE,entryHeaderOffset);
if ((uint)rollbackErr != JOURNAL_ENTRY_HEADER_SIZE)
{
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Update of journal entry header failed! (%s)",
@@ -337,7 +412,7 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
"Attempting to rollback and continue.", strerror_r(l_errno, errbuf, 80));
//attempt to rollback top level header
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % 0).str();
int rollbackErr = ::pwrite(fd, header.c_str(), header.length() + 1,0);
int rollbackErr = _pwrite(fd, header.c_str(), header.length() + 1,0);
if ((uint)rollbackErr != (header.length() + 1))
{
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed (%s)!",

View File

@@ -58,6 +58,11 @@ class Replicator
private:
Replicator();
// a couple helpers
ssize_t _write(int fd, const void *data, size_t len);
ssize_t _pwrite(int fd, const void *data, size_t len, off_t offset);
Config *mpConfig;
SMLogging *mpLogger;
std::string msJournalPath;

View File

@@ -409,6 +409,7 @@ void Synchronizer::process(list<string>::iterator name)
s.unlock();
bool success = false;
int retryCount = 0;
while (!success)
{
assert(!s.owns_lock());
@@ -434,7 +435,10 @@ void Synchronizer::process(list<string>::iterator name)
success = true;
}
catch(exception &e) {
logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Retrying...", key.c_str(),
// these are often self-resolving, so we will suppress logging it for 10 iterations, then escalate
// to error, then to crit
//if (++retryCount >= 10)
logger->log((retryCount < 20 ? LOG_ERR : LOG_CRIT), "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Retrying...", key.c_str(),
pending->opFlags, e.what());
success = false;
sleep(1);
@@ -463,18 +467,33 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
{
ScopedReadLock s(ioc, sourceFile);
string &key = *it;
string key = *it;
size_t pos = key.find_first_of('/');
bf::path prefix = key.substr(0, pos);
string cloudKey = key.substr(pos + 1);
char buf[80];
bool exists = false;
int err;
bf::path objectPath = cachePath/key;
MetadataFile md(sourceFile, MetadataFile::no_create_t(),true);
if (!md.exists())
{
logger->log(LOG_DEBUG, "synchronize(): no metadata found for %s. It must have been deleted.", sourceFile.c_str());
try
{
if (!bf::exists(objectPath))
return;
size_t size = bf::file_size(objectPath);
replicator->remove(objectPath);
cache->deletedObject(prefix, cloudKey, size);
cs->deleteObject(cloudKey);
}
catch (exception &e)
{
logger->log(LOG_DEBUG, "synchronize(): failed to remove orphaned object '%s' from the cache, got %s",
objectPath.string().c_str(), e.what());
}
return;
}
@@ -495,7 +514,6 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
if (exists)
return;
// TODO: should be safe to check with Cache instead of a file existence check
exists = cache->exists(prefix, cloudKey);
if (!exists)
{
@@ -503,14 +521,15 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
return;
}
err = cs->putObject((cachePath / key).string(), cloudKey);
err = cs->putObject(objectPath.string(), cloudKey);
if (err)
throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80));
numBytesRead += mdEntry.length;
bytesReadBySync += mdEntry.length;
numBytesUploaded += mdEntry.length;
++objectsSyncedWithNoJournal;
replicator->remove((cachePath/key), Replicator::NO_LOCAL);
replicator->remove(objectPath, Replicator::NO_LOCAL);
}
void Synchronizer::synchronizeDelete(const string &sourceFile, list<string>::iterator &it)
@@ -535,6 +554,29 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
if (!md.exists())
{
logger->log(LOG_DEBUG, "synchronizeWithJournal(): no metadata found for %s. It must have been deleted.", sourceFile.c_str());
try
{
bf::path objectPath = cachePath/key;
if (bf::exists(objectPath))
{
size_t objSize = bf::file_size(objectPath);
replicator->remove(objectPath);
cache->deletedObject(prefix, cloudKey, objSize);
cs->deleteObject(cloudKey);
}
bf::path jPath = journalPath/(key + ".journal");
if (bf::exists(jPath))
{
size_t jSize = bf::file_size(jPath);
replicator->remove(jPath);
cache->deletedJournal(prefix, jSize);
}
}
catch(exception &e)
{
logger->log(LOG_DEBUG, "synchronizeWithJournal(): failed to remove orphaned object '%s' from the cache, got %s",
(cachePath/key).string().c_str(), e.what());
}
return;
}
@@ -683,7 +725,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
while (count < size)
{
err = ::write(newFD, data.get(), size - count);
err = ::write(newFD, &data[count], size - count);
if (err < 0)
{
::unlink(newCachePath.string().c_str());
@@ -693,9 +735,22 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
count += err;
}
numBytesWritten += size;
assert(bf::file_size(oldCachePath) == MetadataFile::getLengthFromKey(cloudKey));
cache->rename(prefix, cloudKey, newCloudKey, size - MetadataFile::getLengthFromKey(cloudKey));
size_t oldSize = bf::file_size(oldCachePath);
cache->rename(prefix, cloudKey, newCloudKey, size - oldSize);
replicator->remove(oldCachePath);
// This condition is probably irrelevant for correct functioning now,
// but it should be very rare so what the hell.
if (oldSize != MetadataFile::getLengthFromKey(cloudKey))
{
ostringstream oss;
oss << "Synchronizer::synchronizeWithJournal(): detected a mismatch between file size and " <<
"length stored in the object name. object name = " << cloudKey << " length-in-name = " <<
MetadataFile::getLengthFromKey(cloudKey) << " real-length = " << oldSize;
logger->log(LOG_WARNING, oss.str().c_str());
}
}
mergeDiff += size - originalSize;

View File

@@ -74,7 +74,7 @@ bool WriteTask::run()
while (readCount < cmd->count)
{
uint toRead = min(cmd->count - readCount, bufsize);
uint toRead = min(static_cast<uint>(cmd->count - readCount), bufsize);
success = read(&databuf[0], toRead);
check_error("WriteTask read data", false);
if (success==0)

View File

@@ -433,7 +433,7 @@ bool writetask()
WriteTask w(clientSock, hdr->payloadLen);
ssize_t result = ::write(sessionSock, cmd, hdr->payloadLen);
assert(result==(hdr->payloadLen));
assert(result == static_cast<ssize_t>(hdr->payloadLen));
w.run();
@@ -1065,7 +1065,7 @@ bool copytask(bool connectionTest=false)
len -= 2;
ssize_t result = ::write(sessionSock, buf, len);
assert(result==len);
assert(result==static_cast<ssize_t>(len));
int err=0;
@@ -1805,7 +1805,7 @@ void shortMsg()
WriteTask w(clientSock, hdrWrite->payloadLen);
ssize_t result = ::write(sessionSock, cmdWrite, hdrWrite->payloadLen);
assert(result==(hdrWrite->payloadLen));
assert(result==static_cast<ssize_t>(hdrWrite->payloadLen));
w.run();

View File

@@ -132,35 +132,18 @@ namespace helpers
const char* convNumToStr(int64_t val, char* dst, int radix)
{
if (radix == 16 || radix == -16)
#ifdef _MSC_VER
sprintf(dst, "%llX", val);
sprintf(dst, "%llX", (long long)val);
#else
sprintf(dst, "%lX", val);
#endif
else if (radix == 8 || radix == -8)
#ifdef _MSC_VER
sprintf(dst, "%llo", val);
sprintf(dst, "%llo", (long long)val);
#else
sprintf(dst, "%lo", val);
#endif
else if (radix == 10)
{
uint64_t uval = static_cast<uint64_t>(val);
#ifdef _MSC_VER
sprintf(dst, "%llu", uval);
#else
sprintf(dst, "%lu", uval);
#endif
sprintf(dst, "%llu", (unsigned long long)val);
}
else if (radix == -10)
#ifdef _MSC_VER
sprintf(dst, "%lld", val);
sprintf(dst, "%lld", (long long)val);
#else
sprintf(dst, "%ld", val);
#endif
else if (radix == 2 || radix == -2)
{
char tmp[65];

View File

@@ -122,7 +122,7 @@ protected:
// [ the default format in treenode.h is fixed-point notation ]
char buf[20];
long double floatVal;
int64_t exponent;
int exponent;
long double base;
switch (fp->data()->resultType().colDataType)
@@ -157,7 +157,7 @@ protected:
{
snprintf(buf, 20, "%.5Lf", base);
fFloatStr = execplan::removeTrailing0(buf, 20);
snprintf(buf, 20, "e%02ld", exponent);
snprintf(buf, 20, "e%02d", exponent);
fFloatStr += buf;
}

View File

@@ -1,21 +1,5 @@
set(S3API_DIR ${CMAKE_CURRENT_SOURCE_DIR}/libmarias3 CACHE INTERNAL "S3API_DIR")
find_package(Git QUIET)
if(GIT_FOUND AND EXISTS ${ENGINE_SRC_DIR}/.git)
# Update submodules as needed
option(GIT_SUBMODULE "Check submodules during build" ON)
if(GIT_SUBMODULE)
message(STATUS "Submodule update")
execute_process(COMMAND ${GIT_EXECUTABLE} submodule update --init --recursive
WORKING_DIRECTORY ${ENGINE_SRC_DIR}
RESULT_VARIABLE GIT_SUBMOD_RESULT)
if(NOT GIT_SUBMOD_RESULT EQUAL "0")
message(FATAL_ERROR "git submodule update --init failed with ${GIT_SUBMOD_RESULT}, please checkout submodules")
endif()
endif()
endif()
SET(S3_SOURCES ${S3API_DIR}/src/debug.c
${S3API_DIR}/src/error.c
${S3API_DIR}/src/marias3.c