diff --git a/.gitignore b/.gitignore index f8246dc9f..c7aa228d3 100644 --- a/.gitignore +++ b/.gitignore @@ -39,7 +39,6 @@ dbcon/ddlpackage/ddl-scan.h dbcon/dmlpackage/dml-scan.cpp dbcon/dmlpackage/dml-scan.h ddlproc/DDLProc -decomsvr/DecomSvr dmlproc/DMLProc exemgr/ExeMgr oamapps/calpontDB/calpontDBWrite diff --git a/CMakeLists.txt b/CMakeLists.txt index 44aed9bc8..4dd1f6ede 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -240,7 +240,6 @@ ADD_SUBDIRECTORY(dmlproc) ADD_SUBDIRECTORY(procmon) ADD_SUBDIRECTORY(procmgr) ADD_SUBDIRECTORY(oamapps) -ADD_SUBDIRECTORY(decomsvr) ADD_SUBDIRECTORY(primitives) ADD_SUBDIRECTORY(tools) ADD_SUBDIRECTORY(writeengine/server) diff --git a/cpackEngineRPM.cmake b/cpackEngineRPM.cmake index be5df3462..71d5e1f26 100644 --- a/cpackEngineRPM.cmake +++ b/cpackEngineRPM.cmake @@ -132,7 +132,6 @@ SET(CPACK_RPM_platform_USER_FILELIST "/usr/local/mariadb/columnstore/bin/post-mysqld-install" "/usr/local/mariadb/columnstore/bin/pre-uninstall" "/usr/local/mariadb/columnstore/bin/PrimProc" -"/usr/local/mariadb/columnstore/bin/DecomSvr" "/usr/local/mariadb/columnstore/bin/upgrade-columnstore.sh" "/usr/local/mariadb/columnstore/bin/run.sh" "/usr/local/mariadb/columnstore/bin/columnstore" diff --git a/decomsvr/CMakeLists.txt b/decomsvr/CMakeLists.txt deleted file mode 100644 index 5c4e5e6a7..000000000 --- a/decomsvr/CMakeLists.txt +++ /dev/null @@ -1,14 +0,0 @@ - -include_directories( ${ENGINE_COMMON_INCLUDES} ) - - -########### next target ############### - -set(DecomSvr_SRCS quicklz.c server.cpp) - -add_executable(DecomSvr ${DecomSvr_SRCS}) - -target_link_libraries(DecomSvr ${ENGINE_LDFLAGS} ${Boost_LIBRARIES} pthread rt) - -install(TARGETS DecomSvr DESTINATION ${ENGINE_BINDIR} COMPONENT platform) - diff --git a/decomsvr/DecomSvr.rc b/decomsvr/DecomSvr.rc deleted file mode 100644 index 38e974faa..000000000 --- a/decomsvr/DecomSvr.rc +++ /dev/null @@ -1,102 +0,0 @@ -// Microsoft Visual C++ generated resource script. -// -#include "resource.h" - -#define APSTUDIO_READONLY_SYMBOLS -///////////////////////////////////////////////////////////////////////////// -// -// Generated from the TEXTINCLUDE 2 resource. -// -#include "afxres.h" - -///////////////////////////////////////////////////////////////////////////// -#undef APSTUDIO_READONLY_SYMBOLS - -///////////////////////////////////////////////////////////////////////////// -// English (U.S.) resources - -#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_ENU) -#ifdef _WIN32 -LANGUAGE LANG_ENGLISH, SUBLANG_ENGLISH_US -#pragma code_page(1252) -#endif //_WIN32 - -#ifdef APSTUDIO_INVOKED -///////////////////////////////////////////////////////////////////////////// -// -// TEXTINCLUDE -// - -1 TEXTINCLUDE -BEGIN - "resource.h\0" -END - -2 TEXTINCLUDE -BEGIN - "#include ""afxres.h""\r\n" - "\0" -END - -3 TEXTINCLUDE -BEGIN - "\r\n" - "\0" -END - -#endif // APSTUDIO_INVOKED - - -///////////////////////////////////////////////////////////////////////////// -// -// Version -// - -VS_VERSION_INFO VERSIONINFO - FILEVERSION 4,6,0,0 - PRODUCTVERSION 4,6,0,0 - FILEFLAGSMASK 0x17L -#ifdef _DEBUG - FILEFLAGS 0x1L -#else - FILEFLAGS 0x0L -#endif - FILEOS 0x4L - FILETYPE 0x1L - FILESUBTYPE 0x0L -BEGIN - BLOCK "StringFileInfo" - BEGIN - BLOCK "040904b0" - BEGIN - VALUE "CompanyName", "InfiniDB, Inc." - VALUE "FileDescription", "InfiniDB V1 Decompression Server" - VALUE "FileVersion", "4.6.0-0" - VALUE "InternalName", "DecomSvr" - VALUE "LegalCopyright", "Copyright (C) 2014" - VALUE "OriginalFilename", "DecomSvr.exe" - VALUE "ProductName", "InfiniDB" - VALUE "ProductVersion", "4.6.0.0 Beta" - END - END - BLOCK "VarFileInfo" - BEGIN - VALUE "Translation", 0x409, 1200 - END -END - -#endif // English (U.S.) resources -///////////////////////////////////////////////////////////////////////////// - - - -#ifndef APSTUDIO_INVOKED -///////////////////////////////////////////////////////////////////////////// -// -// Generated from the TEXTINCLUDE 3 resource. -// - - -///////////////////////////////////////////////////////////////////////////// -#endif // not APSTUDIO_INVOKED - diff --git a/decomsvr/DecomSvr.vcxproj b/decomsvr/DecomSvr.vcxproj deleted file mode 100644 index 8e3bb55a4..000000000 --- a/decomsvr/DecomSvr.vcxproj +++ /dev/null @@ -1,290 +0,0 @@ - - - - - Debug - Win32 - - - Debug - x64 - - - EnterpriseRelease - Win32 - - - EnterpriseRelease - x64 - - - Release - Win32 - - - Release - x64 - - - - {E7F6F8A8-9DDE-4FC4-8F80-25AB98922E6F} - DecomSvr - Win32Proj - - - - Application - v110 - MultiByte - true - - - Application - v110 - MultiByte - true - - - Application - v110 - MultiByte - - - Application - v110 - MultiByte - true - - - Application - v110 - MultiByte - true - - - Application - v110 - MultiByte - - - - - - - - - - - - - - - - - - - - - - - - - <_ProjectFileVersion>11.0.50727.1 - - - $(SolutionDir)..\..\$(PlatformName)\$(ConfigurationName)\ - $(SolutionDir)..\..\obj\$(ProjectName)\$(PlatformName)\$(ConfigurationName)\ - true - - - $(SolutionDir)..\..\$(Platform)\$(Configuration)\ - $(SolutionDir)..\..\obj\$(ProjectName)\$(Platform)\$(Configuration)\ - true - - - $(SolutionDir)..\..\$(PlatformName)\$(ConfigurationName)\ - $(SolutionDir)..\..\obj\$(ProjectName)\$(PlatformName)\$(ConfigurationName)\ - false - - - $(SolutionDir)..\..\$(Platform)\$(Configuration)\ - $(SolutionDir)..\..\obj\$(ProjectName)\$(Platform)\$(Configuration)\ - false - - - $(SolutionDir)..\..\$(PlatformName)\$(ConfigurationName)\ - $(SolutionDir)..\..\obj\$(ProjectName)\$(PlatformName)\$(ConfigurationName)\ - false - - - $(SolutionDir)..\..\$(Platform)\$(Configuration)\ - $(SolutionDir)..\..\obj\$(ProjectName)\$(Platform)\$(Configuration)\ - false - - - - Disabled - $(SolutionDir)..\..\boost_1_54_0;$(SolutionDir)..\utils\winport;%(AdditionalIncludeDirectories) - WIN32;_DEBUG;_CONSOLE;%(PreprocessorDefinitions) - true - EnableFastChecks - MultiThreadedDebugDLL - - Level3 - EditAndContinue - - - ws2_32.lib;%(AdditionalDependencies) - $(SolutionDir)..\..\boost_1_54_0;%(AdditionalLibraryDirectories) - true - Console - MachineX86 - - - - - X64 - - - Disabled - $(SolutionDir)..\..\boost_1_54_0;$(SolutionDir)..\utils\winport;%(AdditionalIncludeDirectories) - WIN32;_DEBUG;_CONSOLE;%(PreprocessorDefinitions) - true - EnableFastChecks - MultiThreadedDebugDLL - - Level3 - ProgramDatabase - - - ws2_32.lib;%(AdditionalDependencies) - $(SolutionDir)..\..\boost_1_54_0;$(SolutionDir)..\..\x64\Debug;%(AdditionalLibraryDirectories) - true - Console - MachineX64 - - - - - MaxSpeed - true - $(SolutionDir)..\..\boost_1_54_0;$(SolutionDir)..\utils\winport;%(AdditionalIncludeDirectories) - WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions) - MultiThreadedDLL - true - - Level3 - ProgramDatabase - 4996;4244;4267;%(DisableSpecificWarnings) - - - ws2_32.lib;%(AdditionalDependencies) - $(SolutionDir)..\..\boost_1_54_0;%(AdditionalLibraryDirectories) - true - Console - true - true - MachineX86 - - - - - X64 - - - MaxSpeed - true - $(SolutionDir)..\..\boost_1_54_0;$(SolutionDir)..\utils\winport;%(AdditionalIncludeDirectories) - WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions) - MultiThreadedDLL - true - - Level3 - ProgramDatabase - 4996;4244;4267;%(DisableSpecificWarnings) - - - ws2_32.lib;%(AdditionalDependencies) - $(SolutionDir)..\..\boost_1_54_0;C:$(SolutionDir)..\..x64\Release;%(AdditionalLibraryDirectories) - true - Console - true - true - MachineX64 - - - $(SolutionDir)..\..\signit "InfiniDB V1 Decompression Server" $(SolutionDir)..\..x64\Release\DecomSvr.exe - - - - - MaxSpeed - true - $(SolutionDir)..\..\boost_1_54_0;$(SolutionDir)..\utils\winport;%(AdditionalIncludeDirectories) - WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions) - MultiThreadedDLL - true - - Level3 - ProgramDatabase - 4996;4244;4267;%(DisableSpecificWarnings) - - - ws2_32.lib;%(AdditionalDependencies) - $(SolutionDir)..\..\boost_1_54_0;%(AdditionalLibraryDirectories) - true - Console - true - true - MachineX86 - - - - - X64 - - - MaxSpeed - true - $(SolutionDir)..\..\boost_1_54_0;$(SolutionDir)..\utils\winport;%(AdditionalIncludeDirectories) - WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions) - MultiThreadedDLL - true - - Level3 - ProgramDatabase - 4996;4244;4267;%(DisableSpecificWarnings) - false - - - ws2_32.lib;%(AdditionalDependencies) - $(SolutionDir)..\..\boost_1_54_0;$(SolutionDir)..\..\x64\EnterpriseRelease;%(AdditionalLibraryDirectories) - false - Console - true - true - MachineX64 - - - $(SolutionDir)..\..\signit "InfiniDB V1 Decompression Server" $(SolutionDir)..\..\x64\EnterpriseRelease\DecomSvr.exe - - - - - - - - - - - - - - - {4f0851d3-b782-4f12-b748-73efa2da586b} - - - - - - \ No newline at end of file diff --git a/decomsvr/DecomSvr.vcxproj.filters b/decomsvr/DecomSvr.vcxproj.filters deleted file mode 100644 index 3166cce9d..000000000 --- a/decomsvr/DecomSvr.vcxproj.filters +++ /dev/null @@ -1,35 +0,0 @@ - - - - - {4FC737F1-C7A5-4376-A066-2A32D752A2FF} - cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx - - - {93995380-89BD-4b04-88EB-625FBE52EBFB} - h;hpp;hxx;hm;inl;inc;xsd - - - {67DA6AB6-F800-4c08-8B7A-83BB121AAD01} - rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav - - - - - Source Files - - - Source Files - - - - - Header Files - - - - - Resource Files - - - \ No newline at end of file diff --git a/decomsvr/cli.cpp b/decomsvr/cli.cpp deleted file mode 100644 index e7b257a1c..000000000 --- a/decomsvr/cli.cpp +++ /dev/null @@ -1,112 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -//#define NDEBUG -#include -#include -using namespace std; - -/* - Protocol definition: - On the control fifo: - This server waits for the other to send it: - 1. The name of the data fifo to open and read/write (string) - 2. The number of bytes of compressed data to read (number) - - On the data fifo: - The server then reads the compressed data from the data fifo: - 1. The compressed data - then it decompresses it, and sends back to the client: - 1. The number of bytes in the uncompressed stream (number) - 2. The uncompressed data - - strings are sent like this: - uint32_t string len - len bytes of the string - numbers are sent like this: - uint64_t the number - - This server expects numeric values to be in its native byte order, so - the sender needs to do it that way. -*/ - -namespace -{ -const string MessageFifo("/tmp/idbdsfifo"); -} - -int main(int argc, char** argv) -{ -again: - int fd = open(MessageFifo.c_str(), O_WRONLY | O_NONBLOCK); - - if (fd < 0) - { - if (errno == ENXIO) - { - cerr << "waiting for DS to startup..." << endl; - sleep(1); - goto again; - } - - throw runtime_error("while opening fifo for write"); - } - - uint32_t u32; - uint64_t u64; - string s; - ssize_t wrc; - - s = "/tmp/cdatafifo"; - mknod(s.c_str(), S_IFIFO | 0666, 0); - u32 = s.length(); - wrc = write(fd, &u32, 4); - assert(wrc == 4); - wrc = write(fd, s.c_str(), u32); - assert(wrc == u32); - - u64 = 707070; - write(fd, &u64, 8); - - close(fd); - - fd = open(s.c_str(), O_WRONLY); - assert(fd >= 0); - - char* b = new char[u64]; - assert(b); - - wrc = write (fd, b, u64); - assert(wrc == u64); - - delete [] b; - - close(fd); - fd = open(s.c_str(), O_RDONLY); - assert(fd >= 0); - - wrc = read(fd, &u64, 8); - assert(wrc == 8); - - b = new char[u64]; - assert(b); - - cout << "going to read " << u64 << " bytes of uncompressed data" << endl << flush; - wrc = read(fd, b, u64); - assert(wrc == u64); - cout << "read " << u64 << " bytes of uncompressed data" << endl; - - delete [] b; - - close(fd); - - unlink(s.c_str()); - - return 0; -} - diff --git a/decomsvr/quicklz.c b/decomsvr/quicklz.c deleted file mode 100644 index 374212902..000000000 --- a/decomsvr/quicklz.c +++ /dev/null @@ -1,848 +0,0 @@ -// Fast data compression library -// Copyright (C) 2006-2011 Lasse Mikkel Reinhold -// lar@quicklz.com -// -// QuickLZ can be used for free under the GPL 1, 2 or 3 license (where anything -// released into public must be open source) or under a commercial license if such -// has been acquired (see http://www.quicklz.com/order.html). The commercial license -// does not cover derived or ported versions created by third parties under GPL. - -// 1.5.0 final - -#include "quicklz.h" - -#if QLZ_VERSION_MAJOR != 1 || QLZ_VERSION_MINOR != 5 || QLZ_VERSION_REVISION != 0 - #error quicklz.c and quicklz.h have different versions -#endif - -#if (defined(__X86__) || defined(__i386__) || defined(i386) || defined(_M_IX86) || defined(__386__) || defined(__x86_64__) || defined(_M_X64)) - #define X86X64 -#endif - -#define MINOFFSET 2 -#define UNCONDITIONAL_MATCHLEN 6 -#define UNCOMPRESSED_END 4 -#define CWORD_LEN 4 - -#if QLZ_COMPRESSION_LEVEL == 1 && defined QLZ_PTR_64 && QLZ_STREAMING_BUFFER == 0 - #define OFFSET_BASE source - #define CAST (ui32)(size_t) -#else - #define OFFSET_BASE 0 - #define CAST -#endif - -int qlz_get_setting(int setting) -{ - switch (setting) - { - case 0: return QLZ_COMPRESSION_LEVEL; - case 1: return sizeof(qlz_state_compress); - case 2: return sizeof(qlz_state_decompress); - case 3: return QLZ_STREAMING_BUFFER; -#ifdef QLZ_MEMORY_SAFE - case 6: return 1; -#else - case 6: return 0; -#endif - case 7: return QLZ_VERSION_MAJOR; - case 8: return QLZ_VERSION_MINOR; - case 9: return QLZ_VERSION_REVISION; - } - return -1; -} - -#if QLZ_COMPRESSION_LEVEL == 1 -static int same(const unsigned char *src, size_t n) -{ - while(n > 0 && *(src + n) == *src) - n--; - return n == 0 ? 1 : 0; -} -#endif - -static void reset_table_compress(qlz_state_compress *state) -{ - int i; - for(i = 0; i < QLZ_HASH_VALUES; i++) - { -#if QLZ_COMPRESSION_LEVEL == 1 - state->hash[i].offset = 0; -#else - state->hash_counter[i] = 0; -#endif - } -} - -static void reset_table_decompress(qlz_state_decompress *state) -{ - int i; - (void)state; - (void)i; -#if QLZ_COMPRESSION_LEVEL == 2 - for(i = 0; i < QLZ_HASH_VALUES; i++) - { - state->hash_counter[i] = 0; - } -#endif -} - -static __inline ui32 hash_func(ui32 i) -{ -#if QLZ_COMPRESSION_LEVEL == 2 - return ((i >> 9) ^ (i >> 13) ^ i) & (QLZ_HASH_VALUES - 1); -#else - return ((i >> 12) ^ i) & (QLZ_HASH_VALUES - 1); -#endif -} - -static __inline ui32 fast_read(void const *src, ui32 bytes) -{ -#ifndef X86X64 - unsigned char *p = (unsigned char*)src; - switch (bytes) - { - case 4: - return(*p | *(p + 1) << 8 | *(p + 2) << 16 | *(p + 3) << 24); - case 3: - return(*p | *(p + 1) << 8 | *(p + 2) << 16); - case 2: - return(*p | *(p + 1) << 8); - case 1: - return(*p); - } - return 0; -#else - if (bytes >= 1 && bytes <= 4) - return *((ui32*)src); - else - return 0; -#endif -} - -static __inline ui32 hashat(const unsigned char *src) -{ - ui32 fetch, hash; - fetch = fast_read(src, 3); - hash = hash_func(fetch); - return hash; -} - -static __inline void fast_write(ui32 f, void *dst, size_t bytes) -{ -#ifndef X86X64 - unsigned char *p = (unsigned char*)dst; - - switch (bytes) - { - case 4: - *p = (unsigned char)f; - *(p + 1) = (unsigned char)(f >> 8); - *(p + 2) = (unsigned char)(f >> 16); - *(p + 3) = (unsigned char)(f >> 24); - return; - case 3: - *p = (unsigned char)f; - *(p + 1) = (unsigned char)(f >> 8); - *(p + 2) = (unsigned char)(f >> 16); - return; - case 2: - *p = (unsigned char)f; - *(p + 1) = (unsigned char)(f >> 8); - return; - case 1: - *p = (unsigned char)f; - return; - } -#else - switch (bytes) - { - case 4: - *((ui32*)dst) = f; - return; - case 3: - *((ui32*)dst) = f; - return; - case 2: - *((ui16 *)dst) = (ui16)f; - return; - case 1: - *((unsigned char*)dst) = (unsigned char)f; - return; - } -#endif -} - - -size_t qlz_size_decompressed(const char *source) -{ - ui32 n, r; - n = (((*source) & 2) == 2) ? 4 : 1; - r = fast_read(source + 1 + n, n); - r = r & (0xffffffff >> ((4 - n)*8)); - return r; -} - -size_t qlz_size_compressed(const char *source) -{ - ui32 n, r; - n = (((*source) & 2) == 2) ? 4 : 1; - r = fast_read(source + 1, n); - r = r & (0xffffffff >> ((4 - n)*8)); - return r; -} - -size_t qlz_size_header(const char *source) -{ - size_t n = 2*((((*source) & 2) == 2) ? 4 : 1) + 1; - return n; -} - - -static __inline void memcpy_up(unsigned char *dst, const unsigned char *src, ui32 n) -{ - // Caution if modifying memcpy_up! Overlap of dst and src must be special handled. -#ifndef X86X64 - unsigned char *end = dst + n; - while(dst < end) - { - *dst = *src; - dst++; - src++; - } -#else - ui32 f = 0; - do - { - *(ui32 *)(dst + f) = *(ui32 *)(src + f); - f += MINOFFSET + 1; - } - while (f < n); -#endif -} - -static __inline void update_hash(qlz_state_decompress *state, const unsigned char *s) -{ -#if QLZ_COMPRESSION_LEVEL == 1 - ui32 hash; - hash = hashat(s); - state->hash[hash].offset = s; - state->hash_counter[hash] = 1; -#elif QLZ_COMPRESSION_LEVEL == 2 - ui32 hash; - unsigned char c; - hash = hashat(s); - c = state->hash_counter[hash]; - state->hash[hash].offset[c & (QLZ_POINTERS - 1)] = s; - c++; - state->hash_counter[hash] = c; -#endif - (void)state; - (void)s; -} - -#if QLZ_COMPRESSION_LEVEL <= 2 -static void update_hash_upto(qlz_state_decompress *state, unsigned char **lh, const unsigned char *max) -{ - while(*lh < max) - { - (*lh)++; - update_hash(state, *lh); - } -} -#endif - -static size_t qlz_compress_core(const unsigned char *source, unsigned char *destination, size_t size, qlz_state_compress *state) -{ - const unsigned char *last_byte = source + size - 1; - const unsigned char *src = source; - unsigned char *cword_ptr = destination; - unsigned char *dst = destination + CWORD_LEN; - ui32 cword_val = 1U << 31; - const unsigned char *last_matchstart = last_byte - UNCONDITIONAL_MATCHLEN - UNCOMPRESSED_END; - ui32 fetch = 0; - unsigned int lits = 0; - - (void) lits; - - if(src <= last_matchstart) - fetch = fast_read(src, 3); - - while(src <= last_matchstart) - { - if ((cword_val & 1) == 1) - { - // store uncompressed if compression ratio is too low - if (src > source + (size >> 1) && dst - destination > src - source - ((src - source) >> 5)) - return 0; - - fast_write((cword_val >> 1) | (1U << 31), cword_ptr, CWORD_LEN); - - cword_ptr = dst; - dst += CWORD_LEN; - cword_val = 1U << 31; - fetch = fast_read(src, 3); - } -#if QLZ_COMPRESSION_LEVEL == 1 - { - const unsigned char *o; - ui32 hash, cached; - - hash = hash_func(fetch); - cached = fetch ^ state->hash[hash].cache; - state->hash[hash].cache = fetch; - - o = state->hash[hash].offset + OFFSET_BASE; - state->hash[hash].offset = CAST(src - OFFSET_BASE); - -#ifdef X86X64 - if ((cached & 0xffffff) == 0 && o != OFFSET_BASE && (src - o > MINOFFSET || (src == o + 1 && lits >= 3 && src > source + 3 && same(src - 3, 6)))) - { - if(cached != 0) - { -#else - if (cached == 0 && o != OFFSET_BASE && (src - o > MINOFFSET || (src == o + 1 && lits >= 3 && src > source + 3 && same(src - 3, 6)))) - { - if (*(o + 3) != *(src + 3)) - { -#endif - hash <<= 4; - cword_val = (cword_val >> 1) | (1U << 31); - fast_write((3 - 2) | hash, dst, 2); - src += 3; - dst += 2; - } - else - { - const unsigned char *old_src = src; - size_t matchlen; - hash <<= 4; - - cword_val = (cword_val >> 1) | (1U << 31); - src += 4; - - if(*(o + (src - old_src)) == *src) - { - src++; - if(*(o + (src - old_src)) == *src) - { - size_t q = last_byte - UNCOMPRESSED_END - (src - 5) + 1; - size_t remaining = q > 255 ? 255 : q; - src++; - while(*(o + (src - old_src)) == *src && (size_t)(src - old_src) < remaining) - src++; - } - } - - matchlen = src - old_src; - if (matchlen < 18) - { - fast_write((ui32)(matchlen - 2) | hash, dst, 2); - dst += 2; - } - else - { - fast_write((ui32)(matchlen << 16) | hash, dst, 3); - dst += 3; - } - } - fetch = fast_read(src, 3); - lits = 0; - } - else - { - lits++; - *dst = *src; - src++; - dst++; - cword_val = (cword_val >> 1); -#ifdef X86X64 - fetch = fast_read(src, 3); -#else - fetch = (fetch >> 8 & 0xffff) | (*(src + 2) << 16); -#endif - } - } -#elif QLZ_COMPRESSION_LEVEL >= 2 - { - const unsigned char *o, *offset2; - ui32 hash, matchlen, k, m, best_k = 0; - unsigned char c; - size_t remaining = (last_byte - UNCOMPRESSED_END - src + 1) > 255 ? 255 : (last_byte - UNCOMPRESSED_END - src + 1); - (void)best_k; - - - //hash = hashat(src); - fetch = fast_read(src, 3); - hash = hash_func(fetch); - - c = state->hash_counter[hash]; - - offset2 = state->hash[hash].offset[0]; - if(offset2 < src - MINOFFSET && c > 0 && ((fast_read(offset2, 3) ^ fetch) & 0xffffff) == 0) - { - matchlen = 3; - if(*(offset2 + matchlen) == *(src + matchlen)) - { - matchlen = 4; - while(*(offset2 + matchlen) == *(src + matchlen) && matchlen < remaining) - matchlen++; - } - } - else - matchlen = 0; - for(k = 1; k < QLZ_POINTERS && c > k; k++) - { - o = state->hash[hash].offset[k]; -#if QLZ_COMPRESSION_LEVEL == 3 - if(((fast_read(o, 3) ^ fetch) & 0xffffff) == 0 && o < src - MINOFFSET) -#elif QLZ_COMPRESSION_LEVEL == 2 - if(*(src + matchlen) == *(o + matchlen) && ((fast_read(o, 3) ^ fetch) & 0xffffff) == 0 && o < src - MINOFFSET) -#endif - { - m = 3; - while(*(o + m) == *(src + m) && m < remaining) - m++; -#if QLZ_COMPRESSION_LEVEL == 3 - if ((m > matchlen) || (m == matchlen && o > offset2)) -#elif QLZ_COMPRESSION_LEVEL == 2 - if (m > matchlen) -#endif - { - offset2 = o; - matchlen = m; - best_k = k; - } - } - } - o = offset2; - state->hash[hash].offset[c & (QLZ_POINTERS - 1)] = src; - c++; - state->hash_counter[hash] = c; - -#if QLZ_COMPRESSION_LEVEL == 3 - if(matchlen > 2 && src - o < 131071) - { - ui32 u; - size_t offset = src - o; - - for(u = 1; u < matchlen; u++) - { - hash = hashat(src + u); - c = state->hash_counter[hash]++; - state->hash[hash].offset[c & (QLZ_POINTERS - 1)] = src + u; - } - - cword_val = (cword_val >> 1) | (1U << 31); - src += matchlen; - - if(matchlen == 3 && offset <= 63) - { - *dst = (unsigned char)(offset << 2); - dst++; - } - else if (matchlen == 3 && offset <= 16383) - { - ui32 f = (ui32)((offset << 2) | 1); - fast_write(f, dst, 2); - dst += 2; - } - else if (matchlen <= 18 && offset <= 1023) - { - ui32 f = ((matchlen - 3) << 2) | ((ui32)offset << 6) | 2; - fast_write(f, dst, 2); - dst += 2; - } - - else if(matchlen <= 33) - { - ui32 f = ((matchlen - 2) << 2) | ((ui32)offset << 7) | 3; - fast_write(f, dst, 3); - dst += 3; - } - else - { - ui32 f = ((matchlen - 3) << 7) | ((ui32)offset << 15) | 3; - fast_write(f, dst, 4); - dst += 4; - } - } - else - { - *dst = *src; - src++; - dst++; - cword_val = (cword_val >> 1); - } -#elif QLZ_COMPRESSION_LEVEL == 2 - - if(matchlen > 2) - { - cword_val = (cword_val >> 1) | (1U << 31); - src += matchlen; - - if (matchlen < 10) - { - ui32 f = best_k | ((matchlen - 2) << 2) | (hash << 5); - fast_write(f, dst, 2); - dst += 2; - } - else - { - ui32 f = best_k | (matchlen << 16) | (hash << 5); - fast_write(f, dst, 3); - dst += 3; - } - } - else - { - *dst = *src; - src++; - dst++; - cword_val = (cword_val >> 1); - } -#endif - } -#endif - } - while (src <= last_byte) - { - if ((cword_val & 1) == 1) - { - fast_write((cword_val >> 1) | (1U << 31), cword_ptr, CWORD_LEN); - cword_ptr = dst; - dst += CWORD_LEN; - cword_val = 1U << 31; - } -#if QLZ_COMPRESSION_LEVEL < 3 - if (src <= last_byte - 3) - { -#if QLZ_COMPRESSION_LEVEL == 1 - ui32 hash, fetch; - fetch = fast_read(src, 3); - hash = hash_func(fetch); - state->hash[hash].offset = CAST(src - OFFSET_BASE); - state->hash[hash].cache = fetch; -#elif QLZ_COMPRESSION_LEVEL == 2 - ui32 hash; - unsigned char c; - hash = hashat(src); - c = state->hash_counter[hash]; - state->hash[hash].offset[c & (QLZ_POINTERS - 1)] = src; - c++; - state->hash_counter[hash] = c; -#endif - } -#endif - *dst = *src; - src++; - dst++; - cword_val = (cword_val >> 1); - } - - while((cword_val & 1) != 1) - cword_val = (cword_val >> 1); - - fast_write((cword_val >> 1) | (1U << 31), cword_ptr, CWORD_LEN); - - // min. size must be 9 bytes so that the qlz_size functions can take 9 bytes as argument - return dst - destination < 9 ? 9 : dst - destination; -} - -static size_t qlz_decompress_core(const unsigned char *source, unsigned char *destination, size_t size, qlz_state_decompress *state, const unsigned char *history) -{ - const unsigned char *src = source + qlz_size_header((const char *)source); - unsigned char *dst = destination; - const unsigned char *last_destination_byte = destination + size - 1; - ui32 cword_val = 1; - const unsigned char *last_matchstart = last_destination_byte - UNCONDITIONAL_MATCHLEN - UNCOMPRESSED_END; - unsigned char *last_hashed = destination - 1; - const unsigned char *last_source_byte = source + qlz_size_compressed((const char *)source) - 1; - static const ui32 bitlut[16] = {4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0}; - - (void) last_source_byte; - (void) last_hashed; - (void) state; - (void) history; - - for(;;) - { - ui32 fetch; - - if (cword_val == 1) - { -#ifdef QLZ_MEMORY_SAFE - if(src + CWORD_LEN - 1 > last_source_byte) - return 0; -#endif - cword_val = fast_read(src, CWORD_LEN); - src += CWORD_LEN; - } - -#ifdef QLZ_MEMORY_SAFE - if(src + 4 - 1 > last_source_byte) - return 0; -#endif - - fetch = fast_read(src, 4); - - if ((cword_val & 1) == 1) - { - ui32 matchlen; - const unsigned char *offset2; - -#if QLZ_COMPRESSION_LEVEL == 1 - ui32 hash; - cword_val = cword_val >> 1; - hash = (fetch >> 4) & 0xfff; - offset2 = (const unsigned char *)(size_t)state->hash[hash].offset; - - if((fetch & 0xf) != 0) - { - matchlen = (fetch & 0xf) + 2; - src += 2; - } - else - { - matchlen = *(src + 2); - src += 3; - } - -#elif QLZ_COMPRESSION_LEVEL == 2 - ui32 hash; - unsigned char c; - cword_val = cword_val >> 1; - hash = (fetch >> 5) & 0x7ff; - c = (unsigned char)(fetch & 0x3); - offset2 = state->hash[hash].offset[c]; - - if((fetch & (28)) != 0) - { - matchlen = ((fetch >> 2) & 0x7) + 2; - src += 2; - } - else - { - matchlen = *(src + 2); - src += 3; - } - -#elif QLZ_COMPRESSION_LEVEL == 3 - ui32 offset; - cword_val = cword_val >> 1; - if ((fetch & 3) == 0) - { - offset = (fetch & 0xff) >> 2; - matchlen = 3; - src++; - } - else if ((fetch & 2) == 0) - { - offset = (fetch & 0xffff) >> 2; - matchlen = 3; - src += 2; - } - else if ((fetch & 1) == 0) - { - offset = (fetch & 0xffff) >> 6; - matchlen = ((fetch >> 2) & 15) + 3; - src += 2; - } - else if ((fetch & 127) != 3) - { - offset = (fetch >> 7) & 0x1ffff; - matchlen = ((fetch >> 2) & 0x1f) + 2; - src += 3; - } - else - { - offset = (fetch >> 15); - matchlen = ((fetch >> 7) & 255) + 3; - src += 4; - } - - offset2 = dst - offset; -#endif - -#ifdef QLZ_MEMORY_SAFE - if(offset2 < history || offset2 > dst - MINOFFSET - 1) - return 0; - - if(matchlen > (ui32)(last_destination_byte - dst - UNCOMPRESSED_END + 1)) - return 0; -#endif - - memcpy_up(dst, offset2, matchlen); - dst += matchlen; - -#if QLZ_COMPRESSION_LEVEL <= 2 - update_hash_upto(state, &last_hashed, dst - matchlen); - last_hashed = dst - 1; -#endif - } - else - { - if (dst < last_matchstart) - { - unsigned int n = bitlut[cword_val & 0xf]; -#ifdef X86X64 - *(ui32 *)dst = *(ui32 *)src; -#else - memcpy_up(dst, src, 4); -#endif - cword_val = cword_val >> n; - dst += n; - src += n; -#if QLZ_COMPRESSION_LEVEL <= 2 - update_hash_upto(state, &last_hashed, dst - 3); -#endif - } - else - { - while(dst <= last_destination_byte) - { - if (cword_val == 1) - { - src += CWORD_LEN; - cword_val = 1U << 31; - } -#ifdef QLZ_MEMORY_SAFE - if(src >= last_source_byte + 1) - return 0; -#endif - *dst = *src; - dst++; - src++; - cword_val = cword_val >> 1; - } - -#if QLZ_COMPRESSION_LEVEL <= 2 - update_hash_upto(state, &last_hashed, last_destination_byte - 3); // todo, use constant -#endif - return size; - } - - } - } -} - -size_t qlz_compress(const void *source, char *destination, size_t size, qlz_state_compress *state) -{ - size_t r; - ui32 compressed; - size_t base; - - if(size == 0 || size > 0xffffffff - 400) - return 0; - - if(size < 216) - base = 3; - else - base = 9; - -#if QLZ_STREAMING_BUFFER > 0 - if (state->stream_counter + size - 1 >= QLZ_STREAMING_BUFFER) -#endif - { - reset_table_compress(state); - r = base + qlz_compress_core((const unsigned char *)source, (unsigned char*)destination + base, size, state); -#if QLZ_STREAMING_BUFFER > 0 - reset_table_compress(state); -#endif - if(r == base) - { - memcpy(destination + base, source, size); - r = size + base; - compressed = 0; - } - else - { - compressed = 1; - } - state->stream_counter = 0; - } -#if QLZ_STREAMING_BUFFER > 0 - else - { - unsigned char *src = state->stream_buffer + state->stream_counter; - - memcpy(src, source, size); - r = base + qlz_compress_core(src, (unsigned char*)destination + base, size, state); - - if(r == base) - { - memcpy(destination + base, src, size); - r = size + base; - compressed = 0; - reset_table_compress(state); - } - else - { - compressed = 1; - } - state->stream_counter += size; - } -#endif - if(base == 3) - { - *destination = (unsigned char)(0 | compressed); - *(destination + 1) = (unsigned char)r; - *(destination + 2) = (unsigned char)size; - } - else - { - *destination = (unsigned char)(2 | compressed); - fast_write((ui32)r, destination + 1, 4); - fast_write((ui32)size, destination + 5, 4); - } - - *destination |= (QLZ_COMPRESSION_LEVEL << 2); - *destination |= (1 << 6); - *destination |= ((QLZ_STREAMING_BUFFER == 0 ? 0 : (QLZ_STREAMING_BUFFER == 100000 ? 1 : (QLZ_STREAMING_BUFFER == 1000000 ? 2 : 3))) << 4); - -// 76543210 -// 01SSLLHC - - return r; -} - -size_t qlz_decompress(const char *source, void *destination, qlz_state_decompress *state) -{ - size_t dsiz = qlz_size_decompressed(source); - -#if QLZ_STREAMING_BUFFER > 0 - if (state->stream_counter + qlz_size_decompressed(source) - 1 >= QLZ_STREAMING_BUFFER) -#endif - { - if((*source & 1) == 1) - { - reset_table_decompress(state); - dsiz = qlz_decompress_core((const unsigned char *)source, (unsigned char *)destination, dsiz, state, (const unsigned char *)destination); - } - else - { - memcpy(destination, source + qlz_size_header(source), dsiz); - } - state->stream_counter = 0; - reset_table_decompress(state); - } -#if QLZ_STREAMING_BUFFER > 0 - else - { - unsigned char *dst = state->stream_buffer + state->stream_counter; - if((*source & 1) == 1) - { - dsiz = qlz_decompress_core((const unsigned char *)source, dst, dsiz, state, (const unsigned char *)state->stream_buffer); - } - else - { - memcpy(dst, source + qlz_size_header(source), dsiz); - reset_table_decompress(state); - } - memcpy(destination, dst, dsiz); - state->stream_counter += dsiz; - } -#endif - return dsiz; -} - diff --git a/decomsvr/quicklz.h b/decomsvr/quicklz.h deleted file mode 100644 index 56915b56d..000000000 --- a/decomsvr/quicklz.h +++ /dev/null @@ -1,150 +0,0 @@ -#ifndef QLZ_HEADER -#define QLZ_HEADER - -// Fast data compression library -// Copyright (C) 2006-2011 Lasse Mikkel Reinhold -// lar@quicklz.com -// -// QuickLZ can be used for free under the GPL 1, 2 or 3 license (where anything -// released into public must be open source) or under a commercial license if such -// has been acquired (see http://www.quicklz.com/order.html). The commercial license -// does not cover derived or ported versions created by third parties under GPL. - -// You can edit following user settings. Data must be decompressed with the same -// setting of QLZ_COMPRESSION_LEVEL and QLZ_STREAMING_BUFFER as it was compressed -// (see manual). If QLZ_STREAMING_BUFFER > 0, scratch buffers must be initially -// zeroed out (see manual). First #ifndef makes it possible to define settings from -// the outside like the compiler command line. - -// 1.5.0 final - -#ifndef QLZ_COMPRESSION_LEVEL - -// 1 gives fastest compression speed. 3 gives fastest decompression speed and best -// compression ratio. -//#define QLZ_COMPRESSION_LEVEL 1 -//#define QLZ_COMPRESSION_LEVEL 2 -#define QLZ_COMPRESSION_LEVEL 3 - -// If > 0, zero out both states prior to first call to qlz_compress() or qlz_decompress() -// and decompress packets in the same order as they were compressed -#define QLZ_STREAMING_BUFFER 0 -//#define QLZ_STREAMING_BUFFER 100000 -//#define QLZ_STREAMING_BUFFER 1000000 - -// Guarantees that decompression of corrupted data cannot crash. Decreases decompression -// speed 10-20%. Compression speed not affected. -#define QLZ_MEMORY_SAFE -#endif - -#define QLZ_VERSION_MAJOR 1 -#define QLZ_VERSION_MINOR 5 -#define QLZ_VERSION_REVISION 0 - -// Using size_t, memset() and memcpy() -#include - -// Verify compression level -#if QLZ_COMPRESSION_LEVEL != 1 && QLZ_COMPRESSION_LEVEL != 2 && QLZ_COMPRESSION_LEVEL != 3 -#error QLZ_COMPRESSION_LEVEL must be 1, 2 or 3 -#endif - -typedef unsigned int ui32; -typedef unsigned short int ui16; - -// Decrease QLZ_POINTERS for level 3 to increase compression speed. Do not touch any other values! -#if QLZ_COMPRESSION_LEVEL == 1 -#define QLZ_POINTERS 1 -#define QLZ_HASH_VALUES 4096 -#elif QLZ_COMPRESSION_LEVEL == 2 -#define QLZ_POINTERS 4 -#define QLZ_HASH_VALUES 2048 -#elif QLZ_COMPRESSION_LEVEL == 3 -#define QLZ_POINTERS 16 -#define QLZ_HASH_VALUES 4096 -#endif - -// Detect if pointer size is 64-bit. It's not fatal if some 64-bit target is not detected because this is only for adding an optional 64-bit optimization. -#if defined _LP64 || defined __LP64__ || defined __64BIT__ || _ADDR64 || defined _WIN64 || defined __arch64__ || __WORDSIZE == 64 || (defined __sparc && defined __sparcv9) || defined __x86_64 || defined __amd64 || defined __x86_64__ || defined _M_X64 || defined _M_IA64 || defined __ia64 || defined __IA64__ -#define QLZ_PTR_64 -#endif - -// hash entry -typedef struct -{ -#if QLZ_COMPRESSION_LEVEL == 1 - ui32 cache; -#if defined QLZ_PTR_64 && QLZ_STREAMING_BUFFER == 0 - unsigned int offset; -#else - const unsigned char* offset; -#endif -#else - const unsigned char* offset[QLZ_POINTERS]; -#endif - -} qlz_hash_compress; - -typedef struct -{ -#if QLZ_COMPRESSION_LEVEL == 1 - const unsigned char* offset; -#else - const unsigned char* offset[QLZ_POINTERS]; -#endif -} qlz_hash_decompress; - - -// states -typedef struct -{ -#if QLZ_STREAMING_BUFFER > 0 - unsigned char stream_buffer[QLZ_STREAMING_BUFFER]; -#endif - size_t stream_counter; - qlz_hash_compress hash[QLZ_HASH_VALUES]; - unsigned char hash_counter[QLZ_HASH_VALUES]; -} qlz_state_compress; - - -#if QLZ_COMPRESSION_LEVEL == 1 || QLZ_COMPRESSION_LEVEL == 2 -typedef struct -{ -#if QLZ_STREAMING_BUFFER > 0 - unsigned char stream_buffer[QLZ_STREAMING_BUFFER]; -#endif - qlz_hash_decompress hash[QLZ_HASH_VALUES]; - unsigned char hash_counter[QLZ_HASH_VALUES]; - size_t stream_counter; -} qlz_state_decompress; -#elif QLZ_COMPRESSION_LEVEL == 3 -typedef struct -{ -#if QLZ_STREAMING_BUFFER > 0 - unsigned char stream_buffer[QLZ_STREAMING_BUFFER]; -#endif -#if QLZ_COMPRESSION_LEVEL <= 2 - qlz_hash_decompress hash[QLZ_HASH_VALUES]; -#endif - size_t stream_counter; -} qlz_state_decompress; -#endif - - -#if defined (__cplusplus) -extern "C" { -#endif - -// Public functions of QuickLZ -size_t qlz_size_decompressed(const char* source); -size_t qlz_size_compressed(const char* source); -size_t qlz_compress(const void* source, char* destination, size_t size, qlz_state_compress* state); -size_t qlz_decompress(const char* source, void* destination, qlz_state_decompress* state); -int qlz_get_setting(int setting); - -#if defined (__cplusplus) -} -#endif - -#endif - diff --git a/decomsvr/resource.h b/decomsvr/resource.h deleted file mode 100644 index 17a793ca1..000000000 --- a/decomsvr/resource.h +++ /dev/null @@ -1,14 +0,0 @@ -//{{NO_DEPENDENCIES}} -// Microsoft Visual C++ generated include file. -// Used by DecomSvr.rc - -// Next default values for new objects -// -#ifdef APSTUDIO_INVOKED -#ifndef APSTUDIO_READONLY_SYMBOLS -#define _APS_NEXT_RESOURCE_VALUE 101 -#define _APS_NEXT_COMMAND_VALUE 40001 -#define _APS_NEXT_CONTROL_VALUE 1001 -#define _APS_NEXT_SYMED_VALUE 101 -#endif -#endif diff --git a/decomsvr/server.cpp b/decomsvr/server.cpp deleted file mode 100644 index f49dc35bc..000000000 --- a/decomsvr/server.cpp +++ /dev/null @@ -1,978 +0,0 @@ -/* Copyright (C) 2014 InfiniDB, Inc. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -/* - Protocol definition: - On the control socket: - This server waits for the other end to send it: - 1. The prefix name of the data fifos to open and read/write (string). The incoming, - compressed data is on "name".c and this server will write uncompressed - data on "name".u. - - On the data fifos: - The server then reads the compressed data from the cdata fifo: - 1. The number of bytes of compressed data to read (number) - 2. The compressed data - then it decompresses it, and writes to the udata fifo: - 1. The number of bytes in the uncompressed stream (number) - 2. The uncompressed data - - strings are sent like this: - uint32_t string len - len bytes of the string - numbers are sent like this: - uint64_t the number - - This server expects numeric values to be in its native byte order, so - the sender needs to send them that way. -*/ - -//#define _FILE_OFFSET_BITS 64 -//#define _LARGEFILE64_SOURCE - -#ifdef _MSC_VER -#define WIN32_LEAN_AND_MEAN -#define NOMINMAX -#include -#include -#include -#include -#include -#include -#else -#include -#include -#include -#include -#endif -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#ifndef _MSC_VER -#include -#endif -using namespace std; - -#include -#include -#include -using namespace boost; - -#include "quicklz.h" -#ifndef _MSC_VER -#include "config.h" -#endif - -//#define SINGLE_THREADED - -namespace -{ -#ifdef _MSC_VER -typedef SOCKET SockType; -#define SockReadFcn reads -#else -typedef int SockType; -#define SockReadFcn readn -#endif - -short PortNo; -SockType ListenSock; - -// version 1.1 of the chunk data has a short header -const uint8_t CHUNK_MAGIC1 = 0xff; -const int SIG_OFFSET = 0; -const int CHECKSUM_OFFSET = 1; -const int LEN_OFFSET = 5; -const uint32_t HEADER_SIZE = 9; - -const int ERR_OK = 0; -const int ERR_CHECKSUM = -1; -const int ERR_DECOMPRESS = -2; -const int ERR_BADINPUT = -3; - -/* version 1.2 of the chunk data changes the hash function used to calculate - * checksums. We can no longer use the algorithm used in ver 1.1. Everything - * else is the same - */ -const uint8_t CHUNK_MAGIC2 = 0xfe; - -// A version of idbassert_s & log() that doesn't require linking the logging lib. -// Things printed to stderr go to /tmp/decomsvr.err -#ifndef __STRING -#define __STRING(x) #x -#endif -#define idbassert_s(x, s) do { \ - if (!(x)) { \ - std::ostringstream os; \ -\ - os << __FILE__ << "@" << __LINE__ << ": assertion \'" << __STRING(x) << "\' failed. Error msg \'" << s << "\'"; \ - std::cerr << os.str() << std::endl; \ - throw runtime_error(os.str()); \ - } \ -} while (0) - -void log(const string& s) -{ - cerr << s << endl; -} - -struct DecomMessage -{ - DecomMessage() : isValid(false) { } - ~DecomMessage() { } - - string toString() const; - - bool isValid; - string pipeName; -}; - -string DecomMessage::toString() const -{ - ostringstream oss; - oss << "valid: " << boolalpha << isValid << ", " << - "pipepfx: " << pipeName; - return oss.str(); -} - -ostream& operator<<(ostream& os, const DecomMessage& rhs) -{ - os << rhs.toString(); - return os; -} - -class ThreadFunc -{ -public: - ThreadFunc(const DecomMessage& dm) : fDm(dm) { } - ~ThreadFunc() { } - - void operator()(); - -private: - //Defaults okay - //ThreadFunc(const ThreadFunc& rhs); - //ThreadFunc& operator=(const ThreadFunc& rhs); - - DecomMessage fDm; -}; - -bool serverInit() -{ -#ifndef _MSC_VER - - //Set parent PID to init - setsid(); - - //Handle certain signals (we want these to return EINTR so we can throw) - //SIGPIPE - //I don't think we'll get any of these from init (except possibly HUP, but that's an indication - // of bad things anyway) - //SIGHUP? - //SIGUSR1? - //SIGUSR2? - //SIGPOLL? - struct sigaction sa; - memset(&sa, 0, sizeof(struct sigaction)); - sa.sa_handler = SIG_IGN; - sigaction(SIGPIPE, &sa, 0); - sigaction(SIGHUP, &sa, 0); - sigaction(SIGUSR1, &sa, 0); - sigaction(SIGUSR2, &sa, 0); -#ifndef __FreeBSD__ - sigaction(SIGPOLL, &sa, 0); -#endif - int fd; - close(2); - fd = open("/tmp/decomsrv.err", O_CREAT | O_TRUNC | O_WRONLY, 0644); - - if (fd >= 0 && fd != 2) - { - dup2(fd, 2); - close(fd); - } - -#endif - return true; -} - -bool initCtlFifo() -{ -#ifdef _MSC_VER - WSAData wsadata; - const WORD minVersion = MAKEWORD(2, 2); - WSAStartup(minVersion, &wsadata); -#endif - ListenSock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - idbassert_s(ListenSock >= 0, string("socket create error: ") + strerror(errno)); - //if (ListenSock < 0) throw runtime_error(string("socket create error: ") + strerror(errno)); -#ifndef _MSC_VER - int optval = 1; - setsockopt(ListenSock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&optval), sizeof(optval)); -#endif - int rc = 0; - struct sockaddr_in serv_addr; - struct in_addr la; - inet_aton("127.0.0.1", &la); - memset(&serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = la.s_addr; - serv_addr.sin_port = htons(PortNo); - const int MaxTries = 5 * 60 / 10; - int tries = 0; -again: - rc = ::bind(ListenSock, (sockaddr*)&serv_addr, sizeof(serv_addr)); - - if (rc < 0) - { -#ifdef _MSC_VER - int x = WSAGetLastError(); - - if (x == WSAEADDRINUSE) -#else - if (errno == EADDRINUSE) -#endif - { - //cerr << "Addr in use..." << endl; - if (++tries >= MaxTries) - { - log("Waited too long for socket to bind...giving up"); - //cerr << "Waited too long for socket to bind...giving up" << endl; - exit(1); - } - - sleep(10); - goto again; - } - - idbassert_s(0, string("socket bind error: ") + strerror(errno)); - //throw runtime_error(string("socket bind error: ") + strerror(errno)); - } - - rc = listen(ListenSock, 16); - idbassert_s(rc >= 0, string("socket listen error") + strerror(errno)); - //if (rc < 0) throw runtime_error(string("socket listen error") + strerror(errno)); - - return true; -} - -#ifndef _MSC_VER -void readn(int fd, void* buf, const size_t wanted) -{ - size_t needed = wanted; - size_t sofar = 0; - char* p = reinterpret_cast(buf); - ssize_t rrc = -1; - pollfd fds[1]; - int en = 0; - int prc = 0; - ostringstream oss; - unsigned zerocount = 0; - - fds[0].fd = fd; - fds[0].events = POLLIN; - - while (wanted > sofar) - { - fds[0].revents = 0; - errno = 0; - prc = poll(fds, 1, -1); - en = errno; - - if (prc <= 0) - { - if (en == EAGAIN || en == EINTR || en == 512) - continue; - - oss << "decomsvr::readn: poll() returned " << prc << " (" << strerror(en) << ")"; - idbassert_s(0, oss.str()); - } - - //no data on fd - if ((fds[0].revents & POLLIN) == 0) - { - oss << "decomsvr::readn: revents for fd " << fds[0].fd << " was " << fds[0].revents; - idbassert_s(0, oss.str()); - } - - errno = 0; - rrc = read(fd, (p + sofar), needed); - en = errno; - - if (rrc < 0) - { - if (en == EAGAIN || en == EINTR || en == 512) - continue; - - oss << "decomsvr::readn: read() returned " << rrc << " (" << strerror(en) << ")"; - idbassert_s(0, oss.str()); - } - - if (rrc == 0) - { - ostringstream os; - zerocount++; - - if (zerocount >= 10) - { - os << "decomsvr::readn(): too many zero-length reads!"; - idbassert_s(0, oss.str()); - } - - os << "decomsvr::readn(): zero-length read on fd " << fd; - log(os.str()); - sleep(1); - } - else - zerocount = 0; - - needed -= rrc; - sofar += rrc; - } -} - -size_t writen(int fd, const void* data, size_t nbytes) -{ - size_t nleft; - ssize_t nwritten; - const char* bufp = (const char*) data; - nleft = nbytes; - - while (nleft > 0) - { - // the O_NONBLOCK flag is not set, this is a blocking I/O. - if ((nwritten = ::write(fd, bufp, nleft)) < 0) - { - if (errno == EINTR) - nwritten = 0; - else - { - // save the error no first - int e = errno; - string errorMsg = "decomsvr: write() error: "; - scoped_array buf(new char[80]); -#if STRERROR_R_CHAR_P - const char* p; - - if ((p = strerror_r(e, buf.get(), 80)) != 0) - errorMsg += p; - -#else - int p; - - if ((p = strerror_r(e, buf.get(), 80)) == 0) - errorMsg += buf.get(); - -#endif - idbassert_s(0, errorMsg); - } - } - - nleft -= nwritten; - bufp += nwritten; - } - - return nbytes; -} -#else -void reads(SOCKET fd, void* buf, const size_t wanted) -{ - size_t needed = wanted; - size_t sofar = 0; - char* p = reinterpret_cast(buf); - ssize_t rrc = -1; - pollfd fds[1]; - int en = 0; - - fds[0].fd = fd; - fds[0].events = POLLIN; - - while (wanted > sofar) - { - fds[0].revents = 0; - poll(fds, 1, -1); - errno = 0; - rrc = recv(fd, (p + sofar), (int)needed, 0); - en = errno; - - if (rrc < 0) - { - if (en == EAGAIN || en == EINTR) - continue; - - ostringstream oss; - oss << "read() returned " << rrc << " (" << strerror(en) << ")"; - idbassert_s(0, oss.str()); - } - - needed -= rrc; - sofar += rrc; - } -} -#endif - -uint32_t readNumber32(SockType fd) -{ - uint32_t np; - SockReadFcn(fd, &np, 4); - return np; -} - -string readString(SockType fd) -{ - string s; - uint32_t len = readNumber32(fd); - idbassert_s(len <= 64, "while reading a string len (>64)"); - //if (len > 64) - // throw runtime_error("while reading a string len (>64)"); - char* buf = (char*)alloca(len + 1); //this should be at most 65 bytes and should always succeed - SockReadFcn(fd, buf, len); - buf[len] = 0; - s = buf; - return s; -} - -DecomMessage getNextMsg(SockType fd) -{ - DecomMessage dm; - - try - { - dm.pipeName = readString(fd); - dm.isValid = true; - } - catch (runtime_error& rex) - { - cerr << "re reading ctl msg: " << rex.what() << endl; - dm.pipeName = ""; - } - catch (...) - { - cerr << "ex reading ctl msg" << endl; - dm.pipeName = ""; - } - - return dm; -} - -// Murmur3 from code.google.com - -uint64_t fmix(uint64_t k) -{ - k ^= k >> 33; - k *= 0xff51afd7ed558ccdULL; - k ^= k >> 33; - k *= 0xc4ceb9fe1a85ec53ULL; - k ^= k >> 33; - - return k; -} - -uint64_t rotl64(uint64_t x, int8_t r) -{ - return (x << r) | (x >> (64 - r)); -} - -class Hasher128 -{ -public: - inline uint64_t operator()(const char* data, uint64_t len) const - { - const int nblocks = len / 16; - - uint64_t h1 = 0; - uint64_t h2 = 0; - - const uint64_t c1 = 0x87c37b91114253d5ULL; - const uint64_t c2 = 0x4cf5ad432745937fULL; - - //---------- - // body - - const uint64_t* blocks = (const uint64_t*) (data); - - for (int i = 0; i < nblocks; i++) - { - uint64_t k1 = blocks[i * 2 + 0]; - uint64_t k2 = blocks[i * 2 + 1]; - - k1 *= c1; - k1 = rotl64(k1, 31); - k1 *= c2; - h1 ^= k1; - - h1 = rotl64(h1, 27); - h1 += h2; - h1 = h1 * 5 + 0x52dce729; - - k2 *= c2; - k2 = rotl64(k2, 33); - k2 *= c1; - h2 ^= k2; - - h2 = rotl64(h2, 31); - h2 += h1; - h2 = h2 * 5 + 0x38495ab5; - } - - //---------- - // tail - - const uint8_t* tail = (const uint8_t*) (data + nblocks * 16); - - uint64_t k1 = 0; - uint64_t k2 = 0; - - switch (len & 15) - { - case 15: - k2 ^= uint64_t(tail[14]) << 48; - - case 14: - k2 ^= uint64_t(tail[13]) << 40; - - case 13: - k2 ^= uint64_t(tail[12]) << 32; - - case 12: - k2 ^= uint64_t(tail[11]) << 24; - - case 11: - k2 ^= uint64_t(tail[10]) << 16; - - case 10: - k2 ^= uint64_t(tail[9]) << 8; - - case 9: - k2 ^= uint64_t(tail[8]) << 0; - k2 *= c2; - k2 = rotl64(k2, 33); - k2 *= c1; - h2 ^= k2; - - case 8: - k1 ^= uint64_t(tail[7]) << 56; - - case 7: - k1 ^= uint64_t(tail[6]) << 48; - - case 6: - k1 ^= uint64_t(tail[5]) << 40; - - case 5: - k1 ^= uint64_t(tail[4]) << 32; - - case 4: - k1 ^= uint64_t(tail[3]) << 24; - - case 3: - k1 ^= uint64_t(tail[2]) << 16; - - case 2: - k1 ^= uint64_t(tail[1]) << 8; - - case 1: - k1 ^= uint64_t(tail[0]) << 0; - k1 *= c1; - k1 = rotl64(k1, 31); - k1 *= c2; - h1 ^= k1; - }; - - //---------- - // finalization - - h1 ^= len; - - h2 ^= len; - - h1 += h2; - - h2 += h1; - - h1 = fmix(h1); - - h2 = fmix(h2); - - h1 += h2; - - h2 += h1; - - return h1; - } -}; - -int uncompressBlock(const char* in, const size_t inLen, unsigned char* out, - unsigned int& outLen) -{ - int rc = ERR_OK; - int qlzrc = 0; - - boost::scoped_ptr scratch(new qlz_state_decompress()); - - uint32_t realChecksum; - uint32_t storedChecksum; - uint32_t storedLen; - uint8_t storedMagic; - Hasher128 hasher; - - outLen = 0; - - if (inLen < 1) - { - return ERR_BADINPUT; - } - - storedMagic = *((uint8_t*) &in[SIG_OFFSET]); - - if (storedMagic == CHUNK_MAGIC1 || storedMagic == CHUNK_MAGIC2) - { - if (inLen < HEADER_SIZE) - { - return ERR_BADINPUT; - } - - storedChecksum = *((uint32_t*) &in[CHECKSUM_OFFSET]); - storedLen = *((uint32_t*) (&in[LEN_OFFSET])); - - if (inLen < storedLen + HEADER_SIZE) - { - return ERR_BADINPUT; - } - - /* We can no longer verify the checksum on ver 1.1 */ - if (storedMagic == CHUNK_MAGIC2) - { - realChecksum = hasher(&in[HEADER_SIZE], storedLen); - - if (storedChecksum != realChecksum) - { - return ERR_CHECKSUM; - } - } - - qlzrc = qlz_decompress(&in[HEADER_SIZE], out, scratch.get()); - } - else - qlzrc = qlz_decompress(in, out, scratch.get()); - - if (qlzrc == 0) - rc = ERR_DECOMPRESS; - else - outLen = qlzrc; - - return rc; -} - -struct ScopedCleaner -{ -#ifdef _MSC_VER - ScopedCleaner() : handle(INVALID_HANDLE_VALUE) { } - ~ScopedCleaner() - { - if (handle != INVALID_HANDLE_VALUE) CloseHandle(handle); - } - - HANDLE handle; -#else - ScopedCleaner() : fd(-1) { } - ~ScopedCleaner() - { - if (fd >= 0) close(fd); - } - - int fd; -#endif -}; - -void ThreadFunc::operator()() -{ - string cfifo = fDm.pipeName + ".c"; - string ufifo = fDm.pipeName + ".u"; - uint64_t ccount = 0; - ssize_t rrc = -1; - ScopedCleaner cleaner; -#ifdef _MSC_VER - HANDLE h; - h = CreateFile(cfifo.c_str(), GENERIC_READ, 0, 0, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, 0); - idbassert_s(h != INVALID_HANDLE_VALUE, "while opening cpipe"); - //if (!(h != INVALID_HANDLE_VALUE)) - // throw runtime_error("while opening cpipe"); - cleaner.handle = h; - - DWORD nread; - BOOL drrc; - drrc = ReadFile(h, &ccount, 8, &nread, 0); - idbassert_s(drrc != 0 && nread == 8 && ccount < 8 * 1024 * 1024, "while reading from cpipe"); - //if (!(drrc != 0 && nread == 8 && ccount < 8 * 1024 * 1024)) - // throw runtime_error("while reading from cpipe"); - - scoped_array in(new char[ccount]); - - drrc = ReadFile(h, in.get(), (DWORD)ccount, &nread, 0); - idbassert_s(drrc != 0 && nread == ccount, "while reading from cpipe"); - //if (!(drrc != 0 && nread == ccount)) - // throw runtime_error("while reading from cpipe"); - - CloseHandle(h); - cleaner.handle = INVALID_HANDLE_VALUE; -#else - scoped_array in; - int fd = -1; - - try - { - fd = open(cfifo.c_str(), O_RDONLY); - idbassert_s(fd >= 0, "when opening data fifo for input"); - - cleaner.fd = fd; - - readn(fd, &ccount, 8); - - in.reset(new char[ccount]); - - readn(fd, in.get(), ccount); - - close(fd); - cleaner.fd = -1; - } - catch (std::exception& ) - { - //This is a protocol error and returning here will clean up resources on - //the stack unwind. - return; - } - -#endif -#ifdef _MSC_VER - h = CreateFile(ufifo.c_str(), GENERIC_WRITE, 0, 0, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, 0); - idbassert_s(h != INVALID_HANDLE_VALUE, "while opening upipe"); - //if (!(h != INVALID_HANDLE_VALUE)) - // throw runtime_error("while opening upipe"); - cleaner.handle = h; - - uint64_t outlen = 512 * 1024 * 8; - unsigned int ol = static_cast(outlen); - - scoped_array out(new char[outlen]); - - int crc = uncompressBlock(in.get(), ccount, reinterpret_cast(out.get()), ol); - - if (crc != ERR_OK) - outlen = 0; - else - outlen = ol; - - BOOL dwrc; - DWORD nwritten; - dwrc = WriteFile(h, &outlen, 8, &nwritten, 0); - idbassert_s(dwrc != 0 && nwritten == 8, "while writing to upipe"); - //if (!(dwrc != 0 && nwritten == 8)) - // throw runtime_error("while writing to upipe"); - - dwrc = WriteFile(h, out.get(), (DWORD)outlen, &nwritten, 0); - idbassert_s(dwrc != 0 && nwritten == outlen, "while writing to upipe"); - //if (!(dwrc != 0 && nwritten == outlen)) - // throw runtime_error("while writing to upipe"); - - FlushFileBuffers(h); - CloseHandle(h); - cleaner.handle = INVALID_HANDLE_VALUE; -#else - scoped_array out; - - try - { - fd = open(ufifo.c_str(), O_WRONLY); - idbassert_s(fd >= 0, "when opening data fifo for output"); - //if (fd < 0) - // throw runtime_error("when opening data fifo for output"); - - cleaner.fd = fd; - - uint64_t outlen = 512 * 1024 * 8; - unsigned int ol = outlen; - - out.reset(new char[outlen]); - - int crc = uncompressBlock(in.get(), ccount, reinterpret_cast(out.get()), ol); - - if (crc != ERR_OK) - outlen = 0; - else - outlen = ol; - - rrc = writen(fd, &outlen, 8); - idbassert_s(rrc == 8, "when writing len to data fifo"); - - rrc = writen(fd, out.get(), outlen); - idbassert_s(rrc == (ssize_t)outlen, "when writing data to data fifo"); - - close(fd); - cleaner.fd = -1; - } - catch (std::exception& ) - { - //There was an error writing the uncompressed data back to PrimProc. Cleanup by - //unwinding the stack - return; - } - -#endif -} - -#ifndef _MSC_VER -void cleanupFifos() -{ - //find all existing fifos and try get rid of them.... - DIR* dirp = 0; - struct dirent* direntp = 0; - char fifoname[PATH_MAX]; - int fd = -1; - - dirp = opendir("/tmp"); - strcpy(fifoname, "/tmp/"); - - direntp = readdir(dirp); - - while (direntp != 0) - { - if (memcmp(direntp->d_name, "cdatafifo", 9) == 0) - { - strcpy(&fifoname[5], direntp->d_name); - fd = open(fifoname, O_RDONLY | O_NONBLOCK); - - //opening and closing this fifo will cause PP to unblock and retry - if (fd >= 0) - { - close(fd); - } - else - { - fd = open(fifoname, O_WRONLY | O_NONBLOCK); - - if (fd >= 0) - close(fd); - } - } - - direntp = readdir(dirp); - } - - closedir(dirp); -} -#endif - -} - -int main(int argc, char** argv) -{ - int c; - - PortNo = 0; - char* p = getenv("IDB_DECOMSVR_PORT"); - - if (p && *p) - PortNo = atoi(p); - - if (PortNo <= 0) - PortNo = 9199; - -#ifdef _MSC_VER - ListenSock = INVALID_SOCKET; -#else - ListenSock = -1; -#endif - opterr = 0; - - while ((c = getopt(argc, argv, "p:")) != -1) - switch (c) - { - case 'p': - PortNo = atoi(optarg); - break; - - case '?': - default: - break; - } - - if (!serverInit()) - { - log("Could not initialize the Decompression Server!"); - //cerr << "Could not initialize the Decompression Server!" << endl; - return 1; - } - - initCtlFifo(); - -#ifndef _MSC_VER - cleanupFifos(); -#endif - - DecomMessage m; - - for (;;) - { -#ifdef _MSC_VER - SOCKET dataSock = INVALID_SOCKET; -#else - int dataSock = -1; -#endif - dataSock = accept(ListenSock, 0, 0); -#ifdef _MSC_VER - idbassert_s(dataSock != INVALID_SOCKET, string("socket accept error: ") + strerror(errno)); - //if (dataSock == INVALID_SOCKET) - // throw runtime_error(string("socket accept error: ") + strerror(errno)); -#else - //if (dataSock < 0) - idbassert_s(dataSock >= 0, string("socket accept error: ") + strerror(errno)); -#endif - m = getNextMsg(dataSock); - shutdown(dataSock, SHUT_RDWR); -#ifdef _MSC_VER - closesocket(dataSock); -#else - close(dataSock); -#endif - - if (m.isValid) - { - ThreadFunc tf(m); -#ifdef SINGLE_THREADED - tf(); -#else - thread t(tf); -#endif - } - else - idbassert_s(0, "Invalid msg"); - - //cerr << "Invalid msg" << endl; - } - - return 0; -} - diff --git a/oam/etc/ProcessConfig.xml b/oam/etc/ProcessConfig.xml index 8a0c3618f..8088135bb 100644 --- a/oam/etc/ProcessConfig.xml +++ b/oam/etc/ProcessConfig.xml @@ -58,16 +58,6 @@ LOADSHARE off - - DecomSvr - pm - /$INSTALLDIR/bin/DecomSvr - 2 - 15 - - LOADSHARE - off - PrimProc pm diff --git a/oam/etc/ProcessConfig.xml.singleserver b/oam/etc/ProcessConfig.xml.singleserver index b01ca5585..9c543ae38 100644 --- a/oam/etc/ProcessConfig.xml.singleserver +++ b/oam/etc/ProcessConfig.xml.singleserver @@ -58,16 +58,6 @@ LOADSHARE off - - DecomSvr - pm - /usr/local/mariadb/columnstore/bin/DecomSvr - 2 - 15 - - LOADSHARE - off - PrimProc pm diff --git a/procmon/processmonitor.cpp b/procmon/processmonitor.cpp index 30b239ec1..823060ad2 100644 --- a/procmon/processmonitor.cpp +++ b/procmon/processmonitor.cpp @@ -2730,20 +2730,6 @@ pid_t ProcessMonitor::startProcess(string processModuleType, string processName, return oam::API_MINOR_FAILURE; } - if (processLocation.find("DecomSvr") != string::npos) - { - // DecomSvr app is special - - sleep(1); - //record the process information into processList - config.buildList(processModuleType, processName, processLocation, arg_list, - launchID, newProcessID, oam::ACTIVE, BootLaunch, RunType, - DepProcessName, DepModuleName, LogFile); - - //Update Process Status: Mark Process oam::ACTIVE state - updateProcessInfo(processName, oam::ACTIVE, newProcessID); - } - //FYI - NEEDS TO STAY HERE TO HAVE newProcessID //record the process information into processList @@ -2797,14 +2783,6 @@ pid_t ProcessMonitor::startProcess(string processModuleType, string processName, close(i); } - // open STDIN, STDOUT & STDERR for trapDaemon and DecomSvr - if (processName == "DecomSvr" ) - { - open("/dev/null", O_RDONLY); //Should be fd 0 - open("/dev/null", O_WRONLY); //Should be fd 1 - open("/dev/null", O_WRONLY); //Should be fd 2 - } - else { int fd; fd = open("/dev/null", O_RDONLY);