diff --git a/cpackEngineRPM.cmake b/cpackEngineRPM.cmake index d26a1d98f..32552e855 100644 --- a/cpackEngineRPM.cmake +++ b/cpackEngineRPM.cmake @@ -327,6 +327,9 @@ SET(CPACK_RPM_storage-engine_USER_FILELIST "/usr/local/mariadb/columnstore/lib/is_columnstore_tables.so" "/usr/local/mariadb/columnstore/lib/is_columnstore_tables.so.1" "/usr/local/mariadb/columnstore/lib/is_columnstore_tables.so.1.0.0" +"/usr/local/mariadb/columnstore/lib/is_columnstore_files.so" +"/usr/local/mariadb/columnstore/lib/is_columnstore_files.so.1" +"/usr/local/mariadb/columnstore/lib/is_columnstore_files.so.1.0.0" "/usr/local/mariadb/columnstore/mysql/mysql-Columnstore" "/usr/local/mariadb/columnstore/mysql/install_calpont_mysql.sh" "/usr/local/mariadb/columnstore/mysql/syscatalog_mysql.sql" @@ -335,6 +338,7 @@ SET(CPACK_RPM_storage-engine_USER_FILELIST "/usr/local/mariadb/columnstore/mysql/calsetuserpriority.sql" "/usr/local/mariadb/columnstore/mysql/calremoveuserpriority.sql" "/usr/local/mariadb/columnstore/mysql/calshowprocesslist.sql" +"/usr/local/mariadb/columnstore/mysql/columnstore_info.sql" ${ignored}) diff --git a/dbcon/mysql/CMakeLists.txt b/dbcon/mysql/CMakeLists.txt index 7d9b012ca..d478843b0 100644 --- a/dbcon/mysql/CMakeLists.txt +++ b/dbcon/mysql/CMakeLists.txt @@ -62,12 +62,25 @@ target_link_libraries(is_columnstore_extents ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LI set_target_properties(is_columnstore_extents PROPERTIES PREFIX "") set_target_properties(is_columnstore_extents PROPERTIES VERSION 1.0.0 SOVERSION 1) -install(TARGETS calmysql is_columnstore_tables is_columnstore_columns is_columnstore_extents DESTINATION ${ENGINE_LIBDIR} COMPONENT storage-engine) +SET ( is_columnstore_files_SRCS + is_columnstore_files.cpp + ) +add_library(is_columnstore_files SHARED ${is_columnstore_files_SRCS}) + +target_link_libraries(is_columnstore_files ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} threadpool) + +# Don't prepend .so file with 'lib' +set_target_properties(is_columnstore_files PROPERTIES PREFIX "") +set_target_properties(is_columnstore_files PROPERTIES VERSION 1.0.0 SOVERSION 1) + + +install(TARGETS calmysql is_columnstore_tables is_columnstore_columns is_columnstore_extents is_columnstore_files DESTINATION ${ENGINE_LIBDIR} COMPONENT storage-engine) install(FILES syscatalog_mysql.sql dumpcat_mysql.sql calsetuserpriority.sql calremoveuserpriority.sql calshowprocesslist.sql + columnstore_info.sql my.cnf DESTINATION ${ENGINE_MYSQLDIR} COMPONENT storage-engine) install(PROGRAMS install_calpont_mysql.sh mysql-Columnstore dumpcat.pl diff --git a/dbcon/mysql/columnstore_info.sql b/dbcon/mysql/columnstore_info.sql new file mode 100644 index 000000000..b545ce647 --- /dev/null +++ b/dbcon/mysql/columnstore_info.sql @@ -0,0 +1,50 @@ +CREATE DATABASE columnstore_info; +USE columnstore_info; + +DELIMITER // +CREATE FUNCTION `format_filesize`(filesize FLOAT) RETURNS varchar(20) CHARSET utf8 DETERMINISTIC +BEGIN + +DECLARE n INT DEFAULT 1; + +LOOP + IF filesize < 1024 THEN + RETURN concat(round(filesize, 2), ' ', elt(n, 'Bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB', 'BB')); + END IF; + SET filesize = filesize / 1024; + SET n = n + 1; +END LOOP; + +END // + + +CREATE PROCEDURE total_usage () +BEGIN + SELECT + (SELECT format_filesize(sum(data_size)) TOTAL_DATA_SIZE FROM INFORMATION_SCHEMA.COLUMNSTORE_EXTENTS) TOTAL_DATA_SIZE, + (SELECT format_filesize(sum(file_size)) TOTAL_DISK_USAGE FROM INFORMATION_SCHEMA.COLUMNSTORE_FILES) TOTAL_DISK_USAGE; +END // + +CREATE PROCEDURE table_usage (IN t_name char(64)) +BEGIN + IF t_name IS NOT NULL THEN + SELECT TABLE_NAME, format_filesize(sum(cf.file_size)) DATA_DISK_USAGE, format_filesize(sum(IFNULL(ccf.file_size, 0))) DICT_DISK_USAGE, format_filesize(sum(cf.file_size) + sum(IFNULL(ccf.file_size, 0))) TOTAL_USAGE FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS cc +JOIN INFORMATION_SCHEMA.COLUMNSTORE_FILES cf ON cc.object_id = cf.object_id +LEFT JOIN INFORMATION_SCHEMA.COLUMNSTORE_FILES ccf ON cc.dictionary_object_id = ccf.object_id +WHERE table_name = t_name GROUP BY table_name; + ELSE + SELECT TABLE_NAME, format_filesize(sum(cf.file_size)) DATA_DISK_USAGE, format_filesize(sum(IFNULL(ccf.file_size, 0))) DICT_DISK_USAGE, format_filesize(sum(cf.file_size) + sum(IFNULL(ccf.file_size, 0))) TOTAL_USAGE FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS cc +JOIN INFORMATION_SCHEMA.COLUMNSTORE_FILES cf ON cc.object_id = cf.object_id +LEFT JOIN INFORMATION_SCHEMA.COLUMNSTORE_FILES ccf ON cc.dictionary_object_id = ccf.object_id +GROUP BY table_name; + END IF; +END // + +CREATE PROCEDURE compression_ratio() +BEGIN +SELECT CONCAT(((1 - (sum(data_size) / sum(compressed_data_size))) * 100), '%') COMPRESSION_RATIO FROM INFORMATION_SCHEMA.COLUMNSTORE_EXTENTS ce +JOIN INFORMATION_SCHEMA.COLUMNSTORE_FILES cf ON ce.object_id = cf.object_id +WHERE compressed_data_size IS NOT NULL; +END // + +DELIMITER ; diff --git a/dbcon/mysql/install_calpont_mysql.sh b/dbcon/mysql/install_calpont_mysql.sh index d04bc3487..9b55f9448 100755 --- a/dbcon/mysql/install_calpont_mysql.sh +++ b/dbcon/mysql/install_calpont_mysql.sh @@ -33,6 +33,7 @@ INSTALL PLUGIN infinidb SONAME 'libcalmysql.so'; INSTALL PLUGIN columnstore_tables SONAME 'is_columnstore_tables.so'; INSTALL PLUGIN columnstore_columns SONAME 'is_columnstore_columns.so'; INSTALL PLUGIN columnstore_extents SONAME 'is_columnstore_extents.so'; +INSTALL PLUGIN columnstore_files SONAME 'is_columnstore_files.so'; -- these are deprecated names DELETE FROM mysql.func WHERE name='caldisablepartition'; DELETE FROM mysql.func WHERE name='caldroppartition'; @@ -133,6 +134,7 @@ $installdir/mysql/bin/mysql --defaults-file=$df --user=root $pwprompt mysql 2>/d $installdir/mysql/bin/mysql --defaults-file=$df --user=root $pwprompt mysql 2>/dev/null <$installdir/mysql/calsetuserpriority.sql $installdir/mysql/bin/mysql --defaults-file=$df --user=root $pwprompt mysql 2>/dev/null <$installdir/mysql/calremoveuserpriority.sql $installdir/mysql/bin/mysql --defaults-file=$df --user=root $pwprompt mysql 2>/dev/null <$installdir/mysql/calshowprocesslist.sql +$installdir/mysql/bin/mysql --defaults-file=$df --user=root $pwprompt mysql 2>/dev/null <$installdir/mysql/columnstore_info.sql sed -i 's/infinidb_compression_type=1/infinidb_compression_type=2/' $installdir/mysql/my.cnf >/dev/null 2>&1 diff --git a/dbcon/mysql/is_columnstore_extents.cpp b/dbcon/mysql/is_columnstore_extents.cpp index fa5835888..1d1098eda 100644 --- a/dbcon/mysql/is_columnstore_extents.cpp +++ b/dbcon/mysql/is_columnstore_extents.cpp @@ -24,13 +24,9 @@ #include #include -#include #include "dbrm.h" #include "objectidmanager.h" -#include "we_convertor.h" -#include "we_define.h" -#include "IDBPolicy.h" -#include "we_config.h" + // Required declaration as it isn't in a MairaDB include bool schema_table_store_record(THD *thd, TABLE *table); @@ -53,8 +49,6 @@ ST_FIELD_INFO is_columnstore_extents_fields[] = {"STATE", 64, MYSQL_TYPE_STRING, 0, 0, 0, 0}, // 13 {"STATUS", 64, MYSQL_TYPE_STRING, 0, 0, 0, 0}, // 14 {"DATA_SIZE", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0}, // 15 - {"FILENAME", 1024, MYSQL_TYPE_STRING, 0, 0, 0, 0}, // 16 - {"FILE_SIZE", 19, MYSQL_TYPE_LONGLONG, 0, MY_I_S_MAYBE_NULL, 0, 0}, // 17 {0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0} }; @@ -65,9 +59,6 @@ static int is_columnstore_extents_fill(THD *thd, TABLE_LIST *tables, COND *cond) std::vector entries; std::vector::iterator iter; std::vector::iterator end; - char oidDirName[WriteEngine::FILE_NAME_SIZE]; - char fullFileName[WriteEngine::FILE_NAME_SIZE]; - char dbDir[WriteEngine::MAX_DB_DIR_LEVEL][WriteEngine::MAX_DB_DIR_NAME_SIZE]; BRM::DBRM *emp = new BRM::DBRM(); if (!emp || !emp->isDBRMReady()) @@ -78,7 +69,7 @@ static int is_columnstore_extents_fill(THD *thd, TABLE_LIST *tables, COND *cond) execplan::ObjectIDManager oidm; BRM::OID_t MaxOID = oidm.size(); - for(BRM::OID_t oid = 0; oid <= MaxOID; oid++) + for(BRM::OID_t oid = 3000; oid <= MaxOID; oid++) { emp->getExtents(oid, entries, false, false, true); if (entries.size() == 0) @@ -161,31 +152,20 @@ static int is_columnstore_extents_fill(THD *thd, TABLE_LIST *tables, COND *cond) default: table->field[14]->store("Unknown", strlen("Unknown"), cs); } - table->field[15]->store((iter->HWM + 1) * 8192); - - WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum); - snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", WriteEngine::Config::getDBRootByNum(iter->dbRoot).c_str(), oidDirName); - table->field[16]->store(fullFileName, strlen(fullFileName), cs); - - if (idbdatafile::IDBPolicy::exists(fullFileName)) - { - table->field[17]->set_notnull(); - table->field[17]->store(idbdatafile::IDBPolicy::size(fullFileName)); - } - else - { - table->field[17]->set_null(); - } + table->field[15]->store((iter->HWM - iter->blockOffset + 1) * 8192); if (schema_table_store_record(thd, table)) + { + delete emp; return 1; + } iter++; } } - + delete emp; return 0; } diff --git a/dbcon/mysql/is_columnstore_files.cpp b/dbcon/mysql/is_columnstore_files.cpp new file mode 100644 index 000000000..b39be0ee6 --- /dev/null +++ b/dbcon/mysql/is_columnstore_files.cpp @@ -0,0 +1,163 @@ +/* c-basic-offset: 4; tab-width: 4; indent-tabs-mode: nil + * vi: set shiftwidth=4 tabstop=4 expandtab: + * :indentSize=4:tabSize=4:noTabs=true: + * + * Copyright (C) 2016 MariaDB Corporaton + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; version 2 of + * the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + * MA 02110-1301, USA. + */ + +#include "idb_mysql.h" + +#include "dbrm.h" +#include "objectidmanager.h" +#include "we_convertor.h" +#include "we_define.h" +#include "IDBPolicy.h" +#include "we_config.h" +#include "we_brm.h" + +// Required declaration as it isn't in a MairaDB include +bool schema_table_store_record(THD *thd, TABLE *table); + +ST_FIELD_INFO is_columnstore_files_fields[] = +{ + {"OBJECT_ID", 11, MYSQL_TYPE_LONG, 0, 0, 0, 0}, + {"SEGMENT_ID", 11, MYSQL_TYPE_LONG, 0, 0, 0, 0}, + {"PARTITION_ID", 11, MYSQL_TYPE_LONG, 0, 0, 0, 0}, + {"FILENAME", 1024, MYSQL_TYPE_STRING, 0, 0, 0, 0}, + {"FILE_SIZE", 19, MYSQL_TYPE_LONGLONG, 0, MY_I_S_MAYBE_NULL, 0, 0}, + {"COMPRESSED_DATA_SIZE", 19, MYSQL_TYPE_LONGLONG, 0, MY_I_S_MAYBE_NULL, 0, 0}, + {0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0} +}; + + +static bool is_columnstore_files_get_entries(THD *thd, TABLE_LIST *tables, BRM::OID_t oid, std::vector &entries) +{ + CHARSET_INFO *cs = system_charset_info; + TABLE *table = tables->table; + + char oidDirName[WriteEngine::FILE_NAME_SIZE]; + char fullFileName[WriteEngine::FILE_NAME_SIZE]; + char dbDir[WriteEngine::MAX_DB_DIR_LEVEL][WriteEngine::MAX_DB_DIR_NAME_SIZE]; + WriteEngine::Config config; + config.initConfigCache(); + + std::vector::const_iterator iter = entries.begin(); + while ( iter != entries.end() ) //organize extents into files + { + // Don't include files more than once at different block offsets + if (iter->blockOffset > 0) + { + iter++; + continue; + } + table->field[0]->store(oid); + table->field[1]->store(iter->segmentNum); + table->field[2]->store(iter->partitionNum); + + WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum); + snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", WriteEngine::Config::getDBRootByNum(iter->dbRoot).c_str(), oidDirName); + table->field[3]->store(fullFileName, strlen(fullFileName), cs); + + if (idbdatafile::IDBPolicy::exists(fullFileName)) + { + table->field[4]->set_notnull(); + table->field[4]->store(idbdatafile::IDBPolicy::size(fullFileName)); + off64_t comp_size; + if ((comp_size = idbdatafile::IDBPolicy::compressedSize(fullFileName)) != -1) + { + table->field[5]->set_notnull(); + table->field[5]->store(comp_size); + } + else + { + table->field[5]->set_null(); + } + } + else + { + table->field[4]->set_null(); + table->field[5]->set_null(); + } + + if (schema_table_store_record(thd, table)) + return 1; + iter++; + } + return 0; +} + +static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) +{ + BRM::DBRM *emp = new BRM::DBRM(); + std::vector entries; + + if (!emp || !emp->isDBRMReady()) + { + return 1; + } + + execplan::ObjectIDManager oidm; + BRM::OID_t MaxOID = oidm.size(); + + for(BRM::OID_t oid = 3000; oid <= MaxOID; oid++) + { + emp->getExtents(oid, entries, false, false, true); + if (entries.size() == 0) + continue; + + if (is_columnstore_files_get_entries(thd, tables, oid, entries)) + { + delete emp; + return 1; + } + + } + + delete emp; + return 0; +} + +static int is_columnstore_files_plugin_init(void *p) +{ + ST_SCHEMA_TABLE *schema = (ST_SCHEMA_TABLE*) p; + schema->fields_info = is_columnstore_files_fields; + schema->fill_table = is_columnstore_files_fill; + return 0; +} + +static struct st_mysql_information_schema is_columnstore_files_plugin_version = +{ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION }; + +maria_declare_plugin(is_columnstore_files_plugin) +{ + MYSQL_INFORMATION_SCHEMA_PLUGIN, + &is_columnstore_files_plugin_version, + "COLUMNSTORE_FILES", + "MariaDB Corporaton", + "An information schema plugin to list ColumnStore filess", + PLUGIN_LICENSE_GPL, + is_columnstore_files_plugin_init, + //is_columnstore_files_plugin_deinit, + NULL, + 0x0100, + NULL, + NULL, + "1.0", + MariaDB_PLUGIN_MATURITY_STABLE +} +maria_declare_plugin_end; diff --git a/oam/install_scripts/post-mysqld-install b/oam/install_scripts/post-mysqld-install index 92ea035fa..546945b22 100755 --- a/oam/install_scripts/post-mysqld-install +++ b/oam/install_scripts/post-mysqld-install @@ -85,12 +85,14 @@ if [ -f $installdir/lib/libcalmysql.so.1.0.0 ]; then is_columnstore_tables=$installdir/lib/is_columnstore_tables.so.1.0.0 is_columnstore_columns=$installdir/lib/is_columnstore_columns.so.1.0.0 is_columnstore_extents=$installdir/lib/is_columnstore_extents.so.1.0.0 + is_columnstore_files=$installdir/lib/is_columnstore_files.so.1.0.0 elif [ -f $installdir/lib/libcalmysql.so.1 ]; then libcalmysql=$installdir/lib/libcalmysql.so.1 libudfsdk=$installdir/lib/libudf_mysql.so.1 is_columnstore_tables=$installdir/lib/is_columnstore_tables.so.1 is_columnstore_columns=$installdir/lib/is_columnstore_columns.so.1 is_columnstore_extents=$installdir/lib/is_columnstore_extents.so.1 + is_columnstore_files=$installdir/lib/is_columnstore_files.so.1 else libcalmysql= fi @@ -104,6 +106,7 @@ if [ -n "$libcalmysql" ]; then ln -sf $is_columnstore_tables is_columnstore_tables.so ln -sf $is_columnstore_columns is_columnstore_columns.so ln -sf $is_columnstore_extents is_columnstore_extents.so + ln -sf $is_columnstore_files is_columnstore_files.so fi # cleanup previous install mysql replication files diff --git a/utils/idbdatafile/IDBFileSystem.h b/utils/idbdatafile/IDBFileSystem.h index 2b5c0afbf..d0cc55613 100644 --- a/utils/idbdatafile/IDBFileSystem.h +++ b/utils/idbdatafile/IDBFileSystem.h @@ -84,6 +84,13 @@ public: */ virtual off64_t size(const char* path) const = 0; + /** + * compressedSize() returns the decompressed size of the file + * speicified by path. + * Returns the size on success, -1 on error + */ + virtual off64_t compressedSize(const char* path) const = 0; + /** * exists() checks for the existence of a particular path. * Returns true if exists, false otherwise. diff --git a/utils/idbdatafile/IDBPolicy.h b/utils/idbdatafile/IDBPolicy.h index 81891104c..433bde726 100644 --- a/utils/idbdatafile/IDBPolicy.h +++ b/utils/idbdatafile/IDBPolicy.h @@ -115,6 +115,7 @@ public: */ static int mkdir(const char *pathname); static off64_t size(const char* path); + static off64_t compressedSize(const char* path); static int remove(const char *pathname); static int rename(const char *oldpath, const char *newpath); static bool exists(const char* pathname); @@ -175,6 +176,12 @@ off64_t IDBPolicy::size(const char* path) return IDBPolicy::getFs( path ).size( path ); } +inline +off64_t IDBPolicy::compressedSize(const char* path) +{ + return IDBPolicy::getFs( path ).compressedSize( path ); +} + inline int IDBPolicy::remove(const char *pathname) { diff --git a/utils/idbdatafile/PosixFileSystem.cpp b/utils/idbdatafile/PosixFileSystem.cpp index 38822d2fa..59685c1f9 100644 --- a/utils/idbdatafile/PosixFileSystem.cpp +++ b/utils/idbdatafile/PosixFileSystem.cpp @@ -17,6 +17,7 @@ #include "PosixFileSystem.h" #include "IDBLogger.h" +#include "idbcompress.h" #include #include @@ -128,6 +129,86 @@ off64_t PosixFileSystem::size(const char* path) const return ret; } +size_t readFillBuffer( + idbdatafile::IDBDataFile* pFile, + char* buffer, + size_t bytesReq) +{ + char* pBuf = buffer; + ssize_t nBytes; + size_t bytesToRead = bytesReq; + size_t totalBytesRead = 0; + + while (1) + { + nBytes = pFile->read(pBuf, bytesToRead); + if (nBytes > 0) + totalBytesRead += nBytes; + else + break; + + if ((size_t)nBytes == bytesToRead) + break; + + pBuf += nBytes; + bytesToRead = bytesToRead - (size_t)nBytes; + } + + return totalBytesRead; +} + +off64_t PosixFileSystem::compressedSize(const char *path) const +{ + IDBDataFile *pFile = 0; + size_t nBytes; + off64_t dataSize = 0; + + try + { + pFile = IDBDataFile::open(IDBDataFile::BUFFERED, path, "r", 0); + + if (!pFile) + { + return -1; + } + + compress::IDBCompressInterface decompressor; + + char hdr1[compress::IDBCompressInterface::HDR_BUF_LEN]; + nBytes = readFillBuffer( pFile,hdr1,compress::IDBCompressInterface::HDR_BUF_LEN); + if ( nBytes != compress::IDBCompressInterface::HDR_BUF_LEN ) + { + return -1; + } + + int64_t ptrSecSize = decompressor.getHdrSize(hdr1) - compress::IDBCompressInterface::HDR_BUF_LEN; + char* hdr2 = new char[ptrSecSize]; + nBytes = readFillBuffer( pFile,hdr2,ptrSecSize); + if ( (int64_t)nBytes != ptrSecSize ) + { + return -1; + } + + compress::CompChunkPtrList chunkPtrs; + int rc = decompressor.getPtrList(hdr2, ptrSecSize, chunkPtrs); + delete[] hdr2; + if (rc != 0) + { + return -1; + } + + unsigned k = chunkPtrs.size(); + // last header's offset + length will be the data bytes + dataSize = chunkPtrs[k-1].first + chunkPtrs[k-1].second; + delete pFile; + return dataSize; + } + catch (...) + { + return -1; + } +} + bool PosixFileSystem::exists(const char *pathname) const { boost::filesystem::path dirPath(pathname); diff --git a/utils/idbdatafile/PosixFileSystem.h b/utils/idbdatafile/PosixFileSystem.h index c5c53d162..9c22f805e 100644 --- a/utils/idbdatafile/PosixFileSystem.h +++ b/utils/idbdatafile/PosixFileSystem.h @@ -31,6 +31,7 @@ public: /* virtual */ int mkdir(const char *pathname); /* virtual */ off64_t size(const char* path) const; + /* virtual */ off64_t compressedSize(const char *path) const; /* virtual */ int remove(const char *pathname); /* virtual */ int rename(const char *oldpath, const char *newpath); /* virtual */ bool exists(const char* pathname) const; diff --git a/utils/idbhdfs/hdfs-shared/HdfsFileSystem.cpp b/utils/idbhdfs/hdfs-shared/HdfsFileSystem.cpp index 11ef8bee9..1e5626147 100644 --- a/utils/idbhdfs/hdfs-shared/HdfsFileSystem.cpp +++ b/utils/idbhdfs/hdfs-shared/HdfsFileSystem.cpp @@ -31,6 +31,7 @@ #include "HdfsFsCache.h" #include "IDBLogger.h" +#include "idbcompress.h" #include #include @@ -119,6 +120,87 @@ off64_t HdfsFileSystem::size(const char* path) const return retval; } +size_t readFillBuffer( + idbdatafile::IDBDataFile* pFile, + char* buffer, + size_t bytesReq) +{ + char* pBuf = buffer; + ssize_t nBytes; + size_t bytesToRead = bytesReq; + size_t totalBytesRead = 0; + + while (1) + { + nBytes = pFile->read(pBuf, bytesToRead); + if (nBytes > 0) + totalBytesRead += nBytes; + else + break; + + if ((size_t)nBytes == bytesToRead) + break; + + pBuf += nBytes; + bytesToRead = bytesToRead - (size_t)nBytes; + } + + return totalBytesRead; +} + + +off64_t HdfsFileSystem::compressedSize(const char *path) const +{ + IDBDataFile *pFile = 0; + size_t nBytes; + off64_t dataSize = 0; + + try + { + pFile = IDBDataFile::open(IDBDataFile::HDFS, path, "r", 0); + + if (!pFile) + { + return -1; + } + + compress::IDBCompressInterface decompressor; + + char hdr1[compress::IDBCompressInterface::HDR_BUF_LEN]; + nBytes = readFillBuffer( pFile,hdr1,compress::IDBCompressInterface::HDR_BUF_LEN); + if ( nBytes != compress::IDBCompressInterface::HDR_BUF_LEN ) + { + return -1; + } + + int64_t ptrSecSize = decompressor.getHdrSize(hdr1) - compress::IDBCompressInterface::HDR_BUF_LEN; + char* hdr2 = new char[ptrSecSize]; + nBytes = readFillBuffer( pFile,hdr2,ptrSecSize); + if ( (int64_t)nBytes != ptrSecSize ) + { + return -1; + } + + compress::CompChunkPtrList chunkPtrs; + int rc = decompressor.getPtrList(hdr2, ptrSecSize, chunkPtrs); + delete[] hdr2; + if (rc != 0) + { + return -1; + } + + unsigned k = chunkPtrs.size(); + // last header's offset + length will be the data bytes + dataSize = chunkPtrs[k-1].first + chunkPtrs[k-1].second; + delete pFile; + return dataSize; + } + catch (...) + { + return -1; + } +} + bool HdfsFileSystem::exists(const char *pathname) const { int ret = hdfsExists(m_fs,pathname); diff --git a/utils/idbhdfs/hdfs-shared/HdfsFileSystem.h b/utils/idbhdfs/hdfs-shared/HdfsFileSystem.h index 37c8a58c9..f0a80abe1 100644 --- a/utils/idbhdfs/hdfs-shared/HdfsFileSystem.h +++ b/utils/idbhdfs/hdfs-shared/HdfsFileSystem.h @@ -45,6 +45,7 @@ public: /* virtual */ int mkdir(const char *pathname); /* virtual */ off64_t size(const char* path) const; + /* virtual */ off64_t compressedSize(const char* path) const; /* virtual */ int remove(const char *pathname); /* virtual */ int rename(const char *oldpath, const char *newpath); /* virtual */ bool exists(const char* pathname) const;