From 9fe6efe84a4c8fbef5a722b42b25b41a4fbb709f Mon Sep 17 00:00:00 2001 From: Leonid Fedorov <79837786+mariadb-LeonidFedorov@users.noreply.github.com> Date: Fri, 22 Mar 2024 19:47:55 +0300 Subject: [PATCH] chore(arrow) bump apache arrow version and fix test load generator for memory usage (#3149) * bump apache arrow version and fix test load generator for memory usage * limit arrow simd by SSE4.2 --- cmake/arrow.cmake | 5 +- tools/parquetGen/main.cpp | 707 +++++++++++++++++++------------------- 2 files changed, 362 insertions(+), 350 deletions(-) diff --git a/cmake/arrow.cmake b/cmake/arrow.cmake index 531e3adc8..44da39f6b 100644 --- a/cmake/arrow.cmake +++ b/cmake/arrow.cmake @@ -31,14 +31,15 @@ set(ARROW_CMAKE_ARGS "-DCMAKE_INSTALL_PREFIX=${ARROW_PREFIX}" "-DARROW_DATASET=ON" "-DARROW_PARQUET=ON" "-DARROW_FILESYSTEM=ON" + "-DARROW_RUNTIME_SIMD_LEVEL=SSE4_2" "-DThrift_ROOT=${CMAKE_CURRENT_BINARY_DIR}/external/thrift" ) set(ARROW_INCLUDE_DIR "${ARROW_PREFIX}/include") set(ARROW_BUILD_BYPRODUCTS "${ARROW_STATIC_LIB}" "${PARQUET_STATIC_LIB}") externalproject_add(external_arrow - URL https://github.com/apache/arrow/archive/refs/tags/go/v13.0.0.tar.gz - URL_HASH SHA256=ea4a79a4103379573ecbcf19229437a4ba547c0146a7f3c3be0a7e0b3de5de6c + URL https://github.com/apache/arrow/archive/refs/tags/apache-arrow-15.0.2.tar.gz + URL_HASH SHA256=4735b349845bff1fe95ed11abbfed204eb092cabc37523aa13a80cb830fe5b5e SOURCE_SUBDIR cpp BINARY_DIR "${ARROW_BINARY_DIR}" CMAKE_ARGS "${ARROW_CMAKE_ARGS}" diff --git a/tools/parquetGen/main.cpp b/tools/parquetGen/main.cpp index 090891197..63e071f1d 100644 --- a/tools/parquetGen/main.cpp +++ b/tools/parquetGen/main.cpp @@ -5,8 +5,14 @@ #include #include #include +#include +#include #include +#include +#include +#include + static void usage(const std::string& pname) { std::cout << "usage: " << pname << " [-dalbscih]" << std::endl; @@ -23,10 +29,10 @@ static void usage(const std::string& pname) /** * generate one parquet file with INT32 data type -*/ + */ void generateIntTable(std::string fileDir) { - // generate data + // generate data arrow::Int32Builder builder; int reserve_num = 30; PARQUET_THROW_NOT_OK(builder.Reserve(reserve_num)); @@ -36,31 +42,30 @@ void generateIntTable(std::string fileDir) validity[3] = 0; std::vector values; - for (int32_t i = 0; i < reserve_num-1; i++) + for (int32_t i = 0; i < reserve_num - 1; i++) values.push_back(i); values.push_back(static_cast(2147483648)); PARQUET_THROW_NOT_OK(builder.AppendValues(values, validity)); std::shared_ptr array; PARQUET_THROW_NOT_OK(builder.Finish(&array)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::int32())}); - std::shared_ptr table = arrow::Table::Make(schema, {array}); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::int32())}); + std::shared_ptr table = arrow::Table::Make(schema, {array}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/int32.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/int32.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with INT64 data type -*/ + */ void generateInt64Table(std::string fileDir) { - // generate data + // generate data arrow::Int64Builder builder; int reserve_num = 30; PARQUET_THROW_NOT_OK(builder.Reserve(reserve_num)); @@ -69,29 +74,27 @@ void generateInt64Table(std::string fileDir) validity[2] = 0; validity[3] = 0; std::vector values; - for (int64_t i = 0; i < reserve_num-1; i++) + for (int64_t i = 0; i < reserve_num - 1; i++) values.push_back(i); values.push_back(2147483648); PARQUET_THROW_NOT_OK(builder.AppendValues(values, validity)); std::shared_ptr array; PARQUET_THROW_NOT_OK(builder.Finish(&array)); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::int64())}); + std::shared_ptr table = arrow::Table::Make(schema, {array}); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::int64())}); - std::shared_ptr table = arrow::Table::Make(schema, {array}); - - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/int64.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/int64.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with FLOAT data type -*/ + */ void generateFloatTable(std::string fileDir) { int reserve_num = 30; @@ -101,26 +104,25 @@ void generateFloatTable(std::string fileDir) validity[2] = 0; std::vector values; for (int i = 0; i < reserve_num; i++) - values.push_back(i+1.5); + values.push_back(i + 1.5); PARQUET_THROW_NOT_OK(builder.AppendValues(values, validity)); std::shared_ptr array; PARQUET_THROW_NOT_OK(builder.Finish(&array)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::float32())}); - std::shared_ptr table = arrow::Table::Make(schema, {array}); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::float32())}); + std::shared_ptr table = arrow::Table::Make(schema, {array}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/float.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/float.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with DOUBLE data type -*/ + */ void generateDoubleTable(std::string fileDir) { // -----------------Float64----------------------- @@ -131,94 +133,93 @@ void generateDoubleTable(std::string fileDir) dvalidity[3] = 0; std::vector dvalues; for (int i = 0; i < reserve_num; i++) - dvalues.push_back(i+2.5); + dvalues.push_back(i + 2.5); PARQUET_THROW_NOT_OK(doublebuilder.AppendValues(dvalues, dvalidity)); std::shared_ptr doublearray; PARQUET_THROW_NOT_OK(doublebuilder.Finish(&doublearray)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::float64())}); - std::shared_ptr table = arrow::Table::Make(schema, {doublearray}); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::float64())}); + std::shared_ptr table = arrow::Table::Make(schema, {doublearray}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/double.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/double.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with TIME data type -*/ + */ void generateTimeTable(std::string fileDir) { - int reserve_num = 30; + int reserve_num = 30; arrow::Time32Builder time32builder(arrow::time32(arrow::TimeUnit::MILLI), arrow::default_memory_pool()); // int reserve_num = 500; PARQUET_THROW_NOT_OK(time32builder.Reserve(reserve_num)); std::vector time32validity(reserve_num, true); std::vector time32values; for (int32_t i = 0; i < reserve_num; i++) - time32values.push_back(i*3605000); + time32values.push_back(i * 3605000); PARQUET_THROW_NOT_OK(time32builder.AppendValues(time32values, time32validity)); std::shared_ptr time32array; PARQUET_THROW_NOT_OK(time32builder.Finish(&time32array)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::time32(arrow::TimeUnit::MILLI))}); - std::shared_ptr table = arrow::Table::Make(schema, {time32array}); + std::shared_ptr schema = + arrow::schema({arrow::field("col1", arrow::time32(arrow::TimeUnit::MILLI))}); + std::shared_ptr table = arrow::Table::Make(schema, {time32array}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/time.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/time.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with TIME64(microsecond) data type -*/ + */ void generateTime64Table(std::string fileDir) { - int64_t reserve_num = 30; + int64_t reserve_num = 30; arrow::Time64Builder time64builder(arrow::time64(arrow::TimeUnit::MICRO), arrow::default_memory_pool()); // int reserve_num = 500; PARQUET_THROW_NOT_OK(time64builder.Reserve(reserve_num)); std::vector time64validity(reserve_num, true); std::vector time64values; for (int64_t i = 0; i < reserve_num; i++) - time64values.push_back(i*3605001); + time64values.push_back(i * 3605001); PARQUET_THROW_NOT_OK(time64builder.AppendValues(time64values, time64validity)); std::shared_ptr time64array; PARQUET_THROW_NOT_OK(time64builder.Finish(&time64array)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::time64(arrow::TimeUnit::MICRO))}); - std::shared_ptr table = arrow::Table::Make(schema, {time64array}); + std::shared_ptr schema = + arrow::schema({arrow::field("col1", arrow::time64(arrow::TimeUnit::MICRO))}); + std::shared_ptr table = arrow::Table::Make(schema, {time64array}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/time64.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/time64.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with STRING data type -*/ + */ void generateStringTable(std::string fileDir) { - const int reserve_num = 30; + const int reserve_num = 30; // ----------------- String ------------------------- arrow::StringBuilder strbuilder; PARQUET_THROW_NOT_OK(strbuilder.Reserve(reserve_num)); uint8_t validity1[reserve_num]; std::vector values1; - for (int64_t i = reserve_num-1; i >= 0; i--) + for (int64_t i = reserve_num - 1; i >= 0; i--) { // values1.push_back(std::string("hhhh")); validity1[i] = 1; @@ -256,30 +257,29 @@ void generateStringTable(std::string fileDir) values1.push_back(std::string("nD274v")); values1.push_back(std::string("6y0JyW")); - validity1[1] = 0; // set element 1 null + validity1[1] = 0; // set element 1 null PARQUET_THROW_NOT_OK(strbuilder.AppendValues(values1, validity1)); std::shared_ptr strarray; PARQUET_THROW_NOT_OK(strbuilder.Finish(&strarray)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::utf8())}); - std::shared_ptr table = arrow::Table::Make(schema, {strarray}); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::utf8())}); + std::shared_ptr table = arrow::Table::Make(schema, {strarray}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/string.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/string.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with TIMESTAMP data type(millisecond) -*/ + */ void generateTimestampTable(std::string fileDir) { - int reserve_num = 30; + int reserve_num = 30; // ----------------- Timestamp ------------------------- arrow::TimestampBuilder tsbuilder(arrow::timestamp(arrow::TimeUnit::MILLI), arrow::default_memory_pool()); PARQUET_THROW_NOT_OK(tsbuilder.Reserve(reserve_num)); @@ -291,24 +291,24 @@ void generateTimestampTable(std::string fileDir) std::shared_ptr tsarray; PARQUET_THROW_NOT_OK(tsbuilder.Finish(&tsarray)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::timestamp(arrow::TimeUnit::MILLI))}); - std::shared_ptr table = arrow::Table::Make(schema, {tsarray}); + std::shared_ptr schema = + arrow::schema({arrow::field("col1", arrow::timestamp(arrow::TimeUnit::MILLI))}); + std::shared_ptr table = arrow::Table::Make(schema, {tsarray}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/ts.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/ts.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with TIMESTAMP data type(microsecond) -*/ + */ void generateTimestampUsTable(std::string fileDir) { - int reserve_num = 30; + int reserve_num = 30; // ----------------- Timestamp ------------------------- arrow::TimestampBuilder tsbuilder(arrow::timestamp(arrow::TimeUnit::MICRO), arrow::default_memory_pool()); PARQUET_THROW_NOT_OK(tsbuilder.Reserve(reserve_num)); @@ -320,24 +320,24 @@ void generateTimestampUsTable(std::string fileDir) std::shared_ptr tsarray; PARQUET_THROW_NOT_OK(tsbuilder.Finish(&tsarray)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::timestamp(arrow::TimeUnit::MICRO))}); - std::shared_ptr table = arrow::Table::Make(schema, {tsarray}); + std::shared_ptr schema = + arrow::schema({arrow::field("col1", arrow::timestamp(arrow::TimeUnit::MICRO))}); + std::shared_ptr table = arrow::Table::Make(schema, {tsarray}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/ts.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/ts.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with DATE data type -*/ + */ void generateDateTable(std::string fileDir) { - int reserve_num = 30; + int reserve_num = 30; // -------------------------DATETIME arrow::Date32Builder date32builder; PARQUET_THROW_NOT_OK(date32builder.Reserve(reserve_num)); @@ -349,24 +349,23 @@ void generateDateTable(std::string fileDir) std::shared_ptr date32array; PARQUET_THROW_NOT_OK(date32builder.Finish(&date32array)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::date32())}); - std::shared_ptr table = arrow::Table::Make(schema, {date32array}); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::date32())}); + std::shared_ptr table = arrow::Table::Make(schema, {date32array}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/date.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/date.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with INT16 data type -*/ + */ void generateInt16Table(std::string fileDir) { - int reserve_num = 30; + int reserve_num = 30; // ---------------int16 arrow::Int16Builder i16builder; PARQUET_THROW_NOT_OK(i16builder.Reserve(reserve_num)); @@ -378,24 +377,23 @@ void generateInt16Table(std::string fileDir) std::shared_ptr i16array; PARQUET_THROW_NOT_OK(i16builder.Finish(&i16array)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::int16())}); - std::shared_ptr table = arrow::Table::Make(schema, {i16array}); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::int16())}); + std::shared_ptr table = arrow::Table::Make(schema, {i16array}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/int16.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/int16.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with INT8 data type -*/ + */ void generateInt8Table(std::string fileDir) { - int reserve_num = 30; + int reserve_num = 30; // ---------------int16 arrow::Int8Builder i8builder; PARQUET_THROW_NOT_OK(i8builder.Reserve(reserve_num)); @@ -407,22 +405,20 @@ void generateInt8Table(std::string fileDir) std::shared_ptr i8array; PARQUET_THROW_NOT_OK(i8builder.Finish(&i8array)); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::int8())}); + std::shared_ptr table = arrow::Table::Make(schema, {i8array}); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::int8())}); - std::shared_ptr table = arrow::Table::Make(schema, {i8array}); - - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/int8.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/int8.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with DECIMAL data type -*/ + */ void generateDecimalTable(std::string fileDir) { // ----------------------decimal @@ -437,24 +433,23 @@ void generateDecimalTable(std::string fileDir) std::shared_ptr decimalArray; PARQUET_THROW_NOT_OK(d128builder.Finish(&decimalArray)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::decimal128(9, 3))}); - std::shared_ptr table = arrow::Table::Make(schema, {decimalArray}); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::decimal128(9, 3))}); + std::shared_ptr table = arrow::Table::Make(schema, {decimalArray}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/decimal.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/decimal.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with UNSIGNED INT data type -*/ + */ void generateUintTable(std::string fileDir) { - // generate data + // generate data arrow::UInt32Builder builder; uint reserve_num = 30; PARQUET_THROW_NOT_OK(builder.Reserve(reserve_num)); @@ -463,32 +458,30 @@ void generateUintTable(std::string fileDir) validity[2] = 0; validity[3] = 0; std::vector values; - for (uint32_t i = 0; i < reserve_num-1; i++) + for (uint32_t i = 0; i < reserve_num - 1; i++) values.push_back(i); values.push_back(2147483648); PARQUET_THROW_NOT_OK(builder.AppendValues(values, validity)); std::shared_ptr array; PARQUET_THROW_NOT_OK(builder.Finish(&array)); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::uint32())}); + std::shared_ptr table = arrow::Table::Make(schema, {array}); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::uint32())}); - std::shared_ptr table = arrow::Table::Make(schema, {array}); - - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint32.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint32.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with UNSIGNED INT16 data type -*/ + */ void generateUint16Table(std::string fileDir) { - uint16_t reserve_num = 30; + uint16_t reserve_num = 30; // ---------------int16 arrow::UInt16Builder i16builder; PARQUET_THROW_NOT_OK(i16builder.Reserve(reserve_num)); @@ -500,24 +493,23 @@ void generateUint16Table(std::string fileDir) std::shared_ptr i16array; PARQUET_THROW_NOT_OK(i16builder.Finish(&i16array)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::uint16())}); - std::shared_ptr table = arrow::Table::Make(schema, {i16array}); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::uint16())}); + std::shared_ptr table = arrow::Table::Make(schema, {i16array}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint16.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint16.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with UNSIGNED INT8 data type -*/ + */ void generateUint8Table(std::string fileDir) { - uint8_t reserve_num = 30; + uint8_t reserve_num = 30; // ---------------int16 arrow::UInt8Builder i8builder; PARQUET_THROW_NOT_OK(i8builder.Reserve(reserve_num)); @@ -529,24 +521,23 @@ void generateUint8Table(std::string fileDir) std::shared_ptr i8array; PARQUET_THROW_NOT_OK(i8builder.Finish(&i8array)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::uint8())}); - std::shared_ptr table = arrow::Table::Make(schema, {i8array}); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::uint8())}); + std::shared_ptr table = arrow::Table::Make(schema, {i8array}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint8.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint8.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with UNSIGNED INT64 data type -*/ + */ void generateUint64Table(std::string fileDir) { - // generate data + // generate data arrow::UInt64Builder builder; uint64_t reserve_num = 30; PARQUET_THROW_NOT_OK(builder.Reserve(reserve_num)); @@ -555,28 +546,27 @@ void generateUint64Table(std::string fileDir) validity[2] = 0; validity[3] = 0; std::vector values; - for (uint64_t i = 0; i < reserve_num-1; i++) + for (uint64_t i = 0; i < reserve_num - 1; i++) values.push_back(i); values.push_back(2147483648); PARQUET_THROW_NOT_OK(builder.AppendValues(values, validity)); std::shared_ptr array; PARQUET_THROW_NOT_OK(builder.Finish(&array)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::uint64())}); - std::shared_ptr table = arrow::Table::Make(schema, {array}); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::uint64())}); + std::shared_ptr table = arrow::Table::Make(schema, {array}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint64.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint64.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with BOOLEAN data type -*/ + */ void generateBoolTable(std::string fileDir) { int reserve_num = 30; @@ -592,21 +582,20 @@ void generateBoolTable(std::string fileDir) std::shared_ptr boolArray; PARQUET_THROW_NOT_OK(boolBuilder.Finish(&boolArray)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::boolean())}); - std::shared_ptr table = arrow::Table::Make(schema, {boolArray}); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::boolean())}); + std::shared_ptr table = arrow::Table::Make(schema, {boolArray}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/bool.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/bool.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with NULL data type -*/ + */ void generateNullTable(std::string fileDir) { int reserve_num = 30; @@ -617,21 +606,20 @@ void generateNullTable(std::string fileDir) std::shared_ptr nullarray; PARQUET_THROW_NOT_OK(nullBuilder.Finish(&nullarray)); - std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::null())}); - std::shared_ptr table = arrow::Table::Make(schema, {nullarray}); + std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::null())}); + std::shared_ptr table = arrow::Table::Make(schema, {nullarray}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/null.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/null.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate different parquet files with one data type -*/ + */ void generateTable(std::string fileDir) { generateBoolTable(fileDir); @@ -655,7 +643,7 @@ void generateTable(std::string fileDir) /** * generate one parquet file with different data types -*/ + */ void generateAllTable(std::string fileDir) { const int reserve_num = 30; @@ -679,7 +667,7 @@ void generateAllTable(std::string fileDir) int32Validity[2] = 0; int32Validity[3] = 0; std::vector int32Values; - for (int32_t i = 0; i < reserve_num-1; i++) + for (int32_t i = 0; i < reserve_num - 1; i++) int32Values.push_back(i); int32Values.push_back(static_cast(2147483648)); PARQUET_THROW_NOT_OK(int32Builder.AppendValues(int32Values, int32Validity)); @@ -694,7 +682,7 @@ void generateAllTable(std::string fileDir) int64Validity[2] = 0; int64Validity[3] = 0; std::vector int64Values; - for (int64_t i = 0; i < reserve_num-1; i++) + for (int64_t i = 0; i < reserve_num - 1; i++) int64Values.push_back(i); int64Values.push_back(2147483648); PARQUET_THROW_NOT_OK(int64Builder.AppendValues(int64Values, int64Validity)); @@ -708,7 +696,7 @@ void generateAllTable(std::string fileDir) floatValidity[2] = 0; std::vector floatValues; for (int i = 0; i < reserve_num; i++) - floatValues.push_back(i+1.5); + floatValues.push_back(i + 1.5); PARQUET_THROW_NOT_OK(floatBuilder.AppendValues(floatValues, floatValidity)); std::shared_ptr floatArray; PARQUET_THROW_NOT_OK(floatBuilder.Finish(&floatArray)); @@ -720,7 +708,7 @@ void generateAllTable(std::string fileDir) dvalidity[3] = 0; std::vector dvalues; for (int i = 0; i < reserve_num; i++) - dvalues.push_back(i+2.5); + dvalues.push_back(i + 2.5); PARQUET_THROW_NOT_OK(doubleBuilder.AppendValues(dvalues, dvalidity)); std::shared_ptr doubleArray; PARQUET_THROW_NOT_OK(doubleBuilder.Finish(&doubleArray)); @@ -731,19 +719,19 @@ void generateAllTable(std::string fileDir) std::vector time32validity(reserve_num, true); std::vector time32values; for (int32_t i = 0; i < reserve_num; i++) - time32values.push_back(i*3605001); + time32values.push_back(i * 3605001); PARQUET_THROW_NOT_OK(time32builder.AppendValues(time32values, time32validity)); std::shared_ptr time32array; PARQUET_THROW_NOT_OK(time32builder.Finish(&time32array)); // time64(microsecond) - int64_t reserve_num64 = 30; + int64_t reserve_num64 = 30; arrow::Time64Builder time64builder(arrow::time64(arrow::TimeUnit::MICRO), arrow::default_memory_pool()); PARQUET_THROW_NOT_OK(time64builder.Reserve(reserve_num64)); std::vector time64validity(reserve_num, true); std::vector time64values; for (int64_t i = 0; i < reserve_num64; i++) - time64values.push_back(i*3605000001); + time64values.push_back(i * 3605000001); PARQUET_THROW_NOT_OK(time64builder.AppendValues(time64values, time64validity)); std::shared_ptr time64array; PARQUET_THROW_NOT_OK(time64builder.Finish(&time64array)); @@ -753,7 +741,7 @@ void generateAllTable(std::string fileDir) PARQUET_THROW_NOT_OK(strbuilder.Reserve(reserve_num)); uint8_t validity1[reserve_num]; std::vector values1; - for (int64_t i = reserve_num-1; i >= 0; i--) + for (int64_t i = reserve_num - 1; i >= 0; i--) { validity1[i] = 1; } @@ -789,7 +777,7 @@ void generateAllTable(std::string fileDir) values1.push_back(std::string("Iqa8Nr")); values1.push_back(std::string("nD274v")); values1.push_back(std::string("6y0JyW")); - validity1[1] = 0; // set element 1 null + validity1[1] = 0; // set element 1 null PARQUET_THROW_NOT_OK(strbuilder.AppendValues(values1, validity1)); std::shared_ptr strarray; PARQUET_THROW_NOT_OK(strbuilder.Finish(&strarray)); @@ -895,7 +883,7 @@ void generateAllTable(std::string fileDir) uint32Validity[2] = 0; uint32Validity[3] = 0; std::vector uint32Values; - for (uint32_t i = 0; i < reserve_num-1; i++) + for (uint32_t i = 0; i < reserve_num - 1; i++) uint32Values.push_back(i); uint32Values.push_back(2147483648); PARQUET_THROW_NOT_OK(uint32Builder.AppendValues(uint32Values, uint32Validity)); @@ -933,7 +921,7 @@ void generateAllTable(std::string fileDir) uint64Validity[2] = 0; uint64Validity[3] = 0; std::vector uint64Values; - for (uint64_t i = 0; i < ureserve_num-1; i++) + for (uint64_t i = 0; i < ureserve_num - 1; i++) uint64Values.push_back(i); uint64Values.push_back(2147483648); PARQUET_THROW_NOT_OK(uint64Builder.AppendValues(uint64Values, uint64Validity)); @@ -955,7 +943,7 @@ void generateAllTable(std::string fileDir) uint8_t binaryValidity[reserve_num]; std::vector binaryValues; - for (int32_t i = reserve_num-1; i >= 0; i--) + for (int32_t i = reserve_num - 1; i >= 0; i--) { binaryValidity[i] = 1; } @@ -1006,72 +994,46 @@ void generateAllTable(std::string fileDir) // make schema // 28 cols - std::shared_ptr schema = arrow::schema({ - arrow::field("col1", arrow::int32()), - arrow::field("col2", arrow::int64()), - arrow::field("col3", arrow::float32()), - arrow::field("col4", arrow::float64()), - arrow::field("col5", arrow::time32(arrow::TimeUnit::MILLI)), - arrow::field("col6", arrow::utf8()), - arrow::field("col7", arrow::utf8()), - arrow::field("col8", arrow::utf8()), - arrow::field("col9", arrow::utf8()), - arrow::field("col10", arrow::utf8()), - arrow::field("col11", arrow::utf8()), - arrow::field("col12", arrow::timestamp(arrow::TimeUnit::MILLI)), - arrow::field("col13", arrow::date32()), - arrow::field("col14", arrow::timestamp(arrow::TimeUnit::MILLI)), - arrow::field("col15", arrow::int16()), - arrow::field("col16", arrow::int8()), - arrow::field("col17", arrow::decimal128(9, 3)), - arrow::field("col18", arrow::uint32()), - arrow::field("col19", arrow::uint16()), - arrow::field("col20", arrow::uint8()), - arrow::field("col21", arrow::uint64()), - arrow::field("col22", arrow::boolean()), - arrow::field("col23", arrow::decimal128(38, 10)), - arrow::field("col24", arrow::time64(arrow::TimeUnit::MICRO)), - arrow::field("col25", arrow::timestamp(arrow::TimeUnit::MICRO)), - arrow::field("col26", arrow::timestamp(arrow::TimeUnit::MICRO)), - arrow::field("col27", arrow::binary()), - arrow::field("col28", arrow::fixed_size_binary(4)) - }); - std::shared_ptr table = arrow::Table::Make(schema, { - int32Array, - int64Array, - floatArray, - doubleArray, - time32array, - strarray, - strarray, - strarray, - strarray, - strarray, - strarray, - tsarray, - date32array, - tsarray, - i16array, - i8array, - decimalArray, - uint32Array, - ui16array, - ui8array, - uint64Array, - boolArray, - decimalArray1, - time64array, - tsarray1, - tsarray1, - binaryArray, - fixedSizeArray - }); + std::shared_ptr schema = + arrow::schema({arrow::field("col1", arrow::int32()), + arrow::field("col2", arrow::int64()), + arrow::field("col3", arrow::float32()), + arrow::field("col4", arrow::float64()), + arrow::field("col5", arrow::time32(arrow::TimeUnit::MILLI)), + arrow::field("col6", arrow::utf8()), + arrow::field("col7", arrow::utf8()), + arrow::field("col8", arrow::utf8()), + arrow::field("col9", arrow::utf8()), + arrow::field("col10", arrow::utf8()), + arrow::field("col11", arrow::utf8()), + arrow::field("col12", arrow::timestamp(arrow::TimeUnit::MILLI)), + arrow::field("col13", arrow::date32()), + arrow::field("col14", arrow::timestamp(arrow::TimeUnit::MILLI)), + arrow::field("col15", arrow::int16()), + arrow::field("col16", arrow::int8()), + arrow::field("col17", arrow::decimal128(9, 3)), + arrow::field("col18", arrow::uint32()), + arrow::field("col19", arrow::uint16()), + arrow::field("col20", arrow::uint8()), + arrow::field("col21", arrow::uint64()), + arrow::field("col22", arrow::boolean()), + arrow::field("col23", arrow::decimal128(38, 10)), + arrow::field("col24", arrow::time64(arrow::TimeUnit::MICRO)), + arrow::field("col25", arrow::timestamp(arrow::TimeUnit::MICRO)), + arrow::field("col26", arrow::timestamp(arrow::TimeUnit::MICRO)), + arrow::field("col27", arrow::binary()), + arrow::field("col28", arrow::fixed_size_binary(4))}); + std::shared_ptr table = arrow::Table::Make( + schema, + {int32Array, int64Array, floatArray, doubleArray, time32array, strarray, strarray, + strarray, strarray, strarray, strarray, tsarray, date32array, tsarray, + i16array, i8array, decimalArray, uint32Array, ui16array, ui8array, uint64Array, + boolArray, decimalArray1, time64array, tsarray1, tsarray1, binaryArray, fixedSizeArray}); - // write to file + // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/tests.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/tests.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 1000)); PARQUET_THROW_NOT_OK(outfile->Close()); @@ -1082,151 +1044,200 @@ void generateAllTable(std::string fileDir) std::shared_ptr nullarray; PARQUET_THROW_NOT_OK(nullBuilder.Finish(&nullarray)); - std::shared_ptr schema1 = arrow::schema({ - arrow::field("col1", arrow::null()), - arrow::field("col2", arrow::null()), - arrow::field("col3", arrow::null()), - arrow::field("col4", arrow::null()), - arrow::field("col5", arrow::null()), - arrow::field("col6", arrow::null()), - arrow::field("col7", arrow::null()), - arrow::field("col8", arrow::utf8()), - arrow::field("col9", arrow::null()), - arrow::field("col10", arrow::null()), - arrow::field("col11", arrow::utf8()), - arrow::field("col12", arrow::timestamp(arrow::TimeUnit::MILLI)), - arrow::field("col13", arrow::null()), - arrow::field("col14", arrow::null()), - arrow::field("col15", arrow::null()), - arrow::field("col16", arrow::null()), - arrow::field("col17", arrow::null()), - arrow::field("col18", arrow::null()), - arrow::field("col19", arrow::null()), - arrow::field("col20", arrow::null()), - arrow::field("col21", arrow::null()), - arrow::field("col22", arrow::null()), - arrow::field("col23", arrow::null()), - arrow::field("col24", arrow::null()), - arrow::field("col25", arrow::timestamp(arrow::TimeUnit::MICRO)), - arrow::field("col26", arrow::timestamp(arrow::TimeUnit::MILLI)), - arrow::field("col27", arrow::null()), - arrow::field("col28", arrow::null()) - }); - std::shared_ptr table1 = arrow::Table::Make(schema1, { - nullarray, - nullarray, - nullarray, - nullarray, - nullarray, - nullarray, - nullarray, - strarray, - nullarray, - nullarray, - strarray, - tsarray, - nullarray, - nullarray, - nullarray, - nullarray, - nullarray, - nullarray, - nullarray, - nullarray, - nullarray, - nullarray, - nullarray, - nullarray, - tsarray1, - tsarray, - nullarray, - nullarray - }); + std::shared_ptr schema1 = + arrow::schema({arrow::field("col1", arrow::null()), + arrow::field("col2", arrow::null()), + arrow::field("col3", arrow::null()), + arrow::field("col4", arrow::null()), + arrow::field("col5", arrow::null()), + arrow::field("col6", arrow::null()), + arrow::field("col7", arrow::null()), + arrow::field("col8", arrow::utf8()), + arrow::field("col9", arrow::null()), + arrow::field("col10", arrow::null()), + arrow::field("col11", arrow::utf8()), + arrow::field("col12", arrow::timestamp(arrow::TimeUnit::MILLI)), + arrow::field("col13", arrow::null()), + arrow::field("col14", arrow::null()), + arrow::field("col15", arrow::null()), + arrow::field("col16", arrow::null()), + arrow::field("col17", arrow::null()), + arrow::field("col18", arrow::null()), + arrow::field("col19", arrow::null()), + arrow::field("col20", arrow::null()), + arrow::field("col21", arrow::null()), + arrow::field("col22", arrow::null()), + arrow::field("col23", arrow::null()), + arrow::field("col24", arrow::null()), + arrow::field("col25", arrow::timestamp(arrow::TimeUnit::MICRO)), + arrow::field("col26", arrow::timestamp(arrow::TimeUnit::MILLI)), + arrow::field("col27", arrow::null()), + arrow::field("col28", arrow::null())}); + std::shared_ptr table1 = arrow::Table::Make( + schema1, {nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, + strarray, nullarray, nullarray, strarray, tsarray, nullarray, nullarray, + nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, + nullarray, nullarray, nullarray, tsarray1, tsarray, nullarray, nullarray}); std::shared_ptr outfile1; - PARQUET_ASSIGN_OR_THROW( - outfile1, arrow::io::FileOutputStream::Open(fileDir + "/nulls.parquet")); + PARQUET_ASSIGN_OR_THROW(outfile1, arrow::io::FileOutputStream::Open(fileDir + "/nulls.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table1, pool, outfile1, 3)); PARQUET_THROW_NOT_OK(outfile1->Close()); } -/** - * generate large volume parquet files -*/ -void generateLargeTable(int64_t reserve_num, std::string rowNum, std::string fileDir) + +class LargeDataProducer : public arrow::RecordBatchReader { - // int32 - arrow::Int32Builder builder; - // int64_t reserve_num = 1000000; - PARQUET_THROW_NOT_OK(builder.Reserve(reserve_num)); - std::vector validity(reserve_num, true); - std::vector values; - for (int32_t i = 0; i < reserve_num; i++) - values.push_back(i); - PARQUET_THROW_NOT_OK(builder.AppendValues(values, validity)); - std::shared_ptr array; - PARQUET_THROW_NOT_OK(builder.Finish(&array)); - - // timestamp - arrow::TimestampBuilder tsbuilder(arrow::timestamp(arrow::TimeUnit::MILLI), arrow::default_memory_pool()); - PARQUET_THROW_NOT_OK(tsbuilder.Reserve(reserve_num)); - std::vector tsvalidity(reserve_num, true); - std::vector tsvalues; - for (int64_t i = 0; i < reserve_num; i++) - tsvalues.push_back(i * 10000001); - PARQUET_THROW_NOT_OK(tsbuilder.AppendValues(tsvalues, tsvalidity)); - std::shared_ptr tsarray; - PARQUET_THROW_NOT_OK(tsbuilder.Finish(&tsarray)); - - // string - arrow::StringBuilder strbuilder; - PARQUET_THROW_NOT_OK(strbuilder.Reserve(reserve_num)); - std::vector values1; - for (int64_t i = reserve_num-1; i >= 0; i--) + public: + LargeDataProducer(size_t numberOfRowsToProduce, size_t chunkSize) + : numberOfRowsToProduce_(numberOfRowsToProduce), chunkSize_(chunkSize) { - values1.push_back(std::string("hhhh")); + PARQUET_THROW_NOT_OK(i32builder_.Reserve(chunkSize)); + PARQUET_THROW_NOT_OK(tsbuilder_.Reserve(chunkSize)); + PARQUET_THROW_NOT_OK(d128builder_.Reserve(chunkSize)); + PARQUET_THROW_NOT_OK(strbuilder_.Reserve(chunkSize)); + PARQUET_THROW_NOT_OK(doublebuilder_.Reserve(chunkSize)); } - PARQUET_THROW_NOT_OK(strbuilder.AppendValues(values1)); - std::shared_ptr strarray; - PARQUET_THROW_NOT_OK(strbuilder.Finish(&strarray)); - // decimal - auto t = arrow::Decimal128Type::Make(38, 10); - PARQUET_ASSIGN_OR_THROW(auto t1, t); - arrow::Decimal128Builder d128builder(t1, arrow::default_memory_pool()); - for (int64_t i = 0; i < reserve_num; i++) - PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("1234567890987654321.12345678"))); - std::shared_ptr decimalArray; - PARQUET_THROW_NOT_OK(d128builder.Finish(&decimalArray)); + virtual std::shared_ptr schema() const + { + return arrow::schema( + {arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::timestamp(arrow::TimeUnit::MILLI)), + arrow::field("col3", arrow::utf8()), arrow::field("col4", arrow::decimal128(38, 10)), + arrow::field("col5", arrow::float64()), arrow::field("col6", arrow::utf8())}); + } - // double - arrow::DoubleBuilder doublebuilder; - PARQUET_THROW_NOT_OK(doublebuilder.Reserve(reserve_num)); - std::vector dvalidity(reserve_num, true); - std::vector dvalues; - for (int i = 0; i < reserve_num; i++) - dvalues.push_back(i+2.5); - PARQUET_THROW_NOT_OK(doublebuilder.AppendValues(dvalues, dvalidity)); - std::shared_ptr doublearray; - PARQUET_THROW_NOT_OK(doublebuilder.Finish(&doublearray)); + virtual arrow::Status ReadNext(std::shared_ptr* batch) + { + if (restChunkSize() == 0) + { + batch = nullptr; + } + else + { + *batch = arrow::RecordBatch::Make(schema(), restChunkSize(), + {ProduceInts32(), ProduceTimeStamps(), ProduceStrings(), + ProduceDecimals(), ProduceDoubles(), ProduceStrings()}); + } + rowsProduced_ += restChunkSize(); + return arrow::Status::OK(); + } - std::shared_ptr schema = arrow::schema({ - arrow::field("col1", arrow::int32()), - arrow::field("col2", arrow::timestamp(arrow::TimeUnit::MILLI)), - arrow::field("col3", arrow::utf8()), - arrow::field("col4", arrow::decimal128(38, 10)), - arrow::field("col5", arrow::float64()), - arrow::field("col6", arrow::utf8()) - }); - std::shared_ptr table = arrow::Table::Make(schema, {array, tsarray, strarray, decimalArray, doublearray, strarray}); +private: + size_t restChunkSize() + { + return std::min(chunkSize_, numberOfRowsToProduce_ - rowsProduced_); + } + std::shared_ptr ProduceInts32() + { + size_t num = restChunkSize(); + std::vector validity(num, true); + std::vector values; + values.reserve(num); + for (int32_t i = (int32_t)rowsProduced_; i < (int32_t)(rowsProduced_ + num); i++) + values.push_back(i); + PARQUET_THROW_NOT_OK(i32builder_.AppendValues(values, validity)); + std::shared_ptr array; + PARQUET_THROW_NOT_OK(i32builder_.Finish(&array)); + i32builder_.Reset(); + return array; + } - // write to file - arrow::MemoryPool* pool = arrow::default_memory_pool(); + std::shared_ptr ProduceDoubles() + { + size_t num = restChunkSize(); + std::vector dvalidity(num, true); + std::vector dvalues; + for (size_t i = rowsProduced_; i < rowsProduced_ + num; i++) + dvalues.push_back(i + 2.5); + PARQUET_THROW_NOT_OK(doublebuilder_.AppendValues(dvalues, dvalidity)); + std::shared_ptr doublearray; + PARQUET_THROW_NOT_OK(doublebuilder_.Finish(&doublearray)); + doublebuilder_.Reset(); + return doublearray; + } + + std::shared_ptr ProduceStrings() + { + size_t num = restChunkSize(); + std::vector values; + for (size_t i = 0; i < num ; i++) + { + values.push_back(std::string("hhhh")); + } + PARQUET_THROW_NOT_OK(strbuilder_.AppendValues(values)); + std::shared_ptr strarray; + PARQUET_THROW_NOT_OK(strbuilder_.Finish(&strarray)); + strbuilder_.Reset(); + return strarray; + } + + std::shared_ptr ProduceTimeStamps() + { + size_t num = restChunkSize(); + std::vector tsvalidity(num, true); + std::vector tsvalues; + for (int64_t i = 0; i < int64_t(num); i++) + tsvalues.push_back(i * 10000001); + PARQUET_THROW_NOT_OK(tsbuilder_.AppendValues(tsvalues, tsvalidity)); + std::shared_ptr tsarray; + PARQUET_THROW_NOT_OK(tsbuilder_.Finish(&tsarray)); + tsbuilder_.Reset(); + return tsarray; + } + + std::shared_ptr ProduceDecimals() + { + size_t num = restChunkSize(); + for (size_t i = 0; i < num; i++) + PARQUET_THROW_NOT_OK(d128builder_.Append(arrow::Decimal128("1234567890987654321.12345678"))); + std::shared_ptr decimalArray; + PARQUET_THROW_NOT_OK(d128builder_.Finish(&decimalArray)); + d128builder_.Reset(); + return decimalArray; + } + + private: + arrow::Int32Builder i32builder_; + arrow::DoubleBuilder doublebuilder_; + arrow::StringBuilder strbuilder_; + arrow::TimestampBuilder tsbuilder_{arrow::timestamp(arrow::TimeUnit::MILLI), arrow::default_memory_pool()}; + arrow::Decimal128Builder d128builder_{arrow::Decimal128Type::Make(38, 10).ValueOrDie()}; + size_t numberOfRowsToProduce_ = 100; + size_t chunkSize_ = 10; + size_t rowsProduced_ = 0; +}; + +void generateLargeTable(size_t reserve_num, std::string rowNum, std::string fileDir) +{ + // Data is in RBR + std::shared_ptr batch_stream = std::make_shared(reserve_num, 1000000); + + // Choose compression + std::shared_ptr props = parquet::WriterProperties::Builder().build(); + + // Opt to store Arrow schema for easier reads back into Arrow + std::shared_ptr arrow_props = parquet::ArrowWriterProperties::Builder().build(); + + // Create a writer std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/" + rowNum + "MRows.parquet")); + + std::unique_ptr writer; PARQUET_ASSIGN_OR_THROW( - outfile, arrow::io::FileOutputStream::Open(fileDir + "/" + rowNum + "MRows.parquet")); - PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 100000)); - PARQUET_THROW_NOT_OK(outfile->Close()); + writer, parquet::arrow::FileWriter::Open(*batch_stream->schema().get(), arrow::default_memory_pool(), + outfile, props, arrow_props)); + + // Write each batch as a row_group + for (arrow::Result> maybe_batch : *batch_stream) + { + PARQUET_ASSIGN_OR_THROW(auto batch, maybe_batch); + PARQUET_ASSIGN_OR_THROW(auto table, arrow::Table::FromRecordBatches(batch->schema(), {batch})); + PARQUET_THROW_NOT_OK(writer->WriteTable(*table.get(), batch->num_rows())); + } + + // Write file footer and close + PARQUET_THROW_NOT_OK(writer->Close()); } int main(int argc, char** argv)