You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-406 Improved Information Schema
* Add INFORMATION_SCHEMA.COLUMNSTORE_FILES which contains information about files * Remove file information from COLUMNSTORE_EXTENTS (due to above) * Hide columns with Object ID < 3000 (internal columns) * Fix bad calculation in data_size columns * Fix minor memory leak * Add compressedSize() function to IDBFileSystem to get the used file size for a compressed file * Add columnstore_info schema with utility stored procedures to access the information_schema tables
This commit is contained in:
@ -327,6 +327,9 @@ SET(CPACK_RPM_storage-engine_USER_FILELIST
|
||||
"/usr/local/mariadb/columnstore/lib/is_columnstore_tables.so"
|
||||
"/usr/local/mariadb/columnstore/lib/is_columnstore_tables.so.1"
|
||||
"/usr/local/mariadb/columnstore/lib/is_columnstore_tables.so.1.0.0"
|
||||
"/usr/local/mariadb/columnstore/lib/is_columnstore_files.so"
|
||||
"/usr/local/mariadb/columnstore/lib/is_columnstore_files.so.1"
|
||||
"/usr/local/mariadb/columnstore/lib/is_columnstore_files.so.1.0.0"
|
||||
"/usr/local/mariadb/columnstore/mysql/mysql-Columnstore"
|
||||
"/usr/local/mariadb/columnstore/mysql/install_calpont_mysql.sh"
|
||||
"/usr/local/mariadb/columnstore/mysql/syscatalog_mysql.sql"
|
||||
@ -335,6 +338,7 @@ SET(CPACK_RPM_storage-engine_USER_FILELIST
|
||||
"/usr/local/mariadb/columnstore/mysql/calsetuserpriority.sql"
|
||||
"/usr/local/mariadb/columnstore/mysql/calremoveuserpriority.sql"
|
||||
"/usr/local/mariadb/columnstore/mysql/calshowprocesslist.sql"
|
||||
"/usr/local/mariadb/columnstore/mysql/columnstore_info.sql"
|
||||
${ignored})
|
||||
|
||||
|
||||
|
@ -62,12 +62,25 @@ target_link_libraries(is_columnstore_extents ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LI
|
||||
set_target_properties(is_columnstore_extents PROPERTIES PREFIX "")
|
||||
set_target_properties(is_columnstore_extents PROPERTIES VERSION 1.0.0 SOVERSION 1)
|
||||
|
||||
install(TARGETS calmysql is_columnstore_tables is_columnstore_columns is_columnstore_extents DESTINATION ${ENGINE_LIBDIR} COMPONENT storage-engine)
|
||||
SET ( is_columnstore_files_SRCS
|
||||
is_columnstore_files.cpp
|
||||
)
|
||||
add_library(is_columnstore_files SHARED ${is_columnstore_files_SRCS})
|
||||
|
||||
target_link_libraries(is_columnstore_files ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} threadpool)
|
||||
|
||||
# Don't prepend .so file with 'lib'
|
||||
set_target_properties(is_columnstore_files PROPERTIES PREFIX "")
|
||||
set_target_properties(is_columnstore_files PROPERTIES VERSION 1.0.0 SOVERSION 1)
|
||||
|
||||
|
||||
install(TARGETS calmysql is_columnstore_tables is_columnstore_columns is_columnstore_extents is_columnstore_files DESTINATION ${ENGINE_LIBDIR} COMPONENT storage-engine)
|
||||
install(FILES syscatalog_mysql.sql
|
||||
dumpcat_mysql.sql
|
||||
calsetuserpriority.sql
|
||||
calremoveuserpriority.sql
|
||||
calshowprocesslist.sql
|
||||
columnstore_info.sql
|
||||
my.cnf
|
||||
DESTINATION ${ENGINE_MYSQLDIR} COMPONENT storage-engine)
|
||||
install(PROGRAMS install_calpont_mysql.sh mysql-Columnstore dumpcat.pl
|
||||
|
50
dbcon/mysql/columnstore_info.sql
Normal file
50
dbcon/mysql/columnstore_info.sql
Normal file
@ -0,0 +1,50 @@
|
||||
CREATE DATABASE columnstore_info;
|
||||
USE columnstore_info;
|
||||
|
||||
DELIMITER //
|
||||
CREATE FUNCTION `format_filesize`(filesize FLOAT) RETURNS varchar(20) CHARSET utf8 DETERMINISTIC
|
||||
BEGIN
|
||||
|
||||
DECLARE n INT DEFAULT 1;
|
||||
|
||||
LOOP
|
||||
IF filesize < 1024 THEN
|
||||
RETURN concat(round(filesize, 2), ' ', elt(n, 'Bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB', 'BB'));
|
||||
END IF;
|
||||
SET filesize = filesize / 1024;
|
||||
SET n = n + 1;
|
||||
END LOOP;
|
||||
|
||||
END //
|
||||
|
||||
|
||||
CREATE PROCEDURE total_usage ()
|
||||
BEGIN
|
||||
SELECT
|
||||
(SELECT format_filesize(sum(data_size)) TOTAL_DATA_SIZE FROM INFORMATION_SCHEMA.COLUMNSTORE_EXTENTS) TOTAL_DATA_SIZE,
|
||||
(SELECT format_filesize(sum(file_size)) TOTAL_DISK_USAGE FROM INFORMATION_SCHEMA.COLUMNSTORE_FILES) TOTAL_DISK_USAGE;
|
||||
END //
|
||||
|
||||
CREATE PROCEDURE table_usage (IN t_name char(64))
|
||||
BEGIN
|
||||
IF t_name IS NOT NULL THEN
|
||||
SELECT TABLE_NAME, format_filesize(sum(cf.file_size)) DATA_DISK_USAGE, format_filesize(sum(IFNULL(ccf.file_size, 0))) DICT_DISK_USAGE, format_filesize(sum(cf.file_size) + sum(IFNULL(ccf.file_size, 0))) TOTAL_USAGE FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS cc
|
||||
JOIN INFORMATION_SCHEMA.COLUMNSTORE_FILES cf ON cc.object_id = cf.object_id
|
||||
LEFT JOIN INFORMATION_SCHEMA.COLUMNSTORE_FILES ccf ON cc.dictionary_object_id = ccf.object_id
|
||||
WHERE table_name = t_name GROUP BY table_name;
|
||||
ELSE
|
||||
SELECT TABLE_NAME, format_filesize(sum(cf.file_size)) DATA_DISK_USAGE, format_filesize(sum(IFNULL(ccf.file_size, 0))) DICT_DISK_USAGE, format_filesize(sum(cf.file_size) + sum(IFNULL(ccf.file_size, 0))) TOTAL_USAGE FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS cc
|
||||
JOIN INFORMATION_SCHEMA.COLUMNSTORE_FILES cf ON cc.object_id = cf.object_id
|
||||
LEFT JOIN INFORMATION_SCHEMA.COLUMNSTORE_FILES ccf ON cc.dictionary_object_id = ccf.object_id
|
||||
GROUP BY table_name;
|
||||
END IF;
|
||||
END //
|
||||
|
||||
CREATE PROCEDURE compression_ratio()
|
||||
BEGIN
|
||||
SELECT CONCAT(((1 - (sum(data_size) / sum(compressed_data_size))) * 100), '%') COMPRESSION_RATIO FROM INFORMATION_SCHEMA.COLUMNSTORE_EXTENTS ce
|
||||
JOIN INFORMATION_SCHEMA.COLUMNSTORE_FILES cf ON ce.object_id = cf.object_id
|
||||
WHERE compressed_data_size IS NOT NULL;
|
||||
END //
|
||||
|
||||
DELIMITER ;
|
@ -33,6 +33,7 @@ INSTALL PLUGIN infinidb SONAME 'libcalmysql.so';
|
||||
INSTALL PLUGIN columnstore_tables SONAME 'is_columnstore_tables.so';
|
||||
INSTALL PLUGIN columnstore_columns SONAME 'is_columnstore_columns.so';
|
||||
INSTALL PLUGIN columnstore_extents SONAME 'is_columnstore_extents.so';
|
||||
INSTALL PLUGIN columnstore_files SONAME 'is_columnstore_files.so';
|
||||
-- these are deprecated names
|
||||
DELETE FROM mysql.func WHERE name='caldisablepartition';
|
||||
DELETE FROM mysql.func WHERE name='caldroppartition';
|
||||
@ -133,6 +134,7 @@ $installdir/mysql/bin/mysql --defaults-file=$df --user=root $pwprompt mysql 2>/d
|
||||
$installdir/mysql/bin/mysql --defaults-file=$df --user=root $pwprompt mysql 2>/dev/null <$installdir/mysql/calsetuserpriority.sql
|
||||
$installdir/mysql/bin/mysql --defaults-file=$df --user=root $pwprompt mysql 2>/dev/null <$installdir/mysql/calremoveuserpriority.sql
|
||||
$installdir/mysql/bin/mysql --defaults-file=$df --user=root $pwprompt mysql 2>/dev/null <$installdir/mysql/calshowprocesslist.sql
|
||||
$installdir/mysql/bin/mysql --defaults-file=$df --user=root $pwprompt mysql 2>/dev/null <$installdir/mysql/columnstore_info.sql
|
||||
|
||||
sed -i 's/infinidb_compression_type=1/infinidb_compression_type=2/' $installdir/mysql/my.cnf >/dev/null 2>&1
|
||||
|
||||
|
@ -24,13 +24,9 @@
|
||||
#include <vector>
|
||||
#include <limits>
|
||||
|
||||
#include <boost/shared_ptr.hpp>
|
||||
#include "dbrm.h"
|
||||
#include "objectidmanager.h"
|
||||
#include "we_convertor.h"
|
||||
#include "we_define.h"
|
||||
#include "IDBPolicy.h"
|
||||
#include "we_config.h"
|
||||
|
||||
|
||||
// Required declaration as it isn't in a MairaDB include
|
||||
bool schema_table_store_record(THD *thd, TABLE *table);
|
||||
@ -53,8 +49,6 @@ ST_FIELD_INFO is_columnstore_extents_fields[] =
|
||||
{"STATE", 64, MYSQL_TYPE_STRING, 0, 0, 0, 0}, // 13
|
||||
{"STATUS", 64, MYSQL_TYPE_STRING, 0, 0, 0, 0}, // 14
|
||||
{"DATA_SIZE", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0}, // 15
|
||||
{"FILENAME", 1024, MYSQL_TYPE_STRING, 0, 0, 0, 0}, // 16
|
||||
{"FILE_SIZE", 19, MYSQL_TYPE_LONGLONG, 0, MY_I_S_MAYBE_NULL, 0, 0}, // 17
|
||||
{0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0}
|
||||
};
|
||||
|
||||
@ -65,9 +59,6 @@ static int is_columnstore_extents_fill(THD *thd, TABLE_LIST *tables, COND *cond)
|
||||
std::vector<struct BRM::EMEntry> entries;
|
||||
std::vector<struct BRM::EMEntry>::iterator iter;
|
||||
std::vector<struct BRM::EMEntry>::iterator end;
|
||||
char oidDirName[WriteEngine::FILE_NAME_SIZE];
|
||||
char fullFileName[WriteEngine::FILE_NAME_SIZE];
|
||||
char dbDir[WriteEngine::MAX_DB_DIR_LEVEL][WriteEngine::MAX_DB_DIR_NAME_SIZE];
|
||||
|
||||
BRM::DBRM *emp = new BRM::DBRM();
|
||||
if (!emp || !emp->isDBRMReady())
|
||||
@ -78,7 +69,7 @@ static int is_columnstore_extents_fill(THD *thd, TABLE_LIST *tables, COND *cond)
|
||||
execplan::ObjectIDManager oidm;
|
||||
BRM::OID_t MaxOID = oidm.size();
|
||||
|
||||
for(BRM::OID_t oid = 0; oid <= MaxOID; oid++)
|
||||
for(BRM::OID_t oid = 3000; oid <= MaxOID; oid++)
|
||||
{
|
||||
emp->getExtents(oid, entries, false, false, true);
|
||||
if (entries.size() == 0)
|
||||
@ -161,31 +152,20 @@ static int is_columnstore_extents_fill(THD *thd, TABLE_LIST *tables, COND *cond)
|
||||
default:
|
||||
table->field[14]->store("Unknown", strlen("Unknown"), cs);
|
||||
}
|
||||
table->field[15]->store((iter->HWM + 1) * 8192);
|
||||
|
||||
WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum);
|
||||
snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", WriteEngine::Config::getDBRootByNum(iter->dbRoot).c_str(), oidDirName);
|
||||
table->field[16]->store(fullFileName, strlen(fullFileName), cs);
|
||||
|
||||
if (idbdatafile::IDBPolicy::exists(fullFileName))
|
||||
{
|
||||
table->field[17]->set_notnull();
|
||||
table->field[17]->store(idbdatafile::IDBPolicy::size(fullFileName));
|
||||
}
|
||||
else
|
||||
{
|
||||
table->field[17]->set_null();
|
||||
}
|
||||
table->field[15]->store((iter->HWM - iter->blockOffset + 1) * 8192);
|
||||
|
||||
if (schema_table_store_record(thd, table))
|
||||
{
|
||||
delete emp;
|
||||
return 1;
|
||||
}
|
||||
|
||||
iter++;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
delete emp;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
163
dbcon/mysql/is_columnstore_files.cpp
Normal file
163
dbcon/mysql/is_columnstore_files.cpp
Normal file
@ -0,0 +1,163 @@
|
||||
/* c-basic-offset: 4; tab-width: 4; indent-tabs-mode: nil
|
||||
* vi: set shiftwidth=4 tabstop=4 expandtab:
|
||||
* :indentSize=4:tabSize=4:noTabs=true:
|
||||
*
|
||||
* Copyright (C) 2016 MariaDB Corporaton
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; version 2 of
|
||||
* the License.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
* MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
#include "idb_mysql.h"
|
||||
|
||||
#include "dbrm.h"
|
||||
#include "objectidmanager.h"
|
||||
#include "we_convertor.h"
|
||||
#include "we_define.h"
|
||||
#include "IDBPolicy.h"
|
||||
#include "we_config.h"
|
||||
#include "we_brm.h"
|
||||
|
||||
// Required declaration as it isn't in a MairaDB include
|
||||
bool schema_table_store_record(THD *thd, TABLE *table);
|
||||
|
||||
ST_FIELD_INFO is_columnstore_files_fields[] =
|
||||
{
|
||||
{"OBJECT_ID", 11, MYSQL_TYPE_LONG, 0, 0, 0, 0},
|
||||
{"SEGMENT_ID", 11, MYSQL_TYPE_LONG, 0, 0, 0, 0},
|
||||
{"PARTITION_ID", 11, MYSQL_TYPE_LONG, 0, 0, 0, 0},
|
||||
{"FILENAME", 1024, MYSQL_TYPE_STRING, 0, 0, 0, 0},
|
||||
{"FILE_SIZE", 19, MYSQL_TYPE_LONGLONG, 0, MY_I_S_MAYBE_NULL, 0, 0},
|
||||
{"COMPRESSED_DATA_SIZE", 19, MYSQL_TYPE_LONGLONG, 0, MY_I_S_MAYBE_NULL, 0, 0},
|
||||
{0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0}
|
||||
};
|
||||
|
||||
|
||||
static bool is_columnstore_files_get_entries(THD *thd, TABLE_LIST *tables, BRM::OID_t oid, std::vector<struct BRM::EMEntry> &entries)
|
||||
{
|
||||
CHARSET_INFO *cs = system_charset_info;
|
||||
TABLE *table = tables->table;
|
||||
|
||||
char oidDirName[WriteEngine::FILE_NAME_SIZE];
|
||||
char fullFileName[WriteEngine::FILE_NAME_SIZE];
|
||||
char dbDir[WriteEngine::MAX_DB_DIR_LEVEL][WriteEngine::MAX_DB_DIR_NAME_SIZE];
|
||||
WriteEngine::Config config;
|
||||
config.initConfigCache();
|
||||
|
||||
std::vector<struct BRM::EMEntry>::const_iterator iter = entries.begin();
|
||||
while ( iter != entries.end() ) //organize extents into files
|
||||
{
|
||||
// Don't include files more than once at different block offsets
|
||||
if (iter->blockOffset > 0)
|
||||
{
|
||||
iter++;
|
||||
continue;
|
||||
}
|
||||
table->field[0]->store(oid);
|
||||
table->field[1]->store(iter->segmentNum);
|
||||
table->field[2]->store(iter->partitionNum);
|
||||
|
||||
WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum);
|
||||
snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", WriteEngine::Config::getDBRootByNum(iter->dbRoot).c_str(), oidDirName);
|
||||
table->field[3]->store(fullFileName, strlen(fullFileName), cs);
|
||||
|
||||
if (idbdatafile::IDBPolicy::exists(fullFileName))
|
||||
{
|
||||
table->field[4]->set_notnull();
|
||||
table->field[4]->store(idbdatafile::IDBPolicy::size(fullFileName));
|
||||
off64_t comp_size;
|
||||
if ((comp_size = idbdatafile::IDBPolicy::compressedSize(fullFileName)) != -1)
|
||||
{
|
||||
table->field[5]->set_notnull();
|
||||
table->field[5]->store(comp_size);
|
||||
}
|
||||
else
|
||||
{
|
||||
table->field[5]->set_null();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
table->field[4]->set_null();
|
||||
table->field[5]->set_null();
|
||||
}
|
||||
|
||||
if (schema_table_store_record(thd, table))
|
||||
return 1;
|
||||
iter++;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond)
|
||||
{
|
||||
BRM::DBRM *emp = new BRM::DBRM();
|
||||
std::vector<struct BRM::EMEntry> entries;
|
||||
|
||||
if (!emp || !emp->isDBRMReady())
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
execplan::ObjectIDManager oidm;
|
||||
BRM::OID_t MaxOID = oidm.size();
|
||||
|
||||
for(BRM::OID_t oid = 3000; oid <= MaxOID; oid++)
|
||||
{
|
||||
emp->getExtents(oid, entries, false, false, true);
|
||||
if (entries.size() == 0)
|
||||
continue;
|
||||
|
||||
if (is_columnstore_files_get_entries(thd, tables, oid, entries))
|
||||
{
|
||||
delete emp;
|
||||
return 1;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
delete emp;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int is_columnstore_files_plugin_init(void *p)
|
||||
{
|
||||
ST_SCHEMA_TABLE *schema = (ST_SCHEMA_TABLE*) p;
|
||||
schema->fields_info = is_columnstore_files_fields;
|
||||
schema->fill_table = is_columnstore_files_fill;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static struct st_mysql_information_schema is_columnstore_files_plugin_version =
|
||||
{ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION };
|
||||
|
||||
maria_declare_plugin(is_columnstore_files_plugin)
|
||||
{
|
||||
MYSQL_INFORMATION_SCHEMA_PLUGIN,
|
||||
&is_columnstore_files_plugin_version,
|
||||
"COLUMNSTORE_FILES",
|
||||
"MariaDB Corporaton",
|
||||
"An information schema plugin to list ColumnStore filess",
|
||||
PLUGIN_LICENSE_GPL,
|
||||
is_columnstore_files_plugin_init,
|
||||
//is_columnstore_files_plugin_deinit,
|
||||
NULL,
|
||||
0x0100,
|
||||
NULL,
|
||||
NULL,
|
||||
"1.0",
|
||||
MariaDB_PLUGIN_MATURITY_STABLE
|
||||
}
|
||||
maria_declare_plugin_end;
|
@ -85,12 +85,14 @@ if [ -f $installdir/lib/libcalmysql.so.1.0.0 ]; then
|
||||
is_columnstore_tables=$installdir/lib/is_columnstore_tables.so.1.0.0
|
||||
is_columnstore_columns=$installdir/lib/is_columnstore_columns.so.1.0.0
|
||||
is_columnstore_extents=$installdir/lib/is_columnstore_extents.so.1.0.0
|
||||
is_columnstore_files=$installdir/lib/is_columnstore_files.so.1.0.0
|
||||
elif [ -f $installdir/lib/libcalmysql.so.1 ]; then
|
||||
libcalmysql=$installdir/lib/libcalmysql.so.1
|
||||
libudfsdk=$installdir/lib/libudf_mysql.so.1
|
||||
is_columnstore_tables=$installdir/lib/is_columnstore_tables.so.1
|
||||
is_columnstore_columns=$installdir/lib/is_columnstore_columns.so.1
|
||||
is_columnstore_extents=$installdir/lib/is_columnstore_extents.so.1
|
||||
is_columnstore_files=$installdir/lib/is_columnstore_files.so.1
|
||||
else
|
||||
libcalmysql=
|
||||
fi
|
||||
@ -104,6 +106,7 @@ if [ -n "$libcalmysql" ]; then
|
||||
ln -sf $is_columnstore_tables is_columnstore_tables.so
|
||||
ln -sf $is_columnstore_columns is_columnstore_columns.so
|
||||
ln -sf $is_columnstore_extents is_columnstore_extents.so
|
||||
ln -sf $is_columnstore_files is_columnstore_files.so
|
||||
fi
|
||||
|
||||
# cleanup previous install mysql replication files
|
||||
|
@ -84,6 +84,13 @@ public:
|
||||
*/
|
||||
virtual off64_t size(const char* path) const = 0;
|
||||
|
||||
/**
|
||||
* compressedSize() returns the decompressed size of the file
|
||||
* speicified by path.
|
||||
* Returns the size on success, -1 on error
|
||||
*/
|
||||
virtual off64_t compressedSize(const char* path) const = 0;
|
||||
|
||||
/**
|
||||
* exists() checks for the existence of a particular path.
|
||||
* Returns true if exists, false otherwise.
|
||||
|
@ -115,6 +115,7 @@ public:
|
||||
*/
|
||||
static int mkdir(const char *pathname);
|
||||
static off64_t size(const char* path);
|
||||
static off64_t compressedSize(const char* path);
|
||||
static int remove(const char *pathname);
|
||||
static int rename(const char *oldpath, const char *newpath);
|
||||
static bool exists(const char* pathname);
|
||||
@ -175,6 +176,12 @@ off64_t IDBPolicy::size(const char* path)
|
||||
return IDBPolicy::getFs( path ).size( path );
|
||||
}
|
||||
|
||||
inline
|
||||
off64_t IDBPolicy::compressedSize(const char* path)
|
||||
{
|
||||
return IDBPolicy::getFs( path ).compressedSize( path );
|
||||
}
|
||||
|
||||
inline
|
||||
int IDBPolicy::remove(const char *pathname)
|
||||
{
|
||||
|
@ -17,6 +17,7 @@
|
||||
|
||||
#include "PosixFileSystem.h"
|
||||
#include "IDBLogger.h"
|
||||
#include "idbcompress.h"
|
||||
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
@ -128,6 +129,86 @@ off64_t PosixFileSystem::size(const char* path) const
|
||||
return ret;
|
||||
}
|
||||
|
||||
size_t readFillBuffer(
|
||||
idbdatafile::IDBDataFile* pFile,
|
||||
char* buffer,
|
||||
size_t bytesReq)
|
||||
{
|
||||
char* pBuf = buffer;
|
||||
ssize_t nBytes;
|
||||
size_t bytesToRead = bytesReq;
|
||||
size_t totalBytesRead = 0;
|
||||
|
||||
while (1)
|
||||
{
|
||||
nBytes = pFile->read(pBuf, bytesToRead);
|
||||
if (nBytes > 0)
|
||||
totalBytesRead += nBytes;
|
||||
else
|
||||
break;
|
||||
|
||||
if ((size_t)nBytes == bytesToRead)
|
||||
break;
|
||||
|
||||
pBuf += nBytes;
|
||||
bytesToRead = bytesToRead - (size_t)nBytes;
|
||||
}
|
||||
|
||||
return totalBytesRead;
|
||||
}
|
||||
|
||||
off64_t PosixFileSystem::compressedSize(const char *path) const
|
||||
{
|
||||
IDBDataFile *pFile = 0;
|
||||
size_t nBytes;
|
||||
off64_t dataSize = 0;
|
||||
|
||||
try
|
||||
{
|
||||
pFile = IDBDataFile::open(IDBDataFile::BUFFERED, path, "r", 0);
|
||||
|
||||
if (!pFile)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
compress::IDBCompressInterface decompressor;
|
||||
|
||||
char hdr1[compress::IDBCompressInterface::HDR_BUF_LEN];
|
||||
nBytes = readFillBuffer( pFile,hdr1,compress::IDBCompressInterface::HDR_BUF_LEN);
|
||||
if ( nBytes != compress::IDBCompressInterface::HDR_BUF_LEN )
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t ptrSecSize = decompressor.getHdrSize(hdr1) - compress::IDBCompressInterface::HDR_BUF_LEN;
|
||||
char* hdr2 = new char[ptrSecSize];
|
||||
nBytes = readFillBuffer( pFile,hdr2,ptrSecSize);
|
||||
if ( (int64_t)nBytes != ptrSecSize )
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
compress::CompChunkPtrList chunkPtrs;
|
||||
int rc = decompressor.getPtrList(hdr2, ptrSecSize, chunkPtrs);
|
||||
delete[] hdr2;
|
||||
if (rc != 0)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
unsigned k = chunkPtrs.size();
|
||||
// last header's offset + length will be the data bytes
|
||||
dataSize = chunkPtrs[k-1].first + chunkPtrs[k-1].second;
|
||||
delete pFile;
|
||||
return dataSize;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
bool PosixFileSystem::exists(const char *pathname) const
|
||||
{
|
||||
boost::filesystem::path dirPath(pathname);
|
||||
|
@ -31,6 +31,7 @@ public:
|
||||
|
||||
/* virtual */ int mkdir(const char *pathname);
|
||||
/* virtual */ off64_t size(const char* path) const;
|
||||
/* virtual */ off64_t compressedSize(const char *path) const;
|
||||
/* virtual */ int remove(const char *pathname);
|
||||
/* virtual */ int rename(const char *oldpath, const char *newpath);
|
||||
/* virtual */ bool exists(const char* pathname) const;
|
||||
|
@ -31,6 +31,7 @@
|
||||
|
||||
#include "HdfsFsCache.h"
|
||||
#include "IDBLogger.h"
|
||||
#include "idbcompress.h"
|
||||
|
||||
#include <iostream>
|
||||
#include <list>
|
||||
@ -119,6 +120,87 @@ off64_t HdfsFileSystem::size(const char* path) const
|
||||
return retval;
|
||||
}
|
||||
|
||||
size_t readFillBuffer(
|
||||
idbdatafile::IDBDataFile* pFile,
|
||||
char* buffer,
|
||||
size_t bytesReq)
|
||||
{
|
||||
char* pBuf = buffer;
|
||||
ssize_t nBytes;
|
||||
size_t bytesToRead = bytesReq;
|
||||
size_t totalBytesRead = 0;
|
||||
|
||||
while (1)
|
||||
{
|
||||
nBytes = pFile->read(pBuf, bytesToRead);
|
||||
if (nBytes > 0)
|
||||
totalBytesRead += nBytes;
|
||||
else
|
||||
break;
|
||||
|
||||
if ((size_t)nBytes == bytesToRead)
|
||||
break;
|
||||
|
||||
pBuf += nBytes;
|
||||
bytesToRead = bytesToRead - (size_t)nBytes;
|
||||
}
|
||||
|
||||
return totalBytesRead;
|
||||
}
|
||||
|
||||
|
||||
off64_t HdfsFileSystem::compressedSize(const char *path) const
|
||||
{
|
||||
IDBDataFile *pFile = 0;
|
||||
size_t nBytes;
|
||||
off64_t dataSize = 0;
|
||||
|
||||
try
|
||||
{
|
||||
pFile = IDBDataFile::open(IDBDataFile::HDFS, path, "r", 0);
|
||||
|
||||
if (!pFile)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
compress::IDBCompressInterface decompressor;
|
||||
|
||||
char hdr1[compress::IDBCompressInterface::HDR_BUF_LEN];
|
||||
nBytes = readFillBuffer( pFile,hdr1,compress::IDBCompressInterface::HDR_BUF_LEN);
|
||||
if ( nBytes != compress::IDBCompressInterface::HDR_BUF_LEN )
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t ptrSecSize = decompressor.getHdrSize(hdr1) - compress::IDBCompressInterface::HDR_BUF_LEN;
|
||||
char* hdr2 = new char[ptrSecSize];
|
||||
nBytes = readFillBuffer( pFile,hdr2,ptrSecSize);
|
||||
if ( (int64_t)nBytes != ptrSecSize )
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
compress::CompChunkPtrList chunkPtrs;
|
||||
int rc = decompressor.getPtrList(hdr2, ptrSecSize, chunkPtrs);
|
||||
delete[] hdr2;
|
||||
if (rc != 0)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
unsigned k = chunkPtrs.size();
|
||||
// last header's offset + length will be the data bytes
|
||||
dataSize = chunkPtrs[k-1].first + chunkPtrs[k-1].second;
|
||||
delete pFile;
|
||||
return dataSize;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
bool HdfsFileSystem::exists(const char *pathname) const
|
||||
{
|
||||
int ret = hdfsExists(m_fs,pathname);
|
||||
|
@ -45,6 +45,7 @@ public:
|
||||
|
||||
/* virtual */ int mkdir(const char *pathname);
|
||||
/* virtual */ off64_t size(const char* path) const;
|
||||
/* virtual */ off64_t compressedSize(const char* path) const;
|
||||
/* virtual */ int remove(const char *pathname);
|
||||
/* virtual */ int rename(const char *oldpath, const char *newpath);
|
||||
/* virtual */ bool exists(const char* pathname) const;
|
||||
|
Reference in New Issue
Block a user